likes
comments
collection
share

SkyWalking8源码(三)OAP处理Segment

作者站长头像
站长
· 阅读数 25

前言

本章基于skywalking8.6.0分析OAP如何处理客户端发来的Segment。

  1. 客户端如何发送segment;
  2. OAP模块系统;
  3. 如何处理Segment;
  4. 如何分析Segment得到衍生metrics;
  5. 什么是L1和L2聚合;
  6. 什么是下采样;

一、客户端发送Segment

1、通讯连接管理

GRPCChannelManager负责管理与OAP的grpcChannel连接。

SkyWalking8源码(三)OAP处理Segment

GRPCChannelManager#boot:GRPCChannelManager实现BootService,在agent启动阶段开启定时任务,与OAP建立连接。

SkyWalking8源码(三)OAP处理Segment

GRPCChannelManager#run:与OAP建立连接成功,通知所有GRPCChannelListener

SkyWalking8源码(三)OAP处理Segment

GRPCChannelListener监听者有很多,大部分需要与OAP通讯。

SkyWalking8源码(三)OAP处理Segment

2、TraceSegmentServiceClient

注册channel监听(prepare)

TraceSegmentServiceClient负责发送TraceSegment给OAP,在prepare阶段注册监听。

SkyWalking8源码(三)OAP处理Segment

TraceSegmentServiceClient#statusChanged:当grpc连接建立,创建grpcStub。

SkyWalking8源码(三)OAP处理Segment

启动生产消费DataCarrier(boot)

TraceSegmentServiceClient#boot:启动DataCarrier。

SkyWalking8源码(三)OAP处理Segment

DataCarrier是一个生产消费模型的封装。

SkyWalking8源码(三)OAP处理Segment

  1. channel_size:5个buffer;
  2. buffer_size:每个buffer是一个能存储300个data的环形数组;
  3. BufferStrategy.IF_POSSIBLE:生产data分配到某个buffer,如果数组满了最多重试3次如果重试失败数据会被丢弃
  4. consume this:消费者逻辑由TraceSegmentServiceClient自己实现;
  5. 1:消费者线程数1个,消费5个buffer数组;

注:默认消费线程空闲会睡20ms。

注册TraceSegment监听(onComplete)

TraceSegmentServiceClient#onComplete:agent启动最后阶段,TraceSegmentServiceClient注册监听TraceSegment结束。

SkyWalking8源码(三)OAP处理Segment

3、TraceSegment结束

TracingContext#finish:当一个线程的TraceSegment结束,通知所有TracingContextListener

SkyWalking8源码(三)OAP处理Segment

SkyWalking8源码(三)OAP处理Segment

TraceSegmentServiceClient#afterFinished:

TraceSegmentServiceClient将TraceSegment直接放入DataCarrier异步消费。

SkyWalking8源码(三)OAP处理Segment

4、TraceSegment发送

TraceSegmentServiceClient#consume:单线程消费DataCarrier中的TraceSegment。

  1. 将TraceSegment转换为SegmentObject.proto;
  2. 发送SegmentObject给OAP;
  3. 阻塞等待OAP响应,最多30s;

SkyWalking8源码(三)OAP处理Segment

重点看一下模型转换。

TraceSegment#transform:

SkyWalking8源码(三)OAP处理Segment

  1. traceId:全局traceId;
  2. traceSegmentId:当前segment的id;
  3. spans:segment下的所有span;
  4. service:当前应用名;
  5. serviceInstance:当前实例名,如果配置未指定,启动时自动生成;
  6. isSizeLimited:span数量是否达到阈值SPAN_LIMIT_PER_SEGMENT,默认150;

AbstractTracingSpan#transform:除了的span属性,还包括span的TraceSegmentRef引用上游。

SkyWalking8源码(三)OAP处理Segment

StackBasedTracingSpan#transform:对于ExitSpan,还包含peer对端的ip-port。

SkyWalking8源码(三)OAP处理Segment

TraceSegmentRef#transform:span引用上游TraceSegmentRef。

SkyWalking8源码(三)OAP处理Segment

二、OAP模块系统

1、启动概述

OAPServerBootstrap#start:OAP启动会根据配置文件中的模块配置,由ModuleManager启动各Module

SkyWalking8源码(三)OAP处理Segment

