基于Redission高级应用11-RSet原理及工具类封装及实战应用
概述:
RSet
是 Redisson 提供的一个分布式 Java Set 实现,它基于 Redis 的 set 数据结构。Redis set 是字符串类型的无序集合,且集合中的元素具有唯一性,即集合中不允许有重复的元素。
原理:
RSet
的原理是将 Java Set 接口的操作映射到 Redis 的 set 数据结构的命令。Redis 提供了一系列操作 set 的命令,
SADD
:向集合添加一个或多个成员SREM
:移除集合中一个或多个成员SMEMBERS
:返回集合中的所有成员SISMEMBER
:判断成员元素是否是集合的成员SCARD
:获取集合的成员数SUNION
,SINTER
,SDIFF
:集合的并集、交集、差集操作
Redisson 的 RSet
通过调用这些命令来实现分布式的 set 功能。
优点:
- 分布式特性:
RSet
可以跨多个节点使用,适用于分布式系统和微服务架构。 - 性能:由于 Redis 是基于内存的数据结构存储,所以
RSet
提供快速的读写性能。 - 高可用和持久化:Redis 支持数据的持久化,并且可以配置为主从复制模式,提供高可用性。
- 原子操作:Redis 的操作是原子性的,这意味着
RSet
的操作也是原子性的,适合并发环境。 - 丰富的操作:
RSet
支持 Redis set 提供的丰富操作,如集合运算等。 - 简单易用:Redisson 提供了与 Java 标准 Set 接口一致的操作,使得开发者可以方便地使用
RSet
。
缺点:
- 内存限制:由于 Redis 是基于内存的,大量数据的存储可能受到物理内存的限制。
- 成本:相比于基于磁盘的数据库,维护足够内存可能导致更高的成本。
- 数据一致性:在 Redis 集群模式下,网络分区或其他问题可能会影响数据的最终一致性。
- 复杂的集群管理:在分布式环境下,管理 Redis 集群可能比较复杂,需要处理节点的添加、移除和故障转移。
- 数据安全性:如果没有合适的配置和管理,Redis 数据可能会面临安全风险,如未授权的访问等。
流程图:
流程图:
- 开始操作,然后添加一个元素到
RSet
。 - 添加元素后,检查该元素是否存在于
RSet
中。 - 根据元素是否存在,决定是移除元素还是结束流程。
- 如果元素存在于
RSet
,则将其移除。 - 如果元素不存在,或者在移除元素之后,流程结束。
时序图:
时序图:
- 客户端首先创建或获取
RSet
的引用。 RSet
询问 Redis 服务器该集合是否存在。- Redis 服务器返回结果给
RSet
,然后RSet
将引用返回给客户端。 - 客户端请求向
RSet
添加一个元素。 RSet
向 Redis 服务器发送SADD
命令添加元素。- Redis 服务器返回添加结果,
RSet
将结果返回给客户端。 - 客户端检查一个元素是否存在于
RSet
中。 RSet
向 Redis 服务器发送SISMEMBER
命令检查元素。- Redis 服务器返回存在结果,
RSet
将结果返回给客户端。 - 如果元素存在,客户端请求从
RSet
移除该元素。 RSet
向 Redis 服务器发送SREM
命令移除元素。- Redis 服务器返回移除结果,
RSet
将结果返回给客户端。 - 客户端检查
RSet
是否为空。 RSet
向 Redis 服务器发送SCARD
命令获取集合大小。- Redis 服务器返回集合大小,
RSet
将结果返回给客户端。 - 如果集合为空,客户端请求清空
RSet
。 RSet
向 Redis 服务器发送DEL
命令删除集合。- Redis 服务器返回删除结果,
RSet
将结果返回给客户端。
工具类:
功能点:
- 容错机制:实现重试逻辑、异常处理和可能的回退策略。
- 易用性:提供简单的API,隐藏底层的复杂性。
- 高可用性:确保在 Redis 实例不可用时有策略来处理。
- 功能丰富:除了基本的 Set 操作,还可以添加监听器、排序、和元素过期等高级特性。
- 配置化重试策略:允许自定义重试次数和重试间隔,甚至使用更复杂的重试库,如 Resilience4j。
- 异步操作支持:为了不阻塞当前线程,可以提供异步API,让调用者能够以非阻塞的方式处理操作结果。
- 更细粒度的异常处理:区分不同类型的异常,并为每种异常类型提供不同的处理策略。
- 事件监听:允许注册事件监听器,以便在添加、删除或更新元素时触发自定义行为。
- 集群支持:确保在 Redis 集群环境中
RSet
操作的正确性,处理可能的跨槽(cross-slot)操作。 - 日志记录增强:提供更详细的日志记录,包括操作成功时的日志。
- 资源管理:确保在操作完成后释放所有资源,例如关闭连接。
- 监控和度量:集成监控系统来跟踪操作的性能和成功率。
源码:
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author derek_smart
* @Date 202/4/25 15:26
* @Description RSet 工具类
* <p>
*/
public class EnhancedRSet<E> {
private static final Logger logger = LoggerFactory.getLogger(EnhancedRSet.class);
private final RedissonClient redissonClient;
private final RSet<E> rSet;
// Executor for asynchronous operations
private final ExecutorService executorService = Executors.newCachedThreadPool();
public EnhancedRSet(RedissonClient redissonClient, String setName) {
this.redissonClient = redissonClient;
this.rSet = redissonClient.getSet(setName);
}
public boolean add(E e) {
try {
return retry(() -> rSet.add(e));
} catch (Exception ex) {
logger.error("Failed to add element to set", ex);
// Implement fallback strategy if needed
return false;
}
}
public boolean remove(Object o) {
try {
return retry(() -> rSet.remove(o));
} catch (Exception ex) {
logger.error("Failed to remove element from set", ex);
return false;
}
}
public boolean contains(Object o) {
try {
return retry(() -> rSet.contains(o));
} catch (Exception ex) {
logger.error("Failed to check if element is in set", ex);
return false;
}
}
public int size() {
try {
return retry(() -> rSet.size());
} catch (Exception ex) {
logger.error("Failed to get the size of the set", ex);
return -1;
}
}
public void clear() {
try {
retry(() -> {
rSet.clear();
return null;
});
} catch (Exception ex) {
logger.error("Failed to clear the set", ex);
}
}
// Implement other methods like addAll, removeAll, iterator, etc.
// Retry logic for operations
private <T> T retry(Callable<T> operation) throws Exception {
int attempts = 3;
for (int i = 0; i < attempts; i++) {
try {
return operation.call();
} catch (Exception ex) {
logger.warn("Operation failed on attempt {}", i + 1, ex);
if (i == attempts - 1) {
throw ex;
}
// Optionally, add some delay here
}
}
return null;
}
// Example of additional features: Element expiration
public void addWithExpiration(E e, long timeToLive, TimeUnit timeUnit) {
try {
retry(() -> {
rSet.add(e);
rSet.expire(timeToLive, timeUnit);
return null;
});
} catch (Exception ex) {
logger.error("Failed to add element with expiration to set", ex);
}
}
// Callable interface for retry operations
@FunctionalInterface
private interface Callable<T> {
T call() throws Exception;
}
// Improved retry with custom configuration
private <T> T retry(Callable<T> operation, int maxAttempts, long delay, TimeUnit timeUnit) throws Exception {
for (int i = 0; i < maxAttempts; i++) {
try {
return operation.call();
} catch (Exception ex) {
logger.warn("Operation failed on attempt {}", i + 1, ex);
if (i < maxAttempts - 1) {
timeUnit.sleep(delay);
} else {
throw ex;
}
}
}
return null;
}
// Asynchronous add operation
public CompletableFuture<Boolean> addAsync(E e) {
return CompletableFuture.supplyAsync(() -> {
try {
return retry(() -> rSet.add(e), 3, 100, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
logger.error("Failed to add element to set asynchronously", ex);
throw new RuntimeException("Async operation failed", ex);
}
}, executorService);
}
// ... other asynchronous operations ...
// Enhanced logging
public boolean addAyns(E e) {
try {
boolean result = retry(() -> rSet.add(e), 3, 100, TimeUnit.MILLISECONDS);
logger.info("Element added to set: {}", e);
return result;
} catch (Exception ex) {
logger.error("Failed to add element to set", ex);
return false;
}
}
// ... existing code ...
// Cleanup resources
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
使用示例:
让通过一个示例来演示如何使用 EnhancedRSet
类,并调用其中的所有方法。将创建一个 EnhancedRSet
实例,执行一系列操作,并在结束时清理资源。
假设已经有了一个 RedissonClient
实例。
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Author derek_smart
* @Date 202/4/25 17:25
* @Description EnhancedRSet 测试类
* <p>
*/
public class EnhancedRSetExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 配置 Redisson
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
// 创建 EnhancedRSet 实例
EnhancedRSet<String> enhancedRSet = new EnhancedRSet<>(redissonClient, "mySet");
// 添加元素
enhancedRSet.add("element1");
enhancedRSet.add("element2");
// 异步添加元素
enhancedRSet.addAsync("element3").thenAccept(result -> {
if (result) {
System.out.println("Element3 added asynchronously");
}
}).get(); // 等待异步操作完成,实际应用中应避免使用 get() 阻塞主线程
// 检查元素是否存在
boolean contains = enhancedRSet.contains("element1");
System.out.println("Set contains element1: " + contains);
// 获取集合大小
int size = enhancedRSet.size();
System.out.println("Size of set: " + size);
// 移除元素
enhancedRSet.remove("element2");
// 清空集合
enhancedRSet.clear();
// 添加元素并设置过期时间
enhancedRSet.addWithExpiration("element4", 10, TimeUnit.SECONDS);
// 关闭 EnhancedRSet 实例并释放资源
enhancedRSet.shutdown();
// 关闭 Redisson 客户端
redissonClient.shutdown();
}
}
使用总结:
这个示例中,首先配置了 Redisson 客户端,然后创建了 EnhancedRSet
实例。执行了一系列的操作,包括添加、异步添加、检查存在性、获取大小、移除和清空集合。还使用了 addWithExpiration
方法添加了一个带过期时间的元素。
对于异步添加操作,使用了 CompletableFuture
的 thenAccept
方法来处理异步结果。在实际应用中,通常不会在异步操作后立即调用 get()
方法,因为这会阻塞调用线程,而是根据程序的需求处理异步结果。
最后,调用 shutdown
方法来关闭 EnhancedRSet
实例并释放资源,然后关闭 Redisson 客户端。
请注意,这个示例假设 Redis 服务器已经在本地运行并且监听默认端口 6379。在实际部署中,需要根据实际的 Redis 服务器配置来设置 Config
对象。此外,由于异步操作可能会抛出异常,在 get()
方法后添加了异常处理(ExecutionException
和 InterruptedException
)。
转载自:https://juejin.cn/post/7365771381677965346