likes
comments
collection
share

熔断器防止重试风暴 | 并发编程

作者站长头像
站长
· 阅读数 10

简介

实现一个熔断器,防止视频上传的重试风暴,关注其中遇到的并发问题。

关键词:LongAdder、ConcurrentLinkedQueue、读写锁、volatile。

优雅重试

重试的风险:重试会加大直接下游的负载、重试会存在链路放大的效应。

退避策略

对于一些暂时性的错误,如网络抖动等,可能立即重试还是会失败,通常等待一小会儿再重试的话成功率会较高,并且也可能打散上游重试的时间,较少因为同时都重试而导致的下游瞬间流量高峰。

如何优雅地重试 (qq.com)

退避策略:决定等多久再重试。

  • 线性退避:Thread.sleep(200);
  • 随机退避:Thread.sleep(100L * (new Random().nextInt(3) + 1));
  • 指数退避:Thread.sleep((long) (100 * Math.pow(2, retryCount)));

防止重试风暴

首先要在单点进行限制,一个服务不能不受限制的重试下游,很容易造成下游被打挂。除了限制用户设定的重试次数上限外,更重要的是限制重试请求的成功率。

如何优雅地重试 (qq.com)

我使用一个熔断器限制单点重试,防止熔断风暴,主要组件为一个滑动窗口。

组件

  1. Bucket:记录了1s内请求的结果
    • 1s内的成功数:用private final修饰
    • 1s内的失败数,用private final修饰
  2. window:滑动窗口,使用队列实现,包含10个Bucket
    • 新1s到来时,出队最老的Bucket,入队新的Bucket
    • 有失败时,通过统计window中成功与失败的比例,判断是否熔断
  3. fused:熔断标志位,使用volatile修饰
  4. 两个LongAdder
    • 记录最新1s内的成功数
    • 记录最新1s内的失败数
  5. 线程池ScheduledThreadPool:使用周期为1s的定时任务,控制滑动窗口

组件分析

Bucket

  • private成员常量两个,用于存储1s内的成功数和1s内的失败数
  • 有参构造方法一个,用于初始化两个私有成员常量
  • public方法两个,分别用于获取两个成员常量的值

为什么Bucket中是两个成员常量,而不是成员变量?后面讲滑动窗口的时候会说。

LongAdder

两个LongAdder用来记录当前1s内的成功数和失败数,成功或失败时调用对应的increment()方法。

关于LongAdder我应该知道的:

  • Java8引入,空间换时间
  • 多段锁。竞争激烈时,不同线程对应不同cell上
  • AtomicLong保证并发安全的原理
    • private volatile long value;
    • Unsafe的CAS
  • LongAdder保证并发安全的原理
    • volatile long base:竞争不激烈,直接累加到该变量上
    • volatile Cell[] cells:竞争激烈,各个线程分散累加到自己的槽cells[i]中

window

window是一个队列,我使用ConcurrentLinkedQueue实现。

逻辑上队列中有10个Bucket,表示10s前到现在这10s间成功和失败的情况。但是基于并发安全和效率的考虑,队列中只存储了不包括当前1s的前9s的数据,也就是说队列中只有9个Bucket。

这也是为什么Bucket中设计为成员常量,而不是成员变量,因为前9s的数据已经成为历史了,不会被修改了。而当前1s的信息由LongAdder记录。

调整窗口

public void adjustWindow() {
    window.poll();
    Bucket bucket = new Bucket(successfulCount.sumThenReset(), failedCount.sumThenReset());
    window.offer(bucket);
}

注意:要使用sumThenReset()方法,而不是sum()方法,因为LongAdder是重复使用的。

判断是否需要熔断

上传视频出现失败时会判断是否需要熔断。

public boolean isNeedBlow() {
    long successful = successfulCount.sum();
    long failed = failedCount.sum();
    for (Bucket bucket : window) {
        successful += bucket.getSuccessfulCount();
        failed += bucket.getFailedCount();
    }
    return ((double) failed / successful > CRITICAL_POINT);
}