ModuleManager#init:模块系统启动,依次调用ModuleProvider的prepare、start、notifyAfterCompleted方法,和agent侧的BootService差不多。

SkyWalking8源码(三)OAP处理Segment

2、配置概述

application.yml是OAP的核心配置文件。

SkyWalking8源码(三)OAP处理Segment

  1. 第一层是模块,如cluster模块;
  2. 第二层是模块实现名,如cluster模块的实现有6种,如standalone、zookeeper;
  3. 每个模块有一个激活实现,通过selector指定,比如cluster的默认实现是standalone;
  4. 每个模块实现有各自的配置;

3、模块相关类

Module相关类图。

SkyWalking8源码(三)OAP处理Segment

  1. ModuleDefine:模块定义,通过JDK SPI加载,如ClusterModule对应cluster模块;
  2. ModuleProvider:模块实现,通过JDK SPI加载,运行期间一个模块只有一个模块实现;
  3. Service:服务,ModuleDefine定义模块需要实现哪些Service,ModuleProvider需要实现这些Service才能启动成功;
  4. ModuleConfig:模块实现的配置,会将OAP配置反射注入各ModuleConfig实现;

案例:Storage模块。

StorageModule是storage模块定义,要求模块实现必须提供Service实现包括:StorageBuilderFactory等。

SkyWalking8源码(三)OAP处理Segment

StorageModuleElasticsearch7Provider是storage模块的一种实现。

StorageModuleElasticsearch7Config是该模块实现的配置类。

prepare方法注册了StorageBuilderFactory的服务实现。

SkyWalking8源码(三)OAP处理Segment

依赖查找:其他业务可以通过ModuleManager-ModuleDefine-ModuleProvider-Service路径查找Service实现。

SkyWalking8源码(三)OAP处理Segment

三、Segment收集

SkyWalking8源码(三)OAP处理Segment

客户端有两种方式发送segment数据,一种是直连OAP通过grpc发送,一种是发送segment到kafka由OAP消费,本章关注前者。

1、core模块-接收grpc请求

CoreModuleProvider#notifyAfterCompleted:core模块启动grpcServer,监听11800端口。

SkyWalking8源码(三)OAP处理Segment

2、trace模块-处理segment请求

TraceModuleProvider#start:trace模块通过TraceSegmentReportServiceHandler接收segment请求。

SkyWalking8源码(三)OAP处理Segment

TraceSegmentReportServiceHandler#collect:

trace模块将segment发送给analyzer模块的ISegmentParserService服务实现。

SkyWalking8源码(三)OAP处理Segment

3、analyzer模块-分析segment

SegmentParserServiceImplISegmentParserService实现,创建TraceAnalyzer执行分析。

SkyWalking8源码(三)OAP处理Segment

TraceAnalyzer#doAnalysis:创建AnalysisListener,依次回调AnalysisListener的所有方法:

  1. parseSegment-处理segment;
  2. parseFirst-处理第一个span,parseExit-处理ExitSpan,parseEntry-处理EntrySpan,parseLocal-处理LocalSpan;
  3. build:最终构建为Source传递给core模块;

SkyWalking8源码(三)OAP处理Segment

AnalyzerModuleProvider#listenerManager:AnalysisListener有n个,这里主要看SegmentAnalysisListener,其他后面再看。

SkyWalking8源码(三)OAP处理Segment

SegmentAnalysisListener组装segment数据。

SkyWalking8源码(三)OAP处理Segment

SegmentAnalysisListener会被回调四个方法:parseSegment、parseFirst、parseEntry、build。

SegmentAnalysisListener#parseSegment:解析segment

SkyWalking8源码(三)OAP处理Segment

  1. duration:segment耗时=span最大endTime-span最小startTime;
  2. isError:segment是否异常,默认segmentStatusAnalysisStrategy=FROM_SPAN_STATUS,如果EntrySpan发生异常,则segment异常;
  3. appendSearchable:searchableTagKeys可搜索的tag的key,加入segment;
  4. sampleStatus:oap侧是否采集segment

1)概率采集:sampleRate/10000的概率采集,默认10000即必采集;

2)异常采集:如果发生异常,默认forceSampleErrorSegment=true,必采集;

3)慢采集:如果超出slowTraceSegmentThreshold毫秒,必采集,默认-1代表不生效;

SegmentAnalysisListener#parseFirst:解析segment的第一个span

