likes
comments
collection
share

基于Redission高级应用17-RBatch原理及工具类封装及实战应用

作者站长头像
站长
· 阅读数 56

RBatch 是 Redisson 提供的一个功能,它允许在 Redis 服务器上以批量的方式执行多个操作。 这种方式可以显著减少客户端与服务器之间的往返次数(round-trip time, RTT),从而提高性能,特别是在执行大量操作时。

原理

RBatch 的工作原理主要基于两种 Redis 提供的特性:PIPELININGTRANSACTIONS

Pipelining

  • Pipelining 是一种网络优化技术,它允许客户端一次性发送多个命令到服务器,而不需要等待每个命令的响应。
  • Redis 服务器在接收到这些命令后,会将它们排队并顺序执行,然后将所有命令的响应一次性返回给客户端。
  • 这种方式可以显著减少等待每个命令响应所产生的网络延迟,特别是在执行大量命令时。

Transactions

  • Redis 事务使用 MULTI, EXEC, DISCARDWATCH 命令来实现。
  • 当一个事务开始时(MULTI),Redis 会将后续的所有命令放入一个队列,然后在 EXEC 命令被调用时一次性执行所有命令。
  • 如果在执行 EXEC 之前调用 DISCARD,则会取消事务中的所有命令。
  • WATCH 命令可以用来提供乐观锁功能,它会监视一个或多个键,如果在事务执行之前这些键被其他客户端改变了,那么事务将不会执行。

RBatch 结合了这两种特性,允许开发者以两种模式使用批量操作:

1. 执行模式(Execution Mode)

  • IN_MEMORY_ATOMIC:使用 Redis 事务特性,确保所有命令作为一个原子操作执行。如果任何命令在执行时失败,整个事务都会回滚。
  • IN_MEMORY_NON_ATOMIC:使用 Pipelining,但不保证原子性。命令会被快速执行,即使有命令失败,其他命令也会继续执行。

2. 响应模式(Response Mode)

  • SYNCHRONIZED:等待所有命令执行完毕并返回响应。
  • ASYNCHRONIZED:不等待命令执行完毕,不获取响应。

RBatch 通过收集客户端的操作请求,并在客户端本地缓存这些请求。当调用 execute()executeAsync() 方法时,RBatch 会将所有缓存的操作请求发送到 Redis 服务器,然后根据选择的模式(事务或管道)执行。

RBatch流程图:

Create RBatch
Add Commands
Execute or ExecuteAsync
Synchronous
Asynchronous
Wait for Response
Process Result
Return Future
Attach Callback
Future Completes
Process Result
Start
RBatch
Add Commands to Batch
Execute Mode
batch.execute
batch.executeAsync
Wait for BatchResult
Process BatchResult
End
Return CompletableFuture
Attach Callback to Future
Future Completes
Process BatchResult

在这个流程图中,我们可以看到以下步骤:

  1. 启动:
    • 流程开始时,首先创建 RBatch 实例。
  2. 添加命令:
    • RBatch 实例添加一系列要批量执行的 Redis 命令。
  3. 执行模式决策:
    • 根据需要执行的模式,决定是同步执行还是异步执行。
  4. 同步执行:
    • 如果选择同步执行,调用 batch.execute() 方法。
    • 等待响应并接收 BatchResult
  5. 处理同步结果:
    • 处理接收到的同步 BatchResult
  6. 异步执行:
    • 如果选择异步执行,调用 batch.executeAsync() 方法。
    • 立即返回 CompletableFuture
  7. 附加回调:
    • 为异步执行结果的 CompletableFuture 附加回调函数。
  8. 异步执行完成:
    • CompletableFuture 完成时,触发回调函数。
  9. 处理异步结果:
    • 处理异步执行的结果。
  10. 结束:
  • 批量操作执行完成,无论是同步还是异步,都会到达流程的结束点。
Client ApplicationRedissonClientRedis Serveralt[Synchronous Execution][Asynchronous Execution]Create RBatchRBatch instanceAdd commands to RBatchCommands addedExecute batch (sync or async)Send batch commandsExecute commands and return resultsReturn BatchResultSend batch commands asynchronouslyExecute commandsReturn CompletableFutureAttach callback to CompletableFutureReturn results upon completionClient ApplicationRedissonClientRedis Server

在这个时序图中,我们可以看到以下步骤:

  1. 创建 RBatch:
    • 客户端应用请求 Redisson 客户端创建一个 RBatch 实例。
  2. 添加命令:
    • 客户端应用向 RBatch 实例添加一系列要执行的 Redis 命令。
  3. 执行批量操作:
    • 客户端应用请求 Redisson 客户端执行批量操作,这可以是同步或异步。
  4. 同步执行:
    • 对于同步执行,Redisson 客户端将批量命令发送到 Redis 服务器。
    • Redis 服务器执行这些命令并返回结果。
    • Redisson 客户端将结果作为 BatchResult 返回给客户端应用。
  5. 异步执行:
    • 对于异步执行,Redisson 客户端异步地将批量命令发送到 Redis 服务器。
    • 客户端应用接收到一个 CompletableFuture
    • 客户端应用可以选择附加一个回调函数到 CompletableFuture
    • 一旦批量操作完成,Redis 服务器返回结果,如果有回调函数,它将被触发。

