InfluxDB 源码分析(三):数据查询流程
相较于数据的写入, influxdb 作为一个时许数据库,数据的查询作为时许数据库的重中之重,必然是十分复杂的,influxdb有着非常优秀的查询性能,这一切离不开influxdb对于底层存储的一些设计。在上一篇文章中, 我们简单叙述了influxdb的数据写入流程。
数据查询流程概览
Route{
"query", // Query serving route.
"GET", "/query", true, true, h.serveQuery,
},
通过查询最终会走到 executeSelectStatement
里面。
func (e *StatementExecutor) executeSelectStatement(ctx *query.ExecutionContext, stmt *influxql.SelectStatement) error {
cur, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions)
if err != nil {
return err
}
// Generate a row emitter from the iterator set.
em := query.NewEmitter(cur, ctx.ChunkSize)
defer em.Close()
// Emit rows to the results channel.
var writeN int64
var emitted bool
var pointsWriter *BufferedPointsWriter
if stmt.Target != nil {
pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000)
}
for {
row, partial, err := em.Emit()
if err != nil {
return err
} else if row == nil {
// Check if the query was interrupted while emitting.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
break
}
// Write points back into system for INTO statements.
if stmt.Target != nil {
n, err := e.writeInto(pointsWriter, stmt, row, e.StrictErrorHandling)
if err != nil {
return err
}
writeN += n
continue
}
result := &query.Result{
Series: []*models.Row{row},
Partial: partial,
}
// Send results or exit if closing.
if err := ctx.Send(result); err != nil {
return err
}
emitted = true
}
// Flush remaining points and emit write count if an INTO statement.
if stmt.Target != nil {
if err := pointsWriter.Flush(); err != nil {
return err
}
var messages []*query.Message
if ctx.ReadOnly {
messages = append(messages, query.ReadOnlyWarning(stmt.String()))
}
return ctx.Send(&query.Result{
Messages: messages,
Series: []*models.Row{{
Name: "result",
Columns: []string{"time", "written"},
Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},
}},
})
}
// Always emit at least one result.
if !emitted {
return ctx.Send(&query.Result{
Series: make([]*models.Row, 0),
})
}
return nil
}
inmem 存储结构解析
#inmem的实现相对来说比较简单,感兴趣的可以自己去探索相关的源码,这里只做简单的介绍,或者看完TSI在回来看会更容易一些
inmem 是 in-memory 的缩写,意思就是全部都放在内存中。 inmem的内存索引结构非常简单,只有几个map便足以支持我们查询。
// Index is the in memory index of a collection of measurements, time// series, and their tags. Exported functions are goroutine safe while
// un-exported functions assume the caller will use the appropriate locks.
type Index struct {
mu sync.RWMutex
database string
sfile *tsdb.SeriesFile
fieldset *tsdb.MeasurementFieldSet
// In-memory metadata index, built on load and updated when new series come in
measurements map[string]*measurement // measurement name to object and index
series map[string]*series // map series key to the Series object
seriesSketch, seriesTSSketch estimator.Sketch
measurementsSketch, measurementsTSSketch estimator.Sketch
// Mutex to control rebuilds of the index
rebuildQueue sync.Mutex
}
可以看到,inmem的实现非常简单,只实现了两个map结构,分别是measurements
和series
。
其中measurements
的key
是measurement_key
, value
就是对应的measurement对
象. series
的key是series_key
, value 是 series
对象。
这里我们重点看measurement对象。
type measurement struct {
Database string
Name string `json:"name,omitempty"`
NameBytes []byte
mu sync.RWMutex
fieldNames map[string]struct{}
seriesByID map[uint64]*series
seriesByTagKeyValue map[string]*tagKeyValue
sortedSeriesIDs seriesIDs
dirty bool
}
这里同样有两个map对象,其中一个是seriesByID
。 这个对象允许我们根seriesID
去查询相关的series
对象。第二个map是seriesByTagKeyValue
。
seriesByTagKeyValue
的key是tag_key
, value则是tag_value
。
通过 measurement
对象的SeriesIDsByTagValue
我们可以知道对应的 seriesIDs
是什么。
func (m *measurement) SeriesIDsByTagValue(key, value []byte) seriesIDs {
tagVals := m.seriesByTagKeyValue[string(key)]
if tagVals == nil {
return nil
}
return tagVals.Load(string(value))
}
一个完整的Inmem查询流程
- 用户在终端输入了:
select * from cpu where value='1'
- executeSelectStatement 解析生成对应的
Iterators
#TSI查询部分
- 先去缓存中尝试查询有没有对应的seriesID,如果没有,继续下一步
- 通过index下的measurements map 找到对应的measurement数据
- 通过 measurement 下的SeriesIDsByTagValue 获取到相关的seriesID
- 通过 seriesID 根据seriesByID查询到该 seriesID 对应的 seriesKey
TSI 存储结构解析
当我们将 index-version
的配置修改为 tsm1
时,我们会发现在influxdb中每个data
目录的shard
中发现多了一个 index
文件夹。
(base) hanshu@base index % ls
0 1 2 3 4 5 6 7
(base) hanshu@base index % cd 0
(base) hanshu@base % ls
L0-00000001.tsl MANIFEST
可以发现index存在0-7 八个文件夹,这里的设计非常像_series 文件夹的结构,所以在这里推测,这里也采用了 partition
的设计思路,把tsi的索引的建立分散在八个不同的partition中,以减少对单个partition的写入压力。
TSI 在源码中的入口位置为: influxdb/tsdb/index/tsi1/
在TSI包下非常罕见的出现了一个 doc.go
的文件,在这个文件里面概述了关于 tsi 索引的一些功能和概念。感兴趣的可以花时间出略读一下,不感兴趣的话我们把目光聚集在下面这段话就行,我们来看下:
Index File Layout
The index file is composed of 3 main block types: one series block, one or more
tag blocks, and one measurement block. At the end of the index file is a
trailer that records metadata such as the offsets to these blocks.
索引文件包含三个主要的 block 类型, 一个series block, 一个是tag block,一个 measurement block。在这些模块的末尾有一个存储元信息记录的trailer,它记录了每个block在文件中的迁移量。
需要注意的是 series block
只是一个文件上的概念,并不会在代码层面有所表示。所以我们重点关注的是 measurement block
和 tag block
// IndexFileTrailerSize is the size of the trailer. Currently 82 bytes.
IndexFileTrailerSize = IndexFileVersionSize +
8 + 8 + // measurement block offset + size
8 + 8 + // series id set offset + size
8 + 8 + // tombstone series id set offset + size
8 + 8 + // series sketch offset + size
8 + 8 + // tombstone series sketch offset + size
0
图示如下:
从这个定义我们可以知道,每个tsi文件的末尾有64个字节用于存储FileTrailer信息,包括
measurement block
在文件中的位置和大小。剩下的48
的字节则用来存储 series
相关的,比如 series sketch
和 tombstone series
。 tombstone 是series的一个标志位。当一个series
被删除时,并不会真正的删除该series,而是会将该series标记为已删除。
Measurement Block
我们接下来把目光移动到最下层的第一个 MeasureMent
。 具体在代码中的位置为:
// tsdb/index/tsi1/measurement_block.go:32
MeasurementTrailerSize = 0 +
2 + // version
8 + 8 + // data offset/size
8 + 8 + // hash index offset/size
8 + 8 + // measurement sketch offset/size
8 + 8 // tombstone measurement sketch offset/size
// MeasurementBlockTrailer represents meta data at the end of a MeasurementBlock.
type MeasurementBlockTrailer struct {
Version int // Encoding version
// Offset & size of data section. Data struct {
Offset int64
Size int64
}
// Offset & size of hash map section.
HashIndex struct {
Offset int64
Size int64
}
// Offset and size of cardinality sketch for measurements.
Sketch struct {
Offset int64
Size int64
}
// Offset and size of cardinality sketch for tombstoned measurements.
TSketch struct {
Offset int64
Size int64
}
}
可以看到 Measurement 的 Trailer
记录了以下信息。
- measurement 所对应的data信息的偏移量和大小。该存储在代码中的表示为
MeasurementBlockElem
对象。该对象存储了 measurement 的名称,series信息。以及非常重要的tagBlock
。 它记录了这张表的tag数据在文件中的偏移量和大小。为 where tag 时去哪里找数据指明了道路。 同时我们也看到了 MeasurementBlockElem 维护了一个 seriesIDSet。这里主要是为了加速查询,当我们select * from cpu
时, 我们可以直接从MeasurementBlockElem
拿到cpu这张表下所有的seriesID
集合。
// MeasurementBlockElem represents an internal measurement element.
type MeasurementBlockElem struct {
flag byte // flag
name []byte // measurement name
tagBlock struct {
offset int64
size int64
}
series struct {
n uint64 // series count data []byte // serialized series data
}
seriesIDSet *tsdb.SeriesIDSet
// size in bytes, set after unmarshaling.
size int
}
- 以hash索引的方式存储了
MeasurementBlockElement
在文件中的offset, 可以在不用读取整体的tsi文件的前提下,快速定位对某个measurementblockElement的文件位置,然后读取并解析。
Tag Block
现在当我们查询时,我们已经可以根据tsi查询到我们要查的哪张表的数据信息了,也就是我们现在的接口已经完全可以支持下面这类查询了。
select * from cpu
但是我们知道,influxdb提供了非常强大influxql 供我们查询数据。
select * from cpu where value='1'
这样需要二次过滤的场景只有 Measurement Block 显然就无能为力了。因为Measurement Block虽然记录了seriesIDSet。但是没有记录 value = '1' 相关 seriesID 在哪里。所以面对需要 series过滤的场景,我们只能请出来 Tag Block
帮我们解决问题了。
Tag Block
在代码中的位置为:
// TagBlockTrailerSize is the total size of the on-disk trailer.const
TagBlockTrailerSize = 0 +
8 + 8 + // value data offset/size
8 + 8 + // key data offset/size
8 + 8 + // hash index offset/size
8 + // size
2 // version
// TagBlockTrailer represents meta data at the end of a TagBlock.
type TagBlockTrailer struct {
Version int // Encoding version
Size int64 // Total size w/ trailer
// Offset & size of value data section.
ValueData struct {
Offset int64
Size int64
}
// Offset & size of key data section.
KeyData struct {
Offset int64
Size int64
}
// Offset & size of hash map section.
HashIndex struct {
Offset int64
Size int64
}
我们可以看到 TagBlockTrailer
记录了如下几个信息:
- value data 所有tag key对应的
value data
数据,注意,同一个tagkey对应的value data
在文件中是放在一块的。 - key data 存储改measurement下的所有tag key
- hash index 是一个 map<tag_key, offset> 的 映射,可以通过hash index 快速定位到某个对应的tag key
接下来我们来看下 tag key 具体在代码中的结构:
type TagBlockKeyElem struct {
flag byte
key []byte
// Value data
data struct {
offset uint64
size uint64
buf []byte
}
// Value hash index data
hashIndex struct {
offset uint64
size uint64
buf []byte
}
size int
}
图示如下:
其中 data
存储了该tag key 对应的value在文件中的位置。 hashIndex 存储了value索引在文件中的偏移量和大小。 key 则是该tag key 的name。
到此我们已经完成了where的上半部分了:
select * from cpu where value=
这个时候有人要说了,我 可以通过把所有value全筛选出来然后全部对比一下。 这样也不是不行,但是对于时许数据库这种动则上亿上十亿的点来说性能有点过于低了。
influxdb 解决的思路呢,其实就是value对应的hashIndex
。我们接下来看下tagvalue
的结构。
// TagBlockValueElem represents a tag value element.
type TagBlockValueElem struct {
flag byte
value []byte
// Legacy uvarint-encoded series data.
// Mutually exclusive with seriesIDSetData field.
series struct {
n uint64 // Series count
data []byte // Raw series data
}
// Roaring bitmap encoded series data.
// Mutually exclusive with series.data field.
seriesIDSetData []byte
size int
}
这个时候有人又要问了,为什么有个 series ,又有个 seriesIDSetData 呢,不是有一个就好了? 其实两个并不会同时使用,主要取决于 flag的值, 默认的是 seriesIDSetData 的方式:
// unmarshal unmarshals buf into e.
func (e *TagBlockValueElem) unmarshal(buf []byte) {
// Parse series data (original uvarint encoded or roaring bitmap).
if e.flag&TagValueSeriesIDSetFlag == 0 {
e.series.data, buf = buf[:sz], buf[sz:]
} else {
// buf = memalign(buf)
e.seriesIDSetData, buf = buf, buf[sz:]
}
// Save length of elem.
e.size = start - len(buf)
}
Tag Value 结构如图所示:
这个时候我们已经完全有能力执行这样一个sql
在数据库中查询到我们想要的数据了。
select * from cpu where value="1"
这个时候我们已经拿到了我们想要的seriesID。我们可以通过 SeriesIndex
根据seriesID 查询到对应的 seriesKey
是哪个。这块的内容等到 series详解到时候会单独说明。
在这里简单总结下:
seriesIndex 在内存中维护了两个map。我们可以通过seriesKey 拿ID,也可以通过ID 拿该seriesKey在文件中的位置。
keyIDMap *rhh.HashMap
idOffsetMap map[uint64]int64
// SeriesKey returns the series key for a given id.
func (f *SeriesFile) SeriesKey(id uint64) []byte {
if id == 0 {
return nil
}
p := f.SeriesIDPartition(id)
if p == nil {
return nil
}
return p.SeriesKey(id)
}
一个完整的TIS 查询流程
#前置处理部分
- 用户在终端输入了:
select * from cpu where value='1'
- executeSelectStatement 解析生成对应的
Iterators
#TSI查询部分
- 先去缓存中尝试查询有没有对应的seriesID,如果没有,继续下一步
- 通过IndexFile 的 Trailer 根据 meansurement name, 本例子中为
cpu
找到对应的meansurement
的 block - 通过 meansurement block 找到 meansurement 下的 tag key 数据,找到 value 这个tag_key
- 通过该tag_key 找到 tag_key 对应的value 的 hash index。对
1
进行hash index 得到对应tag value 的位置。 - 拿到该tag value 下的 seriesIDs
#seriesIndex
- 通过 seriesID 查询到该 seriesID 对应的 value
这个时候问们就实现了 TSI 所实现的倒排索引,即通过tag_key+value 反向查找 series_key。
TSI VS Inmem
在上面的两个章节中,我们分别了解TSI 和 Inmem 两种不同的索引实现。在实际的生产应用中比较推荐使用TSI, 因为TSI的出现就是为了弥补Inmem带来的巨大缺点。 相较于后续引入的更加完善的TSI, Inmem 的主要优势有:
- 实现简单
- 在series数量比较少的时候查询效率很高。 缺点则有很多了:
- 非常吃内存,因为所有的内存索引结构都在内存中维护,当我们有上千万,上亿个 series 序列到时候,内存中的两个map会变的异常的庞大无比。
- 启动时间会比较慢,因为inmem每次启动中都需要在内存中完整的构建这样索引结构,索引需要遍历所有的tsm文件。
TSI 倒排索引源码实现
当我们去influxdb输入这样一条语句的时候:
select * from cpu where value='1'
influxdb 最终会调用createCallIterator
这个方法,这个方法内部维护了一个迭代器。主要的作用是查询到所有的seriesID, 之后根据seriesID 查询到对应的 seriesKey。再根据seriesKey 去TSM中获取到对应的数据。
func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) {
if e.index.Type() == tsdb.InmemIndexName {
ts := e.index.(indexTagSets)
// 如果是in-mem 那就从内存里获取所有的seriesKey
tagSets, err = ts.TagSets([]byte(measurement), opt)
} else {
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
}
if err != nil {
return nil, err
}
// Reverse the tag sets if we are ordering by descending.
if !opt.Ascending {
for _, t := range tagSets {
t.Reverse()
}
}
// 省略部分代码
return itrs, nil
}
关于上面这段代码,我们只需要关注以下部分:
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
我们点进去tsi相关的索引入口,TSI相关的索引就是从这里开始的。
// 该方法会返回一个排好序的tag set 集合
func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
release := is.SeriesFile.Retain()
defer release()
// 根据measurement name 和 tag value 条件,筛选出所有符合条件的seriesID
itr, err := is.measurementSeriesByExprIterator(name, opt.Condition)
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()
// measurementSeriesByExprIterator filters deleted series IDs; no need to
// do so here.
var dims []string
if len(opt.Dimensions) > 0 {
dims = make([]string, len(opt.Dimensions))
copy(dims, opt.Dimensions)
sort.Strings(dims)
}
// For every series, get the tag values for the requested tag keys i.e.
// dimensions. This is the TagSet for that series. Series with the same // TagSet are then grouped together, because for the purpose of GROUP BY // they are part of the same composite series. tagSets := make(map[string]*query.TagSet, 64)
var (
seriesN, maxSeriesN int
db = is.Database()
)
//MaxSeriesN 要查询的series数量,默认情况下 slimit = limit if opt.MaxSeriesN > 0 {
maxSeriesN = opt.MaxSeriesN
} else {
maxSeriesN = int(^uint(0) >> 1)
}
// The tag sets require a string for each series key in the set, The series
// file formatted keys need to be parsed into models format. Since they will // end up as strings we can re-use an intermediate buffer for this process. var keyBuf []byte
var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration.
for {
se, err := itr.Next()
if err != nil {
return nil, err
} else if se.SeriesID == 0 {
break
}
// 根据seriesID 获取到对应的seriesKey
key := sfile.SeriesKey(se.SeriesID)
if len(key) == 0 {
continue
}
if seriesN&0x3fff == 0x3fff {
// check every 16384 series if the query has been canceled
select {
case <-opt.InterruptCh:
return nil, query.ErrQueryInterrupted
default:
}
}
if seriesN > maxSeriesN {
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
}
// NOTE - must not escape this loop iteration.
_, tagsBuf = ParseSeriesKeyInto(key, tagsBuf)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tagsBuf) {
continue
}
var tagsAsKey []byte
if len(dims) > 0 {
tagsAsKey = MakeTagsKey(dims, tagsBuf)
}
tagSet, ok := tagSets[string(tagsAsKey)]
if !ok {
// This TagSet is new, create a new entry for it.
tagSet = &query.TagSet{
Tags: nil,
Key: tagsAsKey,
}
}
// Associate the series and filter with the Tagset.
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
tagSet.AddFilter(string(keyBuf), se.Expr)
keyBuf = keyBuf[:0]
// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet
seriesN++
}
// Sort the series in each tag set.
for _, t := range tagSets {
sort.Sort(t)
}
// The TagSets have been created, as a map of TagSets. Just send
// the values back as a slice, sorting for consistency. sortedTagsSets := make([]*query.TagSet, 0, len(tagSets))
for _, v := range tagSets {
sortedTagsSets = append(sortedTagsSets, v)
}
sort.Sort(byTagKey(sortedTagsSets))
return sortedTagsSets, nil
}
我们继续关注这里, measurementSeriesByExprIterator
允许我们根据条件和measurement
来获取我们所需要的seriesID。同时我们可以看到,如果没有带where
条件,则会直接返回该表下所有的seriesID。
func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
// Return all series for the measurement if there are no tag expressions.
if expr == nil {
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
itr, err := is.seriesByExprIterator(name, expr)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
TSI 查询不带条件
我们先看最基础的不带条件的查询语句,类似于这样:
select * from cpu
如果我们的查询不带where
条件,influxdb
查询时会直接获取该表下的所有的seriesID
。在measurement这一层级维护了一个seriesID
的set。这样influxdb就不需要遍历所有的tagvalue block去获取所有的seriesID
。 加速了全表的查询。
func (is IndexSet) measurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) {
a := make([]SeriesIDIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
itr, err := idx.MeasurementSeriesIDIterator(name)
if err != nil {
SeriesIDIterators(a).Close()
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
return MergeSeriesIDIterators(a...), nil
}
我们可以先不去深思merge的逻辑,merge本质上就是把多个迭代器合并成一个。我们主要看这段代码:
itr, err := idx.MeasurementSeriesIDIterator(name)
每个index文件下会有多个partitions
文件,在influxdb中 有8个,influxdb这样设置的目的是为了分单TSI的读写压力。和seriesFile的设计思路是一致的。一张 measurement
的数据可能会存储在多个 partitions
中,所以我们查询的时候,需要将每个partitions
都查一遍。
TSI partitions 在文件中的结构如下:
(base) hanshu@xxx-MB0 index % ls
0 1 2 3 4 5 6 7
func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) {
itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
for _, p := range i.partitions {
// 获取某个measurement下的所有SeriesID
itr, err := p.MeasurementSeriesIDIterator(name)
if err != nil {
tsdb.SeriesIDIterators(itrs).Close()
return nil, err
} else if itr != nil {
itrs = append(itrs, itr)
}
}
// 将查询到的结果进行合并
return tsdb.MergeSeriesIDIterators(itrs...), nil
}
而每个parttion
下又维护了该parttion
下的FileSet
信息. 为什么呢,假设一个parttion
只有一个tsi索引文件,随着我们该shard数据的不断膨胀,单个tsi的文件将会剧烈的增加,导致性能的下降,所以每个parttion下会有多个tsi文件,单个tsi文件的大小上限是2g。
func (p *Partition) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) {
fs, err := p.RetainFileSet()
if err != nil {
return nil, err
}
return newFileSetSeriesIDIterator(fs, fs.MeasurementSeriesIDIterator(name)), nil
}
实际parttion是通过FileSet
这一层去查询seriesID的。一个parttion可能会存在多个Tsi文件。
func (fs *FileSet) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
// 这里需要遍历每一个 tsi file
a := make([]tsdb.SeriesIDIterator, 0, len(fs.files))
for _, f := range fs.files {
itr := f.MeasurementSeriesIDIterator(name)
if itr != nil {
a = append(a, itr)
}
}
return tsdb.MergeSeriesIDIterators(a...)
}
LogFile
下方维护一个 mms
map。 其中key为measurement的name。value是logMeasurement
对象。这个mms 就像我们在上文提到的measurement block。
// MeasurementSeriesIDIterator returns an iterator over all series for a measurement.
func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
f.mu.RLock()
defer f.mu.RUnlock()
mm := f.mms[string(name)]
if mm == nil || mm.cardinality() == 0 {
return nil
}
return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet())
}
logMeasurement 缓存了该分区内该表所有的series信息,为了加速查询。
type logMeasurement struct {
name []byte
tagSet map[string]logTagKey
deleted bool
series map[uint64]struct{}
seriesSet *tsdb.SeriesIDSet
}
// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
// one
func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet {
if m.seriesSet != nil {
return m.seriesSet.CloneNoLock()
}
ss := tsdb.NewSeriesIDSet()
for seriesID := range m.series {
ss.AddNoLock(seriesID)
}
return ss
}
这部分信息在写入tsi和重启时加载TSI中的过程中都会通过execSeriesEntry
写入。
func (f *LogFile) execSeriesEntry(e *LogEntry) {
if !deleted {
# 这里
mm.addSeriesID(e.SeriesID)
} else {
mm.removeSeriesID(e.SeriesID)
}
// Read tag count.
tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder)
}
到这里,假设我们不带tag条件查询的话,直接从全表去查询seriesId的流程就结束了,非常简单,因为在measurement这一层已经提前缓存了当前measurement
所有的 seriesId 信息,所以我们不需要遍历所有的tagkey 和 tagvalue 去获取。从而大大的提升了查询的索引速度。
TSI 查询带条件
如果我们的查询带条件怎么办呢,带条件查询通过measurement缓存的seriesID Set 就行不通了。就一定需要过滤出来符合条件的tagvalue。从而获取到目标的seriesID。
select * from cpu where host='serviceA'
当我们试图通过key,value这样的组合去查询时,那意味着我们查询到的seriesID 将是局部的,这个时候将只能沿着TSI的完整路径去查询。在下方的 seriesByBinaryExprStringIterator 方法中,传入了name: measurement的名称,key: tagkey,value: tagvalue。以及操作类型op。
op的存在主要是为了根据不同的操作符去进行处理,比如=
, !=
。
func (is IndexSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIDIterator, error) {
// Special handling for "_name" to match measurement name.
if bytes.Equal(key, []byte("_name")) {
if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) {
return is.measurementSeriesIDIterator(name)
}
return nil, nil
}
if op == influxql.EQ {
// Match a specific value.
if len(value) != 0 {
return is.tagValueSeriesIDIterator(name, key, value)
}
mitr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
}
kitr, err := is.tagKeySeriesIDIterator(name, key)
if err != nil {
if mitr != nil {
mitr.Close()
}
return nil, err
}
// Return all measurement series that have no values from this tag key.
return DifferenceSeriesIDIterators(mitr, kitr), nil
}
// Return all measurement series without this tag value.
if len(value) != 0 {
mitr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
}
vitr, err := is.tagValueSeriesIDIterator(name, key, value)
if err != nil {
if mitr != nil {
mitr.Close()
}
return nil, err
}
return DifferenceSeriesIDIterators(mitr, vitr), nil
}
// Return all series across all values of this tag key.
return is.tagKeySeriesIDIterator(name, key)
}
我们重点关注tagValueSeriesIDIterator
这个方法。从这个方法的参数签名我们就能发现,诶,有戏。这个函数要求我们传入表名。tagkey 和 tagvalue。 同时返回一个 SeriesID 的迭代器。
那一刻,我们就知道我们找对了。
func (is IndexSet) tagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) {
a := make([]SeriesIDIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
itr, err := idx.TagValueSeriesIDIterator(name, key, value)
if err != nil {
SeriesIDIterators(a).Close()
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
return MergeSeriesIDIterators(a...), nil
}
同样的路径,我们来到了TagValueSeriesIDIterator
。需要注意的是,这里有一个cache的机制,也就是说我们的tsi同样是有缓存机制的,缓存的数据是我们经常查询的热数据,如果缓存中没有查到,则才会走实际的TSI逻辑。
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
// Check series ID set cache...
if i.tagValueCacheSize > 0 {
if ss := i.tagValueCache.Get(name, key, value); ss != nil {
// Return a clone because the set is mutable.
return tsdb.NewSeriesIDSetIterator(ss.Clone()), nil
}
}
a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
for _, p := range i.partitions {
itr, err := p.TagValueSeriesIDIterator(name, key, value)
if err != nil {
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
itr := tsdb.MergeSeriesIDIterators(a...)
if i.tagValueCacheSize == 0 {
return itr, nil
}
// 如果查询的数据不在缓存中,就搞进去。
if ssitr, ok := itr.(tsdb.SeriesIDSetIterator); ok {
ss := ssitr.SeriesIDSet()
i.tagValueCache.Put(name, key, value, ss)
}
return itr, nil
}
不用猜,下面肯定是Partition
。
func (p *Partition) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
fs, err := p.RetainFileSet()
if err != nil {
return nil, err
}
itr, err := fs.TagValueSeriesIDIterator(name, key, value)
if err != nil {
fs.Release()
return nil, err
} else if itr == nil {
fs.Release()
return nil, nil
}
return newFileSetSeriesIDIterator(fs, itr), nil
}
然后是FileSet
func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
ss := tsdb.NewSeriesIDSet()
var ftss *tsdb.SeriesIDSet
for i := len(fs.files) - 1; i >= 0; i-- {
f := fs.files[i]
// Remove tombstones set in previous file.
if ftss != nil && ftss.Cardinality() > 0 {
ss = ss.AndNot(ftss)
}
// Fetch tag value series set for this file and merge into overall set.
fss, err := f.TagValueSeriesIDSet(name, key, value)
if err != nil {
return nil, err
} else if fss != nil {
ss.Merge(fss)
}
// Fetch tombstone set to be processed on next file.
if ftss, err = f.TombstoneSeriesIDSet(); err != nil {
return nil, err
}
}
return tsdb.NewSeriesIDSetIterator(ss), nil
}
然后是LogFile
, 大家注意看最终获取到tagValue的路径,是不是先从measurement block中找,然后找到tagKey的block,然后根据tag key的block可以找到tag value block。 然后获取该value对应的seriesID信息。这一部分内容和前面TSI存储结构解析中数据查询流程是一样的。
// TagValueSeriesIDSet returns a series iterator for a tag value.
func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
mm, ok := f.mms[string(name)]
if !ok {
return nil, nil
}
tk, ok := mm.tagSet[string(key)]
if !ok {
return nil, nil
}
tv, ok := tk.tagValues[string(value)]
if !ok {
return nil, nil
} else if tv.cardinality() == 0 {
return nil, nil
}
return tv.seriesIDSet(), nil
}
现在到这里我们已经通过倒排索引学习到如何通过tag查询到目标seriesID了。但是还有一个问题需要解决,我们需要有一种方式可以让我们根据seriesID 获取到对应的 seriesKey。还记得我们上面提到的TagSet方法吗。
Series 索引
可以看到serieskey是通SeriesFile的SeriesKey
去查询。不出意外,series也是分为了8个parttion。
key := sfile.SeriesKey(se.SeriesID)
// SeriesKey returns the series key for a given id.
func (f *SeriesFile) SeriesKey(id uint64) []byte {
if id == 0 {
return nil
}
p := f.SeriesIDPartition(id)
if p == nil {
return nil
}
return p.SeriesKey(id)
}
可以看到首先是根据seriesID的hash去确认改series存在了哪个parttion中,之后再从目标parttion中寻找相关的seriesKey。
// SeriesKey returns the series key for a given id.
func (p *SeriesPartition) SeriesKey(id uint64) []byte {
if id == 0 {
return nil
}
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return nil
}
key := p.seriesKeyByOffset(p.index.FindOffsetByID(id))
p.mu.RUnlock()
return key
}
parttion 维护了一个 SeriesIndex 的实例,该实例提供了FindOffsetByID
方法可以根据 SeriesId
获取到该ID在文件中的offset。
func (idx *SeriesIndex) FindOffsetByID(id uint64) int64 {
if offset := idx.idOffsetMap[id]; offset != 0 {
return offset
} else if len(idx.data) == 0 {
return 0
}
hash := rhh.HashUint64(id)
for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask {
elem := idx.idOffsetData[(pos * SeriesIndexElemSize):]
elemID := binary.BigEndian.Uint64(elem[:8])
if elemID == id {
return int64(binary.BigEndian.Uint64(elem[8:]))
} else if elemID == 0 || d > rhh.Dist(rhh.HashUint64(elemID), pos, idx.capacity) {
return 0
}
}
}
之后 Partition
便会根据 offset信息去获取到对应的 seriesKey.
func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte {
if offset == 0 {
return nil
}
segmentID, pos := SplitSeriesOffset(offset)
for _, segment := range p.segments {
if segment.ID() != segmentID {
continue
}
key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize))
return key
}
return nil
}
关于Series索引本文只是点到即止,并不是本文的重点,重点到时候可以看 series 解析那一章。
TSM 文件结构解析
单个TSM的主要由四部分组成,分别是Header,Blocks, Index,Footer
他们的大概描述如下:
- Header: TSM 头文件,主要由五个字节组成,其中四个字节表示当前哪个存储引擎,如Tsm1。另外一个字节存储版本,如果tsm1 对应的值就是 1。
- Blocks influxdb 存储数据的最小部分,Data 中的内容根据数据类型的不同,在 InfluxDB 中会采用不同的压缩方式
- Index:TSM的索引,他们先按照key再按照时间戳以字典序排序,当我们要根据 serieskey 去查询时,由于serieskey在index是有序的,所以可以快速通过二分查找法定位到。同时根据时间戳就可以定位到存储数据的block。
- Footer: tsm file 的最后4字节的内容存放了 Index 部分的起始位置在 tsm file 中的偏移量,方便将索引信息加载到内存中。
Header
blocks 存储
每个block主要存储了两部分的内容
- CRC 通过使用CRC32算法去校验data的完整性
- Data: data存储的时间戳+value。value会根据字段的不同类型进行不同的压缩。
Index
注意:这里的key的格式时: seriesKey + 分隔符 + fieldName。
在influxdb启动之初,Index索引就会被加载到内存中,当我们需要根据serieskey查询数据时,我们只需要:
- 拼接 serieskey + 分隔符 + field_name
- 根据二分查找法快速查询到对应的serieskey
- 根据serieskey的min time 和 maxtime 以及 offset + size 就可以快速定位到一个block。之后就是查询出来数据并解压了。
TSM 文件读取
现在我们已经完成了从用户的查询语句得到seriesKey的过程。
inputs, err := e.createTagSetIterators(ctx, ref, measurement, t, opt)
一个seriesKey 下方可能会存在很多条数据。于是查询到所有的seriesKey之后,会去TSM中查询seriesKey下的数据。
func (e *Engine) createTagSetIterators(ctx context.Context, ref *influxql.VarRef, name string, t *query.TagSet, opt query.IteratorOptions) ([]query.Iterator, error) {
// Set parallelism by number of logical cpus.
...
// Read series groups in parallel.
var wg sync.WaitGroup
for i := range groups {
wg.Add(1)
go func(i int) {
defer wg.Done()
groups[i].itrs, groups[i].err = e.createTagSetGroupIterators(ctx, ref, name, groups[i].keys, t, groups[i].filters, opt)
}(i)
}
wg.Wait()
...
}
下面的方法 createTagSetGroupIterators
返回了一个迭代器,我们来看下函数的参数:
- ref 我们要查询的field是什么。由于influxdb的每个field是有类型的,并根据不同的field类型做了压缩,所以我们查询的时候是无法查出来所有的
field
的。需要单独查询所有的 field。并把它们最后混合到一起。 - name: 表名
- seriesKeys: 一个serieskey的列表,代表该次分组涉及到的serieskey列表。
// createTagSetGroupIterators creates a set of iterators for a subset of a tagset's series.
func (e *Engine) createTagSetGroupIterators(ctx context.Context, ref *influxql.VarRef, name string, seriesKeys []string, t *query.TagSet, filters []influxql.Expr, opt query.IteratorOptions) ([]query.Iterator, error) {
itrs := make([]query.Iterator, 0, len(seriesKeys))
//依次查询所有的 seriesKey
for i, seriesKey := range seriesKeys {
var conditionFields []influxql.VarRef
if filters[i] != nil {
// Retrieve non-time fields from this series filter and filter out tags.
conditionFields = influxql.ExprNames(filters[i])
}
itr, err := e.createVarRefSeriesIterator(ctx, ref, name, seriesKey, t, filters[i], conditionFields, opt)
if err != nil {
return itrs, err
} else if itr == nil {
continue
}
itrs = append(itrs, itr)
// Abort if the query was killed
select {
case <-opt.InterruptCh:
query.Iterators(itrs).Close()
return nil, query.ErrQueryInterrupted
default:
}
// Enforce series limit at creation time.
if opt.MaxSeriesN > 0 && len(itrs) > opt.MaxSeriesN {
query.Iterators(itrs).Close()
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", len(itrs), opt.MaxSeriesN)
}
}
return itrs, nil
}
然后每个方法进入到createVarRefSeriesIterator
这个方法中。这个方法最核心的就是buildCursor
。 他的作用就是根据字段的不同类型,生成不同的游标。最后根据不同的字段类型被包装在不同的迭代器里面。
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
func (e *Engine) createVarRefSeriesIterator(ctx context.Context, ref *influxql.VarRef, name string, seriesKey string, t *query.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt query.IteratorOptions) (query.Iterator, error) {
_, tfs := models.ParseKey([]byte(seriesKey))
tags := query.NewTags(tfs.Map())
// Create options specific for this series.
itrOpt := opt
itrOpt.Condition = filter
var curCounter, auxCounter, condCounter *metrics.Counter
if col := metrics.GroupFromContext(ctx); col != nil {
curCounter = col.GetCounter(numberOfRefCursorsCounter)
auxCounter = col.GetCounter(numberOfAuxCursorsCounter)
condCounter = col.GetCounter(numberOfCondCursorsCounter)
}
// Build main cursor.
var cur cursor
if ref != nil {
cur = e.buildCursor(ctx, name, seriesKey, tfs, ref, opt)
// If the field doesn't exist then don't build an iterator.
if cur == nil {
return nil, nil
}
if curCounter != nil {
curCounter.Add(1)
}
}
// 省略一部分代码
// 生成不同类型的迭代器。
switch cur := cur.(type) {
case floatCursor:
return newFloatIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case integerCursor:
return newIntegerIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case unsignedCursor:
return newUnsignedIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case stringCursor:
return newStringIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
case booleanCursor:
return newBooleanIterator(name, tags, itrOpt, cur, aux, conds, condNames), nil
default:
panic("unreachable")
}
}
这里可以看到,TSM同样也是有缓存的。后面查询的时候会优先从缓存中拿去数据。注意这个 keyCursor 后面会有大用。
// buildFloatCursor creates a cursor for a float field
func (e *Engine) buildFloatCursor(ctx context.Context, measurement, seriesKey, field string, opt query.IteratorOptions) floatCursor {
key := SeriesFieldKeyBytes(seriesKey, field)
cacheValues := e.Cache.Values(key)
keyCursor := e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
return newFloatCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
}
我们继续去newFloatCursor
这里面去看。当需要根据生序返回的时候会返回newFloatAscendingCursor
. influxdb 默认的排序是根据时间降序的,所以我们直接看这一段.
newFloatDescendingCursor
func newFloatCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) floatCursor {
if ascending {
return newFloatAscendingCursor(seek, cacheValues, tsmKeyCursor)
}
return newFloatDescendingCursor(seek, cacheValues, tsmKeyCursor)
}
到这里就会实际把我们需要查询的数据从TSM
文件中读出来,再我们移动迭代器的时候返回了。
func newFloatDescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *floatDescendingCursor {
c := &floatDescendingCursor{}
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})
if t, _ := c.peekCache(); t != seek {
c.cache.pos--
}
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values)
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
return c.tsm.values[i].UnixNano() >= seek
})
if t, _ := c.peekTSM(); t != seek {
c.tsm.pos--
}
return c
}
注意这一行。在这一行我们该seriesKey下的field正式从文件中被加载到内存中了。
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatBlock(&c.tsm.values)
// 当我们需要查询数据的时候,如果缓存里面有,则优先从缓存中读取数据
// nextFloat returns the next key/value for the cursor.
func (c *floatDescendingCursor) nextFloat() (int64, float64) {
ckey, cvalue := c.peekCache()
tkey, tvalue := c.peekTSM()
// No more data in cache or in TSM files.
if ckey == tsdb.EOF && tkey == tsdb.EOF {
return tsdb.EOF, 0
}
// Both cache and tsm files have the same key, cache takes precedence.
if ckey == tkey {
c.nextCache()
c.nextTSM()
return ckey, cvalue
}
// Buffered cache key precedes that in TSM file.
if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) {
c.nextCache()
return ckey, cvalue
}
// Buffered TSM key precedes that in cache.
c.nextTSM()
return tkey, tvalue
}
这个时候有人可能就要迷惑了,我这里只查了一个字段,如果我这张表有很多个field。那要怎么查询呢? 答案是:每个field都查一遍,然后merge结果。
一个完整的数据查询流程
#前置处理部分
- 用户在终端输入了:
select * from cpu where value='1'
- executeSelectStatement 解析生成对应的
Iterators
#TSI查询部分
- 先去缓存中尝试查询有没有对应的seriesID,如果没有,继续下一步
- 通过IndexFile 的 Trailer 根据 meansurement name, 本例子中为
cpu
找到对应的meansurement
的 block - 通过 meansurement block 找到 meansurement 下的 tag key 数据,找到 value 这个tag_key
- 通过该tag_key 找到 tag_key 对应的value 的 hash index。对
1
进行hash index 得到对应tag value 的位置。 - 拿到该tag value 下的 seriesIDs
#seriesIndex查询部分
- 通过 seriesID 查询到该 seriesID 对应的 serieskey
#TSM查询部分
- 根据字段类型生成对应的迭代器
- 根据seriesKey 去查询相关的数据。
转载自:https://juejin.cn/post/7352792335904456731