SkyWalking8源码(三)OAP处理Segment

  1. timebucket:segment的timebucket截断到秒;
  2. serviceId、endpointId、instanceId:对service、endpoint、instance进行base64编码;
  3. dataBinary:segment原始数据都变成byte数组,放入segment;

SegmentAnalysisListener#parseEntry:解析EntrySpan

和第一个span里逻辑重复,endpoint可以覆盖第一个span。

SkyWalking8源码(三)OAP处理Segment

SegmentAnalysisListener#build:调用core模块的SourceReceiver接收segment。

SkyWalking8源码(三)OAP处理Segment

4、core模块-转发分析结果

OAP侧分析结果是Segment,继承Source

SkyWalking8源码(三)OAP处理Segment

SourceReceiverImplSource交给DispatcherManager

SkyWalking8源码(三)OAP处理Segment

DispatcherManager根据Source的scope,找到所有处理器SourceDispatcher

SkyWalking8源码(三)OAP处理Segment

SegmentDispatcherSegment转换为SegmentRecord,交给RecordStreamProcessor

SkyWalking8源码(三)OAP处理Segment

RecordStreamProcessor#in:交给SegmentRecord对应RecordPersistentWorker实例。

SkyWalking8源码(三)OAP处理Segment

RecordPersistentWorker#in:调用storage模块执行存储。

SkyWalking8源码(三)OAP处理Segment

5、storage模块-存储segment

以es存储为例。

RecordEsDAO#prepareBatchInsert:

  1. 将SegmentRecord转换为map作为文档内容;
  2. 索引名,segment是sw_segment-日期;
  3. id=segment_id;

SkyWalking8源码(三)OAP处理Segment

BatchProcessEsDAO#asynchronous:

将单个segment写入请求放入BulkProcessor异步批量写入es。

bulkActions,根据数据量写入,默认满1000条触发;

flushInterval,根据时间写入,默认满10s触发;

concurrentRequests,并发请求es的数量,默认2;

SkyWalking8源码(三)OAP处理Segment

四、Segment衍生Metrics

1、概述

通过segment可以分析得到其他有效信息。

元数据类:如服务列表、实例列表、端点列表、服务拓扑、接口拓扑。

SkyWalking8源码(三)OAP处理Segment

metrics类:如接口cpm(Calls Per Minute)。

SkyWalking8源码(三)OAP处理Segment

在Analysis模块中,Segment分析还包含两个AnalysisListener:

  1. NetworkAddressAliasMappingListener:转换EntrySpan的SegmentRef为ip-port到服务名的映射关系;
  2. MultiScopesAnalysisListener:转换Segment为多个Scope模型;

SkyWalking8源码(三)OAP处理Segment

这些AnalysisListener和SegmentAnalysisListener流程类似:

SkyWalking8源码(三)OAP处理Segment

  1. AnalysisListenerSegmentObject转换为Source,发送给SourceReceiver
  2. SourceReceiverSource发送给DispatcherManager
  3. DispatcherManager根据Sourcescope找到对应SourceDispatcher
  4. SourceDispatcherSource转换为存储模型StorageData发送给指定的StreamProcessor
  5. StreamProcessorStorageData发送给Worker
  6. WorkerStorageData持久化;

衍生数据主要来源于MultiScopesAnalysisListener对SegmentObject的Source转换,而不同Source会发送给不同SourceDispatcher实现

2、MultiScopesAnalysisListener

MultiScopesAnalysisListener处理所有类型Span,将Span转换为4个类型的SourceBuilder,再将SourceBuilder转换为多个Source后发送给SourceReceiver

SkyWalking8源码(三)OAP处理Segment

EntrySpan

EntrySpan原始数据如下。如feign调用,service-a调用service-b。

parentSpanId: -1
startTime: 1719379473514
endTime: 1719379473524
refs {
  traceId: "40040aef1ca4402eaccadb2a7191807b.105.17193794734470001"
  parentTraceSegmentId: "40040aef1ca4402eaccadb2a7191807b.105.17193794734470000"
  parentSpanId: 1
  parentService: "prod::service-a"
  parentServiceInstance: "87a58cc0e149400ca2458a6aabe991ae@127.0.0.1"
  parentEndpoint: "{GET}/api/v1/feign"
  networkAddressUsedAtPeer: "127.0.0.1:8092"
}
operationName: "{GET}/api/v1/b/hello"
spanLayer: Http
componentId: 14
tags {
  key: "url"
  value: "http://127.0.0.1:8092/api/v1/b/hello"
}
tags {
  key: "http.method"
  value: "GET"
}

