likes
comments
collection
share

基于Redission高级应用26-RStream事件流处理工具类实现

作者站长头像
站长
· 阅读数 76

概述

正文

Redis 5.0引入的Redis Streams为实现事件驱动架构提供了坚实的基础。在Spring应用程序中,可以通过封装常见操作的工具类来简化Redis Streams的集成。本文介绍了RedisStreamUtil,这是一个为Spring应用程序设计的工具类,它简化了与Redis Streams的交互。

Redis Streams和Redisson

Redis Streams是一种数据结构,它模拟了日志数据类型,允许多个生产者和消费者以高度并发的方式读写数据。Redisson是一个Java内存数据网格,提供了分布式和可扩展的数据结构和服务。它的特点之一就是能够以Java友好的方式与Redis Streams进行交互。

RedisStreamUtil工具类

RedisStreamUtil是一个工具类,旨在抽象与Redis Streams交互的复杂性。它提供了创建流和消费者组、向流中添加事件、读取事件和确认事件处理完成的方法。

以下是RedisStreamUtil类的代码:

import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.StreamReadGroupArgs;
import org.redisson.api.stream.TrimStrategy;
import org.redisson.client.RedisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;
/**
 * @Author derek_smart
 * @Date 2024/7/24 8:30
 * @Description RStream 工具类
 */
@Service
public class RedisStreamUtil {

    private static final Logger logger = LoggerFactory.getLogger(RedisStreamUtil.class);
    @Autowired
    private RedissonClient redissonClient;

    // 创建流和消费者组,如果它们还不存在
    public void createStreamAndConsumerGroup(String streamName, String groupName) {
        RStream<String, String> stream = redissonClient.getStream(streamName);
        try {
            // 尝试创建消费者组,如果流不存在,它将被自动创建
            stream.createGroup(groupName);
        } catch (RedisException e) {
            // 如果消费者组已经存在,我们将得到一个错误
            if (e.getMessage().contains("BUSYGROUP Consumer Group name already exists")) {
                logger.info("Consumer group '{}' already exists for stream '{}'", groupName, streamName);
            } else {
                // 如果是其他错误,记录日志
                logger.error("Error creating consumer group '{}' for stream '{}'", groupName, streamName, e);
            }
        }
    }
    

    // 添加事件到流
    public StreamMessageId addEventToStream(String streamName, Map<String, String> event) {
        try {
            RStream<String, String> stream = redissonClient.getStream(streamName);
            return stream.add(StreamAddArgs.entries(event).trim(TrimStrategy.MAXLEN, 1000));
        } catch (Exception e) {
            logger.error("Error adding event to stream", e);
            return null;
        }
    }

    // 读取事件
    public Map<StreamMessageId, Map<String, String>> readEventsFromStream(String streamName, String groupName, String consumerName) {
        try {
            RStream<String, String> stream = redissonClient.getStream(streamName);
            return stream.readGroup(groupName, consumerName, StreamReadGroupArgs.neverDelivered());
        } catch (Exception e) {
            logger.error("Error reading events from stream", e);
            return null;
        }
    }

    // 确认事件处理完成
    public void acknowledgeEvent(String streamName, String groupName, StreamMessageId messageId) {
        try {
            RStream<String, String> stream = redissonClient.getStream(streamName);
            stream.ack(groupName, messageId);
        } catch (Exception e) {
            logger.error("Error acknowledging event", e);
        }
    }


    // 读取并自动确认事件
    public Map<StreamMessageId, Map<String, String>> readAndAcknowledgeEventsFromStream(String streamName, String groupName, String consumerName) {
        Map<StreamMessageId, Map<String, String>> messages = readEventsFromStream(streamName, groupName, consumerName);
        if (messages != null) {
            messages.keySet().forEach(messageId -> acknowledgeEvent(streamName, groupName, messageId));
        }
        return messages;
    }

    // 允许自定义 StreamAddArgs
    public StreamMessageId addEventToStreams(String streamName, Map<String, String> event) {
        try {
            RStream<String, String> stream = redissonClient.getStream(streamName);
            // 创建 StreamAddArgs 实例并设置条目
            StreamAddArgs<String, String> streamAddArgs = StreamAddArgs.<String, String>entries(event);
            // 添加事件到流
            return stream.add(streamAddArgs);
        } catch (Exception e) {
            logger.error("Error adding event to stream", e);
            return null;
        }
    }

    // 允许自定义 StreamReadGroupArgs
    public Map<StreamMessageId, Map<String, String>> readEventsFromStream(String streamName, String groupName, String consumerName, StreamReadGroupArgs readGroupArgs) {
        try {
            RStream<String, String> stream = redissonClient.getStream(streamName);
            return stream.readGroup(groupName, consumerName, readGroupArgs);
        } catch (Exception e) {
            logger.error("Error reading events from stream with custom StreamReadGroupArgs", e);
            return null;
        }
    }
}

基于Redission高级应用26-RStream事件流处理工具类实现

该类使用Spring的@Service注解将其标记为一个bean,并使用@Autowired注入RedissonClient依赖。

创建流和消费者组

createStreamAndConsumerGroup方法检查流和消费者组是否已经存在。如果不存在,它会创建它们。这种方法确保了幂等性,意味着多次调用不会产生不良效果。

public void createStreamAndConsumerGroup(String streamName, String groupName) {
    // 实现...
}

向流中添加事件

