likes
comments
collection
share

InfluxDB 源码分析(二): 数据写入流程

作者站长头像
站长
· 阅读数 8

通过观察influxdb提出来的http服务,发现influxdb提供了post类型的write接口。

Route{  
   "write", // Data-ingest route.  
   "POST", "/api/v2/write", true, writeLogEnabled, h.serveWriteV2,  
},

点进去最终发现指向的是: serveWrite 需要的参数是:

  • database 数据库
  • retentionPolicy 数据保留策略
  • precision 时间精度
  • user meta.User
func (h *Handler) serveWrite(database, retentionPolicy, precision string, w http.ResponseWriter, r *http.Request, user meta.User) {  
	// 省略一部分代码
	writePoints := func() error {  
   switch pw := h.PointsWriter.(type) {  
   case pointsWriterWithContext:  
      var npoints, nvalues int64  
      ctx := context.WithValue(context.Background(), coordinator.StatPointsWritten, &npoints)  
      ctx = context.WithValue(ctx, coordinator.StatValuesWritten, &nvalues)  
  
      // for now, just store the number of values used.  
      err := pw.WritePointsWithContext(ctx, database, retentionPolicy, consistency, user, points)  
      atomic.AddInt64(&h.stats.ValuesWrittenOK, nvalues)  
      if err != nil {  
         return err  
      }  
      return nil  
   default:  
      return h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points)  
   }  
  }
  // 省略一部分代码   
}

我们查看WritePointsWithContext 这段代码内容:

func (w *PointsWriter) WritePointsPrivilegedWithContext(ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {  
   atomic.AddInt64(&w.stats.WriteReq, 1)  
   atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points)))  
   
   // 如果rp没有指定RP策略
   if retentionPolicy == "" {  
      db := w.MetaClient.Database(database)  
      if db == nil {  
         return influxdb.ErrDatabaseNotFound(database)  
      }  
      retentionPolicy = db.DefaultRetentionPolicy  
   } 
    
   // 这一步建立shardMap指的是需要计算出来哪些点要写到哪个shard中,下方会说明shardMap的数据结构。
   shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})  
   if err != nil {  
      return err  
   }  
  
   // Write each shard in it's own goroutine and return as soon as one fails.  
   ch := make(chan error, len(shardMappings.Points))  
   // 遍历每个shard以及需要写入的points
   for shardID, points := range shardMappings.Points {  
      go func(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {  
         var numPoints, numValues int64  
         ctx = context.WithValue(ctx, tsdb.StatPointsWritten, &numPoints)  
         ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues)  
		 //  数据实际上是在这里写入的
         err := w.writeToShardWithContext(ctx, shard, database, retentionPolicy, points)  
         if err == tsdb.ErrShardDeletion {  
            err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}  
         }  
  
         if v, ok := ctx.Value(StatPointsWritten).(*int64); ok {  
            atomic.AddInt64(v, numPoints)  
         }  
  
         if v, ok := ctx.Value(StatValuesWritten).(*int64); ok {  
            atomic.AddInt64(v, numValues)  
         }  
  
         ch <- err  
      }(ctx, shardMappings.Shards[shardID], database, retentionPolicy, points)  
   }  
  
}

通过上面的代码我们可以看到,influxdb将我们需要写入的数据分散到了各个不同的Shard中,然后再依次写入这些Shard。通过看一些资料,我们大概能猜到是通过hash的方式去选择某个shard的,而hash的内容则是 measurement+tag_key组成的字符串。

func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {  
   // 获取rp info对象
   rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)  
   // 省略一部分非核心代码
   // 构建一个Mapping结构
   mapping := NewShardMapping(len(wp.Points))  
   for _, p := range wp.Points {  
	  // 遍历所有的point,选择合适的shardGroup
      sg := list.ShardGroupAt(p.Time())  
      if sg == nil {  
         // We didn't create a shard group because the point was outside the  
         // scope of the RP.         mapping.Dropped = append(mapping.Dropped, p)  
         atomic.AddInt64(&w.stats.WriteDropped, 1)  
         continue  
      }  
	  // 选择合适的shard对象进行写入
      sh := sg.ShardFor(p)  
      mapping.MapPoint(&sh, p)  
   }  
   return mapping, nil  
}

