likes
comments
collection
share

InfluxDB 源码分析(七): Compact 压缩流程

作者站长头像
站长
· 阅读数 5

Compact 概述

influxdb 的存储引擎使用了基于LSM-Tree 改进的TSM文件结构。 TSM 与 LSM-Tree 类似,也有类似于MemTable(缓存), WAL 和 SSTable 的概念。LSM-Tree 会将内存中的Cache(MemTable) 持久化到硬盘,然后把硬盘上的若干个文件进行merge。这也可以有效的减少文件的个数,优化读的效率。(因为读的目标文件变少了)

influxdb 实现了这一套机制,在influxdb内部,compact 之前会先制定一个compact计划。该计划包含一组待压缩的文件,然后对这一组文件进行压缩。步骤如下:

  • 制定一个 compact plan, 该plan 包含一组待压缩的TSM 文件
  • 针对该组TSM 文件进行压缩。

TSM 文件名揭秘

在我们去服务器上查看influxdb的shard路径的时候,我们经常会发现shard下充斥了大量的tsm文件。他们的文件名由两组数字组成。 我们以 000000001-000000001.tsm 举例。

InfluxDB 源码分析(七):  Compact 压缩流程

在括号前面的,我们称之为一个世代,比如这个shard在初次从cache中持久化到磁盘的时候,那这个时候世代就是1。 第二次就是 2. 这个时候该shard下总会有两个文件, 为: 000000001-000000001.tsm000000002-000000001.tsm

这个时候有人要问了,为什么不是生成 000000001-000000002.tsm 文件呢? 这里要提前声明。 在 level 4 级别下的tsm文件,一个世代下只会存在一个级别。 当到level 4 及其 以上时,才会出现: 000000002-000000004.tsm000000002-000000005.tsm 这样同世代但是级别不同的文件出现。具体的原因我们后面会解释。

TSM 压缩过程

在去阅读源码之前,我们需要先知道TSM 是以怎么样一种规律去压缩的。详情大家看下图:

InfluxDB 源码分析(七):  Compact 压缩流程

假设我们这次图中有三个level为1的tsm文件,那么他们压缩完之后的结果是生成一个新的文件,叫 03 -02 .tsm 也就是最大的世代的级别+1的新文件。 图示如下:

InfluxDB 源码分析(七):  Compact 压缩流程 在这里我们就能理解为什么同一个世代下可能会有多个level 同时存在了。因为influxdb 规定了 单个 tsm的上限是2.1G。这个时候从 level3 -> level 4 的过程中,就有可能所有的level3文件的大小加起来远大于 2.1G。 以至于需要多个文件才能存下来。这个时候就会在当前世代的level+1 起一个新文件。

源码分析

压缩计划

源码位置:

// 表示一个可以被一起压缩的组。里面是待压缩的文件名, 根据文件名升序排列 [05-01.tsm, 06-01.tsm]
type CompactionGroup []string  
  
// CompactionPlanner 压缩计划  
type CompactionPlanner interface {  
   Plan(lastWrite time.Time) []CompactionGroup   // 只针对level 4 级别进行全量压缩 full
   PlanLevel(level int) []CompactionGroup   // 针对某个级别做压缩
   PlanOptimize() []CompactionGroup   
   Release(group []CompactionGroup)  
   FullyCompacted() bool  
  
   ForceFull()  
  
   SetFileStore(fs *FileStore)  
}

在这里我们主要关心两个核心方法:

  • Plan(lastWrite time.Time) []CompactionGroup 只针对level 4 级别进行全量压缩 full
  • PlanLevel(level int) []CompactionGroup 针对某个级别做压缩

这两个是生成压缩计划的核心。

在influxdb内部,提供了CompactionPlanner的默认实现DefaultPlanner。这里就不在展示相关的结构体定义了。 接下来我们直接看 DefaultPlanner 的 PlanLevel 默认实现,之后其他的结构体定义会在看源码的途中进行补充。

代码相关的内容请看注释;

