likes
comments
collection
share

Springboot 使用Websocket的4种方式

作者站长头像
站长
· 阅读数 50

Websocket概念

Websocket是一个基于TCP协议的全双工的通信协议。该技术的目的是提供基于浏览器机制的需要与服务器进行双向通信的应用程序,而不需要依赖的多个HTTP连接。

历史上,客户机与服务器客户机和服务器之间的通信(例如,即时消息传递) 和游戏应用程序)需要滥用HTTP来轮询服务器的更新,同时发送不同的上游通知HTTP调用。这也是Websocket技术出现的原因

Websocket与HTTP的唯一关系是:它的握手由HTTP服务器作为升级请求,默认情况下,WebSocket协议使用80端口作为普通的WebSocket连接和端口443用于WebSocket连接SSL。

websocket协议如下所示:

  • 来自客户端的发送的请求头包含内容
     GET /chat HTTP/1.1
     Host: server.example.com
     Upgrade: websocket
     Connection: Upgrade
     Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
     Origin: http://example.com
     Sec-WebSocket-Protocol: chat, superchat
     Sec-WebSocket-Version: 13
  • 来自服务器端的响应包含内容
      HTTP/1.1 101 Switching Protocols
      Upgrade: websocket
      Connection: Upgrade
      Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
      Sec-WebSocket-Protocol: chat

在请求头内容中有个Connection参数的值为 Upgrade,表示这是一个http转ws的升级请求。 目的是80端口和443端口可同时给两个协议使用。

集成使用

配置文件

pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.13</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.94.Final</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

application.yml

server:
  port: 8899
netty:
  port: 8900
  websocket-path: netty-websocket
stomp:
  # 用户订阅的目标尾缀
  user-destination: /notice

启动类

@SpringBootApplication
@EnableWebSocket
@EnableAsync
public class WebsocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebsocketApplication.class, args);
    }

    @Resource
    private NettyWebsocketServer nettyWebsocketServer;

    @PostConstruct
    public void init() {
        nettyWebsocketServer.init();
    }
}

前端websocket代码

非Stomp

<script>
// 非Stomp方式执行以下代码
    let websocket = new WebSocket("ws://localhost:8899/{连接的端点}");
    websocket.onopen = () => {
        // 成功打开连接的时候,需要做的事情   
    }
    websocket.onmessage = (result) => {
        // 接收服务端消息
        console.log(result.data)
    }
    websocket.onerror = (res) => {
        // 接收错误的消息
    }
    websocket.onclose = (res) => {
        // 连接关闭需要做的事情
    }
</script>
<script>
// 发送代码
websocket.send(JSON.stringify("内容支持JSON,使用JSON.stringify"));
</script>
<script>
// 关闭的代码
websocket.close()
</script>

Stomp方式:

<script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.js"></script>
// stomp的方式
<script>
// 给后端的参数
let headers = {
   // 登录的用户名称
   'login': username,
   // 登录通行码
   // 'passcode': 'mypasscode',
   };
   
let socket = new WebSocket("ws://localhost:8899/{连接的端点}");
let stompClient = Stomp.over(socket);
stompClient.connect(headers, function (frame) {
    // 成功打开连接的时候,需要做的事情
    // 订阅属于自己的主题
    stompClient.subscribe('/sliver-gravel/'+username+"/notice", function (greeting) {
        console.log(greeting)
        parseProtocol(greeting.body);
        },headers);
    },function (error){
         // 连接关闭需要做的事情
        console.log('stomp连接关闭>>>' + error);
    });
 // 
</script>
<script>
// 发送代码
stompClient.send("/app/hello", {}, JSON.stringify("内容支持JSON,使用JSON.stringify"));
</script>
<script>
// 关闭的代码
if(stompClient!==null){
   stompClient.disconnect(()=>{
     // 连接关闭之后需要做的事情
   })                 
}
</script>

Servlet方式

配置

@Configuration
public class ServerWebsocketConfiguration {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

主要代码

@ServerEndpoint(value = "/servlet/{username}")
@Component
public class WebsocketEndpointServer {
    private final static ConcurrentHashMap<Long, Session> ID_SESSION_MAP = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(@PathParam("username") String username, Session session) 
        throws Exception {
      // 完成连接需要做的事情,如向用户发送相关的信息
      
    }