addEventToStream方法将事件添加到指定的流中。它接受一个流名称和一个表示事件数据的映射。

public StreamMessageId addEventToStream(String streamName, Map<String, String> event) {
    // 实现...
}

读取事件

readEventsFromStream方法为特定的消费者组和消费者从流中读取消息。它使用StreamReadGroupArgs.neverDelivered()来确保只读取未交付的消息。

public Map<StreamMessageId, Map<String, String>> readEventsFromStream(String streamName, String groupName, String consumerName) {
    // 实现...
}

确认事件

acknowledgeEvent方法确认消息已被处理,这对于消费者组中的可靠消息处理至关重要。

public void acknowledgeEvent(String streamName, String groupName, StreamMessageId messageId) {
    // 实现...
}

时序图:

客户端服务层RedisStreamUtil工具类Redis服务器请求处理事件流createStreamAndConsumerGroup(streamName, groupName)验证流和组是否存在返回结果流和消费者组创建完成addEventToStream(streamName, event)添加事件到流返回消息ID事件添加完成readEventsFromStream(streamName, groupName, consumerName)读取未交付的事件返回事件数据事件读取完成处理事件acknowledgeEvent(streamName, groupName, messageId)确认事件已处理确认完成事件确认完成客户端服务层RedisStreamUtil工具类Redis服务器

时序图将包括以下步骤:

  1. 客户端请求处理事件流。
  2. 服务层调用RedisStreamUtil来创建流和消费者组。
  3. 服务层调用RedisStreamUtil来添加事件到流中。
  4. 服务层调用RedisStreamUtil来读取事件。
  5. 服务层处理读取到的事件。
  6. 服务层调用RedisStreamUtil来确认事件已处理。

流程图:

开始
流是否存在?
消费者组是否存在?
创建流
流和消费者组已准备
创建消费者组
添加事件到流
事件添加成功?
读取事件
记录添加事件错误
事件读取成功?
处理事件
记录读取事件错误
确认事件处理完成?
结束
记录确认事件错误

在这个流程图中,我们从开始节点"A"出发,首先检查流是否存在(节点"B")。如果流不存在,我们创建流(节点"D"),然后检查消费者组是否存在(节点"C")。如果消费者组不存在,我们创建它(节点"F")。一旦流和消费者组都准备好了(节点"E"),我们就可以添加事件到流中(节点"G")。

添加事件后,我们检查操作是否成功(节点"H")。如果成功,我们继续读取事件(节点"I")。如果事件读取成功,我们处理事件(节点"L")并确认事件处理完成(节点"N")。如果在任何步骤中遇到错误,我们记录相应的错误(节点"J", "M", "P")并最终到达结束节点"O"。

在Spring服务中使用RedisStreamUtil

为了演示RedisStreamUtil的实际应用,考虑以下使用该工具类与Redis Streams交互的服务示例:

import org.redisson.api.StreamMessageId;
import org.redisson.api.stream.StreamReadGroupArgs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
/**
 * @Author derek_smart
 * @Date 2024/7/24 8:30
 * @Description RStream 使用示例
 */
@Service
public class MyStreamService {

    @Autowired
    private RedisStreamUtil redisStreamUtil;

    public void processStream() {
        String streamName = "myStream";
        String groupName = "myGroup";
        String consumerName = "myConsumer";

        // 创建流和消费者组
        redisStreamUtil.createStreamAndConsumerGroup(streamName, groupName);

        // 添加事件到流
        Map<String, String> event = new HashMap<>();
        event.put("eventKey1", "eventValue1");
        event.put("eventKey2", "eventValue2");
        StreamMessageId messageId = redisStreamUtil.addEventToStream(streamName, event);

        // 读取事件
        Map<StreamMessageId, Map<String, String>> messages = redisStreamUtil.readEventsFromStream(streamName, groupName, consumerName);

        // 处理事件并确认
        if (messages != null) {
            for (Map.Entry<StreamMessageId, Map<String, String>> messageEntry : messages.entrySet()) {
                StreamMessageId msgId = messageEntry.getKey();
                Map<String, String> msgBody = messageEntry.getValue();
                // 处理消息逻辑...
                System.out.println("Processing message with ID: " + msgId + " and body: " + msgBody);

                // 确认事件处理完成
                redisStreamUtil.acknowledgeEvent(streamName, groupName, msgId);
            }
        }

        // 使用自定义 StreamReadGroupArgs 读取事件
        StreamReadGroupArgs readGroupArgs = StreamReadGroupArgs.neverDelivered().count(10);
        Map<StreamMessageId, Map<String, String>> customMessages = redisStreamUtil.readEventsFromStream(streamName, groupName, consumerName, readGroupArgs);

        // 处理自定义参数读取的事件...
    }
}

服务使用RedisStreamUtil来创建流、添加事件、读取事件和确认它们。这种抽象允许开发人员专注于业务逻辑,而不是Redis Streams的复杂性。

结论

RedisStreamUtil为Spring应用程序中的Redis Streams操作提供了简化和高效的方式。通过封装Redis Streams操作,它允许开发人员以最少的样板代码利用事件流的强大功能。该工具类确保应用程序可以可靠和高效地处理事件,为可扩展的事件驱动架构铺平了道路。

有了RedisStreamUtil,开发人员可以轻松地将Redis Streams集成到他们的Spring应用程序中,享受到强大、高性能和易于使用的事件流系统带来的好处。

转载自:https://juejin.cn/post/7394790673613864972
评论
请登录