SpringBoot + WebSocket 入门(-)
前言:最近业务存在一个新需求:当用户支付成功后,服务端需要向用户推送一条新订单信息,类似于美团外卖的新订单提醒。基于这个需求,首先想到的便是使用 Websocket 进行消息推送
业务模型
编码
代码结构
maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
注意:websocket 版本需要和 SpringBoot 版本保持一致
WebSocket 配置
Websocket 属性配置 WebSocketConfig.java
@Configuration
//使用 broker 形式使用 WS
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private WebSocketDecoratorFactory webSocketDecoratorFactory;
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic"); //设置消息头
registry.setUserDestinationPrefix("/user"); //设置单点用户头
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//设置WS连接地址 @1
registry.addEndpoint("/push")
// 握手拦截处理 @2
.addInterceptors(new HandShakeInterceptor())
// 握手信息处理,这一步主要存储 sessionTemple 的用户会话
// @3
.setHandshakeHandler(new DefaultHandshakeHandler() {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
//从 attributes 中取出预先设置的用户信息
Integer mmId = (Integer) attributes.get("mmId");
return () -> mmId + "";
}
})
//设置跨域
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
//设置会话管理
//@4
registration.addDecoratorFactory(webSocketDecoratorFactory);
}
}
代码解析
@1: 设置WS请求地址,实例: ws://127.0.0.1:8080/push @2: 处理WS连接握手过程中的鉴权操作,可在此步骤中进行用户信息验证 @3: 处理WS连接握手后的会话身份设置,主要将鉴权操作中获取到的用户信息设置到连接会话上 @4: 会话管理,处理会话的连接和断开
握手管理 HandShakeInterceptor.java
public class HandShakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
ServletServerHttpRequest req = (ServletServerHttpRequest) request;
//获取token认证
String token = req.getServletRequest().getParameter("atoken");
if (KtStr.empty(token)) {
return false;
}
/** 业务逻辑鉴权代码
JAccount account = null;
String[] tokens = KtEncry.deIdToken(token, null);
if (tokens.length == 2) {
account = new JAccount();
account.setId(KtCvt.to(tokens[0], int.class));
account = ApiAuth.accountCtx.get(KtCvt.to(tokens[0], int.class), JAccount.class, false);
if (account != null) {
if (account.getPassTime() <= UtContext.sTime() || !tokens[1].equals(account.getToken())) {
account = null;
}
}
}
**/
if (account == null) {
return false;
}
//将授权信息设置到属性中
attributes.put("mmId", account.getId());
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
上述代码比较简单,主要是从请求的url中获取到用于鉴权的 atoken,然后根据业务逻辑进行鉴权操作
会话内存管理 SocketManager.java
public class SocketManager {
private static Logger LOGGER = LoggerFactory.getLogger(SocketManager.class);
//定义 ConcurrentHashMap 存储用户编号和用户会话
private static ConcurrentHashMap<String, WebSocketSession> manager = new ConcurrentHashMap<String, WebSocketSession>();
public static void add(String key, WebSocketSession webSocketSession) {
LOGGER.info("新添加webSocket连接 {} ", key);
manager.put(key, webSocketSession);
}
public static void remove(String key) {
LOGGER.info("移除webSocket连接 {} ", key);
manager.remove(key);
}
public static WebSocketSession get(String key) {
LOGGER.info("获取webSocket连接 {}", key);
return manager.get(key);
}
}
上述代码主要是对 manager 这个map进行操作。
会话连接与销毁和向指定用户发送消息 WebSocketDecoratorFactory.java
@Component
public class WebSocketDecoratorFactory implements WebSocketHandlerDecoratorFactory {
//消息发送
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
private static Logger LOGGER = LoggerFactory.getLogger(WebSocketDecoratorFactory.class);
@Override
public WebSocketHandler decorate(WebSocketHandler handler) {
return new WebSocketHandlerDecorator(handler) {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
LOGGER.info("有人连接啦 sessionId = {}", session.getId());
Principal principal = session.getPrincipal();
if (principal != null) {
LOGGER.info("key = {} 存入", principal.getName());
// 身份校验成功,缓存socket连接
SocketManager.add(principal.getName(), session);
}
super.afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
LOGGER.info("有人退出连接啦 sessionId = {}", session.getId());
Principal principal = session.getPrincipal();
if (principal != null) {
// 身份校验成功,移除socket连接
SocketManager.remove(principal.getName());
}
super.afterConnectionClosed(session, closeStatus);
}
};
}
/**
* 给某个用户发送消息
*
* @param mmId 目标用户
* @param destination 目标topic
* @param msg 消息内容
*/
public boolean sendMessageToUser(Integer mmId, String destination, String msg) throws IOException {
WebSocketSession session = SocketManager.get(mmId + "");
if (session != null && session.isOpen()) {
simpMessagingTemplate.convertAndSendToUser(mmId + "", destination, msg);
return true;
}
return false;
}
}
上述代码中,主要是实现了 public WebSocketHandler decorate(WebSocketHandler handler) {}
方法,重写了接口中的 afterConnectionEstablished
连接
和 afterConnectionClosed
断开 两个方法,处理当前用户的连接会话。
sendMessageToUser
方法则是用来给指定用户发送指定 topic 发送消息
前端代码
<!DOCTYPE html>
<html>
<head>
<title>Hello WebSocket</title>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.4.1/css/bootstrap.min.css" rel="stylesheet">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/sockjs-client/1.4.0/sockjs.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<div id="main-content" class="container" style="margin-top: 10px;">
<div class="row">
<form class="navbar-form" style="margin-left:0px">
<div class="col-md-12">
<label>请求Token:</label>
<div class="form-group">
<input type="text" id="token" class="form-control" placeholder="请求token">
</div>
<button class="btn btn-warning" onclick="setToken();" type="button">token</button>
<div class="form-group">
<label>WebSocket 连接:</label>
<button class="btn btn-primary" type="button" onclick="connect();">进行连接</button>
<button class="btn btn-danger" type="button" onclick="disconnect();">断开连接</button>
</div>
<label>订阅地址:</label>
<div class="form-group">
<input type="text" id="subscribe" class="form-control" placeholder="订阅地址">
</div>
<button class="btn btn-warning" onclick="subscribeSocket();" type="button">订阅</button>
</div>
</form>
</div>
</br>
<div class="row">
<div class="form-group">
<label for="content">发送的消息内容:</label>
<input type="text" id="content" class="form-control" placeholder="消息内容">
</div>
<button class="btn btn-info" onclick="sendMessageNoParameter();" type="button">发送</button>
</div>
</br>
<div class="row">
<div class="col-md-12">
<h5 class="page-header" style="font-weight:bold">接收到的消息:</h5>
<table class="table table-striped">
<tbody id="information"></tbody>
</table>
</div>
</div>
</div>
</body>
</html>
<script>
// 设置 STOMP 客户端
var stompClient = null;
// 请求token
var TOKEN = "";
// 设置 WebSocket 进入端点
var SOCKET_ENDPOINT = "http://127.0.0.1:8787/push?atoken=";
// 设置订阅消息的请求前缀
var SUBSCRIBE_PREFIX = "/user"
// 设置订阅消息的请求地址
var SUBSCRIBE = "/topic/orderSub";
// 设置服务器端点,访问服务器中哪个接口
var SEND_ENDPOINT = "/app/push";
/* 进行连接 */
function connect() {
// 设置 SOCKET
if (!TOKEN) {
alert("请先配置token")
} else {
var socket = new SockJS(SOCKET_ENDPOINT + TOKEN);
// 配置 STOMP 客户端
stompClient = Stomp.over(socket);
// STOMP 客户端连接
stompClient.connect({}, function (frame) {
console.warn("---------- 连接成功 ----------");
});
}
}
function setToken() {
TOKEN = $("#token").val();
}
/* 订阅信息 */
function subscribeSocket() {
// 设置订阅地址
SUBSCRIBE = SUBSCRIBE_PREFIX + $("#subscribe").val();
// 输出订阅地址
console.warn(`---------- 设置订阅地址为: ${SUBSCRIBE}----------`);
// 执行订阅消息
stompClient.subscribe(SUBSCRIBE, function (responseBody) {
var receiveMessage = JSON.parse(responseBody.body);
$("#information").append("<tr><td>" + responseBody.body + "</td></tr>");
});
}
/* 断开连接 */
function disconnect() {
stompClient.disconnect(function () {
alert("断开连接");
});
}
/* 发送消息并指定目标地址(这里设置的目标地址为自身订阅消息的地址,当然也可以设置为其它地址) */
function sendMessageNoParameter() {
// 设置发送的内容
var sendContent = $("#content").val();
// 设置待发送的消息内容
var message = '{"destination": "' + SUBSCRIBE + '", "content": "' + sendContent + '"}';
// 发送消息
stompClient.send(SEND_ENDPOINT, {}, message);
}
</script>
前端代码中,通过地址 http://127.0.0.1:8787/push?atoken=xxx
进行连接后端 ws 地址,随后订阅了 /user/topic/orderSub
的消息,
此时调用 sendMessageToUser()
方法,向 /topic/orderSub
发送消息时,改用户便可以收到对应消息
转载自:https://juejin.cn/post/6915650050781609998