likes
comments
collection
share

基于Redission高级应用21-RScheduledExecutorService 分布式任务执行的解决方案

作者站长头像
站长
· 阅读数 302

概述:

写作原由

最近在学习分布式一些知识,想想之前在学习Redisson 时并没有使用其实现一个分布式任务执行的方案,故此通过学习发现RScheduledExecutorService 是其提供的一个好的工具。

引言

在构建分布式系统时,任务调度是一个不可或缺的功能,它允许在多个计算节点上安排、执行和管理定时任务。随着微服务和云基础设施的兴起,对于能够跨服务边界调度任务的需求日益增长。Redisson 提供的 RScheduledExecutorService 接口正是为了满足这种需求而设计的,它使得基于 Redis 的分布式任务调度变得简单而高效。然而,直接使用这个接口可能需要处理一些底层的细节,例如任务的重试逻辑和失败处理。为了简化这一过程,DistributedScheduledExecutorTool 应运而生,它是一个基于 RScheduledExecutorService 的高级工具,提供了易于使用的 API 和额外的功能,如自动重试和监听器回调,从而使得分布式任务调度更加健壮和灵活。本文将详细介绍如何利用这两个强大的工具,在分布式环境中有效地调度和管理任务。

原理

RScheduledExecutorService 是 Redission 提供的一个分布式的定时任务执行器,它实现了 Java 的 ScheduledExecutorService 接口。它允许在一个分布式的环境中安排任务在将来某个时间执行,或者周期性地执行。

RScheduledExecutorService 的工作原理基于 Redis 的发布/订阅机制和延时队列。以下是其工作流程的简化描述:

  1. 任务调度:当调度一个任务时,任务的信息和执行时间被保存在 Redis 中的一个延时队列里。

  2. 任务存储:任务的状态和数据被序列化并存储在 Redis 中,以便可以在任何节点上执行。

  3. 任务执行:Redission 节点订阅了任务执行的相关信息。当任务到达执行时间时,Redis 会发布一个消息给所有订阅的节点。

  4. 任务分配:其中一个节点(或者根据配置,可能是多个节点)接收到消息并从 Redis 中提取任务信息,然后开始执行任务。

  5. 任务完成:任务执行完成后,执行结果可以被存储回 Redis,并通知任务的提交者(如果需要)。

优点

  • 分布式执行RScheduledExecutorService 允许在多个节点上分布式地执行定时任务,这有助于负载均衡和高可用性。

  • 容错性:如果执行任务的节点失败,其他节点可以接管并执行该任务,从而提供更好的容错性。

  • 动态扩展:可以动态添加或移除执行任务的节点,不需要停止服务或重新调度现有的任务。

  • 持久化:任务信息存储在 Redis 中,即使所有应用节点都宕机,任务信息也不会丢失。

  • 灵活调度:支持一次性执行、固定延迟执行和固定速率执行等多种调度方式。

缺点

  • 依赖于RedisRScheduledExecutorService 的工作依赖于 Redis,如果 Redis 不可用,那么任务调度和执行也会受到影响。

  • 网络延迟:在分布式环境中,网络延迟可能会影响任务执行的精确时间。

  • 资源消耗:维护定时任务和执行任务可能会增加 Redis 的资源消耗,特别是在有大量任务时。

  • 序列化开销:任务状态和数据需要序列化和反序列化,这可能会引入额外的开销。

  • 复杂性:与本地的 ScheduledExecutorService 相比,分布式定时任务执行器引入了更多的复杂性,需要更多的配置和管理。

RScheduledExecutorServiceScheduledExecutorService总结:

RScheduledExecutorServiceScheduledExecutorService 都用于在将来的某个时间点执行任务,或者以固定的频率重复执行任务。它们的主要区别在于它们的运行环境和分布式能力。

ScheduledExecutorService 使用场景

ScheduledExecutorService 是 Java 标准库提供的一个接口,适用于以下场景:

  • 单机环境:适用于不需要跨多个JVM或服务器分布式执行任务的情况。
  • 简单定时任务:在单个应用程序内部安排一些简单的定时任务,如清理缓存、检查配置更新等。
  • 非持久化任务:如果应用程序重启,定时任务不需要持久化或重新调度。
  • 低容错要求:如果任务执行的节点发生故障,任务可以容忍丢失或手动干预重新调度。

RScheduledExecutorService使用场景

