likes
comments
collection
share

【SpringBoot】使用 Redis 就可以当作MQ使用啦 ~

作者站长头像
站长
· 阅读数 22

工作中遇到的问题

提出问题

在最近的一个项目中,我们需要用到一个消息队列(MQ)来处理实时消息传递的需求。但是这个项目环境并没有(MQ)中间件。怎么办呢? 装一个 MQ ?

Redis 会不会是一个好的方案

因为我们项目使用 (MQ) 的场景不多,所以我们打算采用 Redis 作为中间件来实现这一个功能。使用 Redis 实现 (MQ) 这个功能并不难。但是如何可靠、稳定 的实现这个 (MQ) 就是问题的关键。

用 Redis 的几种可实施的方案

    1. Redis List:Redis的List数据结构可以被用作简单的消息队列。生产者将消息推送到List的尾部,消费者则从List的头部获取消息。这种方式简单易用,但在高并发情况下可能存在性能瓶颈。
    1. Redis Pub/Sub:Redis的发布订阅功能可以用来实现消息队列。生产者发布消息到指定的频道,而消费者订阅这个频道以接收消息。虽然简单,但Pub/Sub不支持消息持久化,且消息无法在消费者离线时被保存。
    1. Redis Streams:Redis 5.0引入了Streams数据结构,它提供了更复杂的消息队列功能,包括持久性、分组、消息消费确认等功能。因此,Redis Streams通常被视为更适合用作消息队列的数据结构。

Redis Streams 特点和好处

  1. 持久性:Redis Stream 支持消息持久化,可以将消息存储在内存中,也可以选择将消息保存到磁盘上,保证消息的持久性。
  2. 多消费者分组:Stream 支持将消费者分组,每个消费者组内的消费者可以共享一个消息流,多个消费者组可以并行消费消息。
  3. 消息消费确认:消费者可以对已经处理的消息进行确认,这样可以确保消息被正确处理,避免消息的重复消费。
  4. 消息时间序列:消息在 Stream 中是有序存储的,每条消息都有唯一的ID和时间戳,可以按照时间顺序进行消费。
  5. 复杂数据结构:Stream 支持复杂的消息结构,每个消息可以包含多个字段和值,这样可以存储更加丰富的消息内容。
  6. 消费者消费位置:消费者可以控制自己的消费位置,可以从头开始消费,也可以从某个特定的消息ID开始消费。

解决方案

所以我们采用 Redis Streams 来充当 (MQ)

SpringBoot 集成 Redis Streams 需要怎么做呢?

跟着我的步骤一步一步来,集成 RedisStreams 很简单。

Redis 的基础配置

首先我们项目中就已经有Redis了,而且已经集成好了,所以我们打算的是用现成的,Redis的基础配置请自行百度。

yml 文件配置

redis:
  mq:
    streams:
      # 生产者
      - keyName:  stream:key
        groups:
          # 消费组
          - groupName: stream_group
            consumers:
              # 消费者
              - consumerName : stream_consumers
                # 监听类
                listenerClass : com.xx.streamListener

实体类配置

主要用于装载基础配置的载体。

RedisMq

@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "redis.mq")
public class RedisMq {
    private List<RedisMqStream> streams;

    public List<RedisMqStream> getStreams() {
        return streams;
    }

    public void setStreams(List<RedisMqStream> streams) {
        this.streams = streams;
    }
}

RedisMqConsumers

public class RedisMqConsumers {

    private String consumerName;

    private String listenerClass;

    public String getConsumerName() {
        return consumerName;
    }

    public void setConsumerName(String consumerName) {
        this.consumerName = consumerName;
    }

    public String getListenerClass() {
        return listenerClass;
    }

    public void setListenerClass(String listenerClass) {
        this.listenerClass = listenerClass;
    }
}

RedisMqGroup

public class RedisMqGroup {

    private String groupName;
    private List<RedisMqConsumers> consumers;

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public List<RedisMqConsumers> getConsumers() {
        return consumers;
    }

    public void setConsumers(List<RedisMqConsumers> consumers) {
        this.consumers = consumers;
    }
}

RedisMqStream

