likes
comments
collection
share

SpringBoot + WebSocket 入门(-)

作者站长头像
站长
· 阅读数 3

前言:最近业务存在一个新需求:当用户支付成功后,服务端需要向用户推送一条新订单信息,类似于美团外卖的新订单提醒。基于这个需求,首先想到的便是使用 Websocket 进行消息推送

业务模型

SpringBoot + WebSocket 入门(-)

编码

代码结构

SpringBoot + WebSocket 入门(-)

maven 依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>

注意:websocket 版本需要和 SpringBoot 版本保持一致

WebSocket 配置

Websocket 属性配置 WebSocketConfig.java

@Configuration
//使用 broker 形式使用 WS
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private WebSocketDecoratorFactory webSocketDecoratorFactory;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic"); //设置消息头
        registry.setUserDestinationPrefix("/user"); //设置单点用户头
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //设置WS连接地址 @1
        registry.addEndpoint("/push")
                // 握手拦截处理 @2
                .addInterceptors(new HandShakeInterceptor())
                // 握手信息处理,这一步主要存储 sessionTemple 的用户会话
                // @3
                .setHandshakeHandler(new DefaultHandshakeHandler() {
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
                        //从 attributes 中取出预先设置的用户信息
                        Integer mmId = (Integer) attributes.get("mmId");
                        return () -> mmId + "";
                    }
                })
                //设置跨域
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        //设置会话管理
        //@4
        registration.addDecoratorFactory(webSocketDecoratorFactory);
    }
}

代码解析

@1: 设置WS请求地址,实例: ws://127.0.0.1:8080/push @2: 处理WS连接握手过程中的鉴权操作,可在此步骤中进行用户信息验证 @3: 处理WS连接握手后的会话身份设置,主要将鉴权操作中获取到的用户信息设置到连接会话上 @4: 会话管理,处理会话的连接和断开

握手管理 HandShakeInterceptor.java

public class HandShakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        ServletServerHttpRequest req = (ServletServerHttpRequest) request;
        //获取token认证
        String token = req.getServletRequest().getParameter("atoken");
        if (KtStr.empty(token)) {
            return false;
        }

        /** 业务逻辑鉴权代码
        JAccount account = null;
        String[] tokens = KtEncry.deIdToken(token, null);
        if (tokens.length == 2) {
            account = new JAccount();
            account.setId(KtCvt.to(tokens[0], int.class));
            account = ApiAuth.accountCtx.get(KtCvt.to(tokens[0], int.class), JAccount.class, false);
            if (account != null) {
                if (account.getPassTime() <= UtContext.sTime() || !tokens[1].equals(account.getToken())) {
                    account = null;
                }
            }
        }
        **/

        if (account == null) {
            return false;
        }

        //将授权信息设置到属性中
        attributes.put("mmId", account.getId());
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
    }
}

上述代码比较简单,主要是从请求的url中获取到用于鉴权的 atoken,然后根据业务逻辑进行鉴权操作

会话内存管理 SocketManager.java

public class SocketManager {

    private static Logger LOGGER = LoggerFactory.getLogger(SocketManager.class);

    //定义 ConcurrentHashMap 存储用户编号和用户会话
    private static ConcurrentHashMap<String, WebSocketSession> manager = new ConcurrentHashMap<String, WebSocketSession>();

    public static void add(String key, WebSocketSession webSocketSession) {
        LOGGER.info("新添加webSocket连接 {} ", key);
        manager.put(key, webSocketSession);
    }

    public static void remove(String key) {
        LOGGER.info("移除webSocket连接 {} ", key);
        manager.remove(key);
    }

    public static WebSocketSession get(String key) {
        LOGGER.info("获取webSocket连接 {}", key);
        return manager.get(key);
    }
}

上述代码主要是对 manager 这个map进行操作。

会话连接与销毁和向指定用户发送消息 WebSocketDecoratorFactory.java

@Component
public class WebSocketDecoratorFactory implements WebSocketHandlerDecoratorFactory {

    //消息发送
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(WebSocketDecoratorFactory.class);

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        return new WebSocketHandlerDecorator(handler) {
            @Override
            public void afterConnectionEstablished(WebSocketSession session) throws Exception {
                LOGGER.info("有人连接啦  sessionId = {}", session.getId());
                Principal principal = session.getPrincipal();

                if (principal != null) {
                    LOGGER.info("key = {} 存入", principal.getName());
                    // 身份校验成功,缓存socket连接
                    SocketManager.add(principal.getName(), session);
                }


                super.afterConnectionEstablished(session);
            }

            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                LOGGER.info("有人退出连接啦  sessionId = {}", session.getId());
                Principal principal = session.getPrincipal();
                if (principal != null) {
                    // 身份校验成功,移除socket连接
                    SocketManager.remove(principal.getName());
                }

                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }

