rocketmq高可用之二开broker实现topic限流
这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
源码版本
- 5.1.0
背景
rocketmq 高可用设计中必不可少的就是限流,如果我们想让我们的rocketmq
集群稳定不被客户端打爆,我们最好是在topic
添加限流,防止某个topic
的生产者异常发送大量的消息打爆rocketmq
集群
调研
通过调研发现rocketmq
官方并未提供相关的限流方案,其实像一些云厂商的rocketmq
也一般会有限流,比如火山引擎的rocketmq
topic限流
通过源码研究最终打算先在topic
上面限流
实现
我们先简单研究下rocketmq
的netty线程模型配置
入口是在NettyRemotingServer
在构造方法NettyRemotingServer
中初始化bossGroup
、workGroup
两个线程池
this.eventLoopGroupBoss = buildBossEventLoopGroup();
this.eventLoopGroupSelector = buildEventLoopGroupSelector();
bossGroup
我们先看bossGroup
是使用几个线程,一般都是一个
可以看到bossGroup
不管是使用epoll
还是nio
,都是使用一个线程,专门用来处理连接事件
并且是写死的,实际我们也并需要去修改成多个线程,毕竟我们只监听一个端口
workGroup
实际wrokGroup
的线程数量是由nettyServerConfig.getServerSelectorThreads()
决定的,我们进去看看默认数量
可以看到默认是三个
ChannelPipeline 初始化
ChannelPipeline
初始化的方法主要是在configChannel
方法中初始化的
protected ChannelPipeline configChannel(SocketChannel ch) {
return ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
distributionHandler,
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
我们可以看到业务处理+编解码啥的都是用的一个新的线程池defaultEventExecutorGroup
defaultEventExecutorGroup
默认的线程数是8个,这里我们可以通过defaultEventExecutorGroup
初试的源码看到
这里是通过方法nettyServerConfig.getServerWorkerThreads()
获取的,默认就是8
rocketmq netty通行模型
所以我们可以总结出rocketmq
的通信模型如下
我们需要添加限流只需要在业务处理器serverHandler
前添加一个LimiterHandler
handler即可。
LimiterHandler 细节
添加LimiterHandler
我们需要注意一下细节
LimiterHandler
最好使用新的线程池,不影响原先的处理,防止限流线程池会耗尽原先的DefaultEventExecutorGroup
线程池LimiterHandler
线程池的阻塞策略应该是如果阻塞则跳过等待继续向下执行,不能影响事件的传播- 等待队列尽量很小,防止等待太久影响原有的业务逻辑
限流算法选择
我们这里直接使用guava
包里提供的限流算法即可
实现
- 新增是否开启限流配置
- NettyServerConfig.class
添加属性
private boolean limiterHandlerEnable = false;
private int limiterThreads = 2;
- 限流默认false,不开启
- 线程池大小默认为2
- 新增限流的
EventExecutorGroup
NettyRemotingServer
中新增如下属性
private EventExecutorGroup limitEventExecutorGroup;
初始化我们和defaultEventExecutorGroup
初始化保持一致在start()
中完成初始化
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
if (nettyServerConfig.isLimiterHandlerEnable()) {
this.limitEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getLimiterThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "limiterThread_" + this.threadIndex.incrementAndGet());
}
}, 16, (task, executor) -> {
String threadPoolName = executor.parent().toString();
log.error("RateLimitHandler reject task, {} pendingTasks:{}", threadPoolName,
executor.pendingTasks());
// 继续执行任务
task.run();
});
}
注意这里我们重写的拒绝策略是继续执行任务,防止影响到正常的时间传播
队列大小我们给的最小16,小于16 netty
这里也会给16
新增限流handler
我们直接在NettyRemotingServer
中新建一个内部类
@ChannelHandler.Sharable
class NettyTopicLimiter extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) {
try {
// 非当前线程池直接触发下一个事件
if (!ctx.executor().inEventLoop()) {
ctx.fireChannelRead(msg);
return;
}
//非发送消息
int code = msg.getCode();
if (code == RequestCode.SEND_MESSAGE || code == RequestCode.SEND_MESSAGE_V2 || code == RequestCode.SEND_BATCH_MESSAGE) {
int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort);
Pair<NettyRequestProcessor, ExecutorService> pair = remotingAbstract.processorTable.get(RequestCode.SEND_MESSAGE_LIMIT);
if (pair == null) {
ctx.fireChannelRead(msg);
return;
}
//限流
if (pair.getObject1().rejectRequest(ctx, msg)) {
RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "rateLimit");
writeResponse(ctx.channel(), msg, response);
return;
}
ctx.fireChannelRead(msg);
} else {
ctx.fireChannelRead(msg);
}
} catch (Exception e) {
log.error("limiter error", e);
ctx.fireChannelRead(msg);
}
}
}
限流属性
我们在NettyRemotingServer
中新增nettyTopicLimiter
属性
private NettyTopicLimiter nettyTopicLimiter;
限流handler初始化
我们在NettyRemotingServer
的prepareSharableHandlers
方法中初始化,和原来保持一致
private void prepareSharableHandlers() {
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
if (nettyServerConfig.isLimiterHandlerEnable()) {
this.nettyTopicLimiter = new NettyTopicLimiter();
}
distributionHandler = new RemotingCodeDistributionHandler();
}
pipeline添加handerl
protected ChannelPipeline configChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
distributionHandler,
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler);
if (nettyServerConfig.isLimiterHandlerEnable()) {
pipeline.addLast(limitEventExecutorGroup, nettyTopicLimiter);
}
return pipeline.addLast(defaultEventExecutorGroup, serverHandler);
}
添加限流业务处理器
我们需要添加一个限流业务处理器,实现实际的限流逻辑
我们放在这个包下
public class SendMessageLimiterProcessor implements NettyRequestProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final TopicConfigManager topicConfigManager;
protected static volatile Map<String, RateLimiter> limiterMap = new ConcurrentHashMap<>();
public SendMessageLimiterProcessor(TopicConfigManager topicConfigManager) {
this.topicConfigManager = topicConfigManager;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception {
return null;
}
@Override
public boolean rejectRequest() {
return false;
}
@Override
public boolean rejectRequest(ChannelHandlerContext ctx, RemotingCommand cmd) throws RemotingCommandException {
//获取topic
SendMessageRequestHeader header = SendMessageRequestHeader.parseRequestHeader(cmd);
String topic = header.getTopic();
TopicConfig topicConfig = topicConfigManager.getTopicConfigTable().get(topic);
int qps = topicConfig.getQps();
RateLimiter limiter = getRateLimiter(topic, qps);
return !limiter.tryAcquire();
}
public static RateLimiter getRateLimiter(String topic, int qps) {
RateLimiter limiter = limiterMap.get(topic);
if (Objects.isNull(limiter)) {
synchronized (SendMessageLimiterProcessor.class) {
if (Objects.isNull(limiter)) {
limiter = RateLimiter.create(qps);
limiterMap.put(topic, limiter);
}
}
}
return limiter;
}
/**
* update qps
* @param topic topic
* @param qps qps
*/
public static void updateTopicRateLimiter(String topic, int qps) {
RateLimiter limiter = limiterMap.get(topic);
if (Objects.nonNull(limiter)) {
limiter.setRate(qps);
}
}
}
qps限流配置
- TopicConfig.class qps限流配置我们扩展topic的元数据,新增qps字段
private int qps = 0;
测试
我们可以先一个并发测试工具类测试
public class ConcurrentControlUtils {
public static final int MESSAGE_COUNT = 10;
public static final String PRODUCER_GROUP = "xiaozou-topic-producer";
public static final String TOPIC = "xiaozou-topic";
public static final String TAG = "xiaozou";
public static void main(String[] args) throws Exception {
int count = 20;
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
System.setProperty("rocketmq.namesrv.domain", "xiaozou:80");
System.setProperty("rocketmq.namesrv.domain.subgroup", "nsaddr-1");
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
ConcurrentControlUtils.process(() -> {
try {
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
"Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
} catch (Exception e) {
// ignore Exception
e.printStackTrace();
}
}, count);
producer.shutdown();
}
public static void process(Runnable runnable) {
process(runnable, 200);
}
public static void process(Runnable runnable, int concurrentThreadNum) {
/**
* 并发数
*/
if (concurrentThreadNum <= 0) {
concurrentThreadNum = 200;
}
CountDownLatch blockLatch = new CountDownLatch(concurrentThreadNum);
ExecutorService threadPool = Executors.newFixedThreadPool(concurrentThreadNum);
for (int i = 0; i < concurrentThreadNum; i++) {
threadPool.submit(() -> {
try {
blockLatch.await();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
if (i == concurrentThreadNum - 1) {
System.out.printf("并发测试开始");
}
blockLatch.countDown();
}
blockingMainThread(threadPool);
}
private static void blockingMainThread(ExecutorService threadPool) {
threadPool.shutdown();
while (!threadPool.isTerminated()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("线程池关闭");
}
}
效果
总结
核心源码大致是如上,我们扩展了netty
通信模块,同时修改了topic
的元数据管理。
转载自:https://juejin.cn/post/7216587974007226428