优点

  1. 减少网络延迟:
    • 通过将多个操作合并为单个网络请求,RBatch 可以减少网络往返延迟,特别是在网络延迟较高的环境中效果显著。
  2. 提高吞吐量:
    • 批量执行操作可以提高应用程序和 Redis 服务器的吞吐量。
  3. 灵活性:
    • RBatch 支持同步和异步执行,为不同的应用场景提供灵活性。
  4. 原子性操作:
    • 在事务模式下,RBatch 可以保证一组操作的原子性。

缺点

  1. 内存占用:
    • 当批量操作很多时,客户端和服务器可能需要更多的内存来缓存这些操作。
  2. 复杂性增加:
    • 管理批量操作的逻辑可能会增加代码复杂性。
  3. 事务模式的性能开销:
    • 在事务模式下,如果批量操作中包含大量的写操作,可能会阻塞 Redis 服务器,因为 Redis 是单线程的,其他客户端的请求需要等待事务完成。
  4. 错误处理:
    • 在管道模式中,如果某个操作失败,其他操作仍然会继续执行,可能需要额外的逻辑来处理这种部分失败的情况。
  5. 资源管理:
    • 如果不正确管理 RBatch 对象,可能会导致资源泄露。 综上所述,RBatch 是一个强大的工具,可以在合适的场景下显著提高性能。然而,开发者需要根据具体的应用场景和需求来权衡使用 RBatch 的利弊。

工具类:

源码:

import org.redisson.api.RBatch;
import org.redisson.api.RedissonClient;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.client.RedisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
/**
 * @Author derek_smart
 * @Date 202/5/14 8:18
 * @Description RBatch 工具类
 */
public class BatchExecutor {

    private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);

    private final RedissonClient redissonClient;

    public BatchExecutor(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    public <T> T execute(BatchOperation operation, Function<BatchResult<?>, T> resultProcessor, BatchOptions options) {
        RBatch batch = redissonClient.createBatch(options);
        operation.apply(batch);
        try {
            BatchResult<?> result = batch.execute();
            return resultProcessor.apply(result);
        } catch (RedisException e) {
            logger.error("Batch operation failed", e);
            throw e;
        }
    }

    public CompletableFuture<Void> executeAsync(BatchOperation operation, Consumer<BatchResult<?>> resultConsumer, BatchOptions options) {
        RBatch batch = redissonClient.createBatch(options);
        operation.apply(batch);
        CompletableFuture<BatchResult<?>> future = (CompletableFuture<BatchResult<?>>) batch.executeAsync();
        return future.thenAcceptAsync(result -> {
            try {
                resultConsumer.accept(result);
            } catch (Exception e) {
                logger.error("Async batch operation failed", e);
            }
        });
    }

    @FunctionalInterface
    public interface BatchOperation {
        void apply(RBatch batch);
    }
}

基于Redission高级应用17-RBatch原理及工具类封装及实战应用

时序图:

Client ApplicationBatchExecutorRedissonClientRedis ServerSynchronous Batch ExecutionResult processed and returned to ClientAsynchronous Batch ExecutionResult consumed and processed asynchronouslyalt[Batch Execution Complete]execute(batchOperation, resultProcessor, options)createBatch(options)RBatchadd batchOperation commands to RBatchexecute()Send batch commandsExecute commands and return resultsBatchResultresultProcessor.apply(BatchResult)executeAsync(batchOperation, resultConsumer, options)createBatch(options)RBatchadd batchOperation commands to RBatchexecuteAsync()Send batch commandsExecute commands and return resultsCompletableFuture<BatchResult>CompletableFuture<Void>Attach resultConsumer to CompletableFutureContinue other workresultConsumer.accept(BatchResult)Client ApplicationBatchExecutorRedissonClientRedis Server

在这个时序图中,我们展示了两个不同的流程:同步批量执行和异步批量执行。

  1. 同步批量执行
    • 客户端应用调用 BatchExecutorexecute 方法,并传入批量操作、结果处理函数和批量选项。
    • BatchExecutor 调用 Redisson 客户端创建一个 RBatch 实例。
    • 批量操作命令被添加到 RBatch 中。
    • BatchExecutor 调用 RBatchexecute 方法,该方法将所有命令发送到 Redis 服务器执行。
    • Redis 服务器执行这些命令并返回结果给 Redisson 客户端。
    • Redisson 客户端将结果作为 BatchResult 返回给 BatchExecutor
    • BatchExecutor 使用结果处理函数处理 BatchResult,并将处理后的结果返回给客户端应用。
  2. 异步批量执行
    • 客户端应用调用 BatchExecutorexecuteAsync 方法,并传入批量操作、结果消费者函数和批量选项。
    • BatchExecutor 调用 Redisson 客户端创建一个 RBatch 实例。 批量操作命令被添加到 RBatch 中。
    • BatchExecutor 调用 RBatchexecuteAsync 方法,该方法将所有命令异步发送到 Redis 服务器执行。
    • Redis 服务器执行这些命令并异步返回结果给 Redisson 客户端。
    • Redisson 客户端返回一个 CompletableFuture<BatchResult>BatchExecutor
    • BatchExecutor 将一个 CompletableFuture<Void> 返回给客户端应用。
    • 客户端应用将结果消费者函数附加到 CompletableFuture 上。
    • 客户端应用可以继续执行其他工作,当批量操作完成时,结果消费者函数会被调用以异步处理结果。

