InfluxDB 源码分析(四):Meta 数据结构
meta 指的是influxdb的源信息,在磁盘上的位置是meta目录下的meta.db, meta.db 存储了一些数据库,RP, Shard, 连续查询等信息。
关于meta部分的代码在 influxdb/services/meta/config.go
这个目录下。
我们把目光聚集在data.go
这个文件中。
第一部分 Meta 的数据结构
首先点进去映入眼帘的就是一个名叫Data的结构体定义。
// Data表示所有元数据的最上层集合(来对地方了)
// Data represents the top level collection of all metadata.
type Data struct {
Term uint64 // associated raft term
Index uint64 // associated raft index
ClusterID uint64
Databases []DatabaseInfo
Users []UserInfo
// adminUserExists provides a constant time mechanism for determining
// if there is at least one admin user.
adminUserExists bool
MaxShardGroupID uint64
MaxShardID uint64
}
这里的话,Term和Index这两个属性在开源版是没有用到的,主要用在集群版里面,通过注释也可以看出来一些端倪,集群版分布式一致性算法采用的是raft算法。
既然注释中说Data是元数据的最上层的概念,那说明Databases,Users 位于Data的下层。而MaxShardGroupID,MaxShardID 应该是为了便于查询或者计算而定义的。我们先从DatabaseInfo
来看。
type DatabaseInfo struct {
Name string
DefaultRetentionPolicy string
RetentionPolicies []RetentionPolicyInfo
ContinuousQueries []ContinuousQueryInfo
}
DatabaseInfo 存的是database的详情信息,包含Name,默认的RP,该数据库下保留的RP列表, 以及连续查询策略。其他没有了。
我们继续往下看,看看RetentionPolicyInfo
是不是还有什么。
// 数据保留策略代表一个 retention policy 的元数据
// RetentionPolicyInfo represents metadata about a retention policy.
type RetentionPolicyInfo struct {
Name string //RP的名次
ReplicaN int // 副本数
Duration time.Duration //时间范围
ShardGroupDuration time.Duration // shardGroup实际范围
ShardGroups []ShardGroupInfo //对应的ShardGroup
Subscriptions []SubscriptionInfo
}
RetentionPolicyInfo 下并不会直接关联Shard,而是会关联到一个ShardGroup的概念上去。ShardGroup是shard的逻辑分组,逻辑分组的意思表示,shardGroup和shard不一样,并不会实际存储在磁盘中,shardGroup和rp策略是强相关,大家可以理解为一个ShardGroup存储的是某个特定时间范围内的数据。
我们继续往下看ShardGroups
// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system
// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can// safely delete any associated shards.
type ShardGroupInfo struct {
ID uint64
StartTime time.Time
EndTime time.Time
DeletedAt time.Time
Shards []ShardInfo
TruncatedAt time.Time
}
DeletedAt
字段标记了一个shardGroup的删除时间,如果被设置,则说明该ShardGroup下的Shard是可以被安全删除的。Shards
是一组Shard
列表。下面是Shard下面的概念。
// ShardInfo represents metadata about a shard.
type ShardInfo struct {
ID uint64
Owners []ShardOwner
}
到Shard这一层,我们发现没有什么再往下可以探寻的结构了,Owners
虽然是一个ShardOwner
的列表,但是ShardOwner
表示的是这个Shard的归属的节点,不需要继续向下看了。
到这里 RetentionPolicies
这条线我们已经看到最下层了,我们接下来看ContinuousQueries
连续查询。熟悉influxdb的应该可以看出来,ContinuousQueries
应该只是记录一些连续查询的配置,不会有复杂的信息。
// ContinuousQueryInfo represents metadata about a continuous query.
type ContinuousQueryInfo struct {
Name string
Query string
}
到这一步,相信大家对meta最基本的数据结构已经有一个最基本的认识了,我们甚至还可以画一张图。
第二部分
ShardGroup 创建
我们继续从data目录寻找线索,看一下什么时候会创建一个shardGroup并写进去元信息中呢?果不其然,找到了这样一个方法。
可以看到创建一个shardGroup的时候指定了StartTime
和EndTime
可以看到EndTime
- StartTime
的值就是一个ShardGroupDuration
的值。
这个怎么理解呢? 假设我们的某个RP的值是2w
(2周),我们指定每一周为一个shardGroup
,那当influxdb运行两周之后,元数据的数据结构可能就是这样:
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
// 在数据库给定的时间戳上创建一个shardgroup
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
// Find retention policy.
rpi, err := data.RetentionPolicy(database, policy)
if err != nil {
return err
} else if rpi == nil {
return influxdb.ErrRetentionPolicyNotFound(policy)
}
// Verify that shard group doesn't already exist for this timestamp.
if rpi.ShardGroupByTimestamp(timestamp) != nil {
return nil
}
// Create the shard group.
data.MaxShardGroupID++
sgi := ShardGroupInfo{}
sgi.ID = data.MaxShardGroupID
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {
// Shard group range is [start, end) so add one to the max time.
sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)
}
data.MaxShardID++
sgi.Shards = []ShardInfo{
{ID: data.MaxShardID},
}
// Retention policy has a new shard group, so update the policy. Shard
// Groups must be stored in sorted order, as other parts of the system // assume this to be the case. rpi.ShardGroups = append(rpi.ShardGroups, sgi)
sort.Sort(ShardGroupInfos(rpi.ShardGroups))
return nil
}
那什么时候会调用CreateShardGroup
进行ShardGroup的创建呢? 根据我们对上图的理解,shardGroup的创建只会在一个ShardGroupDuration
结束的时候创建,比如第二周的数据,那肯定要到第一周过完了才会需要一个新的 ShardGroup
, 但是这个ShardGroup
创建的时机是系统定期检查自动创建的呢? 还是用到的时候发现没有临时创建出来的呢?
说实话,我不知道。我们只能顺藤摸瓜向上找。我们通过查询调用信息,发现除了单元测试之外,有个地方调用了data模块下的 CreateShardGroup
函数。位于同样属于meta模块下的clinet.go
文件中。
这个方法主要是对data模块的调用,可见关于shardGroup以及对元信息模块的调用基本上都是委托给data模块的(influxdb的设计美学,权责还是比较分明的)
func createShardGroup(data *Data, database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// It is the responsibility of the caller to check if it exists before calling this method.
if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
return nil, ErrShardGroupExists
}
if err := data.CreateShardGroup(database, policy, timestamp); err != nil {
return nil, err
}
rpi, err := data.RetentionPolicy(database, policy)
if err != nil {
return nil, err
} else if rpi == nil {
return nil, errors.New("retention policy deleted after shard group created")
}
// 按照时间排序
sgi := rpi.ShardGroupByTimestamp(timestamp)
return sgi, nil
}
但是clinet下的createShardGroup
显然还不是我们要找的答案。我们先找下同模块下有没有对这个方法的调用,没有再去其他地方碰碰运气。
运气真好,clinet模块下刚好有我们想要的东西。
这个函数叫预创建ShardGroups
,看来我们今天运气不错,我们先大致看下描述。大致意思就是需要在下一个shardgroup写进来之前把shardgroup创建好。
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
func (c *Client) PrecreateShardGroups(from, to time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
var changed bool
// 遍历所有的database
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
if len(rp.ShardGroups) == 0 {
// No data was ever written to this group, or all groups have been deleted.
continue
}
g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
// Group is not deleted, will end before the future time, but is still yet to expire.
// This last check is important, so the system doesn't create shards groups wholly // in the past.
// Create successive shard group. nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
// if it already exists, continue
if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {
c.logger.Info("Shard group already exists",
logger.ShardGroup(sg.ID),
logger.Database(di.Name),
logger.RetentionPolicy(rp.Name))
continue
}
newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)
if err != nil {
c.logger.Info("Failed to precreate successive shard group",
zap.Uint64("group_id", g.ID), zap.Error(err))
continue
}
changed = true
c.logger.Info("New shard group successfully precreated",
logger.ShardGroup(newGroup.ID),
logger.Database(di.Name),
logger.RetentionPolicy(rp.Name))
}
}
}
if changed {
if err := c.commit(data); err != nil {
return err
}
}
return nil
}
到这一步仍然不是我们想要寻找的东西,但是线索已经尽在眼前了,我们找一下PrecreateShardGroups
调用的位置。运气不错,一下子就找到了。
我们最终在/Users/hanshu/Code/go_project/influxdb/services/precreator/service.go
找到了相关的调用,从注释可以看到,influxdb内部维护了一个定时器,周期检查是否需要创建新的shardGroup
。至此关于ShardGroup的内容就暂时分析完了。
// 不断检查是否需要预创建
// runPrecreation continually checks if resources need precreation.
func (s *Service) runPrecreation() {
defer s.wg.Done()
for {
select {
case <-time.After(s.checkInterval):
if err := s.precreate(time.Now().UTC()); err != nil {
s.Logger.Info("Failed to precreate shards", zap.Error(err))
}
case <-s.done:
s.Logger.Info("Terminating precreation service")
return
}
}
}
// precreate performs actual resource precreation.
func (s *Service) precreate(now time.Time) error {
cutoff := now.Add(s.advancePeriod).UTC()
return s.MetaClient.PrecreateShardGroups(now, cutoff)
}
RP 的创建
相较于ShardGroup
的创建,RP的创建要简单许多,RP意为数据保留策略,数据保留策略的创建只会有两个地方会调用。
- 创建数据库的时候,(如果指定了RP, 创建该RP,如果没有指定,则默认RP,autogen)
- 通过命令行/http的方式使用influxdb sql 创建新的rp
// CreateRetentionPolicy creates a new retention policy on a database.
// It returns an error if name is blank or if the database does not exist.
// 为database创建一个新的rp,如果name为空或者数据库不存在将会报错
func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error {
// Validate retention policy.
// 检查rpinfo
if rpi == nil {
return ErrRetentionPolicyRequired
} else if rpi.Name == "" {
return ErrRetentionPolicyNameRequired
} else if len(rpi.Name) > MaxNameLen {
return ErrNameTooLong
} else if rpi.ReplicaN < 1 {
return ErrReplicationFactorTooLow
}
// Normalise ShardDuration before comparing to any existing
// retention policies. The client is supposed to do this, but
// do it again to verify input.
// 归一化ShardGroupDuration到一个合理的范围内
rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration)
// 如果rp的Duration小于ShardGroupDuration 是不合理的,会报错
if rpi.Duration > 0 && rpi.Duration < rpi.ShardGroupDuration {
return ErrIncompatibleDurations
}
// Find database.
di := data.Database(database)
if di == nil {
return influxdb.ErrDatabaseNotFound(database)
} else if rp := di.RetentionPolicy(rpi.Name); rp != nil {
// RP with that name already exists. Make sure they're the same.
if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration {
return ErrRetentionPolicyExists
}
// if they want to make it default, and it's not the default, it's not an identical command so it's an error
if makeDefault && di.DefaultRetentionPolicy != rpi.Name {
return ErrRetentionPolicyConflict
}
return nil
}
// Append copy of new policy.
di.RetentionPolicies = append(di.RetentionPolicies, *rpi)
// Set the default if needed
if makeDefault {
di.DefaultRetentionPolicy = rpi.Name
}
return nil
}
RP的创建位置
接下来我们截取两个代码片段去看一下RP具体是在什么位置进行创建的。
在创建数据库的时候顺便创建
// CreateDatabase creates a database or returns it if it already exists.
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if db := data.Database(name); db != nil {
return db, nil
}
if err := data.CreateDatabase(name); err != nil {
return nil, err
}
// create default retention policy
if c.retentionAutoCreate {
rpi := DefaultRetentionPolicyInfo()
if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {
return nil, err
}
}
db := data.Database(name)
if err := c.commit(data); err != nil {
return nil, err
}
return db, nil
}
使用influx sql在命令行创建
func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) error {
if !meta.ValidName(stmt.Name) {
// TODO This should probably be in `(*meta.Data).CreateRetentionPolicy`
// but can't go there until 1.1 is used everywhere
return meta.ErrInvalidName
}
spec := meta.RetentionPolicySpec{
Name: stmt.Name,
Duration: &stmt.Duration,
ReplicaN: &stmt.Replication,
ShardGroupDuration: stmt.ShardGroupDuration,
}
// Create new retention policy.
_, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec, stmt.Default)
return err
}
转载自:https://juejin.cn/post/7352789840352116745