使用WebSocket和RabbitMQ实现任务进度推送
1、WebSocket简述
WebSocket 是 HTML5 新增的 API,是一种基于 TCP 连接的持久化双向通信协议。
WebSocket 默认连接端口是80,运行端口是443。
WebSocket 连接地址示例(以 ws 或者 wss 开头):ws://text.com:8080 或 wss://text.com:8080(加密)。
Springboot项目导入WebSocket依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
注册WebSocket节点
@Slf4j
@Configuration
public class WebSocketConfig {
/**
* ServerEndpointExporter 作用
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
log.info("serverEndpointExporter被注入了");
return new ServerEndpointExporter();
}
}
参考文章:# WebSocket 简述
2、RabbitMQ简述
RabbitMQ是一款开源的消息中间件,实现了高级消息队列协议(AMQP),提供了可靠的消息传递机制,用于在分布式系统中传递和存储消息。
Springboot项目导入RabbitMQ依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml文件中配置信息:
spring:
rabbitmq:
host: 172.16.10.XXX
port: 5672
virtual-host: /
username: XXX
password: XXXXXX
生产者生产消息,并向消息队列发送消息简单示例,这里以接口的形式呈现。
若要生产进度消息,还需要再任务模型里进行设计,或按分页计算进度,或预估计算时间
@Slf4j
@RestController
@RequestMapping("/inform")
public class MQProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
ObjectMapper mapper = new ObjectMapper();
@RequestMapping("/test")
public Result<String> index(String msgType, String userId, String sessionId, Float result) throws JsonProcessingException {
AnalysisTaskProgressMessage<Float> progressMsg = new AnalysisTaskProgressMessage<>(msgType, userId, sessionId, result);
String progressMsgJson = mapper.writeValueAsString(progressMsg);
rabbitTemplate.convertAndSend("simple.queue", progressMsgJson);
log.info(progressMsgJson + "消息发送成功!");
return Result.OK("消息发送成功!");
}
}
3、注册WebSocket端点
@Slf4j
@ServerEndpoint(value = "/webSocket/analysisTask/{userId}", encoders = {BaseModelEncoder.class})
@Component
public class AnalysisTaskWSSvr {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static ConcurrentHashMap<AnalysisTaskCompositeId, AnalysisTaskWSSvr> webSocketMap = new ConcurrentHashMap<>();
/**
* 记录当前在线用户的ID
*/
private static Set<String> onlineUserSet = new CopyOnWriteArraySet<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收 AnalysisTaskCompositeId
*/
private AnalysisTaskCompositeId analysisTaskCompositeId;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId, @PathParam("requestId") String requestId) {
// 设置长连接时间
// session.setMaxIdleTimeout(3600000);
this.session = session;
AnalysisTaskCompositeId analysisTaskId = new AnalysisTaskCompositeId(userId, this.session.getId());
this.analysisTaskCompositeId = analysisTaskId;
if (webSocketMap.containsKey(analysisTaskId)) {
webSocketMap.remove(analysisTaskId);
webSocketMap.put(analysisTaskId, this);
} else {
// 加入set中
webSocketMap.put(analysisTaskId, this);
addOnlineCount(analysisTaskId.getUserId());
}
try {
AnalysisTaskWSSvr.sendSuccessMessage(analysisTaskId);
} catch (IOException e) {
throw new RuntimeException(e);
}
log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(analysisTaskCompositeId)) {
webSocketMap.remove(analysisTaskCompositeId);
//从set中删除
subOnlineCount(analysisTaskCompositeId.getUserId());
}
log.info("用户退出:" + analysisTaskCompositeId.getUserId() + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:" + analysisTaskCompositeId.getUserId() + ", 报文:" + message);
if (StringUtils.isEmpty(message)) return;
// TODO: 接收信息,调用相关服务
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.warn("用户错误:" + this.analysisTaskCompositeId.getUserId() + ",原因:" + error.getMessage());
error.printStackTrace();
}
public static void sendSuccessMessage(AnalysisTaskCompositeId compositeId) throws IOException {
AnalysisTaskWSSvr webSocketServer = webSocketMap.get(compositeId);
webSocketServer.session.getBasicRemote().sendText(String.valueOf(compositeId));
}
public static int getOnlineCount() {
return onlineUserSet.size();
}
public static void addOnlineCount(String userId) {
if (!onlineUserSet.contains(userId)) {
onlineUserSet.add(userId);
}
}
public static void subOnlineCount(String userId) {
if (onlineUserSet.contains(userId)) {
onlineUserSet.remove(userId);
}
}
}
4、监听队列,消费消息
构建消息监听器,同时调用相关功能处理消息,这里有多种方式可以选择:
1、采用Direct订阅模型,不同的消息被不同的队列进行消费;
2、采用简单队列模式,生产者生产的消息采用同一个方式进行封装,并标注好消息类型,监听器接收到消息后会根据消息类型分发给不同的websocket连接。
@Slf4j
@Configuration
@RequiredArgsConstructor
public class AnalysisTaskProgressListener {
/**
* 监听任务进度状态
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "analysis.queue1", durable = "true"),
exchange = @Exchange(name = "analysisTask", type = ExchangeTypes.DIRECT),
key = "task1"
))
public void listenTaskProgress(String msg) {
executeTask(msg);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
executeTask(msg);
}
```
private static void executeTask(String msg) {
log.info("监听到消息" + msg);
JSONObject jsonMsg = JSON.parseObject(msg);
// 对消息类型做判断
AnalysisTaskWSSvr.sendProgressMessage(
jsonMsg.getString("userId"),
jsonMsg.getString("sessionId"),
jsonMsg.getString("taskId"),
jsonMsg.getString("taskType"),
jsonMsg.getFloatValue("progress")
);
}
}
5、使用vue3构建WebSocket连接
这里使用vue3构建前端界面,并建立WebSocket连接
WebSocket连接:ws://localhost:8889/webSocket/analysisTask/#{userId}
export default class SocketService {
static instance = null;
static get Instance() {
if (!this.instance) {
this.instance = new SocketService();
}
return this.instance;
}
// 和服务端连接的socket对象
ws = null;
// 存储回调函数
callBackMapping = {};
// 标识是否连接成功
connected = false;
// 记录重试的次数
sendRetryCount = 0;
// 重新连接尝试的次数
connectRetryCount = 0;
// 定义连接服务器的方法
connect() {
// 连接服务器
if (!window.WebSocket) {
return console.log('您的浏览器不支持WebSocket');
}
// let token = $.cookie('123');
// let token = '4E6EF539AAF119D82AC4C2BC84FBA21F';
let url = 'ws://localhost:8889/webSocket/analysisTask/001';
this.ws = new WebSocket(url);
// 连接成功的事件
this.ws.onopen = () => {
console.log('连接服务端成功了');
this.connected = true;
// 重置重新连接的次数
this.connectRetryCount = 0;
};
// 1.连接服务端失败
// 2.当连接成功之后, 服务器关闭的情况
this.ws.onclose = () => {
console.log('连接服务端失败');
this.connected = false;
this.connectRetryCount++;
setTimeout(() => {
this.connect();
}, 500 * this.connectRetryCount);
};
// 得到服务端发送过来的数据
this.ws.onmessage = msg => {
console.log(msg.data, '从服务端获取到了数据');
};
}
// 回调函数的注册
registerCallBack(socketType, callBack) {
this.callBackMapping[socketType] = callBack;
}
// 取消某一个回调函数
unRegisterCallBack(socketType) {
this.callBackMapping[socketType] = null;
}
// 发送数据的方法
send(data) {
// 判断此时此刻有没有连接成功
if (this.connected) {
this.sendRetryCount = 0;
try {
this.ws.send(JSON.stringify(data));
} catch (e) {
this.ws.send(data);
}
} else {
this.sendRetryCount++;
setTimeout(() => {
this.send(data);
}, this.sendRetryCount * 500);
}
}
}
<script setup lang="js">
import SocketService from "../websocket/SocketService.js"
import { reactive } from "vue";
const data = reactive({
socketServe: SocketService.Instance,
});
const sendData = () => {
data.socketServe.send({
"type": "SoilFertilityAnalysisTask",
"taskId": "002"
});
console.log('发送数据');
};
const buildWS = () => {
SocketService.Instance.connect();
data.socketServe = SocketService.Instance;
data.socketServe.registerCallBack('callback1', data.socketServe);
setTimeout(() => {
sendData()
}, 1000)
}
</script>
<template>
<div>
<button @click="buildWS">运行</button>
</div>
</template>
<style scoped>
.read-the-docs {
color: #888;
}
</style>
效果展示如下:
转载自:https://juejin.cn/post/7383561807178514432