likes
comments
collection
share

rocketmq高可用之二开broker实现topic限流

作者站长头像
站长
· 阅读数 9

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

源码版本

  • 5.1.0

背景

rocketmq 高可用设计中必不可少的就是限流,如果我们想让我们的rocketmq集群稳定不被客户端打爆,我们最好是在topic添加限流,防止某个topic的生产者异常发送大量的消息打爆rocketmq集群

调研

通过调研发现rocketmq官方并未提供相关的限流方案,其实像一些云厂商的rocketmq也一般会有限流,比如火山引擎的rocketmq

rocketmq高可用之二开broker实现topic限流

topic限流

通过源码研究最终打算先在topic上面限流

实现

我们先简单研究下rocketmq的netty线程模型配置 入口是在NettyRemotingServer 在构造方法NettyRemotingServer中初始化bossGroupworkGroup两个线程池

rocketmq高可用之二开broker实现topic限流

        this.eventLoopGroupBoss = buildBossEventLoopGroup();
        this.eventLoopGroupSelector = buildEventLoopGroupSelector();

bossGroup

我们先看bossGroup是使用几个线程,一般都是一个

可以看到bossGroup 不管是使用epoll还是nio,都是使用一个线程,专门用来处理连接事件

rocketmq高可用之二开broker实现topic限流

并且是写死的,实际我们也并需要去修改成多个线程,毕竟我们只监听一个端口

workGroup

rocketmq高可用之二开broker实现topic限流

实际wrokGroup的线程数量是由nettyServerConfig.getServerSelectorThreads()决定的,我们进去看看默认数量

rocketmq高可用之二开broker实现topic限流

可以看到默认是三个

ChannelPipeline 初始化

ChannelPipeline初始化的方法主要是在configChannel方法中初始化的

        protected ChannelPipeline configChannel(SocketChannel ch) {
        return ch.pipeline()
            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
            .addLast(defaultEventExecutorGroup,
                encoder,
                new NettyDecoder(),
                distributionHandler,
                new IdleStateHandler(0, 0,
                    nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                connectionManageHandler,
                serverHandler
            );
    }

我们可以看到业务处理+编解码啥的都是用的一个新的线程池defaultEventExecutorGroup

defaultEventExecutorGroup默认的线程数是8个,这里我们可以通过defaultEventExecutorGroup初试的源码看到

rocketmq高可用之二开broker实现topic限流

这里是通过方法nettyServerConfig.getServerWorkerThreads() 获取的,默认就是8

rocketmq高可用之二开broker实现topic限流

rocketmq netty通行模型

所以我们可以总结出rocketmq的通信模型如下

rocketmq高可用之二开broker实现topic限流

我们需要添加限流只需要在业务处理器serverHandler前添加一个LimiterHandlerhandler即可。

LimiterHandler 细节

添加LimiterHandler我们需要注意一下细节

  1. LimiterHandler最好使用新的线程池,不影响原先的处理,防止限流线程池会耗尽原先的DefaultEventExecutorGroup线程池
  2. LimiterHandler线程池的阻塞策略应该是如果阻塞则跳过等待继续向下执行,不能影响事件的传播
  3. 等待队列尽量很小,防止等待太久影响原有的业务逻辑

限流算法选择

我们这里直接使用guava包里提供的限流算法即可

实现

  1. 新增是否开启限流配置
  • NettyServerConfig.class

添加属性

private boolean limiterHandlerEnable = false;

private int limiterThreads = 2;
  • 限流默认false,不开启
  • 线程池大小默认为2
  1. 新增限流的EventExecutorGroup

NettyRemotingServer中新增如下属性

private EventExecutorGroup limitEventExecutorGroup;

初始化我们和defaultEventExecutorGroup初始化保持一致在start()中完成初始化

this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        if (nettyServerConfig.isLimiterHandlerEnable()) {
            this.limitEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getLimiterThreads(), new ThreadFactory() {
                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "limiterThread_" + this.threadIndex.incrementAndGet());
                }
            }, 16, (task, executor) -> {
                String threadPoolName = executor.parent().toString();
                log.error("RateLimitHandler reject task, {} pendingTasks:{}", threadPoolName,
                    executor.pendingTasks());
                // 继续执行任务
                task.run();
            });
        }

注意这里我们重写的拒绝策略是继续执行任务,防止影响到正常的时间传播 队列大小我们给的最小16,小于16 netty这里也会给16

新增限流handler

我们直接在NettyRemotingServer中新建一个内部类

    @ChannelHandler.Sharable
    class NettyTopicLimiter extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) {
            try {
                // 非当前线程池直接触发下一个事件
                if (!ctx.executor().inEventLoop()) {
                    ctx.fireChannelRead(msg);
                    return;
                }
                //非发送消息
                int code = msg.getCode();
                if (code == RequestCode.SEND_MESSAGE || code == RequestCode.SEND_MESSAGE_V2 || code == RequestCode.SEND_BATCH_MESSAGE) {
                    int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
                    NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort);

                    Pair<NettyRequestProcessor, ExecutorService> pair = remotingAbstract.processorTable.get(RequestCode.SEND_MESSAGE_LIMIT);
                    if (pair == null) {
                        ctx.fireChannelRead(msg);
                        return;
                    }
                    //限流
                    if (pair.getObject1().rejectRequest(ctx, msg)) {
                        RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "rateLimit");
                        writeResponse(ctx.channel(), msg, response);
                        return;
                    }
                    ctx.fireChannelRead(msg);

                } else {
                    ctx.fireChannelRead(msg);
                }
                
            } catch (Exception e) {
                log.error("limiter error", e);
                ctx.fireChannelRead(msg);
            }

        }
    }