如RabbitMQ消费,service-a消费service-b发送的消息。

parentSpanId: -1
startTime: 1719381174500
endTime: 1719381174502
refs {
  traceId: "28478c7f47a94de19ee4f44870487f0d.99.17193811744680001"
  parentTraceSegmentId: "28478c7f47a94de19ee4f44870487f0d.99.17193811744680000"
  parentSpanId: 1
  parentService: "prod::service-b"
  parentServiceInstance: "43398fcb6dc542c188a0886179501124@127.0.0.1"
  parentEndpoint: "{GET}/rabbit/send"
  networkAddressUsedAtPeer: "127.0.0.1:5672"
}
operationName: "RabbitMQ/Topic/Queue/test-queue2/Consumer"
spanLayer: MQ
componentId: 53
tags {
  key: "mq.broker"
}
tags {
  key: "mq.topic"
}
tags {
  key: "mq.queue"
  value: "test-queue2"
}
tags {
  key: "transmission.latency"
  value: "12"
}

MultiScopesAnalysisListener#parseEntry:将EntrySpan转换为SourceBuilder。

如果EntrySpan没有ref关联上游Segment和Span,则来源service/instance/endpoint都是User

SkyWalking8源码(三)OAP处理Segment

MultiScopesAnalysisListener#parseEntry:

EntrySpan如果有ref,循环生成n个SourceBuilder,一般rpc只有一个,mq批量消费有多个。

来源服务名sourceServiceName有两种情况:mq消费=broker的ip:port,rpc=正常服务名;

SkyWalking8源码(三)OAP处理Segment

MultiScopesAnalysisListener#setPublicAttrs:

公共属性中timebucket用span的开始时间分钟级别

注:timebucket是skywalking中metrics的重要概念,这里是分钟timebucket格式如yyyyMMddHHmm。

SkyWalking8源码(三)OAP处理Segment

MultiScopesAnalysisListener#build:最终EntrySpan的SourceBuilder会构造出n个类型的Source。

  1. All:全局数据;
  2. Service:服务;
  3. ServiceInstance:服务实例;
  4. Endpoint:端点;
  5. ServiceRelation:服务关系(服务拓扑);
  6. ServiceInstanceRelation:服务实例关系;
  7. EndpointRelation:接口关系(接口拓扑);

SkyWalking8源码(三)OAP处理Segment

Service,服务。

SkyWalking8源码(三)OAP处理Segment

EndpointRelation,接口关系。(接口拓扑)

SkyWalking8源码(三)OAP处理Segment

ExitSpan

ExitSpan如下。

如一次feign调用,service-a调用service-b的/api/v1/b/hello。

spanId: 1
startTime: 1719379972188
endTime: 1719379972231
operationName: "/api/v1/b/hello"
peer: "127.0.0.1:8092"
spanType: Exit
spanLayer: Http
componentId: 11
tags {
  key: "http.method"
  value: "GET"
}
tags {
  key: "url"
  value: "http://127.0.0.1:8092/api/v1/b/hello"
}

MultiScopesAnalysisListener#parseExit:分析ExitSpan

关键点是如何拿到目标serviceName,客户端ExitSpan中只有服务端的ip和port(peer)。

通过服务端的EntrySpan能构造服务端ip和port到服务名的映射关系,即NetworkAddressAliasMappingListener的处理逻辑(这里忽略)。

SkyWalking8源码(三)OAP处理Segment

对于一次db调用ExitSpan如下。

spanId: 1
startTime: 1719389799264
endTime: 1719389799265
operationName: "H2/JDBI/PreparedStatement/execute"
peer: "localhost:-1"
spanType: Exit
spanLayer: Database
componentId: 32
tags {
  key: "db.type"
  value: "sql"
}
tags {
  key: "db.instance"
  value: "test"
}
tags {
  key: "db.statement"
  value: "select * from user where id = ?"
}

MultiScopesAnalysisListener#parseExit:对于db慢调用会成为Source继续分析。

  1. serviceName:peer,db的ip和port;
  2. timebucket:秒级timebucket,取span开始时间;

SkyWalking8源码(三)OAP处理Segment

