likes
comments
collection
share

开箱即用!动态修改RocketMQ线程池

作者站长头像
站长
· 阅读数 62

前言

大家好,我是小郭,上一篇文章中提到了动态修改RocketMQ线程池,那我们如何才能够通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。

另外,从功能性以及健壮性而言还有许多值得我们思考的地方,可以通过查看线程池运行时指标、负载报警、配置日志管理来提高我们对线程池的管理。

开箱即用!动态修改RocketMQ线程池

SpringBoot RocketMQ 实践

版本信息:

rocketmq-spring-boot-starter : 2.2.2

spring-boot-starter :2.3.2.RELEASE

为了满足业务需求,我们初始化三个 Consumer。

并且设置成同一个 Topic 和不同的消费者组 ConsumerGroup,线程池数量都设置为 20。

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic", consumeThreadNumber = 20)
public class MessageConsume implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info(Thread.currentThread().getName());
        log.info("Message: {}", message);
    }
}

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic_1", consumeThreadNumber = 20)
public class MessageConsume implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info(Thread.currentThread().getName());
        log.info("Message: {}", message);
    }
}

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic_2", consumeThreadNumber = 20)
public class MessageConsume implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info(Thread.currentThread().getName());
        log.info("Message: {}", message);
    }
}

生产者:

@GetMapping("/message/send")
public String sendMessage() {

    IntStream.range(0, 1000).forEach(i -> {
        rocketMQTemplate.convertAndSend("test-topic", new Date().toString());
    });
    return "success";
}

执行结果:

开箱即用!动态修改RocketMQ线程池

我们可以看到三个消费者都监听到了Topic的消息,并进行了消费。但是这个时候,由于生产消息数量暴增,需要提高消费者的并行消费速度。

思考片刻,只有最常规的使用方式就是修改线程池后,重新启动消费者。但是目前生产正在跑着业务数据,重启消费者必定会造成大量数据堆积甚至是丢失。

这时候如果能有一个页面动态修改线程池数量,那一定是非常赞的!

如何进行动态修改 RocketMQ 线程数?

思路:

  1. 收集消费者端信息。
  2. 将消费者端的线程池实例注册到服务中。
  3. 定时心跳检查实例状态。
  4. 从容器中拿到对应线程池,调用原生方法进行参数修改。

流程图:

开箱即用!动态修改RocketMQ线程池

实现逻辑:

第一步,收集消费者端信息。

开箱即用!动态修改RocketMQ线程池

第二步,通过DefaultRocketMQListenerContainer类获取到Bean对象的集合,将消费者执行器保存到容器中。

@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
    Map<String, DefaultRocketMQListenerContainer> containerMap =
            ApplicationContextHolder.getBeansOfType(DefaultRocketMQListenerContainer.class);
    try {
        for (DefaultRocketMQListenerContainer container : containerMap.values()) {
            DefaultMQPushConsumer defaultMQPushConsumer = container.getConsumer();
            if (defaultMQPushConsumer != null) {
                ConsumeMessageService consumeMessageService = defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getConsumeMessageService();
                ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor");
                rocketmqConsumeExecutor.put(container.getConsumerGroup(), consumeExecutor);
            }
        }
    } catch (Exception ex) {
        log.error("Failed to get RocketMQ thread pool.", ex);
    }
}

第三步,将消费者端实例注册到服务中。

开箱即用!动态修改RocketMQ线程池

第四步,调用接口将服务注册。

public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) {
    ...
    register();
}

boolean register() {
    log.info("{}{} - registering service...", PREFIX, appPathIdentifier);
    String urlPath = BASE_PATH + "/apps/register/";
    Result registerResult;
    try {
        registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceInfo);
    } catch (Exception ex) {
        registerResult = Results.failure(ErrorCodeEnum.SERVICE_ERROR);
        log.error("{}{} - registration failed: {}", PREFIX, appPathIdentifier, ex.getMessage());
    }
    if (log.isInfoEnabled()) {
        log.info("{}{} - registration status: {}", PREFIX, appPathIdentifier, registerResult.isSuccess() ? "success" : "fail");
    }
    return registerResult.isSuccess();
}

第五步,定时心跳检查消费者实例状态。

开箱即用!动态修改RocketMQ线程池

第六步,初始化定时任务线程池,实现心跳线程,检查实例信息逻辑。

public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) {
    // ...
    initScheduledTasks();
}
// 定时任务线程池
private void initScheduledTasks() {
    scheduler.scheduleWithFixedDelay(new HeartbeatThread(), 30, 30, TimeUnit.SECONDS);
}
// 心跳线程
public class HeartbeatThread implements Runnable {

