基于Redission高级应用26-RStream事件流处理工具类实现
概述
正文
Redis 5.0引入的Redis Streams为实现事件驱动架构提供了坚实的基础。在Spring应用程序中,可以通过封装常见操作的工具类来简化Redis Streams的集成。本文介绍了RedisStreamUtil
,这是一个为Spring应用程序设计的工具类,它简化了与Redis Streams的交互。
Redis Streams和Redisson
Redis Streams是一种数据结构,它模拟了日志数据类型,允许多个生产者和消费者以高度并发的方式读写数据。Redisson是一个Java内存数据网格,提供了分布式和可扩展的数据结构和服务。它的特点之一就是能够以Java友好的方式与Redis Streams进行交互。
RedisStreamUtil工具类
RedisStreamUtil
是一个工具类,旨在抽象与Redis Streams交互的复杂性。它提供了创建流和消费者组、向流中添加事件、读取事件和确认事件处理完成的方法。
以下是RedisStreamUtil
类的代码:
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.StreamReadGroupArgs;
import org.redisson.api.stream.TrimStrategy;
import org.redisson.client.RedisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* @Author derek_smart
* @Date 2024/7/24 8:30
* @Description RStream 工具类
*/
@Service
public class RedisStreamUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisStreamUtil.class);
@Autowired
private RedissonClient redissonClient;
// 创建流和消费者组,如果它们还不存在
public void createStreamAndConsumerGroup(String streamName, String groupName) {
RStream<String, String> stream = redissonClient.getStream(streamName);
try {
// 尝试创建消费者组,如果流不存在,它将被自动创建
stream.createGroup(groupName);
} catch (RedisException e) {
// 如果消费者组已经存在,我们将得到一个错误
if (e.getMessage().contains("BUSYGROUP Consumer Group name already exists")) {
logger.info("Consumer group '{}' already exists for stream '{}'", groupName, streamName);
} else {
// 如果是其他错误,记录日志
logger.error("Error creating consumer group '{}' for stream '{}'", groupName, streamName, e);
}
}
}
// 添加事件到流
public StreamMessageId addEventToStream(String streamName, Map<String, String> event) {
try {
RStream<String, String> stream = redissonClient.getStream(streamName);
return stream.add(StreamAddArgs.entries(event).trim(TrimStrategy.MAXLEN, 1000));
} catch (Exception e) {
logger.error("Error adding event to stream", e);
return null;
}
}
// 读取事件
public Map<StreamMessageId, Map<String, String>> readEventsFromStream(String streamName, String groupName, String consumerName) {
try {
RStream<String, String> stream = redissonClient.getStream(streamName);
return stream.readGroup(groupName, consumerName, StreamReadGroupArgs.neverDelivered());
} catch (Exception e) {
logger.error("Error reading events from stream", e);
return null;
}
}
// 确认事件处理完成
public void acknowledgeEvent(String streamName, String groupName, StreamMessageId messageId) {
try {
RStream<String, String> stream = redissonClient.getStream(streamName);
stream.ack(groupName, messageId);
} catch (Exception e) {
logger.error("Error acknowledging event", e);
}
}
// 读取并自动确认事件
public Map<StreamMessageId, Map<String, String>> readAndAcknowledgeEventsFromStream(String streamName, String groupName, String consumerName) {
Map<StreamMessageId, Map<String, String>> messages = readEventsFromStream(streamName, groupName, consumerName);
if (messages != null) {
messages.keySet().forEach(messageId -> acknowledgeEvent(streamName, groupName, messageId));
}
return messages;
}
// 允许自定义 StreamAddArgs
public StreamMessageId addEventToStreams(String streamName, Map<String, String> event) {
try {
RStream<String, String> stream = redissonClient.getStream(streamName);
// 创建 StreamAddArgs 实例并设置条目
StreamAddArgs<String, String> streamAddArgs = StreamAddArgs.<String, String>entries(event);
// 添加事件到流
return stream.add(streamAddArgs);
} catch (Exception e) {
logger.error("Error adding event to stream", e);
return null;
}
}
// 允许自定义 StreamReadGroupArgs
public Map<StreamMessageId, Map<String, String>> readEventsFromStream(String streamName, String groupName, String consumerName, StreamReadGroupArgs readGroupArgs) {
try {
RStream<String, String> stream = redissonClient.getStream(streamName);
return stream.readGroup(groupName, consumerName, readGroupArgs);
} catch (Exception e) {
logger.error("Error reading events from stream with custom StreamReadGroupArgs", e);
return null;
}
}
}
该类使用Spring的@Service
注解将其标记为一个bean,并使用@Autowired
注入RedissonClient
依赖。
创建流和消费者组
createStreamAndConsumerGroup
方法检查流和消费者组是否已经存在。如果不存在,它会创建它们。这种方法确保了幂等性,意味着多次调用不会产生不良效果。
public void createStreamAndConsumerGroup(String streamName, String groupName) {
// 实现...
}
向流中添加事件
addEventToStream
方法将事件添加到指定的流中。它接受一个流名称和一个表示事件数据的映射。
public StreamMessageId addEventToStream(String streamName, Map<String, String> event) {
// 实现...
}
读取事件
readEventsFromStream
方法为特定的消费者组和消费者从流中读取消息。它使用StreamReadGroupArgs.neverDelivered()
来确保只读取未交付的消息。
public Map<StreamMessageId, Map<String, String>> readEventsFromStream(String streamName, String groupName, String consumerName) {
// 实现...
}
确认事件
acknowledgeEvent
方法确认消息已被处理,这对于消费者组中的可靠消息处理至关重要。
public void acknowledgeEvent(String streamName, String groupName, StreamMessageId messageId) {
// 实现...
}
时序图:
时序图将包括以下步骤:
- 客户端请求处理事件流。
- 服务层调用
RedisStreamUtil
来创建流和消费者组。 - 服务层调用
RedisStreamUtil
来添加事件到流中。 - 服务层调用
RedisStreamUtil
来读取事件。 - 服务层处理读取到的事件。
- 服务层调用
RedisStreamUtil
来确认事件已处理。
流程图:
在这个流程图中,我们从开始节点"A"出发,首先检查流是否存在(节点"B")。如果流不存在,我们创建流(节点"D"),然后检查消费者组是否存在(节点"C")。如果消费者组不存在,我们创建它(节点"F")。一旦流和消费者组都准备好了(节点"E"),我们就可以添加事件到流中(节点"G")。
添加事件后,我们检查操作是否成功(节点"H")。如果成功,我们继续读取事件(节点"I")。如果事件读取成功,我们处理事件(节点"L")并确认事件处理完成(节点"N")。如果在任何步骤中遇到错误,我们记录相应的错误(节点"J", "M", "P")并最终到达结束节点"O"。
在Spring服务中使用RedisStreamUtil
为了演示RedisStreamUtil
的实际应用,考虑以下使用该工具类与Redis Streams交互的服务示例:
import org.redisson.api.StreamMessageId;
import org.redisson.api.stream.StreamReadGroupArgs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* @Author derek_smart
* @Date 2024/7/24 8:30
* @Description RStream 使用示例
*/
@Service
public class MyStreamService {
@Autowired
private RedisStreamUtil redisStreamUtil;
public void processStream() {
String streamName = "myStream";
String groupName = "myGroup";
String consumerName = "myConsumer";
// 创建流和消费者组
redisStreamUtil.createStreamAndConsumerGroup(streamName, groupName);
// 添加事件到流
Map<String, String> event = new HashMap<>();
event.put("eventKey1", "eventValue1");
event.put("eventKey2", "eventValue2");
StreamMessageId messageId = redisStreamUtil.addEventToStream(streamName, event);
// 读取事件
Map<StreamMessageId, Map<String, String>> messages = redisStreamUtil.readEventsFromStream(streamName, groupName, consumerName);
// 处理事件并确认
if (messages != null) {
for (Map.Entry<StreamMessageId, Map<String, String>> messageEntry : messages.entrySet()) {
StreamMessageId msgId = messageEntry.getKey();
Map<String, String> msgBody = messageEntry.getValue();
// 处理消息逻辑...
System.out.println("Processing message with ID: " + msgId + " and body: " + msgBody);
// 确认事件处理完成
redisStreamUtil.acknowledgeEvent(streamName, groupName, msgId);
}
}
// 使用自定义 StreamReadGroupArgs 读取事件
StreamReadGroupArgs readGroupArgs = StreamReadGroupArgs.neverDelivered().count(10);
Map<StreamMessageId, Map<String, String>> customMessages = redisStreamUtil.readEventsFromStream(streamName, groupName, consumerName, readGroupArgs);
// 处理自定义参数读取的事件...
}
}
服务使用RedisStreamUtil
来创建流、添加事件、读取事件和确认它们。这种抽象允许开发人员专注于业务逻辑,而不是Redis Streams的复杂性。
结论
RedisStreamUtil
为Spring应用程序中的Redis Streams操作提供了简化和高效的方式。通过封装Redis Streams操作,它允许开发人员以最少的样板代码利用事件流的强大功能。该工具类确保应用程序可以可靠和高效地处理事件,为可扩展的事件驱动架构铺平了道路。
有了RedisStreamUtil
,开发人员可以轻松地将Redis Streams集成到他们的Spring应用程序中,享受到强大、高性能和易于使用的事件流系统带来的好处。
转载自:https://juejin.cn/post/7394790673613864972