    @OnMessage
    public void onMessage(@PathParam("username") Long username, String message
        ,Session session)  throws Exception {
        // 接收客户端消息,解析消息,以及其他操作
        System.out.println(username+": "+message);
        session.getBasicRemote().sendText("将文本内容原路发送给客户端"+message);
    }

    @OnClose
    public void onClose(@PathParam("username") Long username, Session session)
        throws Exception {
        // 连接关闭需要做的事情
    }

    @OnError
    public void onError(@PathParam("username") Long username, Session session,Throwable throwable)
        throws Exception {
        // 发生错误需要做的事情
        System.err.println(throwable.getMessage());
    }

}

Spring方式

Spring的方式需要使用@EnableWebSocket注解才能生效

配置

@Configuration
public class SpringWebsocketConfiguration implements WebSocketConfigurer {
    /**
    * 配置websocket的路径前缀
    */
    private final String websocketPrefix = "/spring";

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketHandler(), websocketPrefix + "/**")
                .addInterceptors(handshakeInterceptor())
                // springboot 2.4以上版本
                .setAllowedOriginPatterns("*");
    }

    @Bean
    public HandshakeInterceptor handshakeInterceptor() {
        return new ChatHandshakeInterceptor(websocketPrefix);
    }

    @Bean
    public WebSocketHandler webSocketHandler() {
        return new ChatWebsocketHandler(websocketPrefix);
    }
    
    /**
     * 自定义服务器属性容器配置
     *
     * @return 自定义服务器容器属性配置
     */
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }
}

拦截器

public class ChatHandshakeInterceptor implements HandshakeInterceptor {

    private String websocketPrefix;

    public ChatHandshakeInterceptor() {
    }

    public ChatHandshakeInterceptor(String websocketPrefix) {
        this.websocketPrefix = websocketPrefix;
    }

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, 
        WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        // 建立连接前,需要做的事情
        // 如判断用户是否合法、密码是否正确等。
        String path = request.getURI().getPath();
        // 判断路径是否符合要求
        boolean matches = path.replace(websocketPrefix, "")
            .matches("^/[0-9A-z\u4e00-\u9af5]{1,10}");
        if (matches) {
            // 继续握手
            return true;
        }
        System.err.println("参数不合法");
        // 终止握手
        return false;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response
        , WebSocketHandler wsHandler, Exception exception) {
        System.out.println(exception);
    }
}

websocket处理器

public class ChatWebsocketHandler implements WebSocketHandler {


    private final static ConcurrentHashMap<String, WebSocketSession> USERNAME_SESSION_MAP 
        = new ConcurrentHashMap<>();

    private final String websocketPrefix;

    private final ObjectMapper mapper = new ObjectMapper();

    public ChatWebsocketHandler(String websocketPrefix) {
        this.websocketPrefix = websocketPrefix;
    }


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 建立连接之后需要完成的事情,如:
        URI uri = session.getUri();
        if (uri != null) {
            String path = uri.getPath();
            String username = path.replace(websocketPrefix + "/", "");
            WebSocketSession webSocketSession = USERNAME_SESSION_MAP.get(username);
            // 先发送挤下线通知
            if (webSocketSession != null) {
                // chatProtocol为自定义实体类
                WebSocketMessage<String> message = new TextMessage(mapper.writeValueAsBytes(chatProtocol));
                webSocketSession.sendMessage(message);
                // 关闭close
                webSocketSession.close();
            }
            return;
        }
        throw new RuntimeException("没有用户数据");
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        Object payload = message.getPayload();
        // 处理客户发送过来的消息
        System.out.println(payload);
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        // 发生错误需要做的事情
        session.close();
        exception.printStackTrace();
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        // 连接关闭需要做的事情,如删除用户和session的映射
        System.out.println(closeStatus.getCode());
        System.out.println("session.getId() = " + session.getId() + "断开");
    }
}

Netty方式

@Configuration
public class NettyConfiguration {
    @Value("${netty.port}")
    private Integer nettyPort;

    @Value("${netty.websocket-path}")
    private String websocketPath;


    @Bean
    public NettyWebsocketServer nettyWebsocketServer() {
        return new NettyWebsocketServer(nettyPort,websocketPath);
    }
}
public class NettyWebsocketServer {


