【SpringBoot】使用 Redis 就可以当作MQ使用啦 ~
工作中遇到的问题
提出问题
在最近的一个项目中,我们需要用到一个消息队列(MQ)来处理实时消息传递的需求。但是这个项目环境并没有(MQ)中间件。怎么办呢? 装一个 MQ ?
Redis 会不会是一个好的方案
因为我们项目使用 (MQ) 的场景不多,所以我们打算采用 Redis 作为中间件来实现这一个功能。使用 Redis 实现 (MQ) 这个功能并不难。但是如何可靠、稳定 的实现这个 (MQ) 就是问题的关键。
用 Redis 的几种可实施的方案
-
- Redis List:Redis的List数据结构可以被用作简单的消息队列。生产者将消息推送到List的尾部,消费者则从List的头部获取消息。这种方式简单易用,但在高并发情况下可能存在性能瓶颈。
-
- Redis Pub/Sub:Redis的发布订阅功能可以用来实现消息队列。生产者发布消息到指定的频道,而消费者订阅这个频道以接收消息。虽然简单,但Pub/Sub不支持消息持久化,且消息无法在消费者离线时被保存。
-
- Redis Streams:Redis 5.0引入了Streams数据结构,它提供了更复杂的消息队列功能,包括持久性、分组、消息消费确认等功能。因此,Redis Streams通常被视为更适合用作消息队列的数据结构。
Redis Streams 特点和好处
- 持久性:Redis Stream 支持消息持久化,可以将消息存储在内存中,也可以选择将消息保存到磁盘上,保证消息的持久性。
- 多消费者分组:Stream 支持将消费者分组,每个消费者组内的消费者可以共享一个消息流,多个消费者组可以并行消费消息。
- 消息消费确认:消费者可以对已经处理的消息进行确认,这样可以确保消息被正确处理,避免消息的重复消费。
- 消息时间序列:消息在 Stream 中是有序存储的,每条消息都有唯一的ID和时间戳,可以按照时间顺序进行消费。
- 复杂数据结构:Stream 支持复杂的消息结构,每个消息可以包含多个字段和值,这样可以存储更加丰富的消息内容。
- 消费者消费位置:消费者可以控制自己的消费位置,可以从头开始消费,也可以从某个特定的消息ID开始消费。
解决方案
所以我们采用 Redis Streams 来充当 (MQ)
SpringBoot 集成 Redis Streams 需要怎么做呢?
跟着我的步骤一步一步来,集成 RedisStreams 很简单。
Redis 的基础配置
首先我们项目中就已经有Redis了,而且已经集成好了,所以我们打算的是用现成的,Redis的基础配置请自行百度。
yml 文件配置
redis:
mq:
streams:
# 生产者
- keyName: stream:key
groups:
# 消费组
- groupName: stream_group
consumers:
# 消费者
- consumerName : stream_consumers
# 监听类
listenerClass : com.xx.streamListener
实体类配置
主要用于装载基础配置的载体。
RedisMq
@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "redis.mq")
public class RedisMq {
private List<RedisMqStream> streams;
public List<RedisMqStream> getStreams() {
return streams;
}
public void setStreams(List<RedisMqStream> streams) {
this.streams = streams;
}
}
RedisMqConsumers
public class RedisMqConsumers {
private String consumerName;
private String listenerClass;
public String getConsumerName() {
return consumerName;
}
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public String getListenerClass() {
return listenerClass;
}
public void setListenerClass(String listenerClass) {
this.listenerClass = listenerClass;
}
}
RedisMqGroup
public class RedisMqGroup {
private String groupName;
private List<RedisMqConsumers> consumers;
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public List<RedisMqConsumers> getConsumers() {
return consumers;
}
public void setConsumers(List<RedisMqConsumers> consumers) {
this.consumers = consumers;
}
}
RedisMqStream
public class RedisMqStream {
private String keyName;
private List<RedisMqGroup> groups;
public String getKeyName() {
return keyName;
}
public void setKeyName(String keyName) {
this.keyName = keyName;
}
public List<RedisMqGroup> getGroups() {
return groups;
}
public void setGroups(List<RedisMqGroup> groups) {
this.groups = groups;
}
}
RedisMqConfig ( Spring Bean 配置 ) 重点
RedisMqConfig
类是一个用于配置 Redis Stream 消息队列的 Spring Bean,它负责创建和配置 Redis Stream 的监听容器,并通过注解声明了相关的组件依赖。在配置过程中,它使用了自定义的 RedisStreamUtil
工具类,以及 RedisMq
对象,这些对象包含了关于 Redis Stream 的配置信息。主要功能包括:
- 订阅配置: 通过遍历
RedisMq
中定义的多个RedisMqStream
,对每个 Stream 创建对应的监听容器,并初始化相关的 Stream 和消费组。 - 多实例支持: 通过使用 Spring 的
@Component
注解,该类被声明为一个 Spring Bean,可以在应用程序的其他部分通过依赖注入来使用。同时,它通过@Resource
注解注入了RedisStreamUtil
、RedisMq
和线程池ThreadPoolTaskExecutor
。 - 动态实例化监听器: 在订阅配置中,通过反射机制动态实例化指定类的监听器,这增强了代码的灵活性,使得可以根据配置动态选择不同的监听器。
@Component
public class RedisMqConfig {
@Resource
private RedisStreamUtil redisStreamUtil;
@Resource
private RedisMq redisMq;
@Resource(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor executor;
/**
* 配置Redis Stream的订阅
*
* @param factory Redis连接工厂
* @return 包含所有订阅的Subscription列表
* @throws ClassNotFoundException
* @throws InstantiationException
* @throws IllegalAccessException
*/
@Bean
public List<Subscription> subscription(RedisConnectionFactory factory) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
List<Subscription> resultList = new ArrayList<>();
// 配置Stream的监听容器选项
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.batchSize(5)
.executor(executor)
.pollTimeout(Duration.ofSeconds(1))
.build();
// 遍历所有RedisMqStream
for (RedisMqStream redisMqStream : redisMq.getStreams()) {
String keyName = redisMqStream.getKeyName();
List<RedisMqGroup> groups = redisMqStream.getGroups();
// 初始化Stream,创建Stream并创建对应的消费组
for (RedisMqGroup group : groups) {
initStream(keyName, group.getGroupName());
}
// 创建Stream监听容器
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer =
StreamMessageListenerContainer.create(factory, options);
// 遍历所有消费组
for (RedisMqGroup group : groups) {
// 遍历每个消费者配置
for (RedisMqConsumers redisMqConsumers : group.getConsumers()) {
Consumer consumer = Consumer.from(group.getGroupName(), redisMqConsumers.getConsumerName());
String listenerClass = redisMqConsumers.getListenerClass();
// 动态创建监听器实例
StreamListener listener = (StreamListener) Class.forName(listenerClass).newInstance();
/**
* 订阅
*/
Subscription subscription = listenerContainer.receiveAutoAck(consumer,
StreamOffset.create(keyName, ReadOffset.lastConsumed()), listener);
resultList.add(subscription);
}
}
// 启动监听容器
listenerContainer.start();
}
return resultList;
}
/**
* 初始化Stream,如果不存在,则创建Stream和对应的消费组
*
* @param key Stream的键名
* @param group 消费组的名称
*/
private void initStream(String key, String group) {
boolean hasKey = redisStreamUtil.hasKey(key);
if (!hasKey) {
// 如果Stream不存在,则创建Stream、添加初始数据和创建消费组
Map<String, Object> map = Collections.singletonMap("key", "value");
String result = redisStreamUtil.addMap(key, map);
redisStreamUtil.createGroup(key, group);
redisStreamUtil.del(key, result);
}
}
}
配置 RedisStreamUtil
RedisStreamUtil
类是一个用于操作 Redis Stream 的工具类,主要封装了对 Redis Stream 的一些常用操作方法,提供了方便的接口供其他组件或服务使用。
@Service
public class RedisStreamUtil {
@Autowired
@Qualifier("redisTemplate")
public RedisTemplate redisTemplate;
public RedisStreamUtil() {
}
/**
* 创建消费组
*/
public String createGroup(String key, String group){
return redisTemplate.opsForStream().createGroup(key, group);
}
/**
* 获取消费者信息
*/
public StreamInfo.XInfoConsumers queryConsumers(String key, String group){
return redisTemplate.opsForStream().consumers(key, group);
}
/**
* 添加Map消息
*/
public String addMap(String key, Map<String, Object> value){
return redisTemplate.opsForStream().add(key, value).getValue();
}
/**
* 读取消息
*/
public List<MapRecord<String, Object, Object>> read(String key){
return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
}
/**
* 确认消费
*/
public Long ack(String key, String group, String... recordIds){
return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
}
/**
* 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
*/
public Long del(String key, String... recordIds){
return redisTemplate.opsForStream().delete(key, recordIds);
}
/**
* 判断是否存在key
*/
public boolean hasKey(String key){
Boolean aBoolean = redisTemplate.hasKey(key);
return aBoolean==null?false:aBoolean;
}
}
Listener 监听订阅消息 (订阅)
@Service
public class ActCertListener implements StreamListener<String, MapRecord<String, String, String>> {
private RedisStreamUtil redisStreamUtil = SpringUtils.getBean(RedisStreamUtil.class);
@Override
public void onMessage(MapRecord<String, String, String> message) {
try{
String streamKey = message.getStream();
RecordId recordId = message.getId();
Map<String, String> msg = message.getValue();
// TODO 逻辑实现
redisStreamUtil.ack(streamKey, "stream_group", recordId.getValue());
redisStreamUtil.del(streamKey, recordId.getValue());
}catch (Exception e){
e.printStackTrace();
}
}
}
为什么我在这个类中注入不进来 RedisStreamUtil
,有没有大佬能解答一下。
发布消息 (发布)
@RestController
@RequestMapping("/nihao")
public class TestNihaoController {
@Autowired
private RedisStreamUtil redisStreamUtil;
@GetMapping("/nibuhao")
public AjaxResult list() {
Map<String,Object> message = new HashMap<>(2);
message.put("body","消息主题" );
message.put("sendTime", DateUtils.getDateTime());
String streamKey = "stream:key";
redisStreamUtil.addMap(streamKey, message);
return AjaxResult.success();
}
}
如何使用呢?
1、配置yml文件
2、编写生产者
- 在你需要生产消息的地方编写这个代码。
stream key
为必须和你配置的生产者相同。- 使用一个你自定义的Object生产消息。
- 使用
RedisStreamUtil
中的addMap
生产消息。
3、编写执行者
创建你的监听类。实现 StreamListener<String, MapRecord<String, String, String>>
接口
实现onMessage
方法,里边就是你要编写的逻辑
- message能直接拿到你生产者发布的消息。
RedisStreamUtil
ack
检测你的消息。RedisStreamUtil
del
执行完成之后删除你的消息。
4、完活
总结
一定要多思考,如果人永远待在舒适圈的话,人永远不会成长。共勉
觉得作者写的不错的,值得你们借鉴的话,就请点一个免费的赞吧!这个对我来说真的很重要。૮(˶ᵔ ᵕ ᵔ˶)ა
转载自:https://juejin.cn/post/7317158687441371171