public class RedisMqStream {

    private String keyName;

    private List<RedisMqGroup> groups;

    public String getKeyName() {
        return keyName;
    }

    public void setKeyName(String keyName) {
        this.keyName = keyName;
    }

    public List<RedisMqGroup> getGroups() {
        return groups;
    }

    public void setGroups(List<RedisMqGroup> groups) {
        this.groups = groups;
    }
}

RedisMqConfig ( Spring Bean 配置 ) 重点

RedisMqConfig 类是一个用于配置 Redis Stream 消息队列的 Spring Bean,它负责创建和配置 Redis Stream 的监听容器,并通过注解声明了相关的组件依赖。在配置过程中,它使用了自定义的 RedisStreamUtil 工具类,以及 RedisMq 对象,这些对象包含了关于 Redis Stream 的配置信息。主要功能包括:

  1. 订阅配置: 通过遍历 RedisMq 中定义的多个 RedisMqStream,对每个 Stream 创建对应的监听容器,并初始化相关的 Stream 和消费组。
  2. 多实例支持: 通过使用 Spring 的 @Component 注解,该类被声明为一个 Spring Bean,可以在应用程序的其他部分通过依赖注入来使用。同时,它通过 @Resource 注解注入了 RedisStreamUtilRedisMq 和线程池 ThreadPoolTaskExecutor
  3. 动态实例化监听器: 在订阅配置中,通过反射机制动态实例化指定类的监听器,这增强了代码的灵活性,使得可以根据配置动态选择不同的监听器。
@Component
public class RedisMqConfig {

    @Resource
    private RedisStreamUtil redisStreamUtil;
    @Resource
    private RedisMq redisMq;
    @Resource(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor executor;

    /**
     * 配置Redis Stream的订阅
     *
     * @param factory Redis连接工厂
     * @return 包含所有订阅的Subscription列表
     * @throws ClassNotFoundException
     * @throws InstantiationException
     * @throws IllegalAccessException
     */
    @Bean
    public List<Subscription> subscription(RedisConnectionFactory factory) throws ClassNotFoundException, InstantiationException, IllegalAccessException {

        List<Subscription> resultList = new ArrayList<>();

        // 配置Stream的监听容器选项
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        .batchSize(5)
                        .executor(executor)
                        .pollTimeout(Duration.ofSeconds(1))
                        .build();

        // 遍历所有RedisMqStream
        for (RedisMqStream redisMqStream : redisMq.getStreams()) {

            String keyName = redisMqStream.getKeyName();
            List<RedisMqGroup> groups = redisMqStream.getGroups();

            // 初始化Stream,创建Stream并创建对应的消费组
            for (RedisMqGroup group : groups) {
                initStream(keyName, group.getGroupName());
            }

            // 创建Stream监听容器
            StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer =
                    StreamMessageListenerContainer.create(factory, options);

            // 遍历所有消费组
            for (RedisMqGroup group : groups) {

                // 遍历每个消费者配置
                for (RedisMqConsumers redisMqConsumers : group.getConsumers()) {
                    Consumer consumer = Consumer.from(group.getGroupName(), redisMqConsumers.getConsumerName());

                    String listenerClass = redisMqConsumers.getListenerClass();

                    // 动态创建监听器实例
                    StreamListener listener = (StreamListener) Class.forName(listenerClass).newInstance();

                    /**
                     * 订阅
                     */
                    Subscription subscription = listenerContainer.receiveAutoAck(consumer,
                            StreamOffset.create(keyName, ReadOffset.lastConsumed()), listener);
                    resultList.add(subscription);
                }

            }

            // 启动监听容器
            listenerContainer.start();
        }

        return resultList;
    }

    /**
     * 初始化Stream,如果不存在,则创建Stream和对应的消费组
     *
     * @param key   Stream的键名
     * @param group 消费组的名称
     */
    private void initStream(String key, String group) {
        boolean hasKey = redisStreamUtil.hasKey(key);
        if (!hasKey) {
            // 如果Stream不存在,则创建Stream、添加初始数据和创建消费组
            Map<String, Object> map = Collections.singletonMap("key", "value");
            String result = redisStreamUtil.addMap(key, map);
            redisStreamUtil.createGroup(key, group);
            redisStreamUtil.del(key, result);
        }
    }

}

配置 RedisStreamUtil

RedisStreamUtil 类是一个用于操作 Redis Stream 的工具类,主要封装了对 Redis Stream 的一些常用操作方法,提供了方便的接口供其他组件或服务使用。

@Service
public class RedisStreamUtil {

