likes
comments
collection
share

单机无锁线程安全队列-Disruptor

作者站长头像
站长
· 阅读数 20

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。 而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

单机无锁线程安全队列-Disruptor

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>4.0.0</version>
        </dependency>

定义一个商品

@Data
public class Goods {

    private String name;

}

定义生产者

public class Producer {
    private final RingBuffer<Goods> ringBuffer;

    public Producer(RingBuffer<Goods> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * 生产货品
     * @param goodsName
     */
    public void onData(String goodsName) {
        long sequence = ringBuffer.next();
        try {
            Goods goods = ringBuffer.get(sequence);
            goods.setName(goodsName);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{

    private String name;

    public Consumer(String name){
        this.name = name;
    }

    @Override
    public void onEvent(Goods goods, long l, boolean b)  {
        //消费者接收到货品
        System.out.println(name+"消费了"+goods.getName());
    }

    @Override
    public void onBatchStart(long batchSize, long queueDepth) {
        EventHandler.super.onBatchStart(batchSize, queueDepth);
    }

    @Override
    public void onStart() {
        EventHandler.super.onStart();
    }

    @Override
    public void onShutdown() {
        EventHandler.super.onShutdown();
    }

    @Override
    public void onTimeout(long sequence) throws Exception {
        EventHandler.super.onTimeout(sequence);
    }

    @Override
    public void setSequenceCallback(Sequence sequenceCallback) {
        EventHandler.super.setSequenceCallback(sequenceCallback);
    }
}

一个生产者对一个消费者

单机无锁线程安全队列-Disruptor

public class DisruptorDemo {
    
    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.SINGLE,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        //单生产者,单消费者
        disruptor.handleEventsWith(new Consumer("Consumer1"));
        disruptor.start();
        Producer producer = new Producer(ringBuffer);
        while (true){
            producer.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

一个生产者对多个消费者

消费者按顺序消费:

单机无锁线程安全队列-Disruptor

public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.MULTI,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        //多个消费者按顺序消费
        disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
        disruptor.start();
        Producer producer = new Producer(ringBuffer);
        while (true){
            producer.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

多播模式,同一事件可以交给多个消费者处理

单机无锁线程安全队列-Disruptor 只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费
   disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3"))
   .then(new Consumer("Consumer4"));

多个生产者对多个消费者

单机无锁线程安全队列-Disruptor

public class DisruptorDemo {

    public static void main(String[] args) throws InterruptedException {
        Disruptor<Goods> disruptor = new Disruptor<>(
                Goods::new,
                16,  // RingBuffer 大小,必须是 2 的 N 次方
                Executors.defaultThreadFactory(), //线程池
                ProducerType.MULTI,   //指定单生产者还是多生产者
                new YieldingWaitStrategy() //等待策略
        );
        RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();
        disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));
        disruptor.start();
        Producer producer1 = new Producer(ringBuffer);
        Producer producer2 = new Producer(ringBuffer);
        Producer producer3 = new Producer(ringBuffer);
        while (true){
            producer1.onData("goods"+UUID.randomUUID());
            producer2.onData("goods"+UUID.randomUUID());
            producer3.onData("goods"+UUID.randomUUID());
            Thread.sleep(1000);
        }
    }
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

单机无锁线程安全队列-Disruptor 看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

单机无锁线程安全队列-Disruptor

单机无锁线程安全队列-Disruptor

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

单机无锁线程安全队列-Disruptor 1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。 2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。 3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。 4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

单机无锁线程安全队列-Disruptor 5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:tech.meituan.com/2016/11/18/…

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。 Disruptor目前一共内置了8种等待策略。

单机无锁线程安全队列-Disruptor

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,jym,点个赞再走啦!~