基于Redission高级应用21-RScheduledExecutorService 分布式任务执行的解决方案
概述:
写作原由
最近在学习分布式一些知识,想想之前在学习Redisson 时并没有使用其实现一个分布式任务执行的方案,故此通过学习发现RScheduledExecutorService 是其提供的一个好的工具。
引言
在构建分布式系统时,任务调度是一个不可或缺的功能,它允许在多个计算节点上安排、执行和管理定时任务。随着微服务和云基础设施的兴起,对于能够跨服务边界调度任务的需求日益增长。Redisson 提供的 RScheduledExecutorService
接口正是为了满足这种需求而设计的,它使得基于 Redis 的分布式任务调度变得简单而高效。然而,直接使用这个接口可能需要处理一些底层的细节,例如任务的重试逻辑和失败处理。为了简化这一过程,DistributedScheduledExecutorTool
应运而生,它是一个基于 RScheduledExecutorService
的高级工具,提供了易于使用的 API 和额外的功能,如自动重试和监听器回调,从而使得分布式任务调度更加健壮和灵活。本文将详细介绍如何利用这两个强大的工具,在分布式环境中有效地调度和管理任务。
原理
RScheduledExecutorService
是 Redission 提供的一个分布式的定时任务执行器,它实现了 Java 的 ScheduledExecutorService
接口。它允许在一个分布式的环境中安排任务在将来某个时间执行,或者周期性地执行。
RScheduledExecutorService
的工作原理基于 Redis 的发布/订阅机制和延时队列。以下是其工作流程的简化描述:
-
任务调度:当调度一个任务时,任务的信息和执行时间被保存在 Redis 中的一个延时队列里。
-
任务存储:任务的状态和数据被序列化并存储在 Redis 中,以便可以在任何节点上执行。
-
任务执行:Redission 节点订阅了任务执行的相关信息。当任务到达执行时间时,Redis 会发布一个消息给所有订阅的节点。
-
任务分配:其中一个节点(或者根据配置,可能是多个节点)接收到消息并从 Redis 中提取任务信息,然后开始执行任务。
-
任务完成:任务执行完成后,执行结果可以被存储回 Redis,并通知任务的提交者(如果需要)。
优点
-
分布式执行:
RScheduledExecutorService
允许在多个节点上分布式地执行定时任务,这有助于负载均衡和高可用性。 -
容错性:如果执行任务的节点失败,其他节点可以接管并执行该任务,从而提供更好的容错性。
-
动态扩展:可以动态添加或移除执行任务的节点,不需要停止服务或重新调度现有的任务。
-
持久化:任务信息存储在 Redis 中,即使所有应用节点都宕机,任务信息也不会丢失。
-
灵活调度:支持一次性执行、固定延迟执行和固定速率执行等多种调度方式。
缺点
-
依赖于Redis:
RScheduledExecutorService
的工作依赖于 Redis,如果 Redis 不可用,那么任务调度和执行也会受到影响。 -
网络延迟:在分布式环境中,网络延迟可能会影响任务执行的精确时间。
-
资源消耗:维护定时任务和执行任务可能会增加 Redis 的资源消耗,特别是在有大量任务时。
-
序列化开销:任务状态和数据需要序列化和反序列化,这可能会引入额外的开销。
-
复杂性:与本地的
ScheduledExecutorService
相比,分布式定时任务执行器引入了更多的复杂性,需要更多的配置和管理。
RScheduledExecutorService
和 ScheduledExecutorService
总结:
RScheduledExecutorService
和 ScheduledExecutorService
都用于在将来的某个时间点执行任务,或者以固定的频率重复执行任务。它们的主要区别在于它们的运行环境和分布式能力。
ScheduledExecutorService
使用场景
ScheduledExecutorService
是 Java 标准库提供的一个接口,适用于以下场景:
- 单机环境:适用于不需要跨多个JVM或服务器分布式执行任务的情况。
- 简单定时任务:在单个应用程序内部安排一些简单的定时任务,如清理缓存、检查配置更新等。
- 非持久化任务:如果应用程序重启,定时任务不需要持久化或重新调度。
- 低容错要求:如果任务执行的节点发生故障,任务可以容忍丢失或手动干预重新调度。
RScheduledExecutorService使用场景
RScheduledExecutorService
是 Redission 提供的一个接口,它扩展了 ScheduledExecutorService
的功能,适用于以下场景:
- 分布式环境:适用于需要跨多个JVM或服务器执行定时任务的分布式应用程序。
- 高可用性:如果一个节点失败,其他节点可以接管并执行该节点的任务,从而提高了系统的可用性。
- 任务持久化:任务即使在应用程序重启后也能够被保留和重新调度,因为它们被保存��� Redis 中。
- 动态扩展:可以动态地添加或移除执行任务的节点,而不需要重新调度现有任务。
- 负载均衡:任务可以在多个节点之间进行分配,从而实现负载均衡。
- 集群监控和管理:可以通过 Redis 监控和管理集群中的任务执行情况。
和ScheduledExecutorService区别使用场景
基于上述特点,以下是两者区别使用场景的一些示例:
- 如果应用程序是单体架构,只需要在单个服务器上执行定时任务,并且不需要任务的持久化或高可用性,那么
ScheduledExecutorService
就足够了。 - 如果应用程序是微服务架构,需要在多个服务实例之间均衡执行定时任务,并且要求任务即使在服务重启后也能继续执行,那么
RScheduledExecutorService
就是更好的选择。 - 如果需要在多个地理位置分布的服务器上执行定时任务,并且需要统一的任务管理和监控,
RScheduledExecutorService
提供的分布式功能会很有帮助。
解决方案实现
工具类实现:
DistributedScheduledExecutorTool:分布式任务调度的利器在分布式系统中,任务调度是一项核心功能,它涉及到在多个节点上安排和执行定时任务。DistributedScheduledExecutorTool
是一个基于 Redisson 的分布式任务调度工具,它提供了一种简单而强大的方式来在 Java 应用程序中实现这一功能。本文档将详细介绍如何使用 DistributedScheduledExecutorTool
来调度、执行和管理分布式定时任务。
import org.redisson.Redisson;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.redisson.RedissonNode;
import org.redisson.config.RedissonNodeConfig;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* @Author derek_smart
* @Date 202/6/14 8:08
* 为分布式环境设计的定时任务工具类,它基于 Redisson 提供的 `RScheduledExecutorService`。
* 提供了定时任务的调度、执行、取消以及优雅关闭等功能,并且可以处理任务执行失败的情况
*/
public class DistributedScheduledExecutorTool {
private static final Logger logger = LoggerFactory.getLogger(DistributedScheduledExecutorTool.class);
private final RScheduledExecutorService scheduledExecutorService;
private final RedissonClient redissonClient;
private RedissonNode redissonNode;
/**
* 构造函数,初始化 Redisson 客户端并启动一个 Redisson Node 用于执行定时任务。
*
* @param redisUrl Redis 服务器的 URL。
* @param workers 用于执行任务的工作线程数量。
*/
public DistributedScheduledExecutorTool(String redisUrl, int workers) {
Config config = new Config();
config.useSingleServer().setAddress(redisUrl);
this.redissonClient = Redisson.create(config);
// Configure the node for RScheduledExecutorService
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("myScheduledExecutor", workers));
this.redissonNode = RedissonNode.create(nodeConfig);
this.redissonNode.start();
this.scheduledExecutorService = redissonClient.getExecutorService("myScheduledExecutor");
}
/**
* 调度一个 Callable 任务在指定的延迟后执行,并支持重试机制和任务监听器。
*
* @param task 待执行的任务。
* @param delay 延迟时间。
* @param timeUnit 延迟时间的单位。
* @param maxRetries 最大重试次数。
* @param listener 任务监听器。
* @return 返回任务的 ScheduledFuture。
*/
public <V> ScheduledFuture<?> schedule(Callable<V> task, long delay, TimeUnit timeUnit, int maxRetries, TaskListener<V> listener) {
Callable<V> retryingTask = () -> {
int retries = 0;
while (true) {
try {
V result = task.call();
if (listener != null) {
listener.onSuccess(result);
}
return result;
} catch (Exception e) {
if (retries >= maxRetries) {
if (listener != null) {
listener.onFailure(e);
}
throw e;
}
logger.warn("Task failed, attempt " + retries + " of " + maxRetries);
retries++;
TimeUnit.SECONDS.sleep(1); // Wait before retry
}
}
};
return schedule(retryingTask, delay, timeUnit);
}
/**
* 辅助方法,用于实际调度一个任务而不带重试逻辑。
*
* @param task 待执行的任务。
* @param delay 延迟时间。
* @param timeUnit 延迟时间的单位。
* @return 返回任务的 ScheduledFuture。
*/
private <V> ScheduledFuture<?> schedule(Callable<V> task, long delay, TimeUnit timeUnit) {
try {
return scheduledExecutorService.schedule(task, delay, timeUnit);
} catch (Exception e) {
handleTaskFailure(e);
return null;
}
}
/**
* 取消一个尚未执行的定时任务。
*
* @param future 任务的 ScheduledFuture。
* @return 如果任务被成功取消,则返回 true;否则返回 false。
*/
public boolean cancelScheduledTask(ScheduledFuture<?> future) {
if (future != null && !future.isDone()) {
return future.cancel(true);
}
return false;
}
/**
* 优雅地关闭调度执行器服务,等待已经调度的任务完成。
*/
public void shutdownGracefully() {
scheduledExecutorService.shutdown();
try {
if (!scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 调度一个 Runnable 任务以固定的频率执行。
*
* @param command 待执行的任务。
* @param initialDelay 初始延迟时间。
* @param period 两次执行之间的时间间隔。
* @param unit 时间的单位。
* @return 返回任务的 ScheduledFuture。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
}
/**
* 调度一个 Runnable 任务在每次执行完成后,等待指定的延迟时间再次执行。
*
* @param command 待执行的任务。
* @param initialDelay 初始延迟时间。
* @param delay 两次执行之间的延迟时间。
* @param unit 时间的单位。
* @return 返回任务的 ScheduledFuture。
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
/**
* 处理任务调度失败的情况,记录错误信息。
*
* @param e 异常对象。
*/
private void handleTaskFailure(Exception e) {
logger.error("Task scheduling failed", e);
}
/**
* 关闭 Redisson Node 和 Redisson 客户端,释放相关资源。
*/
public void shutdown() {
if (redissonNode != null) {
redissonNode.shutdown();
}
if (redissonClient != null) {
redissonClient.shutdown();
}
}
/**
* 任务监听器接口,用于提供任务执行成功或失败时的回调。
*
* @param <V> 任务执行的返回类型。
*/
public interface TaskListener<V> {
void onSuccess(V result);
void onFailure(Throwable t);
}
}
时序图:
在这个时序图中,展示了客户端应用程序如何通过 DistributedScheduledExecutorTool
初始化、调度任务、取消任务、优雅关闭和完全关闭的过程。时序图还展示了任务执行成功和失败的处理,以及重试逻辑的工作方式。
流程图:
流程图,可以看到:
- 开始时,需要初始化
DistributedScheduledExecutorTool
,包括配置 Redis URL 和工作线程数量。 - 然后检查是否有任务需要调度。
- 如果有任务,它会被调度,包括设置延迟和重试逻辑。
- 一旦任务执行,会检查是否成功执行。
- 如果任务成功,将触发
onSuccess
回调。 - 如果任务失败,将检查是否达到最大重试次数。
- 如果达到最大重试次数,将触发
onFailure
回调。 - 如果没有达到最大重试次数,任务将在指定的延迟后重试。
- 最后,检查是否需要关闭工具类。
- 如果需要优雅关闭,将等待所有任务完成。
- 如果需要立即关闭,将强制关闭执行器服务。
- 无论是优雅关闭还是立即关闭,最终都会关闭 Redisson Node 和 Redisson 客户端。
- 最后,流程结束。
DistributedScheduledExecutorTool总结:
DistributedScheduledExecutorTool
是一个功能丰富的分布式任务调度工具,它利用 Redisson 提供了一个可靠的解决方案,适合于需要跨多个节点执行定时任务的复杂应用程序。通过该类,应该能够开始在 Java 应用程序中使用这个工具来调度和管理定时任务。
测试类实现:
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* @Author derek_smart
* @Date 202/6/14 8:08
* DistributedScheduledExecutorTool 测试类
*/
public class DistributedScheduledExecutorToolTest {
public static void main(String[] args) {
// 初始化工具类
DistributedScheduledExecutorTool executorTool = new DistributedScheduledExecutorTool("redis://127.0.0.1:6379", 5);
// 场景:调度一个可能需要重试的任务
ScheduledFuture<?> future = executorTool.schedule(
() -> {
// 模拟任务逻辑
if (Math.random() > 0.7) {
throw new Exception("Random failure");
}
return "Task completed";
},
5, // 5秒后执行
TimeUnit.SECONDS,
3, // 最多重试3次
new DistributedScheduledExecutorTool.TaskListener<String>() {
@Override
public void onSuccess(String result) {
System.out.println("Task succeeded with result: " + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("Task failed: " + t.getMessage());
}
}
);
// 场景:取消一个已经调度的任务
boolean cancelled = executorTool.cancelScheduledTask(future);
if (cancelled) {
System.out.println("Task was successfully cancelled.");
} else {
System.out.println("Task could not be cancelled.");
}
// 场景:以固定频率执行任务
ScheduledFuture<?> fixedRateFuture = executorTool.scheduleAtFixedRate(
() -> System.out.println("Fixed rate task is running"),
0, // 立即开始
1, // 每1秒执行一次
TimeUnit.SECONDS
);
// 场景:在每次执行完成后,等待指定的延迟时间再执行任务
ScheduledFuture<?> fixedDelayFuture = executorTool.scheduleWithFixedDelay(
() -> System.out.println("Fixed delay task is running"),
0, // 立即开始
1, // 任务完成后等待1秒
TimeUnit.SECONDS
);
// 模拟应用运行一段时间
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 场景:优雅关闭所有正在执行的任务
executorTool.shutdownGracefully();
// 场景:关闭 Redisson 客户端和所有相关资源
executorTool.shutdown();
}
}
总结:
在分布式系统中,任务调度是一个关键组件,它需要跨多个服务器和应用实例协调工作。Redisson 的 RScheduledExecutorService
为这一需求提供了解决方案,使得开发者能够利用 Redis 的能力来实现分布式定时任务。
RScheduledExecutorService
提供了一系列方法,包括延迟执行任务、定期执行任务等,这些方法都是分布式环境下的任务调度的基础。然而,直接使用 RScheduledExecutorService
可能会涉及到一些复杂的设置和错误处理。
DistributedScheduledExecutorTool
应运而生,它封装了 RScheduledExecutorService
的复杂性,提供了一个更友好的 API。它允许开发者调度任务,并自动处理任务重试机制,同时通过任务监听器提供了任务成功或失败的回调。这样,开发者可以专注于任务的逻辑,而不是调度的细节。
此外,DistributedScheduledExecutorTool
还提供了优雅关闭和立即关闭的方法,确保在应用程序关闭时可以正确地处理正在执行的任务。这些特性使得 DistributedScheduledExecutorTool
成为在需要分布式任务调度时的理想选择。
总的来说,RScheduledExecutorService
和 DistributedScheduledExecutorTool
为 Java 开发者提供了一种简单且强大的方法,以实现在分布式环境中的任务调度。不仅提高了代码的可维护性,也确保了任务执行的可靠性和高效性。
转载自:https://juejin.cn/post/7380649024933986344