    @Autowired
    @Qualifier("redisTemplate")
    public RedisTemplate redisTemplate;

    public RedisStreamUtil() {
    }

    /**
     * 创建消费组
     */
    public String createGroup(String key, String group){
        return redisTemplate.opsForStream().createGroup(key, group);
    }

    /**
     * 获取消费者信息
     */
    public StreamInfo.XInfoConsumers queryConsumers(String key, String group){
        return redisTemplate.opsForStream().consumers(key, group);
    }

    /**
     * 添加Map消息
     */
    public String addMap(String key, Map<String, Object> value){
        return redisTemplate.opsForStream().add(key, value).getValue();
    }

    /**
     * 读取消息
     */
    public List<MapRecord<String, Object, Object>> read(String key){
        return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
    }

    /**
     * 确认消费
     */
    public Long ack(String key, String group, String... recordIds){
        return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
    }

    /**
     * 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
     */
    public Long del(String key, String... recordIds){
        return redisTemplate.opsForStream().delete(key, recordIds);
    }

    /**
     * 判断是否存在key
     */
    public boolean hasKey(String key){
        Boolean aBoolean = redisTemplate.hasKey(key);
        return aBoolean==null?false:aBoolean;
    }
}

Listener 监听订阅消息 (订阅)

@Service
public class ActCertListener implements StreamListener<String, MapRecord<String, String, String>> {
    private RedisStreamUtil redisStreamUtil = SpringUtils.getBean(RedisStreamUtil.class);
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try{
            String streamKey = message.getStream();
            RecordId recordId = message.getId();
            Map<String, String> msg = message.getValue();
            // TODO  逻辑实现
           
            redisStreamUtil.ack(streamKey, "stream_group", recordId.getValue());
            redisStreamUtil.del(streamKey, recordId.getValue());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

为什么我在这个类中注入不进来 RedisStreamUtil,有没有大佬能解答一下。

发布消息 (发布)

@RestController
@RequestMapping("/nihao")
public class TestNihaoController {

    @Autowired
    private RedisStreamUtil redisStreamUtil;

    @GetMapping("/nibuhao")
    public AjaxResult list() {
        Map<String,Object> message = new HashMap<>(2);
        message.put("body","消息主题" );
        message.put("sendTime", DateUtils.getDateTime());
        String streamKey = "stream:key";
        redisStreamUtil.addMap(streamKey, message);
        return AjaxResult.success();
    }
}

如何使用呢?

1、配置yml文件

【SpringBoot】使用 Redis 就可以当作MQ使用啦 ~

2、编写生产者

  • 在你需要生产消息的地方编写这个代码。
  • stream key为必须和你配置的生产者相同。
  • 使用一个你自定义的Object生产消息。
  • 使用RedisStreamUtil中的addMap生产消息。

【SpringBoot】使用 Redis 就可以当作MQ使用啦 ~

3、编写执行者

创建你的监听类。实现 StreamListener<String, MapRecord<String, String, String>>接口

【SpringBoot】使用 Redis 就可以当作MQ使用啦 ~

实现onMessage方法,里边就是你要编写的逻辑

  • message能直接拿到你生产者发布的消息。
  • RedisStreamUtil ack检测你的消息。
  • RedisStreamUtil del执行完成之后删除你的消息。

【SpringBoot】使用 Redis 就可以当作MQ使用啦 ~

4、完活

总结

一定要多思考,如果人永远待在舒适圈的话,人永远不会成长。共勉

觉得作者写的不错的,值得你们借鉴的话,就请点一个免费的赞吧!这个对我来说真的很重要。૮(˶ᵔ ᵕ ᵔ˶)ა

转载自:https://juejin.cn/post/7317158687441371171
评论
请登录