MultiScopesAnalysisListener#parseExit:只有调用超过阈值才会生成Source。

根据tag[db.type]得到db类型,根据配置slowDBAccessThreshold=default:200,mongodb:100,获取对应阈值。

SkyWalking8源码(三)OAP处理Segment

MultiScopesAnalysisListener#build:ExitSpan生成4种Source:

  1. ServiceRelation:服务关系;
  2. ServiceInstanceRelation:服务实例关系;
  3. ServiceMeta:DB调用,服务元数据,和普通Service的区别是ServiceMeta没有衍生metrics;
  4. DatabaseAccess:DB调用,DB访问;

注:所以大部分数据来源都是EntrySpan。

SkyWalking8源码(三)OAP处理Segment

LocalSpan

MultiScopesAnalysisListener#parseLogicEndpoints:

对于EntrySpan和LocalSpan都存在逻辑endpoint概念。

LocalSpan:tag[x-le]={"logic-span":true};

EntrySpan:tag[x-le]={"name":"接口名","latency":耗时,"status":是否成功}

SkyWalking8源码(三)OAP处理Segment

MultiScopesAnalysisListener#build:

最终也会和普通EntrySpan一样,能转换为Endpoint,支持Endpoint相关metrics。

SkyWalking8源码(三)OAP处理Segment

常见的逻辑Endpoint如SpringSche

SkyWalking8源码(三)OAP处理Segment

3、SourceDispatcher

根据Source的scope不同,会经过n个不同的SourceDispatcher将Source转换为最终的存储模型,比如端点的scope=3。

SkyWalking8源码(三)OAP处理Segment

Endpoint端点,经过EndpointTrafficDispatcher转换为EndpointTraffic

SkyWalking8源码(三)OAP处理Segment

EndpointRelation端点关系,经过EndpointCallRelationDispatcher转换为EndpointRelationServerSideMetrics

SkyWalking8源码(三)OAP处理Segment

上述都是元数据和拓扑相关数据,更多metrics数据要通过SkyWalking的OAL引擎实现。

4、OAL

AnalyzerModuleProvider#start:Analyzer模块启动阶段会用OAL引擎加载核心OAL定义

SkyWalking8源码(三)OAP处理Segment

具体代码不分析了,分析大致含义。

第一步,读取core.oal描述文件,文件中包含众多metrics的定义。(server-bootstrap)

SkyWalking8源码(三)OAP处理Segment

第二步,使用antlr4解析oal文件得到OALScripts。(oal-grammar)

SkyWalking8源码(三)OAP处理Segment

第三步,OAL引擎根据OALScripts使用freemarker模板+javassist动态生成MetricDispatcher类。(oal-rt)

SkyWalking8源码(三)OAP处理Segment

SkyWalking8源码(三)OAP处理Segment

如果设置环境变量SW_OAL_ENGINE_DEBUG,则能在oal-rt目录下看到自动生成的class。

SkyWalking8源码(三)OAP处理Segment

比如端点平均响应时间EndpointAvgMetrics

SkyWalking8源码(三)OAP处理Segment

EndpointDispatcher解析Source=Endpoint得到EndpointAvgMetrics

SkyWalking8源码(三)OAP处理Segment

5、Metric异步处理

MetricsStreamProcessor#in:

根据Metric类不同,进入不同的MetricsAggregateWorker实例处理。

SkyWalking8源码(三)OAP处理Segment

Metric和Segment处理的一大区别是:

RecordPersistentWorker直接调用底层storage模块存储。

MetricsAggregateWorker走生产消费模型,将数据丢入DataCarrier异步消费。

SkyWalking8源码(三)OAP处理Segment

MetricsAggregateWorker构造确定DataCarrier的生产消费模式。

SkyWalking8源码(三)OAP处理Segment

每个Metric类有一个MetricsAggregateWorker实例,每个Worker有一个DataCarrier,DataCarrier有2个1w容量的buffer生产采用阻塞模式,如果buffer数组满了将阻塞生产线程。

所有MetricsAggregateWorker中的DataCarrier公用一个BulkConsumePool,用4倍核数线程消费。

SkyWalking8源码(三)OAP处理Segment

6、Worker

SkyWalking8源码(三)OAP处理Segment

每个Metric类对应一个MetricsAggregateWorker实例。