// PlanLevel 返回一组要为特定级别重写的 TSM 文件。  
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {  
   // 如果一个完全压缩的请求已经开始,请不要应用任何可以阻止它的级别  
   c.mu.RLock()  
   if c.forceFull {  
      c.mu.RUnlock()  
      return nil  
   }  
   c.mu.RUnlock()  
  
   // 确定磁盘上所有文件的代世代  
   //我们需要在概念上将一代视为单个文件,即使它可能按顺序拆分为多个文件。  
   // generations 生序输出,比如 02 02
   generations := c.findGenerations(true)  
    
   //  如果当前只有一代,并且只有一个tsm文件,则没有压缩的必要了  
   if len(generations) <= 1 && !generations.hasTombstones() {  
      return nil  
   }  
  
   // 按级别对每一代进行分组,使同一级别中的两个相邻代成为同一组的一部分  
   // 当前的世代  
   var currentGen tsmGenerations  
  
   // 世代组  
   var groups []tsmGenerations  
  
   // 遍历所有的世代  
   for i := 0; i < len(generations); i++ {  
      // 获取到当前世代  
      cur := generations[i]  
  
      if i < len(generations)-1 {  
         // 如果当前的 level 小于 下一个 level   if cur.level() < generations[i+1].level() {  
            // 将该  
            currentGen = append(currentGen, cur)  
            continue  
         }  
      }  
  
      if len(currentGen) == 0 || currentGen.level() == cur.level() {  
         currentGen = append(currentGen, cur)  
         continue  
      }  
      
      // 如果当前的level 大于 下一个 level,放入groups  
      groups = append(groups, currentGen)  
  
      currentGen = tsmGenerations{}  
      currentGen = append(currentGen, cur)  
   }  
  
   if len(currentGen) > 0 {  
      groups = append(groups, currentGen)  
   }  
   
   // 过滤level, 只保留目标组
   // 双重列表  
   var levelGroups []tsmGenerations  
   for _, cur := range groups {  
      // 只过滤特定的级别的  
      if cur.level() == level {  
         levelGroups = append(levelGroups, cur)  
      }  
   }  
  
   // 最低一次压缩4个世代的数据  
   minGenerations := 4  
   if level == 1 {  
	  // 如果世代等于1,每次必须凑够8个文件才能开启压缩,避免因为缓存的快速刷新导致频繁的压缩。
      minGenerations = 8  
   }  
  
   var cGroups []CompactionGroup  
   for _, group := range levelGroups {  
      // 对于每一个世代而言,需要分块  
      for _, chunk := range group.chunk(minGenerations) {  
         var cGroup CompactionGroup  
         var hasTombstones bool  
         for _, gen := range chunk {  
            if gen.hasTombstones() {  
               hasTombstones = true  
            }  
            for _, file := range gen.files {  
               cGroup = append(cGroup, file.Path)  
            }  
         }  
		 // 如果凑不够最小的,则直接跳过。为什么不放前面去?提前判断不是更好吗
         if len(chunk) < minGenerations && !hasTombstones {  
            continue  
         }  
  
         cGroups = append(cGroups, cGroup)  
      }  
   }  
  
   if !c.acquire(cGroups) {  
      return nil  
   }  
  
   return cGroups  
}

我们重点关注一下这段逻辑,这段逻辑是找到符合条件的世代,并根据世代的大小生序返回:

generations := c.findGenerations(true)  

type tsmGenerations []*tsmGeneration

type tsmGeneration struct {  
   id            int  
   files         []FileStat  
   parseFileName ParseFileNameFunc  
}