查看下方代码,ShardMapping结构如下,n是需要写入的点数,Points是一个map,key为shardId, value为 对应的points的列表。 shards的key同样也是shardId,但是value是对应的shardInfo对象,shardInfo记录了shard的基本信息。

// NewShardMapping creates an empty ShardMapping.  
func NewShardMapping(n int) *ShardMapping {  
   return &ShardMapping{  
      n:      n,  
      Points: map[uint64][]models.Point{},  
      Shards: map[uint64]*meta.ShardInfo{},  
   }  
}

接下来我们看下shardGroup是如何根据point来选择合适的shard的。

sh := sg.ShardFor(p)  

func (sgi *ShardGroupInfo) ShardFor(p hashIDer) ShardInfo {  
   if len(sgi.Shards) == 1 {  
      return sgi.Shards[0]  
   }  
   return sgi.Shards[p.HashID()%uint64(len(sgi.Shards))]  
}

可以看到如果存在多个shard, 则是通过计算point的hash计算得到。计算采用了fnv64算法,根据ponit的key计算而来,point的key是measurement和tags

func (p *point) HashID() uint64 {  
   h := NewInlineFNV64a()  
   h.Write(p.key)  
   sum := h.Sum64()  
   return sum  
}

解析完point是如何被分配到不同的shard中去之后,我们继续探究influxdb的数据写入流程,也就是对应的这一部分:

err := w.writeToShardWithContext(ctx, shard, database, retentionPolicy, points)



func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {  
   atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))  
   // This is a small wrapper to make type-switching over w.TSDBStore a little  
   // less verbose.   writeToShard := func() error {  
      type shardWriterWithContext interface {  
         WriteToShardWithContext(context.Context, uint64, []models.Point) error  
      }  
      switch sw := w.TSDBStore.(type) {  
      case shardWriterWithContext:  
         if err := sw.WriteToShardWithContext(ctx, shard.ID, points); err != nil {  
            return err  
         }  
      default:  
         if err := w.TSDBStore.WriteToShard(shard.ID, points); err != nil {  
            return err  
         }  
      }  
      return nil  
   }  
  // 此处省略一些代码
}

可以看到,上面最核心的的代码逻辑是sw.WriteToShardWithContex 这个函数的调用。我们找到对应的实现。

func (s *Store) WriteToShardWithContext(ctx context.Context, shardID uint64, points []models.Point) error {  
   // 此处省略一部分代码
   // 拿到对应的shardID
   sh := s.shards[shardID]  
   if sh == nil {  
      s.mu.RUnlock()  
      return ErrShardNotFound  
   }  
   //
   epoch := s.epochs[shardID]  
  
   s.mu.RUnlock()  
  
   // enter the epoch tracker  
   guards, gen := epoch.StartWrite()  
   defer epoch.EndWrite(gen)  
  
   // wait for any guards before writing the points.  
   for _, guard := range guards {  
      if guard.Matches(points) {  
         guard.Wait()  
      }  
   }  
  
   // Ensure snapshot compactions are enabled since the shard might have been cold  
   // and disabled by the monitor.   if isIdle, _ := sh.IsIdle(); isIdle {  
      sh.SetCompactionsEnabled(true)  
   }  
  
   return sh.WritePointsWithContext(ctx, points)  
}

可以看到,最终写入的实现被委托到了shard的 WritePointsWithContext

func (s *Shard) WritePointsWithContext(ctx context.Context, points []models.Point) error {  
  
   engine, err := s.engineNoLock()  
   if err != nil {  
      return err  
   }    
  
   // see if our engine is capable of WritePointsWithContext  
   type contextWriter interface {  
      WritePointsWithContext(context.Context, []models.Point) error  
   }  
   switch eng := engine.(type) {  
   case contextWriter:  
	  // 这里可以看到shard的实现委托给力engine的WritePointsWithContext
      if err := eng.WritePointsWithContext(ctx, points); err != nil {  
         atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points)))  
         atomic.AddInt64(&s.stats.WriteReqErr, 1)  
         return fmt.Errorf("engine: %s", err)  
      }  
   default:  
      // Write to the engine.  
      if err := engine.WritePoints(points); err != nil {  
         atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points)))  
         atomic.AddInt64(&s.stats.WriteReqErr, 1)  
         return fmt.Errorf("engine: %s", err)  
      }  
   }  
  
}