MetricsAggregateWorker背后有n个Worker,涉及L1聚合(内存聚合)和L2聚合(db聚合)概念。在L2聚合中还有DownSampling概念。

MetricsStreamProcessor#create:组装MetricsAggregateWorker

SkyWalking8源码(三)OAP处理Segment

7、L1聚合

L1聚合,将同类型metric,按照id分组,按照聚合函数聚合

MetricsAggregateWorker

MetricsAggregateWorker#onWork:从DataCarrier中消费metrics执行聚合。

SkyWalking8源码(三)OAP处理Segment

MergableBufferedData#accept:将DataCarrier中的metrics按照id分组聚合。

SkyWalking8源码(三)OAP处理Segment

LongAvgMetrics

端点平均耗时,id是分钟timebucket+service+endpoint。

combine将端点耗时latency累积到summation,并统计metric数量count。

SkyWalking8源码(三)OAP处理Segment

EndpointTraffic

EndpointTraffic端点元数据,id是service+endpoint,combine空实现,最终只是将Endpoint去重。

SkyWalking8源码(三)OAP处理Segment

MetricsAggregateWorker将聚合后的metrics循环发送给MetricsRemoteWorker

MetricsRemoteWorker

MetricsRemoteWorker调用RemoteSenderService发送metrics。

SkyWalking8源码(三)OAP处理Segment

RemoteSenderService#send:

  1. 获取OAP集群中的实例;
  2. 通过hash(metric)选择其中一个实例;
  3. 发送metric给目标OAP实例;

SkyWalking8源码(三)OAP处理Segment

OAP注册与发现

OAP支持多种注册中心,通过cluster模块配置。

RemoteClientManager每5s从注册中心拉取OAP实例列表,与对应OAP实例建立连接,组装为RemoteClient通讯客户端。

SkyWalking8源码(三)OAP处理Segment

CoreModuleProvider#start:启动阶段,特定角色的oap实例会做服务注册。

  • mixed:L1+L2聚合,默认角色,会做服务注册;
  • aggregator:L2聚合,会做服务注册;
  • receiver:L1聚合;

SkyWalking8源码(三)OAP处理Segment

选择OAP实例

HashCodeSelector#select:

使用metric的remoteHashCode模OAP实例数量,将同样的metric发送到同一个OAP实例做L2聚合。

SkyWalking8源码(三)OAP处理Segment

EndpointAvgMetrics,端点平均耗时,根据entityId=service+endpoint发送到同一个OAP实例。

SkyWalking8源码(三)OAP处理Segment

发送Metric

RemoteClient#push:这里发送Metric有两种情况。

SkyWalking8源码(三)OAP处理Segment

SelfRemoteClient#push:如果hash到当前oap节点,不走远程调用,直接走下一个worker。

SkyWalking8源码(三)OAP处理Segment

GRPCRemoteClient#push:如果hash到其他oap节点,将数据再次放到一个DataCarrier由其他线程消费。

这个DataCarrier有1个3000容量的buffer,采用BLOCK策略。

SkyWalking8源码(三)OAP处理Segment

GRPCRemoteClient.RemoteMessageConsumer#consume:消费线程向OAP发送请求。

SkyWalking8源码(三)OAP处理Segment

8、L2聚合

SkyWalking8源码(三)OAP处理Segment

接收L1聚合结果

每个metric对应一个MetricsPersistentWorker实例,接收L1聚合结果,将数据放入DataCarrier,采取BLOCK策略,有2000长度的一个buffer。

SkyWalking8源码(三)OAP处理Segment

内存聚合

MetricsPersistentWorker.PersistentConsumer#consume:

所有DataCarrier由一个核数/4的线程池消费,进行内存聚合。

SkyWalking8源码(三)OAP处理Segment

SkyWalking8源码(三)OAP处理Segment

MergableBufferedData#accept:和L1聚合一样在内存中做一次聚合。

SkyWalking8源码(三)OAP处理Segment

db聚合

PersistenceTimer#start:每隔3s消费一波MetricsPersistentWorker中缓存的数据。

SkyWalking8源码(三)OAP处理Segment

PersistenceTimer#extractDataAndSave:

  1. buildBatchRequests:将db更新请求放入prepareRequests列表;
  2. endOfRound:后置处理;

SkyWalking8源码(三)OAP处理Segment

PersistenceWorker#buildBatchRequests:消费内存聚合数据。

SkyWalking8源码(三)OAP处理Segment