限流属性

我们在NettyRemotingServer中新增nettyTopicLimiter属性 private NettyTopicLimiter nettyTopicLimiter;

限流handler初始化

我们在NettyRemotingServerprepareSharableHandlers方法中初始化,和原来保持一致

    private void prepareSharableHandlers() {
        handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
        encoder = new NettyEncoder();
        connectionManageHandler = new NettyConnectManageHandler();
        serverHandler = new NettyServerHandler();
        if (nettyServerConfig.isLimiterHandlerEnable()) {
            this.nettyTopicLimiter = new NettyTopicLimiter();
        }
        distributionHandler = new RemotingCodeDistributionHandler();
    }

pipeline添加handerl

protected ChannelPipeline configChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline()
            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
            .addLast(defaultEventExecutorGroup,
                encoder,
                new NettyDecoder(),
                distributionHandler,
                new IdleStateHandler(0, 0,
                    nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                connectionManageHandler);
        if (nettyServerConfig.isLimiterHandlerEnable()) {
            pipeline.addLast(limitEventExecutorGroup, nettyTopicLimiter);
        }
        return pipeline.addLast(defaultEventExecutorGroup, serverHandler);
    }

添加限流业务处理器

我们需要添加一个限流业务处理器,实现实际的限流逻辑

我们放在这个包下 rocketmq高可用之二开broker实现topic限流

public class SendMessageLimiterProcessor implements NettyRequestProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

    protected final TopicConfigManager topicConfigManager;

    protected static volatile Map<String, RateLimiter> limiterMap = new ConcurrentHashMap<>();
    

    public SendMessageLimiterProcessor(TopicConfigManager topicConfigManager) {
        this.topicConfigManager = topicConfigManager;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception {
        return null;
    }

    @Override
    public boolean rejectRequest() {
        return false;
    }

    @Override
    public boolean rejectRequest(ChannelHandlerContext ctx, RemotingCommand cmd) throws RemotingCommandException {
        //获取topic
        SendMessageRequestHeader header = SendMessageRequestHeader.parseRequestHeader(cmd);
        String topic = header.getTopic();
        TopicConfig topicConfig = topicConfigManager.getTopicConfigTable().get(topic);
        int qps = topicConfig.getQps();
        RateLimiter limiter = getRateLimiter(topic, qps);
        return !limiter.tryAcquire();
    }
    
    public static RateLimiter getRateLimiter(String topic, int qps) {
        RateLimiter limiter = limiterMap.get(topic);
        if (Objects.isNull(limiter)) {
            synchronized (SendMessageLimiterProcessor.class) {
                if (Objects.isNull(limiter)) {
                    limiter = RateLimiter.create(qps);
                    limiterMap.put(topic, limiter);
                }
            }
        }
        return limiter;
    }

    /**
     * update qps
     * @param topic topic
     * @param qps qps
     */
    public static void updateTopicRateLimiter(String topic, int qps) {
        RateLimiter limiter = limiterMap.get(topic);
        if (Objects.nonNull(limiter)) {
            limiter.setRate(qps);
        }
    }
}

qps限流配置

  • TopicConfig.class qps限流配置我们扩展topic的元数据,新增qps字段
private int qps = 0;

测试

我们可以先一个并发测试工具类测试

public class ConcurrentControlUtils {

    public static final int MESSAGE_COUNT = 10;
    public static final String PRODUCER_GROUP = "xiaozou-topic-producer";
    public static final String TOPIC = "xiaozou-topic";

    public static final String TAG = "xiaozou";

    public static void main(String[] args) throws Exception {
        int count = 20;
        
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        System.setProperty("rocketmq.namesrv.domain", "xiaozou:80");
        System.setProperty("rocketmq.namesrv.domain.subgroup", "nsaddr-1");
//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.start();

        DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        ConcurrentControlUtils.process(() -> {
            try {
                Message msg = new Message(TOPIC /* Topic */,
                    TAG /* Tag */,
                    "Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg, 10000);
                System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
            } catch (Exception e) {
                // ignore Exception
                e.printStackTrace();

            }

        }, count);

        producer.shutdown();
    }

    public static void process(Runnable runnable) {
        process(runnable, 200);
    }

    public static void process(Runnable runnable, int concurrentThreadNum) {

        /**
         * 并发数
         */
        if (concurrentThreadNum <= 0) {
            concurrentThreadNum = 200;
        }

        CountDownLatch blockLatch = new CountDownLatch(concurrentThreadNum);
        ExecutorService threadPool = Executors.newFixedThreadPool(concurrentThreadNum);

        for (int i = 0; i < concurrentThreadNum; i++) {
            threadPool.submit(() -> {
                try {
                    blockLatch.await();
                    runnable.run();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            });
            if (i == concurrentThreadNum - 1) {
                System.out.printf("并发测试开始");
            }
            blockLatch.countDown();

        }

        blockingMainThread(threadPool);

    }

    private static void blockingMainThread(ExecutorService threadPool) {
        threadPool.shutdown();
        while (!threadPool.isTerminated()) {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.printf("线程池关闭");
    }
}

效果

rocketmq高可用之二开broker实现topic限流

总结

核心源码大致是如上,我们扩展了netty通信模块,同时修改了topic的元数据管理。