通过上面的实现可以看到,shard的实现被委托给WritePointsWithContext 给engine的。

func (e *Engine) WritePointsWithContext(ctx context.Context, points []models.Point) error {  
   // 创建 缓冲区
   values := make(map[string][]Value, len(points))  
   var (  
      keyBuf    []byte  
      baseLen   int  
      seriesErr error  
      npoints   int64 // total points processed  
      nvalues   int64 // total values (fields) processed  
   )  
   // 遍历所有的点数
   for _, p := range points {  
      keyBuf = append(keyBuf[:0], p.Key()...)  
      keyBuf = append(keyBuf, keyFieldSeparator...)  
      baseLen = len(keyBuf)  
      iter := p.FieldIterator()  
      t := p.Time().UnixNano()  
  
      npoints++  
      for iter.Next() {  
         // Skip fields name "time", they are illegal  
         if bytes.Equal(iter.FieldKey(), timeBytes) {  
            continue  
         }  
         
         keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)  
		 
         if e.seriesTypeMap != nil {  
            // Fast-path check to see if the field for the series already exists.  
            if v, ok := e.seriesTypeMap.Get(keyBuf); !ok {  
               if typ, err := e.Type(keyBuf); err != nil {  
                  // Field type is unknown, we can try to add it.  
               } else if typ != iter.Type() {  
                  // Existing type is different from what was passed in, we need to drop  
                  // this write and refresh the series type map.                  seriesErr = tsdb.ErrFieldTypeConflict  
                  e.seriesTypeMap.Insert(keyBuf, int(typ))  
                  continue  
               }  
  
               // Doesn't exist, so try to insert  
               vv, ok := e.seriesTypeMap.Insert(keyBuf, int(iter.Type()))  
  
               // We didn't insert and the type that exists isn't what we tried to insert, so  
               // we have a conflict and must drop this field/series.               if !ok || vv != int(iter.Type()) {  
                  seriesErr = tsdb.ErrFieldTypeConflict  
                  continue  
               }  
            } else if v != int(iter.Type()) {  
               // The series already exists, but with a different type.  This is also a type conflict  
               // and we need to drop this field/series.               seriesErr = tsdb.ErrFieldTypeConflict  
               continue  
            }  
         }  
  
         var v Value  
         switch iter.Type() {  
         case models.Float:  
            fv, err := iter.FloatValue()  
            if err != nil {  
               return err  
            }  
            v = NewFloatValue(t, fv)  
         case models.Integer:  
            iv, err := iter.IntegerValue()  
            if err != nil {  
               return err  
            }  
            v = NewIntegerValue(t, iv)  
         case models.Unsigned:  
            iv, err := iter.UnsignedValue()  
            if err != nil {  
               return err  
            }  
            v = NewUnsignedValue(t, iv)  
         case models.String:  
            v = NewStringValue(t, iter.StringValue())  
         case models.Boolean:  
            bv, err := iter.BooleanValue()  
            if err != nil {  
               return err  
            }  
            v = NewBooleanValue(t, bv)  
         default:  
            return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())  
         }  
  
         nvalues++  
         values[string(keyBuf)] = append(values[string(keyBuf)], v)  
      }  
   }  
  
   e.mu.RLock()  
   defer e.mu.RUnlock()  
  
   // first try to write to the cache  
   if err := e.Cache.WriteMulti(values); err != nil {  
      return err  
   }  
  
   if e.WALEnabled {  
      if _, err := e.WAL.WriteMulti(values); err != nil {  
         return err  
      }  
   }  
  
   // if requested, store points written stats  
   if pointsWritten, ok := ctx.Value(tsdb.StatPointsWritten).(*int64); ok {  
      *pointsWritten = npoints  
   }  
  
   // if requested, store values written stats  
   if valuesWritten, ok := ctx.Value(tsdb.StatValuesWritten).(*int64); ok {  
      *valuesWritten = nvalues  
   }  
  
   return seriesErr  
}