MetricsPersistentWorker#prepareBatch:

  1. 将数据发送给MetricsTransWorkerDownSampling
  2. 2k一批做db聚合;

SkyWalking8源码(三)OAP处理Segment

MetricsPersistentWorker#flushDataToStorage:db聚合

  1. 从db批量查询metrics,加载到内存;
  2. 循环每个新的metrics,与内存metrics做合并;
  3. 组装插入或更新请求到prepareRequests;
  4. 将合并后的metrics,发送到AlarmNotifyWorker(告警)和ExportWorker(暴露给三方扩展,grpc接收metrics)

SkyWalking8源码(三)OAP处理Segment

MetricsPersistentWorker#loadFromStorage:

根据id批量查询底层存储,比如es是根据ids查询。

enableDatabaseSession默认为true,代表允许db数据缓存在内存中。

SkyWalking8源码(三)OAP处理Segment

Metrics#combine和L1聚合一样。

但是部分Metrics不支持更新(service和endpoint的元数据),这里将db数据加载到内存后去重跳过。

SkyWalking8源码(三)OAP处理Segment

Metrics#calculate,对聚合结果做计算得到最终数值。

比如LongAvgMetrics#calculate,value平均值=summation总和/count数量,比如Endpoint平均响应时间EndpointAvgMetrics。

SkyWalking8源码(三)OAP处理Segment

MetricsPersistentWorker#endOfRound:

批量更新prepareRequests组装完毕后,移除超过70s的缓存metrics。

SkyWalking8源码(三)OAP处理Segment

写db

经过db聚合得到更新请求prepareRequests。

PersistenceTimer#extractDataAndSave:

prepareRequests分成maxSyncOperationNum=5w一批,提交到线程池处理,线程数量syncOperationThreadsNum=2。

SkyWalking8源码(三)OAP处理Segment

BatchProcessEsDAO#synchronous:对于es存储同步提交BulkRequest

SkyWalking8源码(三)OAP处理Segment

DownSampling

DownSampling表示下采样

聚合后的metrics的timebucket是分钟级别的,比如下面endpoint平均响应时间。

bucket总耗时请求次数平均耗时
2024070612302002100
20240706123150150

DownSampling可以将分钟bucket下采样到小时级别。

bucket总耗时请求次数平均耗时
2024070612250383

SkyWalking默认支持HourDay级别的下采样,可以在application.yml中修改。

SkyWalking8源码(三)OAP处理Segment

部分metrics不支持下采样,如endpoint接口元数据。

SkyWalking8源码(三)OAP处理Segment

MetricsTransWorker#in:收到L2聚合流程中的分钟metrics

  1. toHour/Day:转换为小时/日维度metrics;
  2. 下发给小时/日维度的MetricsPersistentWorker;

SkyWalking8源码(三)OAP处理Segment

EndpointAvgMetrics#toHour:

metric转换往往只需要截断timebucket即可。

SkyWalking8源码(三)OAP处理Segment

小时和日维度的MetricsPersistentWorker处理流程同L2db聚合+写db,由PersistenceTimer触发,只是没有后续worker(告警、export)。

SkyWalking8源码(三)OAP处理Segment

总结

DataCarrier

DataCarrier是一个生产消费模型的封装,核心参数:

  1. channelSize:代表有n个环形数组;
  2. bufferSize:每个环形数组的大小;
  3. BufferStrategy:如果环形数组满了,如何处理
    1. 默认BLOCKING,阻塞生产
    2. 可选IF_POSSIBLE,重试3次,失败丢弃
  1. consume:指定消费线程数(线程池)和消费者逻辑;

当buffer空闲时消费者会sleep 20ms。

agent侧

TraceSegment结束放入DataCarrier。

SkyWalking8源码(三)OAP处理Segment

TraceSegmentServiceClient异步单线程消费DataCarrier,转换为SegmentObject发送至OAP。

注意,DataCarrier采用IF_POSSIBLE策略,如果OAP响应速度跟不上,会导致Segment丢失。

OAP侧

模块系统

SkyWalking8源码(三)OAP处理Segment

  1. ModuleDefine:模块定义,通过JDK SPI加载
  2. ModuleProvider:模块实现,通过JDK SPI加载,运行期间一个模块只有一个模块实现;
  3. Service:服务,ModuleDefine定义模块需要实现哪些Service,ModuleProvider需要实现这些Service才能启动成功;
  4. ModuleConfig:模块实现的配置,会将OAP配置反射注入各ModuleConfig实现;