根据successfulfailed的比例判断是否需要熔断。

fused

熔断标记变量,视频上传前会检查fused,如果已经熔断将拒绝本次请求。

使用volatile关键字修饰,可以保证共享变量在多处理开发中的可见性,且成本更低,因为不会引起线程的切换和调度。

问题解决

熔断器的核心是滑动窗口和LongAdder,我们解决其中出现的并发问题。


为什么会出现并发问题?

我们先看一下操作滑动窗口的两个时机:

  • 定时任务每1s会调用adjustWindow(),调整滑动窗口,统计并归零LongAdder
  • 出现上传失败会调用isNeedBlow(),统计LongAdder,遍历滑动窗口

很显然,两个时机可能同时进行,这样就产生了并发问题。


但是滑动窗口我们使用了ConcurrentLinkedQueue实现,而两个LongAdder也都是并发安全的,这样还会出现并发安全问题么?

如果我们使用上面的实现,我认为是会出现并发问题的。我们使用了三个并发安全的组件:ConcurrentLinkedQueue、LongAdder1、LongAdder2,他们单独使用时一定是并发安全的,但是组合使用时就需要额外考虑并发安全问题。

这就是著名的并发不等式:并发安全 + 并发安全 != 并发安全。(我编的😋)


我们将使用加锁锁解决这个问题。

再分析adjustWindow()isNeedBlow()两个函数

  • adjustWindow():调整了队列中的元素,且修改了LongAdder的值(归零)
  • isNeedBlow():获取了LongAdder的值,遍历了队列

可以看出,isNeedBlow()是读操作,adjustWindow()是写操作,且读操作更频繁。所以我选择使用ReentrantReadWriteLock用来解决上面提到的并发安全问题。


我们可以使用读锁将整个isNeedBlow()锁住,使用写锁将整个adjustWindow()锁住,但是这样锁的粒度太大了,我们试试能不能进行优化。


写到这里出现了个重大问题:ConcurrentLinkedQueue的遍历不是线程安全的。

ConcurrentLinkedQueue 遍历是线程不安全的, 线程1遍历,线程2很有可能进行入列出列操作, 所以ConcurrentLinkedQueue 的size是变化。换句话说,要想安全遍历ConcurrentLinkedQueue 队列,必须额外加锁。

ConcurrentLinkedQueue深度解析_撇不完的博客-CSDN博客

目前有两个解决方案,使用一个普通的队列+读写锁,或者使用一个遍历是线程安全的队列。

我使用LinkedBlockingQueue代替ConcurrentLinkedQueue 实现滑动窗口中的队列。

但是经过我的测试,遍历LinkedBlockingQueue时还是可以向其中添加或者删除元素,这与我们的要求不符合,所以我将直接把操作队列的代码加锁,也就是我们优化锁粒度的尝试失败了。


修改后的代码如下:

public void adjustWindow() {
    writeLock.lock();
    try {
        window.poll();
        Bucket bucket = new Bucket(successfulCount.sumThenReset(), failedCount.sumThenReset());
        window.offer(bucket);
    }finally {
        writeLock.unlock();
    }
}
public boolean isNeedBlow() {
    readLock.lock();
    long successful;
    long failed;
    try {
        successful = successfulCount.sum();
        failed = failedCount.sum();
        for (Bucket bucket : window) {
            successful += bucket.getSuccessfulCount();
            failed += bucket.getFailedCount();
        }
    }finally {
        readLock.unlock();
    }
    return ((double) failed / successful > CRITICAL_POINT);
}

到此,我们实现了一个可以在并发环境下使用的熔断器,在上传失败比例过大时进行熔断,防止出现重试风暴。

其他

感谢字节跳动技术团队的文章:如何优雅地重试 (qq.com)