    private final int port;

    private final String websocketPath;

    /**
     * bossGroup 用于接收传入连接
     */
    private final EventLoopGroup bossGroup;
    /**
     * workGroup 用于处理接受连接的流量
     * bossGroup 接受连接成注册到workGroup
     */
    private final EventLoopGroup workGroup;


    public NettyWebsocketServer(int nettyPort,String websocketPath) {
        this.port = nettyPort;
        this.websocketPath = websocketPath;
        // NioEventLoopGroup 是一个处理I/O操作的多线程事件循环
        // 为不同类型的传输提供各种实现
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
    }

    @Async
    public void init() {
        // Bootstrap是一个设置服务器的助手类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        ChannelFuture channelFuture;
        try {
            channelFuture = serverBootstrap.group(bossGroup, workGroup)
                    // 指定NioServerSocketChannel类用于实例化一个新的
                    // Channel实例用于接受连接
                    .channel(NioServerSocketChannel.class)
                    // 是一个特殊的处理类,自定义添加管道,处理更多的程序
                    .childHandler(new WebsocketServerInitializer(websocketPath))
                    // 服务端接受连接队列的长度,若果队列已满,客户端连接将被拒绝
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // Socket 参数,连接保活,默认值为false,启用该功能时
                    // TCP会主动探测空闲连接的有效性
                    // 可以将此功能视为TCP的心跳机制
                    // 默认的心跳间隔是7200s,Netty默认关闭该功能
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .bind(port).sync();
            // 等待直到服务器关闭套接字
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

    /**
     * 销毁容器前,停止Netty
     */
    @PreDestroy
    public void destroy() {
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }


}

通过new WebSocketServerProtocolHandler(websocketPath)实现与Spring方式相似的设置websocket端点,在添加WebSocketServerProtocolHandler类之前,加入自定义的BeforeWebsocketHandler类实现一些功能如:用户名与channel的映射。

public class WebsocketServerInitializer extends ChannelInitializer<SocketChannel> {

    private final String websocketPath;

    public WebsocketServerInitializer(String websocketPath) {
        this.websocketPath = "/"+websocketPath;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline()
                // 使用http 解码器和编码器
                .addLast(new HttpServerCodec())
                // 使用块方式读写
                .addLast(new ChunkedWriteHandler())
                // 使用http 累加器
                .addLast(new HttpObjectAggregator(8192))
                // 在转换协议之前处理路径,通过路径绑定用
                .addLast(new BeforeWebsocketHandler(websocketPath))
                // 转换ws协议
                .addLast(new WebSocketServerProtocolHandler(websocketPath))
                // 添加自定义处理器
                .addLast(new WebsocketServerHandler());
    }
}

BeforeWebsocketHandler类重要的一步是设置request.setUri(websocketPrefix);,使得路径与上述的WebSocketServerProtocolHandler的路径参数一致。

public class BeforeWebsocketHandler extends ChannelInboundHandlerAdapter {
    private final String websocketPrefix;

    public BeforeWebsocketHandler(String prefix) {
        this.websocketPrefix = prefix;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 判断 msg 是否为 http request
        String connection = "Upgrade";
        // msg instanceof FullHttpRequest request 为 JDK17 版本写法
        // FullHttpRequest request = (FullHttpRequest) msg; // JDK8 强转
        if (msg instanceof FullHttpRequest request) {
            if (connection.equals(request.headers().get("Connection"))) {
                String regex = "^[0-9A-z\\u4e00-\\u9af5]{2,10}";
                String path = request.uri().replace(websocketPrefix + "/", "");
                String username = URLDecoder.decode(path, StandardCharsets.UTF_8);
                if (username.matches(regex)) {
                    // 添加 建立channel映射
                    NettyWebsocketService.addUser(username, ctx);
                    // 重新设置 request URI 重要
                    request.setUri(websocketPrefix);
                } else {
                    System.err.println("用户名不合法!关闭连接");
                    ctx.close();
                }
            }
        }
        super.channelRead(ctx, msg);
    }
}

最后继承SimpleChannelInboundHandler类,泛型参数为文本的TextWebSocketFrame类即可

public class WebsocketServerHandler 
    extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private final ExecutorService executorService = new ThreadPoolExecutor(8, 8, 10,
        TimeUnit.SECONDS, new ArrayBlockingQueue<>(500),new DefaultThreadFactory("WebsocketServerHandler"),
        new ThreadPoolExecutor.CallerRunsPolicy());

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
        TextWebSocketFrame textWebSocketFrame) throws Exception {
        System.out.println(textWebSocketFrame.text());
        // 服务器端接收客户端信息,需要做的事情。
        String msg = textWebSocketFrame.text();
        NettyWebsocketService.transformMessage(msg, channelHandlerContext);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 等待一会,推送列表消息
        executorService.execute(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            NettyWebsocketService.pushUserList(ctx);
        });
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyWebsocketService.removeUser(ctx);
        ctx.close();
    }
}

