likes
comments
collection
share

[多线程] Disruptor-高性能队列

作者站长头像
站长
· 阅读数 12

Disruptor是什么?

官网介绍:github.com/LMAX-Exchan…(github.com/LMAX-Exchan…)

Disruptor是一个无锁的并发框架

Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。

特点:

  • 大大简化了并发程序开发的难度,性能上比Java本身提供的一些并发包好。
  • 是一个高性能异步处理框架,实现了观察者模式。
  • 是无锁的,CPU友好。它不会清除缓存中的数据,只会覆盖,降低了垃圾回收机制的启动频率。
  • 业务逻辑是纯内存操作,使用事件驱动方式。

Disruptor核心

Disruptor核心是一个RingBuffer

  • RingBuffer是一个数组,没有首尾指针。
  • RingBuffer是一个首尾相接的环,用于不同线程间传递数据的Buffer。
  • RingBuffer有一个序号,这个序号指向数组中下一个可用的元素。
  • 随着数据不断的填充这个数组,这个序号会一直增长,直到绕过这个环。
  • 前序号指向的元素,可以通过mod计算:序号%长度=索引。
  • 建议将长度设置为2的N次方,有利于二进制计算:序号&(长度-1)=索引。

如果RingBuffer满了,那么是继续覆盖,还是等待消费,这个是由生产者和消费者决定的。

假设RingBuffer满了,生产者有两个选择

  1. 等待RingBuffer有空位在填充
  2. 直接覆盖

同时消费者也可以做出选择

  1. 等待RingBuffer满了再消费
  2. RingBuffer填充一个就消费一个

Sequence

  • 通过顺序递增的序号来编号,管理正在进行交换的数据(事件)
  • 对数据处理的过程总是沿着需要逐个递增处理,实现线程安全
  • 一个Sequence用于跟踪标识某个特定的事件处理者的处理进度
    • 生产者和消费者都各自拥有各自的Sequence
    • 如果多个生产者和多个消费者,他们每一个都会拥有各自的Sequence
  • Sequence可以看成是一个volatile long 类型的数值

Sequencer

  • Sequencer是Disruptor高性能的核心
  • 他是一个interface,主要实现生产者和消费者的快速、正确的传递数据的并发算法

SequenceBarrier

  • 用于保持RingBuffer的生产者和消费者之间的平衡关系
  • 决定消费者是否还有可处理事件的逻辑

EventProcessor

主要事件循环,处理Disruptor中的Event,拥有消费者的EventProcessor

Disruptor Quick Start

官网参考:github.com/LMAX-Exchan…(github.com/LMAX-Exchan…)

开发模型

  1. 定义Event,代表Disruptor所处理的数据单元
/**
 * @author fangxi
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {

    private Long id;
    private String productName;
    private BigDecimal price;

}
  1. 定义Event工厂,实现EventFactocy<?>接口,用来填充RingBuffer容器,如果使用1.8的话,则不需要这个工厂对象,直接在构造方法里面OrderEvent::new
/**
 * @author fangxi
 * <p>
 * 创建orderEvent对象
 */
public class OrderEventFactory implements EventFactory<OrderEvent> {

    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }

}
  1. 定义Event处理器(消费者),实现EventHandler<?>,用来从RingBuffer中取出数据并处理。如果使用1.8处理,可以不需要这个类,用lambda代替
/**
 * @author fangxi
 * 消费者
 */
public class OrderEventHandler implements EventHandler<OrderEvent> {

    /**
     * 有消息发送,这个就会被监听
     */
    @Override
    public void onEvent(OrderEvent orderEvent,long sequence, boolean endOfBatch) throws Exception {
        System.out.println(orderEvent);
    }
}
  1. 组合1-3步
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.math.BigDecimal;

/**
 * @author fangxi
 */
public class DisruptorTest {

    public static void main(String[] args) {
        int bufferSize = 1024;
        // 实例化Disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<>(OrderEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
        // 添加消费者监听
        // disruptor.handleEventsWith(new OrderEventHandler());
        // 消费者,lambda表示
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("消费者:" + event));
        // 启动
        // 实际存储数据的容器
        RingBuffer<OrderEvent> ringBuffer = disruptor.start();
        // 投递100条数据
        for (int i = 0; i < 100; i++) {
            // 生产者,就用lambda表示
            ringBuffer.publishEvent((event, sequence) -> {
                event.setId(sequence);
                event.setPrice(new BigDecimal("100").add(new BigDecimal(sequence)));
                event.setProductName(sequence + "号商品");
            });
        }
        disruptor.shutdown();
    }

}

Disruptor高阶操作

涉及到的Event对象

@Data
public class Trade {

    private String id;
    private String name;
    private String nickname;
    private BigDecimal price;

}

串形操作和并行操作

让消费者按照我们制定的顺序操作

先设置三个消费者

// 三个消费者,将分配设置name,id和打印
EventHandler<Trade> nameHandler = (trade, sequence, endOfBatch) -> {
    System.out.println("Set Name");
    trade.setName("name" + sequence);
		// 模拟耗时操作
    TimeUnit.SECONDS.sleep(1);
};
EventHandler<Trade> idHandler = (trade, sequence, endOfBatch) -> {
    System.out.println("Set Id");
    trade.setId(UUID.randomUUID().toString());
    TimeUnit.SECONDS.sleep(1);
};
EventHandler<Trade> printHandler = (trade, sequence, endOfBatch) -> {
    System.out.println(trade);
};

串形操作

// 串形操作,按照顺序操作
disruptor.handleEventsWith(idHandler).handleEventsWith(nameHandler).handleEventsWith(printHandler);

串形操作打印的结果和handleEventsWith的链式调用顺序有关

并行操作

// 并行操作,三个handler并行执行
disruptor.handleEventsWith(idHandler, nameHandler, printHandler);

生产者

Random random = new Random();
// 也可以直接通过Disruptor对象提交任务
disruptor.publishEvent((trade, sequence) -> {
    double price = random.nextDouble() * 9999;
    trade.setPrice(new BigDecimal(String.valueOf(price)));
});

菱形操作

让串行操作和并行操作同时存在

上面的例子中,可以设置idHandler和设置nameHandler可以同时执行,让printHandler在这两个之后执行

// 菱形操作,让idHandler和nameHandler同时执行,printHandler在这两个操作之后在执行
disruptor.handleEventsWith(idHandler, nameHandler).handleEventsWith(printHandler);
// 也可以这样写
disruptor.handleEventsWith(idHandler, nameHandler).then(printHandler);

多边形操作

实现多边形操作,需要再新增两个handler

// 新增两个handler
EventHandler<Trade> nicknameHandler = (trade, sequence, endOfBatch) -> {
    System.out.println("nicknameHandler");
    trade.setNickname("[" + trade.getName() + "]");
};
EventHandler<Trade> idNotLineHandler = (trade, sequence, endOfBatch) -> {
    System.out.println("idNotLineHandler");
    trade.setId(trade.getId().replaceAll("-", ""));
};

要实现一下的操作

[多线程] Disruptor-高性能队列

// idHander和nameHandler并行
disruptor.handleEventsWith(idHandler, nameHandler);
// idHander之后执行idNotLineHandler
disruptor.after(idHandler).handleEventsWith(idNotLineHandler);
// nameHandler之后执行nicknameHandler
disruptor.after(nameHandler).handleEventsWith(nicknameHandler);
// 在idNotLineHandler, nicknameHandler之后执行printHandler
disruptor.after(idNotLineHandler, nicknameHandler).handleEventsWith(printHandler);