SkyWalking8源码(三)OAP处理Segment
前言
本章基于skywalking8.6.0分析OAP如何处理客户端发来的Segment。
- 客户端如何发送segment;
- OAP模块系统;
- 如何处理Segment;
- 如何分析Segment得到衍生metrics;
- 什么是L1和L2聚合;
- 什么是下采样;
一、客户端发送Segment
1、通讯连接管理
GRPCChannelManager负责管理与OAP的grpcChannel连接。
GRPCChannelManager#boot:GRPCChannelManager实现BootService,在agent启动阶段开启定时任务,与OAP建立连接。
GRPCChannelManager#run:与OAP建立连接成功,通知所有GRPCChannelListener。
GRPCChannelListener监听者有很多,大部分需要与OAP通讯。
2、TraceSegmentServiceClient
注册channel监听(prepare)
TraceSegmentServiceClient负责发送TraceSegment给OAP,在prepare阶段注册监听。
TraceSegmentServiceClient#statusChanged:当grpc连接建立,创建grpcStub。
启动生产消费DataCarrier(boot)
TraceSegmentServiceClient#boot:启动DataCarrier。
DataCarrier是一个生产消费模型的封装。
- channel_size:5个buffer;
- buffer_size:每个buffer是一个能存储300个data的环形数组;
- BufferStrategy.IF_POSSIBLE:生产data分配到某个buffer,如果数组满了最多重试3次,如果重试失败数据会被丢弃;
- consume this:消费者逻辑由TraceSegmentServiceClient自己实现;
- 1:消费者线程数1个,消费5个buffer数组;
注:默认消费线程空闲会睡20ms。
注册TraceSegment监听(onComplete)
TraceSegmentServiceClient#onComplete:agent启动最后阶段,TraceSegmentServiceClient注册监听TraceSegment结束。
3、TraceSegment结束
TracingContext#finish:当一个线程的TraceSegment结束,通知所有TracingContextListener。
TraceSegmentServiceClient#afterFinished:
TraceSegmentServiceClient将TraceSegment直接放入DataCarrier异步消费。
4、TraceSegment发送
TraceSegmentServiceClient#consume:单线程消费DataCarrier中的TraceSegment。
- 将TraceSegment转换为SegmentObject.proto;
- 发送SegmentObject给OAP;
- 阻塞等待OAP响应,最多30s;
重点看一下模型转换。
TraceSegment#transform:
- traceId:全局traceId;
- traceSegmentId:当前segment的id;
- spans:segment下的所有span;
- service:当前应用名;
- serviceInstance:当前实例名,如果配置未指定,启动时自动生成;
- isSizeLimited:span数量是否达到阈值SPAN_LIMIT_PER_SEGMENT,默认150;
AbstractTracingSpan#transform:除了的span属性,还包括span的TraceSegmentRef引用上游。
StackBasedTracingSpan#transform:对于ExitSpan,还包含peer对端的ip-port。
TraceSegmentRef#transform:span引用上游TraceSegmentRef。
二、OAP模块系统
1、启动概述
OAPServerBootstrap#start:OAP启动会根据配置文件中的模块配置,由ModuleManager启动各Module。
ModuleManager#init:模块系统启动,依次调用ModuleProvider的prepare、start、notifyAfterCompleted方法,和agent侧的BootService差不多。
2、配置概述
application.yml是OAP的核心配置文件。
- 第一层是模块,如cluster模块;
- 第二层是模块实现名,如cluster模块的实现有6种,如standalone、zookeeper;
- 每个模块有一个激活实现,通过selector指定,比如cluster的默认实现是standalone;
- 每个模块实现有各自的配置;
3、模块相关类
Module相关类图。
- ModuleDefine:模块定义,通过JDK SPI加载,如ClusterModule对应cluster模块;
- ModuleProvider:模块实现,通过JDK SPI加载,运行期间一个模块只有一个模块实现;
- Service:服务,ModuleDefine定义模块需要实现哪些Service,ModuleProvider需要实现这些Service才能启动成功;
- ModuleConfig:模块实现的配置,会将OAP配置反射注入各ModuleConfig实现;
案例:Storage模块。
StorageModule是storage模块定义,要求模块实现必须提供Service实现包括:StorageBuilderFactory等。
StorageModuleElasticsearch7Provider是storage模块的一种实现。
StorageModuleElasticsearch7Config是该模块实现的配置类。
prepare方法注册了StorageBuilderFactory的服务实现。
依赖查找:其他业务可以通过ModuleManager-ModuleDefine-ModuleProvider-Service路径查找Service实现。
三、Segment收集
客户端有两种方式发送segment数据,一种是直连OAP通过grpc发送,一种是发送segment到kafka由OAP消费,本章关注前者。
1、core模块-接收grpc请求
CoreModuleProvider#notifyAfterCompleted:core模块启动grpcServer,监听11800端口。
2、trace模块-处理segment请求
TraceModuleProvider#start:trace模块通过TraceSegmentReportServiceHandler接收segment请求。
TraceSegmentReportServiceHandler#collect:
trace模块将segment发送给analyzer模块的ISegmentParserService服务实现。
3、analyzer模块-分析segment
SegmentParserServiceImpl:ISegmentParserService实现,创建TraceAnalyzer执行分析。
TraceAnalyzer#doAnalysis:创建AnalysisListener,依次回调AnalysisListener的所有方法:
- parseSegment-处理segment;
- parseFirst-处理第一个span,parseExit-处理ExitSpan,parseEntry-处理EntrySpan,parseLocal-处理LocalSpan;
- build:最终构建为Source传递给core模块;
AnalyzerModuleProvider#listenerManager:AnalysisListener有n个,这里主要看SegmentAnalysisListener,其他后面再看。
SegmentAnalysisListener组装segment数据。
SegmentAnalysisListener会被回调四个方法:parseSegment、parseFirst、parseEntry、build。
SegmentAnalysisListener#parseSegment:解析segment
- duration:segment耗时=span最大endTime-span最小startTime;
- isError:segment是否异常,默认segmentStatusAnalysisStrategy=FROM_SPAN_STATUS,如果EntrySpan发生异常,则segment异常;
- appendSearchable:searchableTagKeys可搜索的tag的key,加入segment;
- sampleStatus:oap侧是否采集segment
1)概率采集:sampleRate/10000的概率采集,默认10000即必采集;
2)异常采集:如果发生异常,默认forceSampleErrorSegment=true,必采集;
3)慢采集:如果超出slowTraceSegmentThreshold毫秒,必采集,默认-1代表不生效;
SegmentAnalysisListener#parseFirst:解析segment的第一个span
- timebucket:segment的timebucket截断到秒;
- serviceId、endpointId、instanceId:对service、endpoint、instance进行base64编码;
- dataBinary:segment原始数据都变成byte数组,放入segment;
SegmentAnalysisListener#parseEntry:解析EntrySpan
和第一个span里逻辑重复,endpoint可以覆盖第一个span。
SegmentAnalysisListener#build:调用core模块的SourceReceiver接收segment。
4、core模块-转发分析结果
OAP侧分析结果是Segment,继承Source。
SourceReceiverImpl将Source交给DispatcherManager。
DispatcherManager根据Source的scope,找到所有处理器SourceDispatcher。
SegmentDispatcher将Segment转换为SegmentRecord,交给RecordStreamProcessor。
RecordStreamProcessor#in:交给SegmentRecord对应RecordPersistentWorker实例。
RecordPersistentWorker#in:调用storage模块执行存储。
5、storage模块-存储segment
以es存储为例。
RecordEsDAO#prepareBatchInsert:
- 将SegmentRecord转换为map作为文档内容;
- 索引名,segment是sw_segment-日期;
- id=segment_id;
BatchProcessEsDAO#asynchronous:
将单个segment写入请求放入BulkProcessor异步批量写入es。
bulkActions,根据数据量写入,默认满1000条触发;
flushInterval,根据时间写入,默认满10s触发;
concurrentRequests,并发请求es的数量,默认2;
四、Segment衍生Metrics
1、概述
通过segment可以分析得到其他有效信息。
元数据类:如服务列表、实例列表、端点列表、服务拓扑、接口拓扑。
metrics类:如接口cpm(Calls Per Minute)。
在Analysis模块中,Segment分析还包含两个AnalysisListener:
- NetworkAddressAliasMappingListener:转换EntrySpan的SegmentRef为ip-port到服务名的映射关系;
- MultiScopesAnalysisListener:转换Segment为多个Scope模型;
这些AnalysisListener和SegmentAnalysisListener流程类似:
- AnalysisListener将SegmentObject转换为Source,发送给SourceReceiver;
- SourceReceiver将Source发送给DispatcherManager;
- DispatcherManager根据Source的scope找到对应SourceDispatcher;
- SourceDispatcher将Source转换为存储模型StorageData发送给指定的StreamProcessor;
- StreamProcessor将StorageData发送给Worker;
- Worker将StorageData持久化;
衍生数据主要来源于MultiScopesAnalysisListener对SegmentObject的Source转换,而不同Source会发送给不同SourceDispatcher实现 。
2、MultiScopesAnalysisListener
MultiScopesAnalysisListener处理所有类型Span,将Span转换为4个类型的SourceBuilder,再将SourceBuilder转换为多个Source后发送给SourceReceiver。
EntrySpan
EntrySpan原始数据如下。如feign调用,service-a调用service-b。
parentSpanId: -1
startTime: 1719379473514
endTime: 1719379473524
refs {
traceId: "40040aef1ca4402eaccadb2a7191807b.105.17193794734470001"
parentTraceSegmentId: "40040aef1ca4402eaccadb2a7191807b.105.17193794734470000"
parentSpanId: 1
parentService: "prod::service-a"
parentServiceInstance: "87a58cc0e149400ca2458a6aabe991ae@127.0.0.1"
parentEndpoint: "{GET}/api/v1/feign"
networkAddressUsedAtPeer: "127.0.0.1:8092"
}
operationName: "{GET}/api/v1/b/hello"
spanLayer: Http
componentId: 14
tags {
key: "url"
value: "http://127.0.0.1:8092/api/v1/b/hello"
}
tags {
key: "http.method"
value: "GET"
}
如RabbitMQ消费,service-a消费service-b发送的消息。
parentSpanId: -1
startTime: 1719381174500
endTime: 1719381174502
refs {
traceId: "28478c7f47a94de19ee4f44870487f0d.99.17193811744680001"
parentTraceSegmentId: "28478c7f47a94de19ee4f44870487f0d.99.17193811744680000"
parentSpanId: 1
parentService: "prod::service-b"
parentServiceInstance: "43398fcb6dc542c188a0886179501124@127.0.0.1"
parentEndpoint: "{GET}/rabbit/send"
networkAddressUsedAtPeer: "127.0.0.1:5672"
}
operationName: "RabbitMQ/Topic/Queue/test-queue2/Consumer"
spanLayer: MQ
componentId: 53
tags {
key: "mq.broker"
}
tags {
key: "mq.topic"
}
tags {
key: "mq.queue"
value: "test-queue2"
}
tags {
key: "transmission.latency"
value: "12"
}
MultiScopesAnalysisListener#parseEntry:将EntrySpan转换为SourceBuilder。
如果EntrySpan没有ref关联上游Segment和Span,则来源service/instance/endpoint都是User;
MultiScopesAnalysisListener#parseEntry:
EntrySpan如果有ref,循环生成n个SourceBuilder,一般rpc只有一个,mq批量消费有多个。
来源服务名sourceServiceName有两种情况:mq消费=broker的ip:port,rpc=正常服务名;
MultiScopesAnalysisListener#setPublicAttrs:
公共属性中timebucket用span的开始时间,分钟级别。
注:timebucket是skywalking中metrics的重要概念,这里是分钟timebucket格式如yyyyMMddHHmm。
MultiScopesAnalysisListener#build:最终EntrySpan的SourceBuilder会构造出n个类型的Source。
- All:全局数据;
- Service:服务;
- ServiceInstance:服务实例;
- Endpoint:端点;
- ServiceRelation:服务关系(服务拓扑);
- ServiceInstanceRelation:服务实例关系;
- EndpointRelation:接口关系(接口拓扑);
Service,服务。
EndpointRelation,接口关系。(接口拓扑)
ExitSpan
ExitSpan如下。
如一次feign调用,service-a调用service-b的/api/v1/b/hello。
spanId: 1
startTime: 1719379972188
endTime: 1719379972231
operationName: "/api/v1/b/hello"
peer: "127.0.0.1:8092"
spanType: Exit
spanLayer: Http
componentId: 11
tags {
key: "http.method"
value: "GET"
}
tags {
key: "url"
value: "http://127.0.0.1:8092/api/v1/b/hello"
}
MultiScopesAnalysisListener#parseExit:分析ExitSpan
关键点是如何拿到目标serviceName,客户端ExitSpan中只有服务端的ip和port(peer)。
通过服务端的EntrySpan能构造服务端ip和port到服务名的映射关系,即NetworkAddressAliasMappingListener的处理逻辑(这里忽略)。
对于一次db调用ExitSpan如下。
spanId: 1
startTime: 1719389799264
endTime: 1719389799265
operationName: "H2/JDBI/PreparedStatement/execute"
peer: "localhost:-1"
spanType: Exit
spanLayer: Database
componentId: 32
tags {
key: "db.type"
value: "sql"
}
tags {
key: "db.instance"
value: "test"
}
tags {
key: "db.statement"
value: "select * from user where id = ?"
}
MultiScopesAnalysisListener#parseExit:对于db慢调用会成为Source继续分析。
- serviceName:peer,db的ip和port;
- timebucket:秒级timebucket,取span开始时间;
MultiScopesAnalysisListener#parseExit:只有调用超过阈值才会生成Source。
根据tag[db.type]得到db类型,根据配置slowDBAccessThreshold=default:200,mongodb:100,获取对应阈值。
MultiScopesAnalysisListener#build:ExitSpan生成4种Source:
- ServiceRelation:服务关系;
- ServiceInstanceRelation:服务实例关系;
- ServiceMeta:DB调用,服务元数据,和普通Service的区别是ServiceMeta没有衍生metrics;
- DatabaseAccess:DB调用,DB访问;
注:所以大部分数据来源都是EntrySpan。
LocalSpan
MultiScopesAnalysisListener#parseLogicEndpoints:
对于EntrySpan和LocalSpan都存在逻辑endpoint概念。
LocalSpan:tag[x-le]={"logic-span":true};
EntrySpan:tag[x-le]={"name":"接口名","latency":耗时,"status":是否成功}
MultiScopesAnalysisListener#build:
最终也会和普通EntrySpan一样,能转换为Endpoint,支持Endpoint相关metrics。
常见的逻辑Endpoint如SpringSche
3、SourceDispatcher
根据Source的scope不同,会经过n个不同的SourceDispatcher将Source转换为最终的存储模型,比如端点的scope=3。
Endpoint端点,经过EndpointTrafficDispatcher转换为EndpointTraffic。
EndpointRelation端点关系,经过EndpointCallRelationDispatcher转换为EndpointRelationServerSideMetrics。
上述都是元数据和拓扑相关数据,更多metrics数据要通过SkyWalking的OAL引擎实现。
4、OAL
AnalyzerModuleProvider#start:Analyzer模块启动阶段会用OAL引擎加载核心OAL定义。
具体代码不分析了,分析大致含义。
第一步,读取core.oal描述文件,文件中包含众多metrics的定义。(server-bootstrap)
第二步,使用antlr4解析oal文件得到OALScripts。(oal-grammar)
第三步,OAL引擎根据OALScripts使用freemarker模板+javassist动态生成Metric和Dispatcher类。(oal-rt)
如果设置环境变量SW_OAL_ENGINE_DEBUG,则能在oal-rt目录下看到自动生成的class。
比如端点平均响应时间EndpointAvgMetrics。
EndpointDispatcher解析Source=Endpoint得到EndpointAvgMetrics。
5、Metric异步处理
MetricsStreamProcessor#in:
根据Metric类不同,进入不同的MetricsAggregateWorker实例处理。
Metric和Segment处理的一大区别是:
RecordPersistentWorker直接调用底层storage模块存储。
MetricsAggregateWorker走生产消费模型,将数据丢入DataCarrier异步消费。
MetricsAggregateWorker构造确定DataCarrier的生产消费模式。
每个Metric类有一个MetricsAggregateWorker实例,每个Worker有一个DataCarrier,DataCarrier有2个1w容量的buffer,生产采用阻塞模式,如果buffer数组满了将阻塞生产线程。
所有MetricsAggregateWorker中的DataCarrier公用一个BulkConsumePool,用4倍核数线程消费。
6、Worker
每个Metric类对应一个MetricsAggregateWorker实例。
MetricsAggregateWorker背后有n个Worker,涉及L1聚合(内存聚合)和L2聚合(db聚合)概念。在L2聚合中还有DownSampling概念。
MetricsStreamProcessor#create:组装MetricsAggregateWorker。
7、L1聚合
L1聚合,将同类型metric,按照id分组,按照聚合函数聚合。
MetricsAggregateWorker
MetricsAggregateWorker#onWork:从DataCarrier中消费metrics执行聚合。
MergableBufferedData#accept:将DataCarrier中的metrics按照id分组聚合。
LongAvgMetrics
端点平均耗时,id是分钟timebucket+service+endpoint。
combine将端点耗时latency累积到summation,并统计metric数量count。
EndpointTraffic
EndpointTraffic端点元数据,id是service+endpoint,combine空实现,最终只是将Endpoint去重。
MetricsAggregateWorker将聚合后的metrics循环发送给MetricsRemoteWorker。
MetricsRemoteWorker
MetricsRemoteWorker调用RemoteSenderService发送metrics。
RemoteSenderService#send:
- 获取OAP集群中的实例;
- 通过hash(metric)选择其中一个实例;
- 发送metric给目标OAP实例;
OAP注册与发现
OAP支持多种注册中心,通过cluster模块配置。
RemoteClientManager每5s从注册中心拉取OAP实例列表,与对应OAP实例建立连接,组装为RemoteClient通讯客户端。
CoreModuleProvider#start:启动阶段,特定角色的oap实例会做服务注册。
- mixed:L1+L2聚合,默认角色,会做服务注册;
- aggregator:L2聚合,会做服务注册;
- receiver:L1聚合;
选择OAP实例
HashCodeSelector#select:
使用metric的remoteHashCode模OAP实例数量,将同样的metric发送到同一个OAP实例做L2聚合。
EndpointAvgMetrics,端点平均耗时,根据entityId=service+endpoint发送到同一个OAP实例。
发送Metric
RemoteClient#push:这里发送Metric有两种情况。
SelfRemoteClient#push:如果hash到当前oap节点,不走远程调用,直接走下一个worker。
GRPCRemoteClient#push:如果hash到其他oap节点,将数据再次放到一个DataCarrier由其他线程消费。
这个DataCarrier有1个3000容量的buffer,采用BLOCK策略。
GRPCRemoteClient.RemoteMessageConsumer#consume:消费线程向OAP发送请求。
8、L2聚合
接收L1聚合结果
每个metric对应一个MetricsPersistentWorker实例,接收L1聚合结果,将数据放入DataCarrier,采取BLOCK策略,有2000长度的一个buffer。
内存聚合
MetricsPersistentWorker.PersistentConsumer#consume:
所有DataCarrier由一个核数/4的线程池消费,进行内存聚合。
MergableBufferedData#accept:和L1聚合一样在内存中做一次聚合。
db聚合
PersistenceTimer#start:每隔3s消费一波MetricsPersistentWorker中缓存的数据。
PersistenceTimer#extractDataAndSave:
- buildBatchRequests:将db更新请求放入prepareRequests列表;
- endOfRound:后置处理;
PersistenceWorker#buildBatchRequests:消费内存聚合数据。
MetricsPersistentWorker#prepareBatch:
- 将数据发送给MetricsTransWorker做DownSampling;
- 2k一批做db聚合;
MetricsPersistentWorker#flushDataToStorage:db聚合
- 从db批量查询metrics,加载到内存;
- 循环每个新的metrics,与内存metrics做合并;
- 组装插入或更新请求到prepareRequests;
- 将合并后的metrics,发送到AlarmNotifyWorker(告警)和ExportWorker(暴露给三方扩展,grpc接收metrics)
MetricsPersistentWorker#loadFromStorage:
根据id批量查询底层存储,比如es是根据ids查询。
enableDatabaseSession默认为true,代表允许db数据缓存在内存中。
Metrics#combine和L1聚合一样。
但是部分Metrics不支持更新(service和endpoint的元数据),这里将db数据加载到内存后去重跳过。
Metrics#calculate,对聚合结果做计算得到最终数值。
比如LongAvgMetrics#calculate,value平均值=summation总和/count数量,比如Endpoint平均响应时间EndpointAvgMetrics。
MetricsPersistentWorker#endOfRound:
批量更新prepareRequests组装完毕后,移除超过70s的缓存metrics。
写db
经过db聚合得到更新请求prepareRequests。
PersistenceTimer#extractDataAndSave:
prepareRequests分成maxSyncOperationNum=5w一批,提交到线程池处理,线程数量syncOperationThreadsNum=2。
BatchProcessEsDAO#synchronous:对于es存储同步提交BulkRequest。
DownSampling
DownSampling表示下采样。
聚合后的metrics的timebucket是分钟级别的,比如下面endpoint平均响应时间。
bucket | 总耗时 | 请求次数 | 平均耗时 |
---|---|---|---|
202407061230 | 200 | 2 | 100 |
202407061231 | 50 | 1 | 50 |
DownSampling可以将分钟bucket下采样到小时级别。
bucket | 总耗时 | 请求次数 | 平均耗时 |
---|---|---|---|
2024070612 | 250 | 3 | 83 |
SkyWalking默认支持Hour和Day级别的下采样,可以在application.yml中修改。
部分metrics不支持下采样,如endpoint接口元数据。
MetricsTransWorker#in:收到L2聚合流程中的分钟metrics
- toHour/Day:转换为小时/日维度metrics;
- 下发给小时/日维度的MetricsPersistentWorker;
EndpointAvgMetrics#toHour:
metric转换往往只需要截断timebucket即可。
小时和日维度的MetricsPersistentWorker处理流程同L2db聚合+写db,由PersistenceTimer触发,只是没有后续worker(告警、export)。
总结
DataCarrier
DataCarrier是一个生产消费模型的封装,核心参数:
- channelSize:代表有n个环形数组;
- bufferSize:每个环形数组的大小;
- BufferStrategy:如果环形数组满了,如何处理
-
- 默认BLOCKING,阻塞生产
- 可选IF_POSSIBLE,重试3次,失败丢弃
- consume:指定消费线程数(线程池)和消费者逻辑;
当buffer空闲时消费者会sleep 20ms。
agent侧
TraceSegment结束放入DataCarrier。
TraceSegmentServiceClient异步单线程消费DataCarrier,转换为SegmentObject发送至OAP。
注意,DataCarrier采用IF_POSSIBLE策略,如果OAP响应速度跟不上,会导致Segment丢失。
OAP侧
模块系统
- ModuleDefine:模块定义,通过JDK SPI加载;
- ModuleProvider:模块实现,通过JDK SPI加载,运行期间一个模块只有一个模块实现;
- Service:服务,ModuleDefine定义模块需要实现哪些Service,ModuleProvider需要实现这些Service才能启动成功;
- ModuleConfig:模块实现的配置,会将OAP配置反射注入各ModuleConfig实现;
依赖查找,通过ModuleManager-ModuleDefine-ModuleProvider-Service路径查找Service实现。
角色
OAP节点分为两种角色:
- Receiver:接收agent的Segment数据并持久化;对Segment分析得到Metrics并进行L1聚合;
- Aggregator:接收Receiver的L1聚合结果,做L2聚合和下采样并持久化;
默认Mixed角色,即Receiver+Aggregator。
Receiver
启动阶段,trace模块注册TraceSegmentReportServiceHandler接收SegmentObject。
启动完毕,core模块启动GrpcServer监听11800端口。
客户端发送SegmentObject,TraceSegmentReportServiceHandler转发给analyzer模块的TraceAnalyzer。
TraceAnalyzer使用多个AnalysisListener分析SegmentObject,得到多个Source:
- SegmentAnalysisListener:OAP侧判断是否采集,将SegmentObject转换为Segment。(概率采集:sampleRate=10000默认全采集;发生异常必采集;慢调用采集:slowTraceSegmentThreshold=-1,默认未开启);
- NetworkAddressAliasMappingListener:将span的ref转换为ip-port和服务的映射关系;
- MultiScopesAnalysisListener:将SegmentObject转换为众多Source,如端点、端点关系、服务、服务关系、db慢调用、逻辑端点;
TraceAnalyzer调用SourceReceiverImpl将Source转发至DispatcherManager。
DispatcherManager根据Source的实现不同,转发至不同的SourceDispatcher实现。
SourceDispatcher将Source发送至不同的StreamProcessor处理(涉及oal生成metrics):
- Segment转换为SegmentRecord,发送至RecordStreamProcessor;
- 其他Source转换为Metrics,发送至MetricsStreamProcessor;
StreamProcessor使用DataCarrier接收数据,至此grpc线程释放。
RecordStreamProcessor,消费SegmentRecord数据,将数据写入底层存储,es存储采用BulkProcessor异步插入:
- SW_STORAGE_ES_BULK_ACTIONS:1000,满1000条插入;
- SW_STORAGE_ES_FLUSH_INTERVAL:10,满10秒插入;
- SW_STORAGE_ES_CONCURRENT_REQUESTS:2,控制并发请求es数量;
MetricsStreamProcessor,消费其他metrics数据,进行L1聚合并发送至Aggregator。
MetricsAggregateWorker,L1聚合,对segment分析结果得到的分钟timebucket的metrics进行内存级别聚合
- 元数据/拓扑类型metrics:简单去重,如接口、服务、接口依赖、服务依赖;
- 其他metrics:如接口平均响应时间,合并接口的总耗时和总次数,计算平均值交给L2聚合处理;
MetricsRemoteWorker,将L1聚合结果发送至Aggregator。
为了保证L2聚合的数据正确性,必须将同样的metrics发送至同一个Aggregator节点。
Receiver使用hash(metrics)%Aggregator.size的方式选择Aggregator节点发送。
如果当前节点是Mixed角色,也可能被命中。
Aggregator
grpc线程,接收L1聚合结果,放入DataCarrier;
L2聚合线程,消费DataCarrier中的metrics,再次进行内存聚合;
PersistenceTimer单线程每隔3s做db聚合:
- 从底层存储(如es)中,根据metrics的id批量加载数据到内存;
- 将L1聚合结果与db中的数据在内存中做聚合计算;部分metrics不支持update,如端点元数据,这里会跳过;其他metrics,如端点平均响应时间,计算总耗时/次数;
- 将聚合结果分成5w一批,多线程写底层存储(更新或插入),如果是es存储,使用BulkRequest;
在L2聚合过程中还涉及下采样逻辑。
将L1聚合结果的分钟metrics转换为小时/日metrics,并再次走db聚合。
欢迎大家评论或私信讨论问题。
本文原创,未经许可不得转载。
欢迎关注公众号【程序猿阿越】。
转载自:https://juejin.cn/post/7388347353408258085