Redisson 实现 发布/订阅原理,优缺点 工具类和 单频道,多频道订阅示例
Redisson 实现发布/订阅(Pub/Sub)的原理是基于 Redis 的发布/订阅消息模式。在这种模式下,发布者将消息发送到一个频道(channel),而订阅者监听这个频道并接收消息。
Redisson 对这个模式进行了封装,使其更加易于在 Java 应用程序中使用。
发布/订阅原理:
-
发布者:发布者是发送消息到特定频道的客户端。在 Redis 中,这通过
PUBLISH
命令实现。 -
订阅者:订阅者是监听频道并接收消息的客户端。Redis 提供了
SUBSCRIBE
命令来实现订阅功能。 -
频道:频道是消息传递的媒介。发布者和订阅者通过频道进行通信。
-
消息:消息是通过频道从发布者传递到订阅者的数据。
Redisson 使用这些概念并提供了一套 Java API,允许你在 Java 程序中方便地实现发布和订阅功能。
优点:
-
易用性:Redisson 提供了简单直观的 API,使得在 Java 应用中实现发布/订阅变得非常容易。
-
高性能:Redis 本身是一个内存数据结构存储,因此 Redisson 的发布/订阅特性能够提供高性能和低延迟的消息传递。
-
可扩展性:Redis 可以通过集群模式来提供更高的可用性和分区容错性,Redisson 作为客户端能够很好地与 Redis 集群协同工作。
-
支持多种数据结构:Redisson 不仅支持发布/订阅,还支持丰富的数据结构和同步器,如分布式锁、原子变量、计数器、队列等。
-
语言无关性:由于 Redis 支持多种编程语言的客户端,不同语言写的服务可以通过 Redis 进行交互。
缺点:
-
消息持久性:Redis 的发布/订阅模式不保证消息的持久性。如果没有订阅者在线,消息将会丢失。如果需要持久性,可能需要结合其他消息队列系统。
-
消息传递保证:Redis 提供的是最基本的消息传递保证,不支持事务或消息队列的高级特性,如死信队列、消息确认机制等。
-
单点故障:如果未使用 Redis 集群,单个 Redis 节点可能成为单点故障。即使在集群模式下,网络分区也可能导致问题。
-
资源消耗:订阅者需要保持长连接以接收消息,这可能会消耗服务器资源,尤其是当订阅者数量很多时。
-
缩放限制:尽管 Redis 可以通过集群来缩放,但在极高的负载下,Redis 集群也可能面临性能瓶颈。
-
复杂的部署和管理:对于大型系统,维护 Redis 集群可能需要专业知识和额外的运维工作。
在选择 Redisson 作为发布/订阅解决方案时,需要根据实际应用的需求权衡这些优缺点。对于需要高吞吐量、低延迟和简单实现的场景,Redisson 是一个很好的选择。
然而,如果你的系统需要复杂的消息传递保证或持久性,可能需要考虑其他消息队列系统,如 RabbitMQ、Apache Kafka 等。
工具类和示例:
使用 Redisson 实现发布/订阅模式可以通过创建一个服务类来封装 Redisson 的发布/订阅功能,并使用 Spring 的注解来定义单频道和多频道的订阅。
下面是一个实现的例子,包括了工具类的创建和单频道订阅、多频道订阅的示例。
Redisson 配置首先,需要配置 Redisson 客户端作为一个 Spring Bean,以便在整个应用程序中使用。
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}
RedisPubSubService 工具类:创建一个服务类来封装 Redisson 的发布/订阅逻辑。
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RedisPubSubService {
private final RedissonClient redissonClient;
@Autowired
public RedisPubSubService(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
// 订阅单个频道
public <T> void subscribeToTopic(String topicName, MessageListener<T> listener) {
RTopic topic = redissonClient.getTopic(topicName);
topic.addListener(listener);
}
// 订阅多个频道
public <T> void subscribeToTopics(String[] topicNames, MessageListener<T> listener) {
for (String topicName : topicNames) {
RTopic topic = redissonClient.getTopic(topicName);
topic.addListener(listener);
}
}
// 发布消息到频道
public <T> long publishToTopic(String topicName, T message) {
RTopic topic = redissonClient.getTopic(topicName);
return topic.publish(message);
}
}
单频道订阅示例:创建一个 Spring 组件,用于订阅单个频道。
import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class SingleTopicSubscriber {
private final RedisPubSubService redisPubSubService;
@Autowired
public SingleTopicSubscriber(RedisPubSubService redisPubSubService) {
this.redisPubSubService = redisPubSubService;
}
@PostConstruct
public void subscribeToTopic() {
redisPubSubService.subscribeToTopic("mySingleTopic", new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String msg) {
System.out.println("Received message on single topic: " + msg);
}
});
}
}
多频道订阅示例:创建一个 Spring 组件,用于同时订阅多个频道。
import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class MultiTopicSubscriber {
private final RedisPubSubService redisPubSubService;
@Autowired
public MultiTopicSubscriber(RedisPubSubService redisPubSubService) {
this.redisPubSubService = redisPubSubService;
}
@PostConstruct
public void subscribeToTopics() {
String[] topics = new String[]{"topic1", "topic2", "topic3"};
redisPubSubService.subscribeToTopics(topics, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String msg) {
System.out.println("Received message on " + channel + ": " + msg);
}
});
}
}
发布消息示例:创建一个 REST 控制器,用于发布消息到指定的频道。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/pubsub")
public class PubSubController {
private final RedisPubSubService redisPubSubService;
@Autowired
public PubSubController(RedisPubSubService redisPubSubService) {
this.redisPubSubService = redisPubSubService;
}
@PostMapping("/publish/{topic}")
public String publishMessage(@PathVariable String topic, @RequestBody String message) {
redisPubSubService.publishToTopic(topic, message);
return "Message published to " + topic;
}
}
在这个控制器中,publishMessage 方法允许客户端通过发送 POST 请求到 /pubsub/publish/{topic}
来发布消息到指定的频道。
这些示例展示了如何在 Spring 应用程序中使用 Redisson 实现发布/订阅模式。RedisPubSubService 服务类封装了 Redisson 的发布/订阅操作,
而 SingleTopicSubscriber 和 MultiTopicSubscriber 组件分别用于订阅单个频道和多个频道。PubSubController 提供了一个 RESTful 接口来发布消息。
转载自:https://juejin.cn/post/7354019135992823848