🚀💥最新面试题,JVM突然崩溃了,线程池队列的任务会丢失,设计持久化方案
周一闲来无事,如期接到了面试电话,面试官小哥哥收了收困意,试探性地确定了下,是否是要面试的我。我言辞犀利,准确肯定地回答,确保了本次面试没有什么差错。
面试官: 看你说你对JAVA很熟悉,常规基础问题,我们先来问一下线程池的问题吧。
我: 好的,这个我最拿手了,你说这个,我可一点都不困... 😎
面试官: 比如线程池内存池,当JVM挂了,队列内的任务丢了没有?
我(思考): 卧槽,这些内存池,老大JVM都GG了,还玩个毛,毛都没有了啊。。。😱
我: 会丢失,这种情况,我们一般直接就使用MQ这种可以持久化消息的方案,不再使用线程池了。
面试官: 我们现在就讨论针对线程池的方案,如何做持久化方案。
我(思考): 我擦,搁以前,我说个MQ替代就完事了,还能这么墨迹。。。😅
我: 这个是优雅挂的,还是暴力挂的?
我(思考): 这个时候,我试图想通过一些JVM提供的钩子函数,做一些事情,所以想确定下挂的方式。。。
面试官: 就是挂了,啥也没有了,你怎么弄吗。
我(思考): 不讲武德啊,上来就挂挂挂,我这使用的啥服务器,总是挂。。。😩
我: 好的,如果JVM挂了,队列里的任务确实会丢失。为了避免这种情况,我们可以使用数据库来持久化任务。当任务被提交时,我们先将任务信息存储到数据库中。一旦任务执行完成,我们更新数据库中的任务状态。这样,即使JVM挂了,我们也可以从数据库中恢复未完成的任务。具体的实现步骤如下...
当JVM停止时,线程池中的任务会丢失,未执行的任务将不会被继续执行
。具体行为取决于JVM的关闭过程以及线程池的状态管理。以下是一些详细说明:
1. 正常关闭与强制关闭
正常关闭
在正常关闭过程中,JVM会尝试完成正在执行的任务并处理一些清理工作。对于线程池,可以使用ExecutorService
的shutdown()
方法来优雅地关闭线程池。这种方法会拒绝新的任务提交,但会继续执行已提交的任务。
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
强制关闭
如果使用shutdownNow()
方法,线程池将立即尝试停止所有活动任务并停止处理等待队列中的任务:
List<Runnable> pendingTasks = executorService.shutdownNow();
shutdownNow()
方法会返回一个未处理任务的列表,这些任务被丢弃,不会被执行。
2. JVM 意外停止
如果JVM由于某些原因(如进程被杀死、系统崩溃等)意外停止,那么线程池中的所有任务将立即停止,未完成的任务将被丢失。这种情况无法通过编程方式控制或恢复。
3. 数据持久化
为了避免任务丢失,可以采用以下措施:
- 任务持久化:将任务存储在数据库、消息队列或持久化存储中,以便在系统重启后可以重新获取和执行。
- 使用分布式任务调度系统:如Quartz、Spring Batch等,这些框架提供了任务调度和持久化支持。
我来开始设计持久化方案了
要实现一个可靠的任务持久化方案,可以使用数据库和一个持久化任务队列。以下是一个详细的实现方案,示例使用MySQL数据库和Java。
1. 数据库设计
首先,需要设计一个数据库表来存储任务信息。假设我们有一个名为tasks
的表,结构如下:
CREATE TABLE tasks (
task_id INT AUTO_INCREMENT PRIMARY KEY,
task_data VARCHAR(255) NOT NULL,
status ENUM('PENDING', 'COMPLETED') DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
2. 任务持久化和执行逻辑
任务持久化
任务提交时,将任务信息存储到数据库中,状态为PENDING
。
任务执行
从数据库中加载PENDING
状态的任务并提交到线程池执行。任务执行完成后,更新任务状态为COMPLETED
。
3. 示例代码
连接数据库的工具类
首先,创建一个工具类来管理数据库连接:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DatabaseUtil {
private static final String URL = "jdbc:mysql://localhost:3306/mydb";
private static final String USER = "user";
private static final String PASSWORD = "password";
public static Connection getConnection() throws SQLException {
return DriverManager.getConnection(URL, USER, PASSWORD);
}
}
任务管理类
任务管理类负责任务的持久化和加载:
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class TaskManager {
// 保存任务到数据库
public void saveTask(String taskData) {
String sql = "INSERT INTO tasks (task_data) VALUES (?)";
try (Connection conn = DatabaseUtil.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, taskData);
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
// 从数据库中加载待处理任务
public List<Task> loadPendingTasks() {
String sql = "SELECT * FROM tasks WHERE status = 'PENDING'";
List<Task> tasks = new ArrayList<>();
try (Connection conn = DatabaseUtil.getConnection();
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
int taskId = rs.getInt("task_id");
String taskData = rs.getString("task_data");
tasks.add(new Task(taskId, taskData));
}
} catch (SQLException e) {
e.printStackTrace();
}
return tasks;
}
// 更新任务状态
public void updateTaskStatus(int taskId, String status) {
String sql = "UPDATE tasks SET status = ? WHERE task_id = ?";
try (Connection conn = DatabaseUtil.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, status);
ps.setInt(2, taskId);
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
任务类
任务类表示一个任务对象:
public class Task {
private int taskId;
private String taskData;
public Task(int taskId, String taskData) {
this.taskId = taskId;
this.taskData = taskData;
}
public int getTaskId() {
return taskId;
}
public String getTaskData() {
return taskData;
}
}
任务执行类
任务执行类实现Runnable
接口,并在任务完成后更新任务状态:
public class TaskExecutor implements Runnable {
private Task task;
private TaskManager taskManager;
public TaskExecutor(Task task, TaskManager taskManager) {
this.task = task;
this.taskManager = taskManager;
}
@Override
public void run() {
System.out.println("Executing task: " + task.getTaskData());
try {
// 模拟任务执行
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 更新任务状态为 COMPLETED
taskManager.updateTaskStatus(task.getTaskId(), "COMPLETED");
System.out.println("Task " + task.getTaskData() + " completed.");
}
}
主类
主类负责初始化线程池、加载任务并提交执行:
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TaskScheduler {
public static void main(String[] args) {
TaskManager taskManager = new TaskManager();
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 加载未完成的任务
List<Task> pendingTasks = taskManager.loadPendingTasks();
for (Task task : pendingTasks) {
executorService.submit(new TaskExecutor(task, taskManager));
}
// 示例:提交新任务
taskManager.saveTask("New Task 1");
taskManager.saveTask("New Task 2");
// 加载并执行新任务
List<Task> newTasks = taskManager.loadPendingTasks();
for (Task task : newTasks) {
executorService.submit(new TaskExecutor(task, taskManager));
}
executorService.shutdown();
}
}
通过上述步骤,您可以实现一个可靠的任务持久化和执行系统,即使在JVM停止或崩溃后,也能重新加载并执行未完成的任务。
转载自:https://juejin.cn/post/7384623060199047187