// findGenerations 根据文件名按世代对所有 TSM 文件进行分组,然后按降序(最新的优先)返回世代。  
// 如果 skipInUse 为真,则不返回属于现有压缩计划一部分的 tsm 文件。  
func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {  
   c.mu.Lock()  
   defer c.mu.Unlock()  
  
   last := c.lastFindGenerations  
   lastGen := c.lastGenerations  
  
   if !last.IsZero() && c.FileStore.LastModified().Equal(last) {  
      return lastGen  
   }  
  
   genTime := c.FileStore.LastModified()  
   tsmStats := c.FileStore.Stats()  
   // 世代  
   generations := make(map[int]*tsmGeneration, len(tsmStats))  
  
   // tsm Stats  
   for _, f := range tsmStats {  
      gen, _, _ := c.ParseFileName(f.Path)  
  
      // Skip any files that are assigned to a current compaction plan  
      // 如果文件正在使用,则跳过本次压缩  
      if _, ok := c.filesInUse[f.Path]; skipInUse && ok {  
         continue  
      }  
      // 如果当前世代没有  
      group := generations[gen]  
      if group == nil {  
         // 生成一个新的组  
         group = newTsmGeneration(gen, c.ParseFileName)  
         // 生成一个group  
         generations[gen] = group  
      }  
      //将该文件填加到指定到 gen 下  
      group.files = append(group.files, f)  
   }  
  

   orderedGenerations := make(tsmGenerations, 0, len(generations))  
  
   for _, g := range generations {  
      orderedGenerations = append(orderedGenerations, g)  
   }  
  
   // 降序排列  
   if !orderedGenerations.IsSorted() {  
	  // 如果没有排序好,则升序排列好 
      sort.Sort(orderedGenerations)  
   }  
  
   c.lastFindGenerations = genTime  
   c.lastGenerations = orderedGenerations  
  
   return orderedGenerations  
}

需要注意的是,PlanLevel 只负责 level 级别小于4的tsm 文件,代码上是这样表现的,对于level>4的, 则需要使用 Plan 函数实现。

// 计划返回一组 TSM 文件以重写 4 级或更高级别。如果可能的话,计划会返回多个组,以允许压缩同时运行  
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {  
   // 只压缩level>4的组
   // 依旧是寻找所有可以压缩的文件
   generations := c.findGenerations(true)  
  
   c.mu.RLock()  
   forceFull := c.forceFull  
   c.mu.RUnlock()  
  
   // 完全压缩是有CD的  
   // 当某个shard过期时,调用完整压缩将该shard所有级别的的压缩一遍,这覆盖了所有的级别
   if forceFull || c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 {  
  
      // Reset the full schedule if we planned because of it.  
      // 重置full 压缩的计划表  
      if forceFull {  
         c.mu.Lock()  
         c.forceFull = false  
         c.mu.Unlock()  
      }  
  
      // tsmFiles  
      var tsmFiles []string  
      var genCount int  
  
      // 遍历所有的世代  
      for i, group := range generations {  
         var skip bool  
  
         // 如果超过两个世代的文件, 并且超过了最大的文件大小,这个时候就要跳过,没有压缩的必要了
         if len(generations) > 2 && group.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !group.hasTombstones() {  
            skip = true  
         }  
   
          // 需要检查下一代的文件级别,因为下一代的文件有可能会和这一代进行合并。这个地方没有太懂
         if i < len(generations)-1 {  
            if generations[i+1].level() <= 3 {  
               skip = false  
            }  
         }  
  
         if skip {  
            continue  
         }  
  
         for _, f := range group.files {  
            // 遍历该世代的下的所有文件,并放置在tsmFiles列表中  
            tsmFiles = append(tsmFiles, f.Path)  
         }  
         genCount += 1  
      }  
      // 根据文件名排好序  
      sort.Strings(tsmFiles)  
  
      // Make sure we have more than 1 file and more than 1 generation  
      // 如果只有一个世代或者文件只有一个,则放弃本次压缩  
      if len(tsmFiles) <= 1 || genCount <= 1 {  
         return nil  
      }  
      // 生成压缩组  
      group := []CompactionGroup{tsmFiles}  
      if !c.acquire(group) {  
         return nil  
      }  
      return group  
   }  
  
   // 如果上一次plan的时间在文件的修改时间之后,说明这段时间内文件是没有被改动的,此时停止压缩。  
   if c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {  
      return nil  
   }  
  
   c.lastPlanCheck = time.Now()  

   // 如果只有一代,需要提前返回,这样可以避免一直迫害同一个文件  
   if len(generations) <= 1 && !generations.hasTombstones() {  
      return nil  
   }  

   // 需要找到第 4 级文件的终点。它们将是最旧的文件。一旦我们看到小于 4 的文件,我们就会按降序扫描每一代  
   // 因为是升序的  
   end := 0  
   start := 0  
   for i, g := range generations {  
      if g.level() <= 3 {  
         break  
      }  
      end = i + 1  
   }  
  
   // 随着压缩的运行,最旧的文件会变大。如果它们已达到极限,我们不想在此计划期间重新压缩它们,因此请跳过我们看到的任何内容。  
   var hasTombstones bool  
   for i, g := range generations[:end] {  
  
      // 如果文件到极限,比如2.1G, 就不压了  
      if g.hasTombstones() {  
         hasTombstones = true  
      }  
  
      if hasTombstones {  
         continue  
      }  
  
      // 如果文件超过了2GB的限制, 就不压缩这个文件了,跳过
      if g.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {  
         start = i + 1  
      }  
  
      // This is an edge case that can happen after multiple compactions run.  The files at the beginning  
      // can become larger faster than ones after them.  We want to skip those really big ones and just      // compact the smaller ones until they are closer in size.     
       if i > 0 {  
         if g.size()*2 < generations[i-1].size() {  
            start = i  
            break  
         }  
      }  
   }  
  
   // step is how may files to compact in a group.  We want to clamp it at 4 but also stil  
   // return groups smaller than 4.   step := 4  
   if step > end {  
      step = end  
   }  
  
   // 拿到上文过滤的所有level>=4 的可以压缩的集合
   generations = generations[start:end]  
  
    groups := []tsmGenerations{}  
   // 每step为一组  
   for i := 0; i < len(generations); i += step {  
      var skipGroup bool  
      startIndex := i  
  
      for j := i; j < i+step && j < len(generations); j++ {  
         gen := generations[j]  
         lvl := gen.level()  
     
         // 如果这个级别有小于3的,则跳过 , 这个时候应该不会有小于3的level出现了,为了保险还是选择继续判断一下
         if lvl <= 3 {  
            skipGroup = true  
            break         
        }  
		  //。为了保险,继续判断下这个文件是可以被压缩的,如果不能,则跳过 
         if gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {  
            startIndex++  
            continue  
         }  
      }  
  
      if skipGroup {  
         continue  
      }  
  
      endIndex := i + step  
      if endIndex > len(generations) {  
         endIndex = len(generations)  
      }  
      if endIndex-startIndex > 0 {  
         groups = append(groups, generations[startIndex:endIndex])  
      }  
   }  
  
   if len(groups) == 0 {  
      return nil  
   }  

   // 对于组,我们需要评估是是否可以对组进行压缩  
   compactable := []tsmGenerations{}  
   for _, group := range groups {  
      // 如果没有足够的世代去压缩,这一轮压缩的条目小于4个,则跳过  
      if len(group) < 4 && !group.hasTombstones() {  
         continue  
      }  
      compactable = append(compactable, group)  
   }  
   // 所有要压缩的问题都是按照顺序压缩  
   var tsmFiles []CompactionGroup  
   for _, c := range compactable {  
      var cGroup CompactionGroup  
      for _, group := range c {  
         for _, f := range group.files {  
            cGroup = append(cGroup, f.Path)  
         }  
      }  
      sort.Strings(cGroup)  
      tsmFiles = append(tsmFiles, cGroup)  
   }  
  
   if !c.acquire(tsmFiles) {  
      return nil  
   }  
   return tsmFiles  
}