注意: NettyWebsocketService是自定义实现的工具类。

Stomp方式

stomp方式需要@EnableWebSocketMessageBroker以及实现WebSocketMessageBrokerConfigurer接口,同样的这里需要一个注册的端点。并且重写了configureClientInboundChannel方法,接着自定义一个实现的ChatInterceptor类,对相关的stomp的消息进行拦截做一些额外的操作。

@EnableWebSocketMessageBroker
@Configuration
public class StompWebsocketConfiguration implements WebSocketMessageBrokerConfigurer {


    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp").setAllowedOriginPatterns("*").addInterceptors();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/chat");
        config.enableSimpleBroker("/topic","/sliver-gravel");
        config.setPreservePublishOrder(true);
        // 用户主题的前缀:默认是/user
        config.setUserDestinationPrefix("/sliver-gravel");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChatInterceptor());
    }
}

这里的自定义实现类ChatInterceptor用于在客户端的信息来到送达之前,进行用户挤退,用户拉取消息的一些逻辑操作,同样的StompChatService类为自定义的工具类。在官方文档中示例中可以作为token校验,accessor.getLogin();可以获取前端的stompClient.connect(headers,connectFunc,errorFunc)中关于login参数。具体可以查看StompHeaderAccessor源码。

public class ChatInterceptor implements ChannelInterceptor {

    private final MessageService messageService = new MessageService();

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor =
                MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