RScheduledExecutorService 是 Redission 提供的一个接口,它扩展了 ScheduledExecutorService 的功能,适用于以下场景:

  • 分布式环境:适用于需要跨多个JVM或服务器执行定时任务的分布式应用程序。
  • 高可用性:如果一个节点失败,其他节点可以接管并执行该节点的任务,从而提高了系统的可用性。
  • 任务持久化:任务即使在应用程序重启后也能够被保留和重新调度,因为它们被保存��� Redis 中。
  • 动态扩展:可以动态地添加或移除执行任务的节点,而不需要重新调度现有任务。
  • 负载均衡:任务可以在多个节点之间进行分配,从而实现负载均衡。
  • 集群监控和管理:可以通过 Redis 监控和管理集群中的任务执行情况。

和ScheduledExecutorService区别使用场景

基于上述特点,以下是两者区别使用场景的一些示例:

  • 如果应用程序是单体架构,只需要在单个服务器上执行定时任务,并且不需要任务的持久化或高可用性,那么 ScheduledExecutorService 就足够了。
  • 如果应用程序是微服务架构,需要在多个服务实例之间均衡执行定时任务,并且要求任务即使在服务重启后也能继续执行,那么 RScheduledExecutorService 就是更好的选择。
  • 如果需要在多个地理位置分布的服务器上执行定时任务,并且需要统一的任务管理和监控,RScheduledExecutorService 提供的分布式功能会很有帮助。

解决方案实现

工具类实现:

DistributedScheduledExecutorTool:分布式任务调度的利器在分布式系统中,任务调度是一项核心功能,它涉及到在多个节点上安排和执行定时任务。DistributedScheduledExecutorTool 是一个基于 Redisson 的分布式任务调度工具,它提供了一种简单而强大的方式来在 Java 应用程序中实现这一功能。本文档将详细介绍如何使用 DistributedScheduledExecutorTool 来调度、执行和管理分布式定时任务。

import org.redisson.Redisson;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.redisson.RedissonNode;
import org.redisson.config.RedissonNodeConfig;

import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * @Author derek_smart
 * @Date 202/6/14 8:08
 * 为分布式环境设计的定时任务工具类,它基于 Redisson 提供的 `RScheduledExecutorService`。
 * 提供了定时任务的调度、执行、取消以及优雅关闭等功能,并且可以处理任务执行失败的情况
 */
public class DistributedScheduledExecutorTool {

    private static final Logger logger = LoggerFactory.getLogger(DistributedScheduledExecutorTool.class);

    private final RScheduledExecutorService scheduledExecutorService;
    private final RedissonClient redissonClient;
    private RedissonNode redissonNode;

    /**
     * 构造函数,初始化 Redisson 客户端并启动一个 Redisson Node 用于执行定时任务。
     *
     * @param redisUrl Redis 服务器的 URL。
     * @param workers  用于执行任务的工作线程数量。
     */
    public DistributedScheduledExecutorTool(String redisUrl, int workers) {
        Config config = new Config();
        config.useSingleServer().setAddress(redisUrl);
        this.redissonClient = Redisson.create(config);

        // Configure the node for RScheduledExecutorService
        RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
        nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("myScheduledExecutor", workers));
        this.redissonNode = RedissonNode.create(nodeConfig);
        this.redissonNode.start();