数据编码完成之后,写入cache和wal。 当然这里只是写入缓存和wal文件中,并没有实际落到我们的data目录中,当满足合适的条件时会执行compactCache操作,把缓存中的数据刷到data目录下。这个触发的条件有两个:一个是当cache大小超过一定阈值,可以通过参数’cache-snapshot-memory-size’配置,默认是25M大小;第二个二是超过一定时间阈值没有时序数据写入WAL也会触发flush,默认时间阈值为10分钟,可以通过参数’cache-snapshot-write-cold-duration’配置。

compactCache函数的作用主要是启动了一个定时器,定期去检查是否需要 将内存中的快照刷到磁盘上。

func (e *Engine) compactCache() {  
   t := time.NewTicker(time.Second)  
   defer t.Stop()  
   for {  
      e.mu.RLock()  
      quit := e.snapDone  
      e.mu.RUnlock()  
  
      select {  
      case <-quit:  
         return  
  
      case <-t.C:  
         e.Cache.UpdateAge()  
         if e.ShouldCompactCache(time.Now()) {  
            start := time.Now()  
            e.traceLogger.Info("Compacting cache", zap.String("path", e.path))  
            err := e.WriteSnapshot()  
            if err != nil && err != errCompactionsDisabled {  
               e.logger.Info("Error writing snapshot", zap.Error(err))  
               atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)  
            } else {  
               atomic.AddInt64(&e.stats.CacheCompactions, 1)  
            }  
            atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())  
         }  
      }  
   }  
}

观察ShouldCompactCache函数, 是符合我们上面所说的内容的。

func (e *Engine) ShouldCompactCache(t time.Time) bool {  
   sz := e.Cache.Size()  
  
   if sz == 0 {  
      return false  
   }  
  
   if sz > e.CacheFlushMemorySizeThreshold {  
      return true  
   }  
  
   return t.Sub(e.Cache.LastWriteTime()) > e.CacheFlushWriteColdDuration  
}

WriteSnapshot的作用是关闭当前所有的segment,也就是wal file,第二个是Cache做快照(snapshot)

func (e *Engine) WriteSnapshot() (err error) {  
   // Lock and grab the cache snapshot along with all the closed WAL  
   // filenames associated with the snapshot  
   started := time.Now()  
   log, logEnd := logger.NewOperation(e.logger, "Cache snapshot", "tsm1_cache_snapshot")  
   defer func() {  
      elapsed := time.Since(started)  
      e.Cache.UpdateCompactTime(elapsed)  
  
      if err == nil {  
         log.Info("Snapshot for path written", zap.String("path", e.path), zap.Duration("duration", elapsed))  
      }  
      logEnd()  
   }()  
  
   closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {  
      e.mu.Lock()  
      defer e.mu.Unlock()  
  
      if e.WALEnabled {  
         if err = e.WAL.CloseSegment(); err != nil {  
            return  
         }  
  
         segments, err = e.WAL.ClosedSegments()  
         if err != nil {  
            return  
         }  
      }  
  
      snapshot, err = e.Cache.Snapshot()  
      if err != nil {  
         return  
      }  
  
      return  
   }()  
  
   if err != nil {  
      return err  
   }  
  
   if snapshot.Size() == 0 {  
      e.Cache.ClearSnapshot(true)  
      return nil  
   }  
  
   // The snapshotted cache may have duplicate points and unsorted data.  We need to deduplicate  
   // it before writing the snapshot.  This can be very expensive so it's done while we are not   // holding the engine write lock.   dedup := time.Now()  
   snapshot.Deduplicate()  
   e.traceLogger.Info("Snapshot for path deduplicated",  
      zap.String("path", e.path),  
      zap.Duration("duration", time.Since(dedup)))  
  
   return e.writeSnapshotAndCommit(log, closedFiles, snapshot)  
}

之后会调用writeSnapshotAndCommit 将快照写文件,并清除缓存以及相关的wal文件。