流程图:

Create BatchExecutor
Define BatchOperation
Execute BatchOperation Synchronously
Execute
Return BatchResult
Return Processed Result
Execute BatchOperation Asynchronously
ExecuteAsync
Return CompletableFuture
Process Result Asynchronously
Complete Future
Start
BatchExecutor
BatchOperation
Sync Execute
Execute Batch
Process Result
End
Async Execute
Execute Batch Async
Attach Callback
Result Consumer

在这个流程图中,我们可以看到以下步骤:

  1. 启动:
    • 流程开始时,首先创建 BatchExecutor 实例。
  2. 设置批量操作:
    • 定义 BatchOperation,其中包含将要批量执行的 Redis 命令。
  3. 同步执行:
    • 如果选择同步执行 (Sync Execute),则调用 BatchExecutorexecute 方法。
    • 执行批量操作,并等待结果 (Execute Batch)。
    • 处理并返回批量执行的结果 (Process Result)。
  4. 异步执行:
    • 如果选择异步执行 (Async Execute),则调用 BatchExecutorexecuteAsync 方法。
    • 异步执行批量操作,立即返回 CompletableFuture (Execute Batch Async)。
    • 附加回调函数以处理异步结果 (Attach Callback)。
    • 当批量操作完成时,回调函数被触发,处理结果 (Result Consumer)。
  5. 结束:
    • 最终,同步或异步流程都会结束,返回处理后的结果。

功能总结

1. 批量操作的封装: - BatchExecutor 提供了一个简洁的接口来收集和执行多个 Redis 命令。 2. 同步和异步执行支持: - 支持同步 (execute) 和异步 (executeAsync) 执行批量操作,以适应不同的应用场景和性能需求。 3. 结果处理: - 允许用户定义如何处理批量操作的结果,通过传递一个处理函数或消费者函数。 4. 自定义批量操作配置: - 用户可以通过 BatchOptions 参数自定义批量操作的行为(如执行模式、响应模式等)。 5. 异常处理和日志记录: - 提供了异常处理机制,并通过日志记录来帮助开发者诊断问题。 6. 资源管理: - 确保在操作完成后释放资源,避免潜在的资源泄露。

测试类:

import org.redisson.Redisson;
import org.redisson.api.BatchResult;
import org.redisson.api.RedissonClient;
import org.redisson.api.BatchOptions;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
 * @Author derek_smart
 * @Date 202/5/14 8:51
 * @Description RBatch 测试类
 */
public class BatchExecutorExample {

    private static final Logger logger = LoggerFactory.getLogger(BatchExecutorExample.class);

    public static void main(String[] args) {
        // 初始化 Redisson 客户端
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redissonClient = Redisson.create(config);

        // 创建 BatchExecutor 实例
        BatchExecutor batchExecutor = new BatchExecutor(redissonClient);

        // 自定义 BatchOptions
        BatchOptions options = BatchOptions.defaults()
                .executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC);

        // 执行同步批量操作并处理结果
        try {
            List<?> responses = batchExecutor.execute(batch -> {
                batch.getMap("derekMap").fastPutAsync("key1", "value1");
                batch.getMap("derekMap").fastPutAsync("key2", "value2");
                batch.getSet("derekSet").addAsync("value1");
                batch.getSet("derekSet").addAsync("value2");
            }, BatchResult::getResponses, options);

            // 打印批量操作的结果
            responses.forEach(response -> logger.info("Sync batch response: {}", response));
        } catch (Exception e) {
            logger.error("Synchronous batch operation failed", e);
        }

        // 执行异步批量操作并处理结果
        CompletableFuture<Void> future = batchExecutor.executeAsync(batch -> {
            batch.getMap("derekMap").fastPutAsync("key3", "value3");
            batch.getMap("derekMap").fastPutAsync("key4", "value4");
            batch.getSet("derekSet").addAsync("value3");
            batch.getSet("derekSet").addAsync("value4");
        }, result -> {
            result.getResponses().forEach(response -> logger.info("Async batch response: {}", response));
        }, options);

        // 等待异步批量操作完成
        try {
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            logger.error("Asynchronous batch operation failed", e);
        }

        // 关闭 Redisson 客户端
        redissonClient.shutdown();
    }
}

基于Redission高级应用17-RBatch原理及工具类封装及实战应用

总结

RBatch 提供了一种高效的方式来执行大量的 Redis 命令,它通过减少网络延迟和利用 Redis 的事务或管道特性来提高性能。可以根据具体的需求选择合适的执行模式和响应模式。

转载自:https://juejin.cn/post/7369534111076220979
评论
请登录