likes
comments
collection
share

【SkyWalking】改造SkyWalking实现性能低损耗采集Dubbo参数(一)

作者站长头像
站长
· 阅读数 13

一、背景

业务方需要了解Dubbo请求的数据,公司要求实现SkyWalking agent采集Dubbo的入参、出参,并展示到链路的Tag上。

下面讲解我们的方案,因为公司对源码的安全管控,所以下面主要讲解思路。

如果需要采集Http请求的参数,也可以参考这个方案。

二、设计思路

1 低损耗采集

SkyWalking 现有Dubbo参数采集功能

所以尽可能减少内存、CPU、网络的性能损耗,且基本不影响Dubbo接口的处理时长。

功能SkyWalking现状改造方案
可以控制采集频率不支持
参数采集后转换的格式toString()方式Json序列化
异步转换参数同步方式,对于大对象会增加较多请求耗时异步方式,对请求耗时基本没影响
限定长度截断数据支持限定数据大小,但是是在对象转String后截取支持按照指定长度序列化部分数据,性能消耗可控
指定endpoint不采集不支持支持
控制Dubbo provider、consumer端是否采集支持支持
支持采集出参不支持支持

2 自动降级和恢复

自动降级的一些场景:

  • 频繁GC时,参数采集频率自动降低,GC不频繁时再恢复采集频率
  • 对于大对象,降低采集频率

SkyWalking当前不支持。

3 支持动态配置

为了能适配不同应用的特殊场景,参数采集的一些设置需要实现动态配置。

  • 可以动态调整采集频率、序列化后数据长度等等配置

SkyWalking提供了Collector(OapServer)动态下发配置到Agent的功能,我们可以基于该该功能做二次开发。

4 单个链路全采集

一个trace链路中参数一般都是有相关性,如果判断trace要采集,尽可能单个链路全采集,如此更方便业务方排错。

  • 一个trace如果要采集参数,这个trace下的Dubbo接口尽可能都采集参数

SkyWalking提供了跨进程传播协议,我们可以基于该该协议实现单个链路的全采集。

三、获取Dubbo入参、出参

如果要实现Dubbo的入参、出参,修改SkyWalking的apm-dubbo-2.7.x-plugin、apm-dubbo-3.x-plugin的DubboInterceptor即可,DubboInterceptor会拦截dubbo框架的MonitorFilter类的invoke()方法进行插桩,在invoke()方法前后分别获取入参、出参。

SkyWalking Dubbo plugin的设计思路如下:

【SkyWalking】改造SkyWalking实现性能低损耗采集Dubbo参数(一)

如何通过DubboInterceptor采集Dubbo参数,可以参考下面代码:

public class DubboInterceptor implements InstanceMethodsAroundInterceptor {

    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        ......
        boolean isConsumer = rpcContext.isConsumerSide();
        ......
        AbstractSpan span;
        ......
        if (isConsumer) {
            ....
            needCollectParam = needCollectParam(isConsumer, ....); // 是否采集参数
            ....
        } else {
            ....
            needCollectParam = needCollectParam(isConsumer, ....); // 是否采集参数
            ....
        }
        ....
        collectInputParam(invocation.getArguments(), span); // 采集入参
        ....
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        ....
        collectOutputParam(ret, span); // 采集出参
        ....
        ContextManager.stopSpan();
        return ret;
    }
  
    /** 是否采集参数 */
    private void needCollectParam(..) {
        // 判断采集频率、采集开关、endpoint黑名单等等
        .....
    }

    /** 采集入参 */
    private void collectInputParam(..) {
        // 异步执行参数的序列化
        .....
    }
  
    /** 采集出参 */
    private void collectOutputParam(..) {
        // 异步执行参数的序列化
        .....
    }
}

四、低损耗采集

1 控制采集频率

控制每秒、每分钟时间窗口的参数采集频率。

实现思路参考了org.apache.skywalking.apm.agent.core.sampling.SamplingService

摘录SamplingService的核心代码:

