InfluxDB 源码分析(二): 数据写入流程
通过观察influxdb提出来的http服务,发现influxdb提供了post类型的write接口。
Route{
"write", // Data-ingest route.
"POST", "/api/v2/write", true, writeLogEnabled, h.serveWriteV2,
},
点进去最终发现指向的是:
serveWrite
需要的参数是:
database
数据库retentionPolicy
数据保留策略precision
时间精度user
meta.User
func (h *Handler) serveWrite(database, retentionPolicy, precision string, w http.ResponseWriter, r *http.Request, user meta.User) {
// 省略一部分代码
writePoints := func() error {
switch pw := h.PointsWriter.(type) {
case pointsWriterWithContext:
var npoints, nvalues int64
ctx := context.WithValue(context.Background(), coordinator.StatPointsWritten, &npoints)
ctx = context.WithValue(ctx, coordinator.StatValuesWritten, &nvalues)
// for now, just store the number of values used.
err := pw.WritePointsWithContext(ctx, database, retentionPolicy, consistency, user, points)
atomic.AddInt64(&h.stats.ValuesWrittenOK, nvalues)
if err != nil {
return err
}
return nil
default:
return h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points)
}
}
// 省略一部分代码
}
我们查看WritePointsWithContext
这段代码内容:
func (w *PointsWriter) WritePointsPrivilegedWithContext(ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
atomic.AddInt64(&w.stats.WriteReq, 1)
atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points)))
// 如果rp没有指定RP策略
if retentionPolicy == "" {
db := w.MetaClient.Database(database)
if db == nil {
return influxdb.ErrDatabaseNotFound(database)
}
retentionPolicy = db.DefaultRetentionPolicy
}
// 这一步建立shardMap指的是需要计算出来哪些点要写到哪个shard中,下方会说明shardMap的数据结构。
shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
if err != nil {
return err
}
// Write each shard in it's own goroutine and return as soon as one fails.
ch := make(chan error, len(shardMappings.Points))
// 遍历每个shard以及需要写入的points
for shardID, points := range shardMappings.Points {
go func(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
var numPoints, numValues int64
ctx = context.WithValue(ctx, tsdb.StatPointsWritten, &numPoints)
ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues)
// 数据实际上是在这里写入的
err := w.writeToShardWithContext(ctx, shard, database, retentionPolicy, points)
if err == tsdb.ErrShardDeletion {
err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}
}
if v, ok := ctx.Value(StatPointsWritten).(*int64); ok {
atomic.AddInt64(v, numPoints)
}
if v, ok := ctx.Value(StatValuesWritten).(*int64); ok {
atomic.AddInt64(v, numValues)
}
ch <- err
}(ctx, shardMappings.Shards[shardID], database, retentionPolicy, points)
}
}
通过上面的代码我们可以看到,influxdb将我们需要写入的数据分散到了各个不同的Shard中,然后再依次写入这些Shard。通过看一些资料,我们大概能猜到是通过hash的方式去选择某个shard的,而hash的内容则是 measurement+tag_key组成的字符串。
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
// 获取rp info对象
rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
// 省略一部分非核心代码
// 构建一个Mapping结构
mapping := NewShardMapping(len(wp.Points))
for _, p := range wp.Points {
// 遍历所有的point,选择合适的shardGroup
sg := list.ShardGroupAt(p.Time())
if sg == nil {
// We didn't create a shard group because the point was outside the
// scope of the RP. mapping.Dropped = append(mapping.Dropped, p)
atomic.AddInt64(&w.stats.WriteDropped, 1)
continue
}
// 选择合适的shard对象进行写入
sh := sg.ShardFor(p)
mapping.MapPoint(&sh, p)
}
return mapping, nil
}
查看下方代码,ShardMapping结构如下,n是需要写入的点数,Points是一个map,key为shardId, value为 对应的points的列表。 shards的key同样也是shardId,但是value是对应的shardInfo对象,shardInfo记录了shard的基本信息。
// NewShardMapping creates an empty ShardMapping.
func NewShardMapping(n int) *ShardMapping {
return &ShardMapping{
n: n,
Points: map[uint64][]models.Point{},
Shards: map[uint64]*meta.ShardInfo{},
}
}
接下来我们看下shardGroup是如何根据point来选择合适的shard的。
sh := sg.ShardFor(p)
func (sgi *ShardGroupInfo) ShardFor(p hashIDer) ShardInfo {
if len(sgi.Shards) == 1 {
return sgi.Shards[0]
}
return sgi.Shards[p.HashID()%uint64(len(sgi.Shards))]
}
可以看到如果存在多个shard, 则是通过计算point的hash计算得到。计算采用了fnv64算法,根据ponit的key计算而来,point的key是measurement和tags
func (p *point) HashID() uint64 {
h := NewInlineFNV64a()
h.Write(p.key)
sum := h.Sum64()
return sum
}
解析完point是如何被分配到不同的shard中去之后,我们继续探究influxdb的数据写入流程,也就是对应的这一部分:
err := w.writeToShardWithContext(ctx, shard, database, retentionPolicy, points)
func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
// This is a small wrapper to make type-switching over w.TSDBStore a little
// less verbose. writeToShard := func() error {
type shardWriterWithContext interface {
WriteToShardWithContext(context.Context, uint64, []models.Point) error
}
switch sw := w.TSDBStore.(type) {
case shardWriterWithContext:
if err := sw.WriteToShardWithContext(ctx, shard.ID, points); err != nil {
return err
}
default:
if err := w.TSDBStore.WriteToShard(shard.ID, points); err != nil {
return err
}
}
return nil
}
// 此处省略一些代码
}
可以看到,上面最核心的的代码逻辑是sw.WriteToShardWithContex
这个函数的调用。我们找到对应的实现。
func (s *Store) WriteToShardWithContext(ctx context.Context, shardID uint64, points []models.Point) error {
// 此处省略一部分代码
// 拿到对应的shardID
sh := s.shards[shardID]
if sh == nil {
s.mu.RUnlock()
return ErrShardNotFound
}
//
epoch := s.epochs[shardID]
s.mu.RUnlock()
// enter the epoch tracker
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)
// wait for any guards before writing the points.
for _, guard := range guards {
if guard.Matches(points) {
guard.Wait()
}
}
// Ensure snapshot compactions are enabled since the shard might have been cold
// and disabled by the monitor. if isIdle, _ := sh.IsIdle(); isIdle {
sh.SetCompactionsEnabled(true)
}
return sh.WritePointsWithContext(ctx, points)
}
可以看到,最终写入的实现被委托到了shard的 WritePointsWithContext
func (s *Shard) WritePointsWithContext(ctx context.Context, points []models.Point) error {
engine, err := s.engineNoLock()
if err != nil {
return err
}
// see if our engine is capable of WritePointsWithContext
type contextWriter interface {
WritePointsWithContext(context.Context, []models.Point) error
}
switch eng := engine.(type) {
case contextWriter:
// 这里可以看到shard的实现委托给力engine的WritePointsWithContext
if err := eng.WritePointsWithContext(ctx, points); err != nil {
atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points)))
atomic.AddInt64(&s.stats.WriteReqErr, 1)
return fmt.Errorf("engine: %s", err)
}
default:
// Write to the engine.
if err := engine.WritePoints(points); err != nil {
atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points)))
atomic.AddInt64(&s.stats.WriteReqErr, 1)
return fmt.Errorf("engine: %s", err)
}
}
}
通过上面的实现可以看到,shard的实现被委托给WritePointsWithContext
给engine的。
func (e *Engine) WritePointsWithContext(ctx context.Context, points []models.Point) error {
// 创建 缓冲区
values := make(map[string][]Value, len(points))
var (
keyBuf []byte
baseLen int
seriesErr error
npoints int64 // total points processed
nvalues int64 // total values (fields) processed
)
// 遍历所有的点数
for _, p := range points {
keyBuf = append(keyBuf[:0], p.Key()...)
keyBuf = append(keyBuf, keyFieldSeparator...)
baseLen = len(keyBuf)
iter := p.FieldIterator()
t := p.Time().UnixNano()
npoints++
for iter.Next() {
// Skip fields name "time", they are illegal
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
if e.seriesTypeMap != nil {
// Fast-path check to see if the field for the series already exists.
if v, ok := e.seriesTypeMap.Get(keyBuf); !ok {
if typ, err := e.Type(keyBuf); err != nil {
// Field type is unknown, we can try to add it.
} else if typ != iter.Type() {
// Existing type is different from what was passed in, we need to drop
// this write and refresh the series type map. seriesErr = tsdb.ErrFieldTypeConflict
e.seriesTypeMap.Insert(keyBuf, int(typ))
continue
}
// Doesn't exist, so try to insert
vv, ok := e.seriesTypeMap.Insert(keyBuf, int(iter.Type()))
// We didn't insert and the type that exists isn't what we tried to insert, so
// we have a conflict and must drop this field/series. if !ok || vv != int(iter.Type()) {
seriesErr = tsdb.ErrFieldTypeConflict
continue
}
} else if v != int(iter.Type()) {
// The series already exists, but with a different type. This is also a type conflict
// and we need to drop this field/series. seriesErr = tsdb.ErrFieldTypeConflict
continue
}
}
var v Value
switch iter.Type() {
case models.Float:
fv, err := iter.FloatValue()
if err != nil {
return err
}
v = NewFloatValue(t, fv)
case models.Integer:
iv, err := iter.IntegerValue()
if err != nil {
return err
}
v = NewIntegerValue(t, iv)
case models.Unsigned:
iv, err := iter.UnsignedValue()
if err != nil {
return err
}
v = NewUnsignedValue(t, iv)
case models.String:
v = NewStringValue(t, iter.StringValue())
case models.Boolean:
bv, err := iter.BooleanValue()
if err != nil {
return err
}
v = NewBooleanValue(t, bv)
default:
return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())
}
nvalues++
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
}
e.mu.RLock()
defer e.mu.RUnlock()
// first try to write to the cache
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}
// if requested, store points written stats
if pointsWritten, ok := ctx.Value(tsdb.StatPointsWritten).(*int64); ok {
*pointsWritten = npoints
}
// if requested, store values written stats
if valuesWritten, ok := ctx.Value(tsdb.StatValuesWritten).(*int64); ok {
*valuesWritten = nvalues
}
return seriesErr
}
数据编码完成之后,写入cache和wal。
当然这里只是写入缓存和wal文件中,并没有实际落到我们的data目录中,当满足合适的条件时会执行compactCache
操作,把缓存中的数据刷到data目录下。这个触发的条件有两个:一个是当cache大小超过一定阈值,可以通过参数’cache-snapshot-memory-size’配置,默认是25M大小;第二个二是超过一定时间阈值没有时序数据写入WAL也会触发flush,默认时间阈值为10分钟,可以通过参数’cache-snapshot-write-cold-duration’配置。
compactCache函数的作用主要是启动了一个定时器,定期去检查是否需要 将内存中的快照刷到磁盘上。
func (e *Engine) compactCache() {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
e.mu.RLock()
quit := e.snapDone
e.mu.RUnlock()
select {
case <-quit:
return
case <-t.C:
e.Cache.UpdateAge()
if e.ShouldCompactCache(time.Now()) {
start := time.Now()
e.traceLogger.Info("Compacting cache", zap.String("path", e.path))
err := e.WriteSnapshot()
if err != nil && err != errCompactionsDisabled {
e.logger.Info("Error writing snapshot", zap.Error(err))
atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
} else {
atomic.AddInt64(&e.stats.CacheCompactions, 1)
}
atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())
}
}
}
}
观察ShouldCompactCache
函数, 是符合我们上面所说的内容的。
func (e *Engine) ShouldCompactCache(t time.Time) bool {
sz := e.Cache.Size()
if sz == 0 {
return false
}
if sz > e.CacheFlushMemorySizeThreshold {
return true
}
return t.Sub(e.Cache.LastWriteTime()) > e.CacheFlushWriteColdDuration
}
WriteSnapshot
的作用是关闭当前所有的segment,也就是wal file,第二个是Cache做快照(snapshot)
func (e *Engine) WriteSnapshot() (err error) {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot
started := time.Now()
log, logEnd := logger.NewOperation(e.logger, "Cache snapshot", "tsm1_cache_snapshot")
defer func() {
elapsed := time.Since(started)
e.Cache.UpdateCompactTime(elapsed)
if err == nil {
log.Info("Snapshot for path written", zap.String("path", e.path), zap.Duration("duration", elapsed))
}
logEnd()
}()
closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.WALEnabled {
if err = e.WAL.CloseSegment(); err != nil {
return
}
segments, err = e.WAL.ClosedSegments()
if err != nil {
return
}
}
snapshot, err = e.Cache.Snapshot()
if err != nil {
return
}
return
}()
if err != nil {
return err
}
if snapshot.Size() == 0 {
e.Cache.ClearSnapshot(true)
return nil
}
// The snapshotted cache may have duplicate points and unsorted data. We need to deduplicate
// it before writing the snapshot. This can be very expensive so it's done while we are not // holding the engine write lock. dedup := time.Now()
snapshot.Deduplicate()
e.traceLogger.Info("Snapshot for path deduplicated",
zap.String("path", e.path),
zap.Duration("duration", time.Since(dedup)))
return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
}
之后会调用writeSnapshotAndCommit
将快照写文件,并清除缓存以及相关的wal文件。
转载自:https://juejin.cn/post/7352789840352083977