plan 函数主要承担了两个具体的职能:

  • 对该shard下所有的tsm进行一波全量压缩的计划
  • 只针对大于3的level文件进行制定压缩计划

这个时候到这里,我们已经拥有制定计划的人了,并且已经制定好了相关的压缩计划,那就需要有一个执行者去执行这些压缩计划。

压缩的执行者

结构体路径: 碍于篇幅,将不再展示该结构题属性部分。

// Compactor 将多个 TSM 文件合并为新文件或将 Cache 写入 1 个或多个 TSM 文件。  
type Compactor struct { }

Compactor是具体的压缩执行器,它主要承担了两部分的工作:

  • 将内存中的MemTable,也就是缓存中的内容,持久化到磁盘变成tsm文件
  • 对一组tsm文件进行压缩

我们先来看第一部分工作,对应的Compactor的函数是WriteSnapshot:

// WriteSnapshot 将缓存快照写入一个或多个新的 TSM 文件。  
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {  
   c.mu.RLock()  
   enabled := c.snapshotsEnabled  
   intC := c.snapshotsInterrupt  
   c.mu.RUnlock()  
  
   if !enabled {  
      return nil, errSnapshotsDisabled  
   }  
  
   start := time.Now()  
   card := cache.Count()  
  
   // Enable throttling if we have lower cardinality or snapshots are going fast.  
   throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second  
  
   // Write snapshost concurrently if cardinality is relatively high.  
  
   // 如果基数很高,并发写  
   concurrency := card / 2e6  
   if concurrency < 1 {  
      concurrency = 1  
   }  
  
   // 如果数量过高,并发写快照  
   if card >= 3e6 {  
      concurrency = 4  
      throttle = false  
   }  
  
   // 根据并发分割成4份  
   splits := cache.Split(concurrency)  
  
   type res struct {  
      files []string  
      err   error  
   }  
  
   resC := make(chan res, concurrency)  
   for i := 0; i < concurrency; i++ {  
      go func(sp *Cache) {  
         iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)  
         // 这里NextGeneration()写新文件的时候,提升了一个世代
         files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle)  
         resC <- res{files: files, err: err}  
  
      }(splits[i])  
   }  
  
   var err error  
   files := make([]string, 0, concurrency)  
   for i := 0; i < concurrency; i++ {  
      result := <-resC  
      if result.err != nil {  
         err = result.err  
      }  
      files = append(files, result.files...)  
   }  
  
   dur := time.Since(start).Truncate(time.Second)  
  
   c.mu.Lock()  
  
   // See if we were disabled while writing a snapshot  
   enabled = c.snapshotsEnabled  
   c.lastSnapshotDuration = dur  
   c.snapshotLatencies.add(time.Since(start))  
   c.mu.Unlock()  
  
   if !enabled {  
      return nil, errSnapshotsDisabled  
   }  
  
   return files, err  
}