/** 时间窗口内已采集次数 */
private volatile AtomicInteger samplingFactorHolder;
/**
 * 尝试采集
*/
public boolean trySampling(String operationName) {
      int factor = samplingFactorHolder.get();
      if (factor < samplingRateWatcher.getSamplingRate()) {
          return samplingFactorHolder.compareAndSet(factor, factor + 1);
      } else {
          return false;
      }
}
public void handleSamplingRateChanged() {
    if (samplingRateWatcher.getSamplingRate() > 0) {
            ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("SamplingService"));
            // 定时重置已采集次数为0
            scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(
                this::resetSamplingFactor, t -> LOGGER.error("unexpected exception.", t)), 0, 3, TimeUnit.SECONDS);
    }
}

2 Json序列化

2.1 选择Gson框架

为何选择Json?

因为参数的数据结构是不固定的,所以不能使用Protobuf、Hession等序列化方式,只能选择toString()、Json等。

最终toString()的数据的可读性不如gson,且不支持反序列化,所以选择Json。

为何选择Gson框架?

因为如下考虑:

  • 兼容性:Gson已在SkyWalking agent中引入,且Gson框架简单,所以不用担心兼容性。而agent引入fastjson可能影响宿主应用,参考别在 agent 中依赖 fastjson

  • 安全性:jackson、fastjson历史安全漏洞比Gson多。

  • 性能:jackson、fastjson性能相比Gson更好,但是差距并非那么大。通过控制采集频率,Json序列化频率不是非常高,Gson性能足矣。

2.2 序列化-异步执行、限定长度截取

这部分是精华! 保障了低性能损耗地完成单次参数采集。

下图是一次dubbo 请求的流程(描述了consumer端详细流程,而provider也是同理):

【SkyWalking】改造SkyWalking实现性能低损耗采集Dubbo参数(一)

异步序列化

参数序列化的任务提交到独立线程池实现异步处理,TraceSegment(一个Segment可以简单理解为某个应用中1次Dubbo的请求过程,包含多个span)会提交到由KafkaTraceSegmentServiceClient中本地队列DataCarrier。然后由KafkaTraceSegmentServiceClient中DataCarrier内的线程消费队列中TraceSegment,再通过transform()转换为SegmentObject数据,transform()方法通过future.get()获得序列化的结果,SegmentObject会发送到Kafka,最终由SkyWalking Collector消费。对应的核心代码如下:

/** 
 * 线程池,要求:
 * 队列不能太长,否则积压过多参数数据,导致GC频繁、甚至OOM
 */
ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,  // 核心线程数
            30,  // 最大线程数
            60L, // 非核心线程的超时时间
            TimeUnit.SECONDS, // 超时时间的单位
            new LinkedBlockingQueue<>(200), // 队列长度
            namedThreadFactory // 使用自定义的线程工厂
        );

/** 采集入参 */
private void collectInputParam(..) {
    // 异步执行参数的序列化
    Future<String> inputParamFuture = paramSerializerThreadPool.submit(()-> ...params...);
    span.setInputParamFuture(inputParamFuture);
}

/** TraceSegment */
public class TraceSegment {
  /** 此处由DataCarrier的线程调用,消费DataCarrier队列中TraceSegment,执行transform()转换为最终上报的数据 */
  public SpanObject.Builder transform() {
        SpanObject.Builder spanBuilder = SpanObject.newBuilder();
        ....
        // 通过future.get()的方式从线程池获取序列化的结果,最长等待1000毫秒
        String inputParam = span.getInputParamFuture().get(1000, TimeUnit.MILLISECONDS);
        String outputParam = span.getOutputParamFuture().get(1000, TimeUnit.MILLISECONDS);
        // 记录到span tag中
        spanBuilder.addTags(new TagValuePair(new AbstractTag("inputParam"), inputParam));
        spanBuilder.addTags(new TagValuePair(new AbstractTag("outputParam"), outputParam));
        ....  
        return spanBuilder;
    }
}  

限定长度截取

使用Gson、fastjson等框架序列化大对象时,速度并非很快,测试发现序列化1M到10M的数据,耗时在几毫秒到上百毫秒。