    /**
     * 给某个用户发送消息
     *
     * @param mmId        目标用户
     * @param destination 目标topic
     * @param msg         消息内容
     */
    public boolean sendMessageToUser(Integer mmId, String destination, String msg) throws IOException {
        WebSocketSession session = SocketManager.get(mmId + "");

        if (session != null && session.isOpen()) {
            simpMessagingTemplate.convertAndSendToUser(mmId + "", destination, msg);
            return true;
        }

        return false;
    }
}

上述代码中,主要是实现了 public WebSocketHandler decorate(WebSocketHandler handler) {} 方法,重写了接口中的 afterConnectionEstablished 连接 和 afterConnectionClosed 断开 两个方法,处理当前用户的连接会话。

sendMessageToUser 方法则是用来给指定用户发送指定 topic 发送消息

前端代码

<!DOCTYPE html>
<html>

<head>
    <title>Hello WebSocket</title>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.4.1/css/bootstrap.min.css" rel="stylesheet">
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/sockjs-client/1.4.0/sockjs.min.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>

<body>
    <div id="main-content" class="container" style="margin-top: 10px;">
        <div class="row">
            <form class="navbar-form" style="margin-left:0px">
                <div class="col-md-12">
                    <label>请求Token:</label>
                    <div class="form-group">
                        <input type="text" id="token" class="form-control" placeholder="请求token">
                    </div>
                    <button class="btn btn-warning" onclick="setToken();" type="button">token</button>
                    <div class="form-group">
                        <label>WebSocket 连接:</label>
                        <button class="btn btn-primary" type="button" onclick="connect();">进行连接</button>
                        <button class="btn btn-danger" type="button" onclick="disconnect();">断开连接</button>
                    </div>
                    <label>订阅地址:</label>
                    <div class="form-group">
                        <input type="text" id="subscribe" class="form-control" placeholder="订阅地址">
                    </div>
                    <button class="btn btn-warning" onclick="subscribeSocket();" type="button">订阅</button>
                </div>
            </form>
        </div>
        </br>
        <div class="row">
            <div class="form-group">
                <label for="content">发送的消息内容:</label>
                <input type="text" id="content" class="form-control" placeholder="消息内容">
            </div>
            <button class="btn btn-info" onclick="sendMessageNoParameter();" type="button">发送</button>
        </div>
        </br>
        <div class="row">
            <div class="col-md-12">
                <h5 class="page-header" style="font-weight:bold">接收到的消息:</h5>
                <table class="table table-striped">
                    <tbody id="information"></tbody>
                </table>
            </div>
        </div>
    </div>
</body>

</html>

<script>
    // 设置 STOMP 客户端
    var stompClient = null;
    // 请求token
    var TOKEN = "";
    // 设置 WebSocket 进入端点
    var SOCKET_ENDPOINT = "http://127.0.0.1:8787/push?atoken=";
    // 设置订阅消息的请求前缀
    var SUBSCRIBE_PREFIX = "/user"
    // 设置订阅消息的请求地址
    var SUBSCRIBE = "/topic/orderSub";
    // 设置服务器端点,访问服务器中哪个接口
    var SEND_ENDPOINT = "/app/push";

    /* 进行连接 */
    function connect() {
        // 设置 SOCKET
        if (!TOKEN) {
            alert("请先配置token")
        } else {
            var socket = new SockJS(SOCKET_ENDPOINT + TOKEN);
            // 配置 STOMP 客户端
            stompClient = Stomp.over(socket);
            // STOMP 客户端连接
            stompClient.connect({}, function (frame) {
                console.warn("---------- 连接成功 ----------");
            });
        }
    }

    function setToken() {
        TOKEN = $("#token").val();
    }

    /* 订阅信息 */
    function subscribeSocket() {
        // 设置订阅地址
        SUBSCRIBE = SUBSCRIBE_PREFIX + $("#subscribe").val();
        // 输出订阅地址
        console.warn(`---------- 设置订阅地址为: ${SUBSCRIBE}----------`);
        // 执行订阅消息
        stompClient.subscribe(SUBSCRIBE, function (responseBody) {
            var receiveMessage = JSON.parse(responseBody.body);
            $("#information").append("<tr><td>" + responseBody.body + "</td></tr>");
        });
    }

    /* 断开连接 */
    function disconnect() {
        stompClient.disconnect(function () {
            alert("断开连接");
        });
    }

    /* 发送消息并指定目标地址(这里设置的目标地址为自身订阅消息的地址,当然也可以设置为其它地址) */
    function sendMessageNoParameter() {
        // 设置发送的内容
        var sendContent = $("#content").val();
        // 设置待发送的消息内容
        var message = '{"destination": "' + SUBSCRIBE + '", "content": "' + sendContent + '"}';
        // 发送消息
        stompClient.send(SEND_ENDPOINT, {}, message);
    }
</script>

前端代码中,通过地址 http://127.0.0.1:8787/push?atoken=xxx 进行连接后端 ws 地址,随后订阅了 /user/topic/orderSub 的消息, 此时调用 sendMessageToUser() 方法,向 /topic/orderSub 发送消息时,改用户便可以收到对应消息