Springboot使用Redis Stream实现轻量消息队列依赖 说明:此部分定义了 Redis 相关的依赖,确保项
Redis Stream 是 Redis 5.0 引入的一种数据结构,用于处理日志类型的数据。它提供了高效、可靠的方式来处理和存储时间序列数据,如事件、消息等。其设计灵感源于 Kafka 和类似的消息队列系统,且完全集成在 Redis 中,利用了 Redis 的高性能和持久化特性。
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
说明:此部分定义了 Redis 相关的依赖,确保项目能够引入并使用 Spring Boot 提供的 Redis 启动器。
RedisTemplate 配置
package com.mjg.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
// 注册 Java 8 日期时间模块
om.registerModule(new JavaTimeModule());
om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
jackson2JsonRedisSerializer.serialize(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key 采用 String 的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash 的 key 也采用 String 的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value 序列化方式采用 jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash 的 value 序列化方式采用 jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
说明:此配置类用于设置 RedisTemplate 的序列化方式,以满足不同数据类型的存储和读取需求。
RedisStreamConfig
package com.mjg.config;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Properties;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class RedisStreamConfig implements InitializingBean, DisposableBean {
private final RedisTemplate<String, Object> redisTemplate;
public static String streamName = "user-event-stream";
public static String userEventGroup = "user-event-group";
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
/**
* 消息侦听器容器,用于监听 Redis Stream 中的消息
*
* @param connectionFactory Redis 连接工厂,用于创建 Redis 连接
* @param messageConsumer 消息消费者,用于处理接收到的消息
* @return 返回 {@link StreamMessageListenerContainer}<{@link String}, {@link ObjectRecord}<{@link String}, {@link String}>> 类型的消息侦听器容器
*/
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> messageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {
StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = streamContainer(streamName, connectionFactory, messageConsumer);
listenerContainer.start();
return listenerContainer;
}
/**
* 创建一个流容器,用于监听 Redis Stream 中的数据
*
* @param streamName Redis Stream 的名称
* @param connectionFactory Redis 连接工厂
* @param streamListener 绑定的监听类
* @return 返回 StreamMessageListenerContainer 对象
*/
@SneakyThrows
private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String streamName, RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(5)) // 拉取消息超时时间
.batchSize(10) // 批量抓取消息
.targetType(String.class) // 传递的数据类型
.executor(threadPoolTaskExecutor)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, options);
// 指定消费最新的消息
StreamOffset<String> offset = StreamOffset.create(streamName, ReadOffset.lastConsumed());
// 创建消费者
StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = buildStreamReadRequest(offset, streamListener);
// 指定消费者对象
container.register(streamReadRequest, streamListener);
return container;
}
/**
* 生成流读取请求
*
* @param offset 偏移量,用于指定从 Redis Stream 中的哪个位置开始读取消息
* @param streamListener 流侦听器,用于处理接收到的消息
* @return 返回一个 StreamReadRequest 对象,表示一个流读取请求
* @throws Exception 当 streamListener 无法识别为 MessageConsumer 类型时,抛出异常
*/
private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset, StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception {
Consumer consumer;
if (streamListener instanceof MessageConsumer) {
consumer = Consumer.from(userEventGroup, InetAddress.getLocalHost().getHostName());
} else {
throw new Exception("无法识别的 stream key");
}
// 关闭自动 ack 确认
return StreamMessageListenerContainer.StreamReadRequest.builder(offset)
.errorHandler((error) -> {
log.error(error.getMessage());
})
.cancelOnError(e -> false)
.consumer(consumer)
// 关闭自动 ack 确认
.autoAcknowledge(false)
.build();
}
/**
* 检查 Redis 版本是否符合要求
*
* @throws IllegalStateException 如果 Redis 版本小于 5.0.0 版本,抛出该异常
*/
private void checkRedisVersion() {
// 获得 Redis 版本
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
Assert.notNull(info, "Redis info is null");
Object redisVersion = info.get("redis_version");
Integer anInt = Convert.toInt(redisVersion);
if (anInt < 5) {
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", redisVersion));
}
}
@Override
public void destroy() throws Exception {
}
@Override
public void afterPropertiesSet() throws Exception {
checkRedisVersion();
StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
if (Boolean.FALSE.equals(redisTemplate.hasKey(streamName))) {
streamOperations.createGroup(streamName, ReadOffset.from("0"), userEventGroup);
}
}
}
说明:该配置类实现了对 Redis Stream 的相关配置,包括消息监听容器的创建、流读取请求的生成、Redis 版本的检查以及组的创建等功能。
生产者
package com.mjg.config;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;
@Component
@RequiredArgsConstructor
@Slf4j
public class MessageProducer {
private final RedisTemplate<String, Object> redisTemplate;
public void sendMessage(String streamKey, Object message) {
RecordId recordId = redisTemplate
.opsForStream().add(StreamRecords.newRecord()
.ofMap(Collections.singletonMap("data", message))
.withStreamKey(streamKey));
if (recordId!= null) {
log.info("Message sent to Stream '{}' with RecordId: {}", streamKey, recordId);
}
}
}
说明:MessageProducer 类负责向 Redis Stream 发送消息。
消费者
package com.mjg.config;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@Component
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {
private final RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(ObjectRecord<String, String> message) {
String stream = message.getStream();
String messageId = message.getId().toString();
String messageBody = message.getValue();
System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);
System.out.println("Message body: " + messageBody);
// 消息应答
redisTemplate.opsForStream().acknowledge(RedisStreamConfig.streamName, RedisStreamConfig.userEventGroup, message.getId());
}
}
说明:MessageConsumer 类实现了 StreamListener 接口,用于处理从 Redis Stream 接收到的消息,并进行相应的应答操作。
测试
@RequiredArgsConstructor
@Slf4j
@RestController
public class MessageController {
public static String streamName = "user-event-stream";
private final MessageProducer messageProducer;
@GetMapping("/send")
public void send() {
messageProducer.sendMessage(streamName, "hello 啦啦啦啦" + LocalDateTime.now());
}
}
说明:MessageController 类中的 send 方法通过调用 MessageProducer 来发送消息到指定的 Redis Stream 中。
转载自:https://juejin.cn/post/7405526698937352219