但是,大对象并不常见,但是SkyWalking作为基础中间件,应该尽可能减少对业务应用的影响,所以经过考量,我们限定入参、出差最大在5K到50K,超出部分进行截取,这个长度也基本满足业务需要。

此处,我们借助Gson框架自定义Writer,Writer中统计已序列化的长度,在到达指定长度后自动抛出异常,结束序列化转换。

通过该方式,不管序列化的对象多大,我们最多序列化50K的数据,耗时稳定在0ms到1ms,该方式的优点是:用完整序列化整个对象,序列化耗时短,内存、CPU消耗低,且上传给服务端的数据量小,所以网络开销也小。

代码如下:

package com.jyp.json;

import com.google.gson.Gson;
import com.google.gson.stream.JsonWriter;

import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;

public class GsonUtil {

    public static final String JSON_TRUNCATION_SUFFIX = "... ...";

    private static final Gson GSON = new Gson();

    /** 指定最大序列化长度 */
    public static String toJsonStr(int length, Object param) {
        if (param == null) {
            return null;
        }

        StringWriter stringWriter = new StringWriter();
        try (LimitedSizeStringWriter limitedWriter = new LimitedSizeStringWriter(stringWriter, length);
             JsonWriter jsonWriter = new JsonWriter(limitedWriter)) {
            GSON.toJson(param, param.getClass(), jsonWriter);
            return stringWriter.toString();
        } catch (LimitedStringException e) {
            // Reached the limit
            return stringWriter + JSON_TRUNCATION_SUFFIX;
        } catch (IOException e) {
            // Handle other IO exceptions, perhaps by logging or re-throwing
            throw new RuntimeException("Error generating JSON", e);
        }

    }

    static class LimitedStringException extends RuntimeException {
        public LimitedStringException(String message) {
            super(message);
        }
    }

    public static class LimitedSizeStringWriter extends Writer {
        private final Writer writer;
        private final int limit;
        private int currentLength = 0;

        public LimitedSizeStringWriter(Writer writer, int limit) {
            this.writer = writer;
            this.limit = limit;
        }

        @Override
        public void write(char[] cbuf, int off, int len) throws IOException {
            if (currentLength + len > limit) {
                writer.write(cbuf, off, limit - currentLength);
                currentLength = limit;
                // 通过抛异常的方式,可以立即停止对象转json。比使用return的方式更高效
                throw new LimitedStringException("Reached character limit");
            }

            writer.write(cbuf, off, len);
            currentLength += len;
        }

        @Override
        public void flush() throws IOException {
            writer.flush();
        }

        @Override
        public void close() throws IOException {
            writer.close();
        }
    }
}

五、自动降级和恢复

参数采集需要尽量不影响应用,以及在应用自身压力大时,采集参数的功能能自动降级。

1 GC频繁时降低采集频率

频繁GC时,参数采集频率自动降低,GC不频繁时再恢复采集频率。

定时通过GarbageCollectorMXBean.getCollectionCount()和GarbageCollectorMXBean.getCollectionCount.getCollectionTime()可以获取GC的次数和耗时。如果每分钟YGC超5次或FullGC超过2次,则降低采集的频率,这个规则不同应用根据情况自定义。

如果一段时间内GC不再频繁,则自动恢复。

2 对于大对象,降低采集频率

如果采集的参数的长度频繁到达序列化的最长长度限制,则可以自动降低对应endpoint的采集频率。

如果一段时间内大对象比较少,则自动恢复。

除了上面的情况,也可以扩展更全面的降级策略,例如根据CPU使用率、内存使用率等情况来控制。

六、小结

上面讲解采集参数的设计思路:

  • 通过修改Dubbo plugin实现采集Dubbo出入参数
  • 通过控制采集频率、异步序列化、自定义Json序列化策略的方式实现性能低损耗采集
  • 根据GC、大对象等情况适当降级采集频率

后面将再写一篇文章讲解如何实现参数采集相关的动态配置、以及如何实现单个链路全采集。