        if (accessor == null) {
            return message;
        }
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            String sessionId = accessor.getSessionId();
            String username = accessor.getLogin();
            if (sessionId != null && username != null) {
                // 强制下线
                messageService.forcedOffline(sessionId, username, channel);
            }
        }
        return message;
    }

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        // 生成 Message类型信息,路由到 用户在线列表
        StompHeaderAccessor accessor =
                MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (accessor == null) {
            return;
        }
        Object simpHeart = message.getHeaders()
                .get(StompHeaderAccessor.HEART_BEAT_HEADER);
        List<String> heart = 
                accessor.getNativeHeader(StompHeaderAccessor.STOMP_HEARTBEAT_HEADER);
        // 客户端关闭连接会 接收到两条 DISCONNECT 消息, 有一个有心跳键值,一个没有心跳键值,故选择一个
        // 学识尚浅,不知原因。
        boolean disconnect = StompCommand.DISCONNECT.equals(accessor.getCommand()) && (heart != null || simpHeart != null);
        if (disconnect) {
            messageService.sendOffline(accessor.getSessionId(), accessor.getLogin(), channel);
            return;
        }
        // 发送上线信息的时候,自己也可能会接收到,所以前端做个兼容处理
        // 接收到自己的消息就不做处理
        // 原因可能是异步的原因
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            String sessionId = accessor.getSessionId();
            String username = accessor.getLogin();
            if (sessionId != null && username != null) {
                messageService.sendOnline(sessionId, username, channel);
            }
            return;
        }
        // Connect消息发送完毕之后,保证订阅该主题之后再发送,发送用户上线信息
        if (StompCommand.SUBSCRIBE.equals(accessor.getCommand()) && accessor.getLogin() != null) {
            messageService.sendUserList(accessor.getSessionId(), accessor.getLogin(), channel);
        }
    }


    private StompHeaderAccessor stompHeaderAccessor(StompCommand command, SimpMessageType messageType) {
        StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.create(command);
        stompHeaderAccessor.setMessageTypeIfNotSet(messageType);
        stompHeaderAccessor.setSessionAttributes(new HashMap<>(0));
        return stompHeaderAccessor;
    }

    /**
     * 消息服务
     */
    private class MessageService {

        /**
         * 强制下线
         *
         * @param sessionId 当前进行连接的sessionID
         * @param username  对应的用户
         * @param channel   管道
         */
        void forcedOffline(String sessionId, String username, MessageChannel channel) {
            // 如果之前有登陆过的账号,则将之前的账号关闭
            String lastSessionId = StompChatService.addUser(username, sessionId);
            if (lastSessionId != null) {
                // 发送错误的信息
                sendErrorMessage(lastSessionId, username, channel);
                // 暂停一会,保证错误信息在关闭之前发送
                try {
                    TimeUnit.MILLISECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                // 该lastSessionId的连接关闭
                StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.DISCONNECT, SimpMessageType.DISCONNECT);
                accessor.setSessionId(lastSessionId);
                // 设置心跳用于关闭连接信息,因为有两条关闭信息,所以用作区分
                accessor.setHeartbeat(0, 0);
                GenericMessage<byte[]> genericMessage = new GenericMessage<>(new byte[0], accessor.getMessageHeaders());
                channel.send(genericMessage);
            }
        }

        void sendOffline(String sessionId, String username, MessageChannel channel) {
            if (username == null) {
                username = StompChatService.removeUser(sessionId);
                if (username == null) {
                    System.err.println("sessionId:" + sessionId + " 不存在");
                    return;
                }
            }
            // 发送下线信息
            StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.SEND, SimpMessageType.MESSAGE);
            accessor.setNativeHeaderValues("destination", Collections.singletonList("/chat/offline"));
            accessor.setNativeHeaderValues("content-length", Collections.singletonList(String.valueOf(username.length())));
            accessor.setDestination("/chat/offline");
            accessor.setHeartbeat(1000, 1000);
            accessor.setSessionId(sessionId);
            GenericMessage<byte[]> genericMessage = new GenericMessage<>(username.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
            channel.send(genericMessage);
        }

        void sendOnline(String sessionId, String username, MessageChannel channel) {
            StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.SEND, SimpMessageType.MESSAGE);
            accessor.setNativeHeaderValues("destination", Collections.singletonList("/chat/online"));
            accessor.setNativeHeaderValues("content-length", Collections.singletonList(String.valueOf(username.length())));
            accessor.setDestination("/chat/online");
            accessor.setHeartbeat(1000, 1000);
            accessor.setSessionId(sessionId);
            GenericMessage<byte[]> genericMessage = new GenericMessage<>(username.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
            channel.send(genericMessage);
        }

        void sendUserList(String sessionId, String username, MessageChannel channel) {

            StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.SEND, SimpMessageType.MESSAGE);
            accessor.setNativeHeaderValues("destination", Collections.singletonList("/chat/user-list"));
            accessor.setNativeHeaderValues("content-length", Collections.singletonList(String.valueOf(username.length())));
            accessor.setDestination("/chat/user-list");
            accessor.setHeartbeat(1000, 1000);
            accessor.setSessionId(sessionId);
            GenericMessage<byte[]> genericMessage = new GenericMessage<>(username.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
            channel.send(genericMessage);
        }

        void sendErrorMessage(String sessionId, String username, MessageChannel channel) {
            StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.SEND, SimpMessageType.MESSAGE);
            accessor.setNativeHeaderValues("destination", Collections.singletonList("/chat/forced-offline"));
            accessor.setNativeHeaderValues("content-length", Collections.singletonList(String.valueOf(username.length())));
            accessor.setDestination("/chat/forced-offline");
            accessor.setHeartbeat(1000, 1000);
            accessor.setSessionId(sessionId);
            GenericMessage<byte[]> genericMessage = new GenericMessage<>(username.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
            channel.send(genericMessage);

        }

    }

}

最后就是处理消息的控制器,StompController要使用@Controller@RestController标注。方法使用@MessageMapping("/forced-offline")进行标注,就像@RequsetMapping一样,对应的路径处理的对应的请求,这里的路径有一个前缀,这个前缀在上述的StompWebsocketConfiguration类中的configureMessageBroker(MessageBrokerRegistry config)方法使用config.setApplicationDestinationPrefixes("/chat");进行设置。 也是说这里的路径是:"/chat/forced-offline" 的格式