对于第二部分工作,Compactor 提供了两个压缩方法,分别是

  • CompactFull 全量压缩
  • CompactFast 快速压缩

但是他们本质上都是调用的 Compactor 的compact 方法。


// compact 将多个较小的 TSM 文件写入 1 个或多个较大的文件。  
func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {  
   size := c.Size  
   if size <= 0 {  
      size = tsdb.DefaultMaxPointsPerBlock  
   }  
  
   c.mu.RLock()  
   intC := c.compactionsInterrupt  
   c.mu.RUnlock()  
  
   // The new compacted files need to added to the max generation in the  
   // set.  We need to find that max generation as well as the max sequence   // number to ensure we write to the next unique location.  
   var maxGeneration, maxSequence int  
   for _, f := range tsmFiles {  
      gen, seq, err := c.parseFileName(f)  
      // 世代 和 level     
       if err != nil {  
         return nil, err  
      }  
  
      // 如果当前世代大于最大的世代  
      if gen > maxGeneration {  
         maxGeneration = gen  
         maxSequence = seq  
      }  
  
      if gen == maxGeneration && seq > maxSequence {  
         maxSequence = seq  
      }  
   }  
  
   // For each TSM file, create a TSM reader  
   // 对待每一个TSM file,创建一个 TSM reader 
   var trs []*TSMReader  
   for _, file := range tsmFiles {  
      select {  
      case <-intC:  
         return nil, errCompactionAborted{}  
      default:  
      }  
  
      tr := c.FileStore.TSMReader(file)  
      // 度该文件  
      if tr == nil {  
         // This would be a bug if this occurred as tsmFiles passed in should only be  
         // assigned to one compaction at any one time.  A nil tr would mean the file         // doesn't exist.         
         return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}  
      }  
      defer tr.Unref() // inform that we're done with this reader when this method returns.  
      trs = append(trs, tr)  
   }  
  
   if len(trs) == 0 {  
      return nil, nil  
   }  
   // 针对待压缩的文件生成一个TSMBatchKeyIterator
   tsm, err := NewTSMBatchKeyIterator(size, fast, intC, tsmFiles, trs...)  
   if err != nil {  
      return nil, err  
   }  
  
   return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true)  
}