依赖查找,通过ModuleManager-ModuleDefine-ModuleProvider-Service路径查找Service实现。

角色

OAP节点分为两种角色:

  1. Receiver:接收agent的Segment数据并持久化;对Segment分析得到Metrics并进行L1聚合;
  2. Aggregator:接收Receiver的L1聚合结果,做L2聚合和下采样并持久化;

默认Mixed角色,即Receiver+Aggregator。

Receiver

SkyWalking8源码(三)OAP处理Segment

启动阶段,trace模块注册TraceSegmentReportServiceHandler接收SegmentObject。

启动完毕,core模块启动GrpcServer监听11800端口。

客户端发送SegmentObject,TraceSegmentReportServiceHandler转发给analyzer模块的TraceAnalyzer

TraceAnalyzer使用多个AnalysisListener分析SegmentObject,得到多个Source

  1. SegmentAnalysisListener:OAP侧判断是否采集,将SegmentObject转换为Segment。(概率采集:sampleRate=10000默认全采集;发生异常必采集;慢调用采集:slowTraceSegmentThreshold=-1,默认未开启);
  2. NetworkAddressAliasMappingListener:将span的ref转换为ip-port和服务的映射关系;
  3. MultiScopesAnalysisListener:将SegmentObject转换为众多Source,如端点、端点关系、服务、服务关系、db慢调用、逻辑端点;

TraceAnalyzer调用SourceReceiverImpl将Source转发至DispatcherManager

DispatcherManager根据Source的实现不同,转发至不同的SourceDispatcher实现。

SourceDispatcher将Source发送至不同的StreamProcessor处理(涉及oal生成metrics):

  1. Segment转换为SegmentRecord,发送至RecordStreamProcessor
  2. 其他Source转换为Metrics,发送至MetricsStreamProcessor

StreamProcessor使用DataCarrier接收数据,至此grpc线程释放

RecordStreamProcessor,消费SegmentRecord数据,将数据写入底层存储,es存储采用BulkProcessor异步插入:

  1. SW_STORAGE_ES_BULK_ACTIONS:1000,满1000条插入;
  2. SW_STORAGE_ES_FLUSH_INTERVAL:10,满10秒插入;
  3. SW_STORAGE_ES_CONCURRENT_REQUESTS:2,控制并发请求es数量;

MetricsStreamProcessor,消费其他metrics数据,进行L1聚合并发送至Aggregator。

MetricsAggregateWorker,L1聚合,对segment分析结果得到的分钟timebucket的metrics进行内存级别聚合

  1. 元数据/拓扑类型metrics:简单去重,如接口、服务、接口依赖、服务依赖;
  2. 其他metrics:如接口平均响应时间,合并接口的总耗时和总次数,计算平均值交给L2聚合处理;

MetricsRemoteWorker,将L1聚合结果发送至Aggregator。

为了保证L2聚合的数据正确性,必须将同样的metrics发送至同一个Aggregator节点。

Receiver使用hash(metrics)%Aggregator.size的方式选择Aggregator节点发送。

如果当前节点是Mixed角色,也可能被命中。

SkyWalking8源码(三)OAP处理Segment

Aggregator

SkyWalking8源码(三)OAP处理Segment

grpc线程,接收L1聚合结果,放入DataCarrier;

L2聚合线程,消费DataCarrier中的metrics,再次进行内存聚合;

PersistenceTimer单线程每隔3s做db聚合:

  1. 从底层存储(如es)中,根据metrics的id批量加载数据到内存;
  2. 将L1聚合结果与db中的数据在内存中做聚合计算;部分metrics不支持update,如端点元数据,这里会跳过;其他metrics,如端点平均响应时间,计算总耗时/次数;
  3. 将聚合结果分成5w一批,多线程写底层存储(更新或插入),如果是es存储,使用BulkRequest;

在L2聚合过程中还涉及下采样逻辑。

将L1聚合结果的分钟metrics转换为小时/日metrics,并再次走db聚合。

欢迎大家评论或私信讨论问题。

本文原创,未经许可不得转载。

欢迎关注公众号【程序猿阿越】。

转载自:https://juejin.cn/post/7388347353408258085
评论
请登录