@Controller
@Slf4j
public class StompController {

    @Resource
    private SimpMessagingTemplate messagingTemplate;


    @Value("${stomp.user-destination}")
    private String userDestination;

    private final ObjectMapper mapper = new ObjectMapper();

    @MessageMapping("/forced-offline")
    public void forcedOffline(String username) throws JsonProcessingException {
        log.warn("stomp用户:{} 被挤下线", username);
        ChatProtocol<ChatProtocol.ErrorMessage> chatProtocol = MessageUtil.errorMessageChatProtocol("-1", "您被挤下线");
        messagingTemplate.convertAndSendToUser(username, userDestination, mapper.writeValueAsString(chatProtocol));
    }


    /**
     * 向其他用户发送 指定用户上线信息
     *
     * @param username 用户名称
     */
    @MessageMapping("/online")
    public void onlineState(String username) throws JsonProcessingException {
        log.info("stomp连接:{} 上线", username);
        ChatProtocol<ChatProtocol.OnlineState> chatProtocol = MessageUtil.onlineStateProtocol(username, true);
        messagingTemplate.convertAndSend("/topic" + userDestination, mapper.writeValueAsString(chatProtocol));
    }

    /**
     * 向其他用户范松 指定用户离线信息
     *
     * @param username 用户名称
     * @throws JsonProcessingException
     */
    @MessageMapping("/offline")
    public void offlineState(String username) throws JsonProcessingException {
        log.info("stomp连接:{} 下线", username);
        ChatProtocol<ChatProtocol.OnlineState> chatProtocol = MessageUtil.onlineStateProtocol(username, false);
        messagingTemplate.convertAndSend("/topic" + userDestination, mapper.writeValueAsString(chatProtocol));
    }

    /**
     * 获取所有在线用户
     *
     * @param username 指定用户
     */
    @MessageMapping("/user-list")
    public void sendUserList(String username) {
        List<String> users = StompChatService.getUsersExceptUser(username);
        ChatProtocol<ChatProtocol.User> userChatProtocol = MessageUtil.userChatProtocol(users);
        try {
            messagingTemplate.convertAndSendToUser(username, userDestination, mapper.writeValueAsString(userChatProtocol));
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @MessageMapping("/message")
    public void message(String payload) throws JsonProcessingException {
        //  ChatProtocol<ChatProtocol.Message> chatProtocol = mapper.readValue(String.valueOf(payload), new TypeReference<ChatProtocol<ChatProtocol.Message>>() { });  // JDK8
        ChatProtocol<ChatProtocol.Message> chatProtocol = mapper.readValue(String.valueOf(payload), new TypeReference<>() {
        });
        ChatProtocol.Message message = chatProtocol.getData();
        String chatType = "group";
        if (chatType.equals(message.getType())) {
            messagingTemplate.convertAndSend("/topic" + userDestination, payload);
            return;
        }
        String toUser = message.getToUser();
        messagingTemplate.convertAndSendToUser(toUser, userDestination, payload);
    }


    /**
     * 订阅 topic/greetings 主题的前端 接受两次消息
     *
     * @param message
     * @return
     * @throws Exception
     */
    @MessageMapping("/hello")
    @SendTo("/topic/greetings")
    public String greeting(String message) throws Exception {
        System.err.println(message);
        messagingTemplate.convertAndSend("/topic/greetings", message);
        return message;
    }
}

@SendToUser,@SendTo这种一般只能发送给一个指定用户或所有订阅一个相同主题的客户。SimpMessagingTemplate模版类则提供更多的api来实现更多特定场景的功能。官方已经注入了一个SimpMessagingTemplateBean,只需要通过@Resource@Autowired使用即可。

Springboot 使用Websocket的4种方式

github

完整示例: DawnSilverGravel Websocket Study

Springboot 使用Websocket的4种方式

Springboot 使用Websocket的4种方式

参考文档

Springboot 官方文档

RFC 6455 - The WebSocket Protocol (ietf.org))

stomp.js (v2.3.3) - STOMP for JavaScript apps (Web browser & node.js) | BootCDN - Bootstrap 中文网开源项目免费 CDN 加速服务

转载自:https://juejin.cn/post/7268558840265408572
评论
请登录