// writeNewFiles 从迭代器写入新的 TSM 文件,一旦达到最大 TSM 文件大小就轮换到新文件。  
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) {  
   // These are the new TSM files written  
  
   // 这些是新写入的tsm文件  
   var files []string  
  
   for {  
	  // sequence 就是level
      sequence++  
  
      // New TSM files are written to a temp file and renamed when fully completed.  
      // 新的 TSM 文件被写入临时文件并在完全完成后重命名。  
      // 此时生成了temp文件  
      fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)  
  
      // Write as much as possible to this file  
      // 尽可能的多写入此文件  
      err := c.write(fileName, iter, throttle)  
  
       // 如果文件达到了最大的限制,则创建新的文件继续压缩  
      if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {  
         files = append(files, fileName)  
         continue  
      } else if err == ErrNoValues {  
         // If the file only contained tombstoned entries, then it would be a 0 length  
         // file that we can drop.         if err := os.RemoveAll(fileName); err != nil {  
            return nil, err  
         }  
         break  
      } else if _, ok := err.(errCompactionInProgress); ok {    
          return nil, err  
      } else if err != nil {  
         //  压缩完成之后移除所有的temp文件
         for _, f := range files {  
            if err := os.RemoveAll(f); err != nil {  
               return nil, err  
            }  
         }  
         // We hit an error and didn't finish the compaction.  Remove the temp file and abort.  
         if err := os.RemoveAll(fileName); err != nil {  
            return nil, err  
         }  
         return nil, err  
      }  
  
      files = append(files, fileName)  
      break  
   }  
  
   return files, nil  
}

这里就触碰到了文章最开始的逻辑,对于level>4的tsm文件,我们总是很容易发现他们属于同一个世代,其实就是因为压缩的时候一个文件装不下了。

到这里我们已经初步窥探的 influxbd tsm 的整个大概的压缩流程了。但是压缩具体是执行的哪部分的压缩内容呢?我们接着往下讲:

Compact 压缩核心

还记得上文中提到的NewTSMBatchKeyIterator 这个吗,NewTSMBatchKeyIterator 可以同时遍历多个tsm文件。实际上压缩的核心就在于此。

由于tsm文件中的key都是按照从小到大进行排序的,所以压缩实际上是将多个tsm中的key merge 到一起的过程。

  1. 先将各tsm文件中的第一个key对应的block一一取出
  2. 扫描1中获取到的所有每一个key,确定一个当前最小的key
  3. 从1中获取到的所有block中提取出key等于2中获取的最小key的block,存在k.blocks
  4. 对3中获取的所有block作merge, 主要是按minTime排序,这样基本就完成了一个Next的操作

tsm文件结构:

InfluxDB 源码分析(七):  Compact 压缩流程

block的data里面存储的就是,时间戳+对应的value,这里的value经过了对应的算法压缩。这里的源码有点小复杂,就不展开了