        this.scheduledExecutorService = redissonClient.getExecutorService("myScheduledExecutor");
    }

    /**
     * 调度一个 Callable 任务在指定的延迟后执行,并支持重试机制和任务监听器。
     *
     * @param task       待执行的任务。
     * @param delay      延迟时间。
     * @param timeUnit   延迟时间的单位。
     * @param maxRetries 最大重试次数。
     * @param listener   任务监听器。
     * @return 返回任务的 ScheduledFuture。
     */
    public <V> ScheduledFuture<?> schedule(Callable<V> task, long delay, TimeUnit timeUnit, int maxRetries, TaskListener<V> listener) {
        Callable<V> retryingTask = () -> {
            int retries = 0;
            while (true) {
                try {
                    V result = task.call();
                    if (listener != null) {
                        listener.onSuccess(result);
                    }
                    return result;
                } catch (Exception e) {
                    if (retries >= maxRetries) {
                        if (listener != null) {
                            listener.onFailure(e);
                        }
                        throw e;
                    }
                    logger.warn("Task failed, attempt " + retries + " of " + maxRetries);
                    retries++;
                    TimeUnit.SECONDS.sleep(1); // Wait before retry
                }
            }
        };
        return schedule(retryingTask, delay, timeUnit);
    }

    /**
     * 辅助方法,用于实际调度一个任务而不带重试逻辑。
     *
     * @param task     待执行的任务。
     * @param delay    延迟时间。
     * @param timeUnit 延迟时间的单位。
     * @return 返回任务的 ScheduledFuture。
     */
    private <V> ScheduledFuture<?> schedule(Callable<V> task, long delay, TimeUnit timeUnit) {
        try {
            return scheduledExecutorService.schedule(task, delay, timeUnit);
        } catch (Exception e) {
            handleTaskFailure(e);
            return null;
        }
    }

    /**
     * 取消一个尚未执行的定时任务。
     *
     * @param future 任务的 ScheduledFuture。
     * @return 如果任务被成功取消,则返回 true;否则返回 false。
     */
    public boolean cancelScheduledTask(ScheduledFuture<?> future) {
        if (future != null && !future.isDone()) {
            return future.cancel(true);
        }
        return false;
    }

    /**
     * 优雅地关闭调度执行器服务,等待已经调度的任务完成。
     */
    public void shutdownGracefully() {
        scheduledExecutorService.shutdown();
        try {
            if (!scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
                scheduledExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduledExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 调度一个 Runnable 任务以固定的频率执行。
     *
     * @param command      待执行的任务。
     * @param initialDelay 初始延迟时间。
     * @param period       两次执行之间的时间间隔。
     * @param unit         时间的单位。
     * @return 返回任务的 ScheduledFuture。
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    /**
     * 调度一个 Runnable 任务在每次执行完成后,等待指定的延迟时间再次执行。
     *
     * @param command      待执行的任务。
     * @param initialDelay 初始延迟时间。
     * @param delay        两次执行之间的延迟时间。
     * @param unit         时间的单位。
     * @return 返回任务的 ScheduledFuture。
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

    /**
     * 处理任务调度失败的情况,记录错误信息。
     *
     * @param e 异常对象。
     */
    private void handleTaskFailure(Exception e) {
        logger.error("Task scheduling failed", e);
    }

    /**
     * 关闭 Redisson Node 和 Redisson 客户端,释放相关资源。
     */
    public void shutdown() {
        if (redissonNode != null) {
            redissonNode.shutdown();
        }
        if (redissonClient != null) {
            redissonClient.shutdown();
        }
    }

    /**
     * 任务监听器接口,用于提供任务执行成功或失败时的回调。
     *
     * @param <V> 任务执行的返回类型。
     */
    public interface TaskListener<V> {
        void onSuccess(V result);

        void onFailure(Throwable t);
    }
}

基于Redission高级应用21-RScheduledExecutorService 分布式任务执行的解决方案 基于Redission高级应用21-RScheduledExecutorService 分布式任务执行的解决方案

时序图:

Client ApplicationDistributedScheduledExecutorToolRScheduledExecutorServiceRedisson NodeScheduled TaskListenerTool starts RedissonNodeWraps task with retry logicalt[Retrylogic]alt[Task execution successful][Task execution failed]All resources are releasedInitialize (redisUrl, workers)Get ExecutorServiceConnect to NodeNode readyschedule(task, delay, timeUnit, retries, listener)Schedule wrapped taskDispatch taskExecute taskonSuccess(result)Handle successonFailure(exception)Handle failureRetry after delaycancelScheduledTask(future)Cancel taskSignal cancellationCancel task if not startedshutdownGracefully()Shutdown executor serviceAwait terminationConfirm shutdownshutdown()Shutdown nodeNode shutdownRedisson client shutdownClient ApplicationDistributedScheduledExecutorToolRScheduledExecutorServiceRedisson NodeScheduled TaskListener

在这个时序图中,展示了客户端应用程序如何通过 DistributedScheduledExecutorTool 初始化、调度任务、取消任务、优雅关闭和完全关闭的过程。时序图还展示了任务执行成功和失败的处理,以及重试逻辑的工作方式。

流程图:

Yes
No
Yes
No
Yes
No
Yes
No
Start
Initialize DistributedScheduledExecutorTool with Redis URL and Workers
Task to Schedule?
Schedule Task with Delay and Retry Logic
Proceed to Shutdown?
Task Executed Successfully?
Trigger onSuccess Callback
Has Max Retries Reached?
Task Completed
Trigger onFailure Callback
Retry Task after Specified Delay
Shutdown Gracefully
Shutdown Immediately
Wait for Tasks to Complete
Force Shutdown
Executor Service Shutdown
Redisson Node Shutdown
Redisson Client Shutdown
End

流程图,可以看到:

  • 开始时,需要初始化 DistributedScheduledExecutorTool,包括配置 Redis URL 和工作线程数量。
  • 然后检查是否有任务需要调度。
  • 如果有任务,它会被调度,包括设置延迟和重试逻辑。
  • 一旦任务执行,会检查是否成功执行。
  • 如果任务成功,将触发 onSuccess 回调。
  • 如果任务失败,将检查是否达到最大重试次数。
  • 如果达到最大重试次数,将触发 onFailure 回调。
  • 如果没有达到最大重试次数,任务将在指定的延迟后重试。
  • 最后,检查是否需要关闭工具类。
  • 如果需要优雅关闭,将等待所有任务完成。
  • 如果需要立即关闭,将强制关闭执行器服务。
  • 无论是优雅关闭还是立即关闭,最终都会关闭 Redisson Node 和 Redisson 客户端。
  • 最后,流程结束。

DistributedScheduledExecutorTool总结:

DistributedScheduledExecutorTool 是一个功能丰富的分布式任务调度工具,它利用 Redisson 提供了一个可靠的解决方案,适合于需要跨多个节点执行定时任务的复杂应用程序。通过该类,应该能够开始在 Java 应用程序中使用这个工具来调度和管理定时任务。

测试类实现:

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
 * @Author derek_smart
 * @Date 202/6/14 8:08
 * DistributedScheduledExecutorTool 测试类
 */
public class DistributedScheduledExecutorToolTest {

    public static void main(String[] args) {
        // 初始化工具类
        DistributedScheduledExecutorTool executorTool = new DistributedScheduledExecutorTool("redis://127.0.0.1:6379", 5);

        // 场景:调度一个可能需要重试的任务
        ScheduledFuture<?> future = executorTool.schedule(
                () -> {
                    // 模拟任务逻辑
                    if (Math.random() > 0.7) {
                        throw new Exception("Random failure");
                    }
                    return "Task completed";
                },
                5, // 5秒后执行
                TimeUnit.SECONDS,
                3, // 最多重试3次
                new DistributedScheduledExecutorTool.TaskListener<String>() {
                    @Override
                    public void onSuccess(String result) {
                        System.out.println("Task succeeded with result: " + result);
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        System.out.println("Task failed: " + t.getMessage());
                    }
                }
        );

        // 场景:取消一个已经调度的任务
        boolean cancelled = executorTool.cancelScheduledTask(future);
        if (cancelled) {
            System.out.println("Task was successfully cancelled.");
        } else {
            System.out.println("Task could not be cancelled.");
        }

        // 场景:以固定频率执行任务
        ScheduledFuture<?> fixedRateFuture = executorTool.scheduleAtFixedRate(
                () -> System.out.println("Fixed rate task is running"),
                0, // 立即开始
                1, // 每1秒执行一次
                TimeUnit.SECONDS
        );

        // 场景:在每次执行完成后,等待指定的延迟时间再执行任务
        ScheduledFuture<?> fixedDelayFuture = executorTool.scheduleWithFixedDelay(
                () -> System.out.println("Fixed delay task is running"),
                0, // 立即开始
                1, // 任务完成后等待1秒
                TimeUnit.SECONDS
        );

        // 模拟应用运行一段时间
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // 场景:优雅关闭所有正在执行的任务
        executorTool.shutdownGracefully();

        // 场景:关闭 Redisson 客户端和所有相关资源
        executorTool.shutdown();
    }
}

基于Redission高级应用21-RScheduledExecutorService 分布式任务执行的解决方案

总结:

在分布式系统中,任务调度是一个关键组件,它需要跨多个服务器和应用实例协调工作。Redisson 的 RScheduledExecutorService 为这一需求提供了解决方案,使得开发者能够利用 Redis 的能力来实现分布式定时任务。

RScheduledExecutorService 提供了一系列方法,包括延迟执行任务、定期执行任务等,这些方法都是分布式环境下的任务调度的基础。然而,直接使用 RScheduledExecutorService 可能会涉及到一些复杂的设置和错误处理。

DistributedScheduledExecutorTool 应运而生,它封装了 RScheduledExecutorService 的复杂性,提供了一个更友好的 API。它允许开发者调度任务,并自动处理任务重试机制,同时通过任务监听器提供了任务成功或失败的回调。这样,开发者可以专注于任务的逻辑,而不是调度的细节。

此外,DistributedScheduledExecutorTool 还提供了优雅关闭和立即关闭的方法,确保在应用程序关闭时可以正确地处理正在执行的任务。这些特性使得 DistributedScheduledExecutorTool 成为在需要分布式任务调度时的理想选择。

总的来说,RScheduledExecutorServiceDistributedScheduledExecutorTool 为 Java 开发者提供了一种简单且强大的方法,以实现在分布式环境中的任务调度。不仅提高了代码的可维护性,也确保了任务执行的可靠性和高效性。

转载自:https://juejin.cn/post/7380649024933986344
评论
请登录