【SkyWalking】改造SkyWalking实现性能低损耗采集Dubbo参数(一)
一、背景
业务方需要了解Dubbo请求的数据,公司要求实现SkyWalking agent采集Dubbo的入参、出参,并展示到链路的Tag上。
下面讲解我们的方案,因为公司对源码的安全管控,所以下面主要讲解思路。
如果需要采集Http请求的参数,也可以参考这个方案。
二、设计思路
1 低损耗采集
SkyWalking 现有Dubbo参数采集功能
所以尽可能减少内存、CPU、网络的性能损耗,且基本不影响Dubbo接口的处理时长。
功能 | SkyWalking现状 | 改造方案 |
---|---|---|
可以控制采集频率 | 不支持 | |
参数采集后转换的格式 | toString()方式 | Json序列化 |
异步转换参数 | 同步方式,对于大对象会增加较多请求耗时 | 异步方式,对请求耗时基本没影响 |
限定长度截断数据 | 支持限定数据大小,但是是在对象转String后截取 | 支持按照指定长度序列化部分数据,性能消耗可控 |
指定endpoint不采集 | 不支持 | 支持 |
控制Dubbo provider、consumer端是否采集 | 支持 | 支持 |
支持采集出参 | 不支持 | 支持 |
2 自动降级和恢复
自动降级的一些场景:
- 频繁GC时,参数采集频率自动降低,GC不频繁时再恢复采集频率
- 对于大对象,降低采集频率
SkyWalking当前不支持。
3 支持动态配置
为了能适配不同应用的特殊场景,参数采集的一些设置需要实现动态配置。
- 可以动态调整采集频率、序列化后数据长度等等配置
SkyWalking提供了Collector(OapServer)动态下发配置到Agent的功能,我们可以基于该该功能做二次开发。
4 单个链路全采集
一个trace链路中参数一般都是有相关性,如果判断trace要采集,尽可能单个链路全采集,如此更方便业务方排错。
- 一个trace如果要采集参数,这个trace下的Dubbo接口尽可能都采集参数
SkyWalking提供了跨进程传播协议,我们可以基于该该协议实现单个链路的全采集。
三、获取Dubbo入参、出参
如果要实现Dubbo的入参、出参,修改SkyWalking的apm-dubbo-2.7.x-plugin、apm-dubbo-3.x-plugin的DubboInterceptor即可,DubboInterceptor会拦截dubbo框架的MonitorFilter类的invoke()方法进行插桩,在invoke()方法前后分别获取入参、出参。
SkyWalking Dubbo plugin的设计思路如下:
如何通过DubboInterceptor采集Dubbo参数,可以参考下面代码:
public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
......
boolean isConsumer = rpcContext.isConsumerSide();
......
AbstractSpan span;
......
if (isConsumer) {
....
needCollectParam = needCollectParam(isConsumer, ....); // 是否采集参数
....
} else {
....
needCollectParam = needCollectParam(isConsumer, ....); // 是否采集参数
....
}
....
collectInputParam(invocation.getArguments(), span); // 采集入参
....
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
....
collectOutputParam(ret, span); // 采集出参
....
ContextManager.stopSpan();
return ret;
}
/** 是否采集参数 */
private void needCollectParam(..) {
// 判断采集频率、采集开关、endpoint黑名单等等
.....
}
/** 采集入参 */
private void collectInputParam(..) {
// 异步执行参数的序列化
.....
}
/** 采集出参 */
private void collectOutputParam(..) {
// 异步执行参数的序列化
.....
}
}
四、低损耗采集
1 控制采集频率
控制每秒、每分钟时间窗口的参数采集频率。
实现思路参考了org.apache.skywalking.apm.agent.core.sampling.SamplingService
。
摘录SamplingService
的核心代码:
/** 时间窗口内已采集次数 */
private volatile AtomicInteger samplingFactorHolder;
/**
* 尝试采集
*/
public boolean trySampling(String operationName) {
int factor = samplingFactorHolder.get();
if (factor < samplingRateWatcher.getSamplingRate()) {
return samplingFactorHolder.compareAndSet(factor, factor + 1);
} else {
return false;
}
}
public void handleSamplingRateChanged() {
if (samplingRateWatcher.getSamplingRate() > 0) {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("SamplingService"));
// 定时重置已采集次数为0
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(
this::resetSamplingFactor, t -> LOGGER.error("unexpected exception.", t)), 0, 3, TimeUnit.SECONDS);
}
}
2 Json序列化
2.1 选择Gson框架
为何选择Json?
因为参数的数据结构是不固定的,所以不能使用Protobuf、Hession等序列化方式,只能选择toString()、Json等。
最终toString()的数据的可读性不如gson,且不支持反序列化,所以选择Json。
为何选择Gson框架?
因为如下考虑:
-
兼容性:Gson已在SkyWalking agent中引入,且Gson框架简单,所以不用担心兼容性。而agent引入fastjson可能影响宿主应用,参考别在 agent 中依赖 fastjson。
-
安全性:jackson、fastjson历史安全漏洞比Gson多。
-
性能:jackson、fastjson性能相比Gson更好,但是差距并非那么大。通过控制采集频率,Json序列化频率不是非常高,Gson性能足矣。
2.2 序列化-异步执行、限定长度截取
这部分是精华! 保障了低性能损耗地完成单次参数采集。
下图是一次dubbo 请求的流程(描述了consumer端详细流程,而provider也是同理):
异步序列化
参数序列化的任务提交到独立线程池实现异步处理,TraceSegment(一个Segment可以简单理解为某个应用中1次Dubbo的请求过程,包含多个span)会提交到由KafkaTraceSegmentServiceClient中本地队列DataCarrier。然后由KafkaTraceSegmentServiceClient中DataCarrier内的线程消费队列中TraceSegment,再通过transform()转换为SegmentObject数据,transform()方法通过future.get()获得序列化的结果,SegmentObject会发送到Kafka,最终由SkyWalking Collector消费。对应的核心代码如下:
/**
* 线程池,要求:
* 队列不能太长,否则积压过多参数数据,导致GC频繁、甚至OOM
*/
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心线程数
30, // 最大线程数
60L, // 非核心线程的超时时间
TimeUnit.SECONDS, // 超时时间的单位
new LinkedBlockingQueue<>(200), // 队列长度
namedThreadFactory // 使用自定义的线程工厂
);
/** 采集入参 */
private void collectInputParam(..) {
// 异步执行参数的序列化
Future<String> inputParamFuture = paramSerializerThreadPool.submit(()-> ...params...);
span.setInputParamFuture(inputParamFuture);
}
/** TraceSegment */
public class TraceSegment {
/** 此处由DataCarrier的线程调用,消费DataCarrier队列中TraceSegment,执行transform()转换为最终上报的数据 */
public SpanObject.Builder transform() {
SpanObject.Builder spanBuilder = SpanObject.newBuilder();
....
// 通过future.get()的方式从线程池获取序列化的结果,最长等待1000毫秒
String inputParam = span.getInputParamFuture().get(1000, TimeUnit.MILLISECONDS);
String outputParam = span.getOutputParamFuture().get(1000, TimeUnit.MILLISECONDS);
// 记录到span tag中
spanBuilder.addTags(new TagValuePair(new AbstractTag("inputParam"), inputParam));
spanBuilder.addTags(new TagValuePair(new AbstractTag("outputParam"), outputParam));
....
return spanBuilder;
}
}
限定长度截取
使用Gson、fastjson等框架序列化大对象时,速度并非很快,测试发现序列化1M到10M的数据,耗时在几毫秒到上百毫秒。
但是,大对象并不常见,但是SkyWalking作为基础中间件,应该尽可能减少对业务应用的影响,所以经过考量,我们限定入参、出差最大在5K到50K,超出部分进行截取,这个长度也基本满足业务需要。
此处,我们借助Gson框架自定义Writer,Writer中统计已序列化的长度,在到达指定长度后自动抛出异常,结束序列化转换。
通过该方式,不管序列化的对象多大,我们最多序列化50K的数据,耗时稳定在0ms到1ms,该方式的优点是:用完整序列化整个对象,序列化耗时短,内存、CPU消耗低,且上传给服务端的数据量小,所以网络开销也小。
代码如下:
package com.jyp.json;
import com.google.gson.Gson;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
public class GsonUtil {
public static final String JSON_TRUNCATION_SUFFIX = "... ...";
private static final Gson GSON = new Gson();
/** 指定最大序列化长度 */
public static String toJsonStr(int length, Object param) {
if (param == null) {
return null;
}
StringWriter stringWriter = new StringWriter();
try (LimitedSizeStringWriter limitedWriter = new LimitedSizeStringWriter(stringWriter, length);
JsonWriter jsonWriter = new JsonWriter(limitedWriter)) {
GSON.toJson(param, param.getClass(), jsonWriter);
return stringWriter.toString();
} catch (LimitedStringException e) {
// Reached the limit
return stringWriter + JSON_TRUNCATION_SUFFIX;
} catch (IOException e) {
// Handle other IO exceptions, perhaps by logging or re-throwing
throw new RuntimeException("Error generating JSON", e);
}
}
static class LimitedStringException extends RuntimeException {
public LimitedStringException(String message) {
super(message);
}
}
public static class LimitedSizeStringWriter extends Writer {
private final Writer writer;
private final int limit;
private int currentLength = 0;
public LimitedSizeStringWriter(Writer writer, int limit) {
this.writer = writer;
this.limit = limit;
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
if (currentLength + len > limit) {
writer.write(cbuf, off, limit - currentLength);
currentLength = limit;
// 通过抛异常的方式,可以立即停止对象转json。比使用return的方式更高效
throw new LimitedStringException("Reached character limit");
}
writer.write(cbuf, off, len);
currentLength += len;
}
@Override
public void flush() throws IOException {
writer.flush();
}
@Override
public void close() throws IOException {
writer.close();
}
}
}
五、自动降级和恢复
参数采集需要尽量不影响应用,以及在应用自身压力大时,采集参数的功能能自动降级。
1 GC频繁时降低采集频率
频繁GC时,参数采集频率自动降低,GC不频繁时再恢复采集频率。
定时通过GarbageCollectorMXBean.getCollectionCount()和GarbageCollectorMXBean.getCollectionCount.getCollectionTime()可以获取GC的次数和耗时。如果每分钟YGC超5次或FullGC超过2次,则降低采集的频率,这个规则不同应用根据情况自定义。
如果一段时间内GC不再频繁,则自动恢复。
2 对于大对象,降低采集频率
如果采集的参数的长度频繁到达序列化的最长长度限制,则可以自动降低对应endpoint的采集频率。
如果一段时间内大对象比较少,则自动恢复。
除了上面的情况,也可以扩展更全面的降级策略,例如根据CPU使用率、内存使用率等情况来控制。
六、小结
上面讲解采集参数的设计思路:
- 通过修改Dubbo plugin实现采集Dubbo出入参数
- 通过控制采集频率、异步序列化、自定义Json序列化策略的方式实现性能低损耗采集
- 根据GC、大对象等情况适当降级采集频率
后面将再写一篇文章讲解如何实现参数采集相关的动态配置、以及如何实现单个链路全采集。
转载自:https://juejin.cn/post/7276712861513023529