// Next returns true if there are any values remaining in the iterator.
func (k *tsmBatchKeyIterator) Next() bool {  
RETRY:  
   // Any merged blocks pending?  
   if len(k.merged) > 0 {  
      k.merged = k.merged[1:]  
      if len(k.merged) > 0 {  
         return true  
      }  
   }  
  
   // Any merged values pending?  
   if k.hasMergedValues() {  
      k.merge()  
      if len(k.merged) > 0 || k.hasMergedValues() {  
         return true  
      }  
   }  
  
   // If we still have blocks from the last read, merge them  
   if len(k.blocks) > 0 {  
      k.merge()  
      if len(k.merged) > 0 || k.hasMergedValues() {  
         return true  
      }  
   }  
  
   // Read the next block from each TSM iterator  
   // 读每一个tsm文件,将其第一组block都存到k.buf里,看起来是要合并排序 
   // 每个tsm文件对应一个blocks 
   // 这个blocks和tsm的index是一样的,是按key从小到大排序的
   for i, v := range k.buf {  
      if len(v) != 0 {  
         continue  
      }  
  
      iter := k.iterators[i]  
      k.currentTsm = k.tsmFiles[i]  
      if iter.Next() {  
         key, minTime, maxTime, typ, _, b, err := iter.Read()  
         if err != nil {  
            k.AppendError(errBlockRead{k.currentTsm, err})  
         }  
  
         // This block may have ranges of time removed from it that would  
         // reduce the block min and max time.         tombstones := iter.r.TombstoneRange(key)  
  
         var blk *block  
         if cap(k.buf[i]) > len(k.buf[i]) {  
            k.buf[i] = k.buf[i][:len(k.buf[i])+1]  
            blk = k.buf[i][len(k.buf[i])-1]  
            if blk == nil {  
               blk = &block{}  
               k.buf[i][len(k.buf[i])-1] = blk  
            }  
         } else {  
            blk = &block{}  
            k.buf[i] = append(k.buf[i], blk)  
         }  
         blk.minTime = minTime  
         blk.maxTime = maxTime  
         blk.key = key  
         blk.typ = typ  
         blk.b = b  
         blk.tombstones = tombstones  
         blk.readMin = math.MaxInt64  
         blk.readMax = math.MinInt64  
  
         blockKey := key  
         for bytes.Equal(iter.PeekNext(), blockKey) {  
            iter.Next()  
            key, minTime, maxTime, typ, _, b, err := iter.Read()  
            if err != nil {  
               k.AppendError(errBlockRead{k.currentTsm, err})  
            }  
  
            tombstones := iter.r.TombstoneRange(key)  
  
            var blk *block  
            if cap(k.buf[i]) > len(k.buf[i]) {  
               k.buf[i] = k.buf[i][:len(k.buf[i])+1]  
               blk = k.buf[i][len(k.buf[i])-1]  
               if blk == nil {  
                  blk = &block{}  
                  k.buf[i][len(k.buf[i])-1] = blk  
               }  
            } else {  
               blk = &block{}  
               k.buf[i] = append(k.buf[i], blk)  
            }  
  
            blk.minTime = minTime  
            blk.maxTime = maxTime  
            blk.key = key  
            blk.typ = typ  
            blk.b = b  
            blk.tombstones = tombstones  
            blk.readMin = math.MaxInt64  
            blk.readMax = math.MinInt64  
         }  
      }  
  
      if iter.Err() != nil {  
         k.AppendError(errBlockRead{k.currentTsm, iter.Err()})  
      }  
   }  
  
   // Each reader could have a different key that it's currently at, need to find  
   // the next smallest one to keep the sort ordering.   var minKey []byte  
   var minType byte  
   for _, b := range k.buf {  
      // block could be nil if the iterator has been exhausted for that file  
      if len(b) == 0 {  
         continue  
      }  
      if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {  
         minKey = b[0].key  
         minType = b[0].typ  
      }  
   }  
   k.key = minKey  
   k.typ = minType  
  
   // Now we need to find all blocks that match the min key so we can combine and dedupe  
   // the blocks if necessary   for i, b := range k.buf {  
      if len(b) == 0 {  
         continue  
      }  
      if bytes.Equal(b[0].key, k.key) {  
         k.blocks = append(k.blocks, b...)  
         k.buf[i] = k.buf[i][:0]  
      }  
   }  
  
   if len(k.blocks) == 0 {  
      return false  
   }  
  
   k.merge()  
  
   // After merging all the values for this key, we might not have any.  (e.g. they were all deleted  
   // through many tombstones).  In this case, move on to the next key instead of ending iteration.   if len(k.merged) == 0 {  
      goto RETRY  
   }  
  
   return len(k.merged) > 0  
}

Compact 的入口

我们现在已经了解了整个Compact的逻辑,我们需要生成一个plan,然后交给Compactor去执行。那什么时候 influxdb才会尝试开启一次压缩的呢?

这个函数会在shard open 的时候打开,也就是每个shard都维护了一个定时任务定期去进行压缩,执行轮询的周期是 1s。 如果这个时候存在未压缩完成的任务,此时就会继续等15分钟,到下一个周期继续检查。

func (e *Engine) compact(wg *sync.WaitGroup) {  
	// 依次生成压缩任务,去压缩,可以看到这里会同时压缩所有级别的压缩任务
	 // Find our compaction plans  
	 level1Groups := e.CompactionPlan.PlanLevel(1)  
	 level2Groups := e.CompactionPlan.PlanLevel(2)  
	 level3Groups := e.CompactionPlan.PlanLevel(3)  
	 level4Groups := e.CompactionPlan.Plan(e.LastModified())  
	 atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))  

	 // If no full compactions are need, see if an optimize is needed  
	 if len(level4Groups) == 0 {  
		level4Groups = e.CompactionPlan.PlanOptimize()  
		atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))  
	 }  
}

·