基于Redission高级应用17-RBatch原理及工具类封装及实战应用
RBatch 是 Redisson 提供的一个功能,它允许在 Redis 服务器上以批量的方式执行多个操作。 这种方式可以显著减少客户端与服务器之间的往返次数(round-trip time, RTT),从而提高性能,特别是在执行大量操作时。
原理
RBatch
的工作原理主要基于两种 Redis 提供的特性:PIPELINING
和 TRANSACTIONS
。
Pipelining:
- Pipelining 是一种网络优化技术,它允许客户端一次性发送多个命令到服务器,而不需要等待每个命令的响应。
- Redis 服务器在接收到这些命令后,会将它们排队并顺序执行,然后将所有命令的响应一次性返回给客户端。
- 这种方式可以显著减少等待每个命令响应所产生的网络延迟,特别是在执行大量命令时。
Transactions:
- Redis 事务使用
MULTI
,EXEC
,DISCARD
和WATCH
命令来实现。 - 当一个事务开始时(
MULTI
),Redis 会将后续的所有命令放入一个队列,然后在EXEC
命令被调用时一次性执行所有命令。 - 如果在执行
EXEC
之前调用DISCARD
,则会取消事务中的所有命令。 WATCH
命令可以用来提供乐观锁功能,它会监视一个或多个键,如果在事务执行之前这些键被其他客户端改变了,那么事务将不会执行。
RBatch
结合了这两种特性,允许开发者以两种模式使用批量操作:
1. 执行模式(Execution Mode):
- IN_MEMORY_ATOMIC:使用 Redis 事务特性,确保所有命令作为一个原子操作执行。如果任何命令在执行时失败,整个事务都会回滚。
- IN_MEMORY_NON_ATOMIC:使用 Pipelining,但不保证原子性。命令会被快速执行,即使有命令失败,其他命令也会继续执行。
2. 响应模式(Response Mode):
- SYNCHRONIZED:等待所有命令执行完毕并返回响应。
- ASYNCHRONIZED:不等待命令执行完毕,不获取响应。
RBatch 通过收集客户端的操作请求,并在客户端本地缓存这些请求。当调用 execute()
或 executeAsync()
方法时,RBatch 会将所有缓存的操作请求发送到 Redis 服务器,然后根据选择的模式(事务或管道)执行。
RBatch流程图:
在这个流程图中,我们可以看到以下步骤:
- 启动:
- 流程开始时,首先创建
RBatch
实例。
- 流程开始时,首先创建
- 添加命令:
- 向
RBatch
实例添加一系列要批量执行的 Redis 命令。
- 向
- 执行模式决策:
- 根据需要执行的模式,决定是同步执行还是异步执行。
- 同步执行:
- 如果选择同步执行,调用
batch.execute()
方法。 - 等待响应并接收
BatchResult
。
- 如果选择同步执行,调用
- 处理同步结果:
- 处理接收到的同步
BatchResult
。
- 处理接收到的同步
- 异步执行:
- 如果选择异步执行,调用
batch.executeAsync()
方法。 - 立即返回
CompletableFuture
。
- 如果选择异步执行,调用
- 附加回调:
- 为异步执行结果的
CompletableFuture
附加回调函数。
- 为异步执行结果的
- 异步执行完成:
- 当
CompletableFuture
完成时,触发回调函数。
- 当
- 处理异步结果:
- 处理异步执行的结果。
- 结束:
- 批量操作执行完成,无论是同步还是异步,都会到达流程的结束点。
在这个时序图中,我们可以看到以下步骤:
- 创建 RBatch:
- 客户端应用请求 Redisson 客户端创建一个
RBatch
实例。
- 客户端应用请求 Redisson 客户端创建一个
- 添加命令:
- 客户端应用向
RBatch
实例添加一系列要执行的 Redis 命令。
- 客户端应用向
- 执行批量操作:
- 客户端应用请求 Redisson 客户端执行批量操作,这可以是同步或异步。
- 同步执行:
- 对于同步执行,Redisson 客户端将批量命令发送到 Redis 服务器。
- Redis 服务器执行这些命令并返回结果。
- Redisson 客户端将结果作为
BatchResult
返回给客户端应用。
- 异步执行:
- 对于异步执行,Redisson 客户端异步地将批量命令发送到 Redis 服务器。
- 客户端应用接收到一个
CompletableFuture
。 - 客户端应用可以选择附加一个回调函数到
CompletableFuture
。 - 一旦批量操作完成,Redis 服务器返回结果,如果有回调函数,它将被触发。
优点
- 减少网络延迟:
- 通过将多个操作合并为单个网络请求,RBatch 可以减少网络往返延迟,特别是在网络延迟较高的环境中效果显著。
- 提高吞吐量:
- 批量执行操作可以提高应用程序和 Redis 服务器的吞吐量。
- 灵活性:
- RBatch 支持同步和异步执行,为不同的应用场景提供灵活性。
- 原子性操作:
- 在事务模式下,RBatch 可以保证一组操作的原子性。
缺点
- 内存占用:
- 当批量操作很多时,客户端和服务器可能需要更多的内存来缓存这些操作。
- 复杂性增加:
- 管理批量操作的逻辑可能会增加代码复杂性。
- 事务模式的性能开销:
- 在事务模式下,如果批量操作中包含大量的写操作,可能会阻塞 Redis 服务器,因为 Redis 是单线程的,其他客户端的请求需要等待事务完成。
- 错误处理:
- 在管道模式中,如果某个操作失败,其他操作仍然会继续执行,可能需要额外的逻辑来处理这种部分失败的情况。
- 资源管理:
- 如果不正确管理 RBatch 对象,可能会导致资源泄露。 综上所述,RBatch 是一个强大的工具,可以在合适的场景下显著提高性能。然而,开发者需要根据具体的应用场景和需求来权衡使用 RBatch 的利弊。
工具类:
源码:
import org.redisson.api.RBatch;
import org.redisson.api.RedissonClient;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.client.RedisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* @Author derek_smart
* @Date 202/5/14 8:18
* @Description RBatch 工具类
*/
public class BatchExecutor {
private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
private final RedissonClient redissonClient;
public BatchExecutor(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public <T> T execute(BatchOperation operation, Function<BatchResult<?>, T> resultProcessor, BatchOptions options) {
RBatch batch = redissonClient.createBatch(options);
operation.apply(batch);
try {
BatchResult<?> result = batch.execute();
return resultProcessor.apply(result);
} catch (RedisException e) {
logger.error("Batch operation failed", e);
throw e;
}
}
public CompletableFuture<Void> executeAsync(BatchOperation operation, Consumer<BatchResult<?>> resultConsumer, BatchOptions options) {
RBatch batch = redissonClient.createBatch(options);
operation.apply(batch);
CompletableFuture<BatchResult<?>> future = (CompletableFuture<BatchResult<?>>) batch.executeAsync();
return future.thenAcceptAsync(result -> {
try {
resultConsumer.accept(result);
} catch (Exception e) {
logger.error("Async batch operation failed", e);
}
});
}
@FunctionalInterface
public interface BatchOperation {
void apply(RBatch batch);
}
}
时序图:
在这个时序图中,我们展示了两个不同的流程:同步批量执行和异步批量执行。
- 同步批量执行:
- 客户端应用调用
BatchExecutor
的execute
方法,并传入批量操作、结果处理函数和批量选项。 BatchExecutor
调用 Redisson 客户端创建一个RBatch
实例。- 批量操作命令被添加到
RBatch
中。 BatchExecutor
调用RBatch
的execute
方法,该方法将所有命令发送到 Redis 服务器执行。- Redis 服务器执行这些命令并返回结果给 Redisson 客户端。
- Redisson 客户端将结果作为
BatchResult
返回给BatchExecutor
。 BatchExecutor
使用结果处理函数处理BatchResult
,并将处理后的结果返回给客户端应用。
- 客户端应用调用
- 异步批量执行:
- 客户端应用调用
BatchExecutor
的executeAsync
方法,并传入批量操作、结果消费者函数和批量选项。 BatchExecutor
调用 Redisson 客户端创建一个RBatch
实例。 批量操作命令被添加到RBatch
中。BatchExecutor
调用RBatch
的executeAsync
方法,该方法将所有命令异步发送到 Redis 服务器执行。- Redis 服务器执行这些命令并异步返回结果给 Redisson 客户端。
- Redisson 客户端返回一个
CompletableFuture<BatchResult>
给BatchExecutor
。 BatchExecutor
将一个CompletableFuture<Void>
返回给客户端应用。- 客户端应用将结果消费者函数附加到
CompletableFuture
上。 - 客户端应用可以继续执行其他工作,当批量操作完成时,结果消费者函数会被调用以异步处理结果。
- 客户端应用调用
流程图:
在这个流程图中,我们可以看到以下步骤:
- 启动:
- 流程开始时,首先创建
BatchExecutor
实例。
- 流程开始时,首先创建
- 设置批量操作:
- 定义
BatchOperation
,其中包含将要批量执行的 Redis 命令。
- 定义
- 同步执行:
- 如果选择同步执行 (
Sync Execute
),则调用BatchExecutor
的execute
方法。 - 执行批量操作,并等待结果 (
Execute Batch
)。 - 处理并返回批量执行的结果 (
Process Result
)。
- 如果选择同步执行 (
- 异步执行:
- 如果选择异步执行 (
Async Execute
),则调用BatchExecutor
的executeAsync
方法。 - 异步执行批量操作,立即返回
CompletableFuture
(Execute Batch Async
)。 - 附加回调函数以处理异步结果 (
Attach Callback
)。 - 当批量操作完成时,回调函数被触发,处理结果 (
Result Consumer
)。
- 如果选择异步执行 (
- 结束:
- 最终,同步或异步流程都会结束,返回处理后的结果。
功能总结
1. 批量操作的封装:
- BatchExecutor
提供了一个简洁的接口来收集和执行多个 Redis 命令。
2. 同步和异步执行支持:
- 支持同步 (execute
) 和异步 (executeAsync
) 执行批量操作,以适应不同的应用场景和性能需求。
3. 结果处理:
- 允许用户定义如何处理批量操作的结果,通过传递一个处理函数或消费者函数。
4. 自定义批量操作配置:
- 用户可以通过 BatchOptions
参数自定义批量操作的行为(如执行模式、响应模式等)。
5. 异常处理和日志记录:
- 提供了异常处理机制,并通过日志记录来帮助开发者诊断问题。
6. 资源管理:
- 确保在操作完成后释放资源,避免潜在的资源泄露。
测试类:
import org.redisson.Redisson;
import org.redisson.api.BatchResult;
import org.redisson.api.RedissonClient;
import org.redisson.api.BatchOptions;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @Author derek_smart
* @Date 202/5/14 8:51
* @Description RBatch 测试类
*/
public class BatchExecutorExample {
private static final Logger logger = LoggerFactory.getLogger(BatchExecutorExample.class);
public static void main(String[] args) {
// 初始化 Redisson 客户端
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
// 创建 BatchExecutor 实例
BatchExecutor batchExecutor = new BatchExecutor(redissonClient);
// 自定义 BatchOptions
BatchOptions options = BatchOptions.defaults()
.executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC);
// 执行同步批量操作并处理结果
try {
List<?> responses = batchExecutor.execute(batch -> {
batch.getMap("derekMap").fastPutAsync("key1", "value1");
batch.getMap("derekMap").fastPutAsync("key2", "value2");
batch.getSet("derekSet").addAsync("value1");
batch.getSet("derekSet").addAsync("value2");
}, BatchResult::getResponses, options);
// 打印批量操作的结果
responses.forEach(response -> logger.info("Sync batch response: {}", response));
} catch (Exception e) {
logger.error("Synchronous batch operation failed", e);
}
// 执行异步批量操作并处理结果
CompletableFuture<Void> future = batchExecutor.executeAsync(batch -> {
batch.getMap("derekMap").fastPutAsync("key3", "value3");
batch.getMap("derekMap").fastPutAsync("key4", "value4");
batch.getSet("derekSet").addAsync("value3");
batch.getSet("derekSet").addAsync("value4");
}, result -> {
result.getResponses().forEach(response -> logger.info("Async batch response: {}", response));
}, options);
// 等待异步批量操作完成
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Asynchronous batch operation failed", e);
}
// 关闭 Redisson 客户端
redissonClient.shutdown();
}
}
总结
RBatch
提供了一种高效的方式来执行大量的 Redis 命令,它通过减少网络延迟和利用 Redis 的事务或管道特性来提高性能。可以根据具体的需求选择合适的执行模式和响应模式。
转载自:https://juejin.cn/post/7369534111076220979