    @Override
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

private boolean renew() {
    Result renewResult;
    try {
        InstanceInfo.InstanceRenew instanceRenew = new InstanceInfo.InstanceRenew()
                .setAppName(instanceInfo.getAppName())
                .setInstanceId(instanceInfo.getInstanceId())
                .setLastDirtyTimestamp(instanceInfo.getLastDirtyTimestamp().toString())
                .setStatus(instanceInfo.getStatus().toString());
        // 检查实例信息是否正常
        renewResult = httpAgent.httpPostByDiscovery(BASE_PATH + "/apps/renew", instanceRenew);
        if (Objects.equals(ErrorCodeEnum.NOT_FOUND.getCode(), renewResult.getCode())) {
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class);
            adapterRegister.register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return renewResult.isSuccess();
    } catch (Exception ex) {
        log.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, ex);
        return false;
    }
}
// 注册实例信息
public void register() {
    Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
    List<ThreadPoolAdapterCacheConfig> threadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap);
    doRegister(threadPoolAdapterCacheConfigs);
}

第七步,从容器中拿到对应线程池,调用原生方法进行参数修改。

开箱即用!动态修改RocketMQ线程池

最后,通过 threadPoolKey 拿到执行器,更新核心线程数和最大线程数。

@Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
    String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
    // 通过threadPoolKey拿到执行器
    ThreadPoolExecutor rocketMQConsumeExecutor = rocketmqConsumeExecutor.get(threadPoolKey);
    if (rocketMQConsumeExecutor != null) {
        int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
        int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize();
        // 更新核心线程数和最大线程数
        rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
        rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
        log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
                threadPoolKey,
                String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()),
                String.format(CHANGE_DELIMITER, originalMaximumPoolSize, rocketMQConsumeExecutor.getMaximumPoolSize()));
        return true;
    }
    log.warn("[{}] RocketMQ consuming thread pool not found.", threadPoolKey);
    return false;
}

我们在消费者端建立了三个消费者 Consumer,通过前端页面可以看到对应 RocketMQ 消费者线程池。

开箱即用!动态修改RocketMQ线程池

我们现在对第一个消费者进行参数的修改。

开箱即用!动态修改RocketMQ线程池

通过后台的日志,我们可以看到已经修改成功了。

2022-12-26 15:35:35.472  INFO 9726 --- [nio-8099-exec-2] c.h.s.s.c.ThreadPoolAdapterController    : [RocketMQ] Change third-party thread pool data. key: my-consumer_test-topic, coreSize: 100, maximumSize: 100
2022-12-26 15:35:35.473  INFO 9726 --- [nio-8099-exec-2] c.h.a.r.RocketMQThreadPoolAdapter        : [my-consumer_test-topic] RocketMQ consumption thread pool parameter change. coreSize: 20 => 100, maximumSize: 20 => 100

我们重新进行消息的生产,看看具体的效果,我们看到线程数量已经发生了变化/

开箱即用!动态修改RocketMQ线程池

利用这种方式,我们就可以解决常规重启系统来修改参数的问题。

上面我们主要利用了Hippo4j对RocketMQ线程池数量进行了修改,为了提高应用的健壮性,我们还有可以利用Hippo4j做到实时查看线程池运行时指标、负载报警、配置日志管理等事情。

如果大家对具体的代码实现感兴趣,欢迎大家到以下两个平台进行指导~

GitHub:github.com/opengoofy/h…

Gitee:gitee.com/magegoofy/h…

什么是 Hippo4j

为了避免大家重复的造轮子,大家只要在项目中引入 Hippo4j ,就能够进行 Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更。

Hippo4j 通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力。

提供以下功能支持:

  • 全局管控 - 管理应用线程池实例。
  • 动态变更 - 应用运行时动态变更线程池参数,包括不限于:核心、最大线程数、阻塞队列容量、拒绝策略等。
  • 通知报警 - 内置四种报警通知策略,线程池活跃度、容量水位、拒绝策略以及任务执行时间超长。
  • 运行监控 - 实时查看线程池运行时数据,最近半小时线程池运行数据图表展示。
  • 功能扩展 - 支持线程池任务传递上下文;项目关闭时,支持等待线程池在指定时间内完成任务。
  • 多种模式 - 内置两种使用模式:依赖配置中心无中间件依赖
  • 容器管理 - Tomcat、Jetty、Undertow 容器线程池运行时查看和线程数变更。
  • 框架适配 - Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更。

开箱即用!动态修改RocketMQ线程池

快速开始

对于本地演示目的,请参阅 Quick start

演示环境: console.hippo4j.cn/index.html

转载自:https://juejin.cn/post/7182017271399710778
评论
请登录