Springboot 使用Websocket的4种方式
Websocket概念
Websocket是一个基于TCP协议的全双工的通信协议。该技术的目的是提供基于浏览器机制的需要与服务器进行双向通信的应用程序,而不需要依赖的多个HTTP连接。
历史上,客户机与服务器客户机和服务器之间的通信(例如,即时消息传递) 和游戏应用程序)需要滥用HTTP来轮询服务器的更新,同时发送不同的上游通知HTTP调用。这也是Websocket技术出现的原因
Websocket与HTTP的唯一关系是:它的握手由HTTP服务器作为升级请求,默认情况下,WebSocket协议使用80端口作为普通的WebSocket连接和端口443用于WebSocket连接SSL。
websocket协议如下所示:
- 来自客户端的发送的请求头包含内容
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
- 来自服务器端的响应包含内容
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
在请求头内容中有个Connection
参数的值为 Upgrade
,表示这是一个http转ws的升级请求。
目的是80端口和443端口可同时给两个协议使用。
集成使用
配置文件
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.13</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.94.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
application.yml
server:
port: 8899
netty:
port: 8900
websocket-path: netty-websocket
stomp:
# 用户订阅的目标尾缀
user-destination: /notice
启动类
@SpringBootApplication
@EnableWebSocket
@EnableAsync
public class WebsocketApplication {
public static void main(String[] args) {
SpringApplication.run(WebsocketApplication.class, args);
}
@Resource
private NettyWebsocketServer nettyWebsocketServer;
@PostConstruct
public void init() {
nettyWebsocketServer.init();
}
}
前端websocket代码
非Stomp
<script>
// 非Stomp方式执行以下代码
let websocket = new WebSocket("ws://localhost:8899/{连接的端点}");
websocket.onopen = () => {
// 成功打开连接的时候,需要做的事情
}
websocket.onmessage = (result) => {
// 接收服务端消息
console.log(result.data)
}
websocket.onerror = (res) => {
// 接收错误的消息
}
websocket.onclose = (res) => {
// 连接关闭需要做的事情
}
</script>
<script>
// 发送代码
websocket.send(JSON.stringify("内容支持JSON,使用JSON.stringify"));
</script>
<script>
// 关闭的代码
websocket.close()
</script>
Stomp方式:
<script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.js"></script>
// stomp的方式
<script>
// 给后端的参数
let headers = {
// 登录的用户名称
'login': username,
// 登录通行码
// 'passcode': 'mypasscode',
};
let socket = new WebSocket("ws://localhost:8899/{连接的端点}");
let stompClient = Stomp.over(socket);
stompClient.connect(headers, function (frame) {
// 成功打开连接的时候,需要做的事情
// 订阅属于自己的主题
stompClient.subscribe('/sliver-gravel/'+username+"/notice", function (greeting) {
console.log(greeting)
parseProtocol(greeting.body);
},headers);
},function (error){
// 连接关闭需要做的事情
console.log('stomp连接关闭>>>' + error);
});
//
</script>
<script>
// 发送代码
stompClient.send("/app/hello", {}, JSON.stringify("内容支持JSON,使用JSON.stringify"));
</script>
<script>
// 关闭的代码
if(stompClient!==null){
stompClient.disconnect(()=>{
// 连接关闭之后需要做的事情
})
}
</script>
Servlet方式
配置
@Configuration
public class ServerWebsocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
主要代码
@ServerEndpoint(value = "/servlet/{username}")
@Component
public class WebsocketEndpointServer {
private final static ConcurrentHashMap<Long, Session> ID_SESSION_MAP = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(@PathParam("username") String username, Session session)
throws Exception {
// 完成连接需要做的事情,如向用户发送相关的信息
}
@OnMessage
public void onMessage(@PathParam("username") Long username, String message
,Session session) throws Exception {
// 接收客户端消息,解析消息,以及其他操作
System.out.println(username+": "+message);
session.getBasicRemote().sendText("将文本内容原路发送给客户端"+message);
}
@OnClose
public void onClose(@PathParam("username") Long username, Session session)
throws Exception {
// 连接关闭需要做的事情
}
@OnError
public void onError(@PathParam("username") Long username, Session session,Throwable throwable)
throws Exception {
// 发生错误需要做的事情
System.err.println(throwable.getMessage());
}
}
Spring方式
Spring的方式需要使用@EnableWebSocket
注解才能生效
配置
@Configuration
public class SpringWebsocketConfiguration implements WebSocketConfigurer {
/**
* 配置websocket的路径前缀
*/
private final String websocketPrefix = "/spring";
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler(), websocketPrefix + "/**")
.addInterceptors(handshakeInterceptor())
// springboot 2.4以上版本
.setAllowedOriginPatterns("*");
}
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new ChatHandshakeInterceptor(websocketPrefix);
}
@Bean
public WebSocketHandler webSocketHandler() {
return new ChatWebsocketHandler(websocketPrefix);
}
/**
* 自定义服务器属性容器配置
*
* @return 自定义服务器容器属性配置
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
}
拦截器
public class ChatHandshakeInterceptor implements HandshakeInterceptor {
private String websocketPrefix;
public ChatHandshakeInterceptor() {
}
public ChatHandshakeInterceptor(String websocketPrefix) {
this.websocketPrefix = websocketPrefix;
}
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// 建立连接前,需要做的事情
// 如判断用户是否合法、密码是否正确等。
String path = request.getURI().getPath();
// 判断路径是否符合要求
boolean matches = path.replace(websocketPrefix, "")
.matches("^/[0-9A-z\u4e00-\u9af5]{1,10}");
if (matches) {
// 继续握手
return true;
}
System.err.println("参数不合法");
// 终止握手
return false;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response
, WebSocketHandler wsHandler, Exception exception) {
System.out.println(exception);
}
}
websocket处理器
public class ChatWebsocketHandler implements WebSocketHandler {
private final static ConcurrentHashMap<String, WebSocketSession> USERNAME_SESSION_MAP
= new ConcurrentHashMap<>();
private final String websocketPrefix;
private final ObjectMapper mapper = new ObjectMapper();
public ChatWebsocketHandler(String websocketPrefix) {
this.websocketPrefix = websocketPrefix;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 建立连接之后需要完成的事情,如:
URI uri = session.getUri();
if (uri != null) {
String path = uri.getPath();
String username = path.replace(websocketPrefix + "/", "");
WebSocketSession webSocketSession = USERNAME_SESSION_MAP.get(username);
// 先发送挤下线通知
if (webSocketSession != null) {
// chatProtocol为自定义实体类
WebSocketMessage<String> message = new TextMessage(mapper.writeValueAsBytes(chatProtocol));
webSocketSession.sendMessage(message);
// 关闭close
webSocketSession.close();
}
return;
}
throw new RuntimeException("没有用户数据");
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
Object payload = message.getPayload();
// 处理客户发送过来的消息
System.out.println(payload);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
// 发生错误需要做的事情
session.close();
exception.printStackTrace();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
// 连接关闭需要做的事情,如删除用户和session的映射
System.out.println(closeStatus.getCode());
System.out.println("session.getId() = " + session.getId() + "断开");
}
}
Netty方式
@Configuration
public class NettyConfiguration {
@Value("${netty.port}")
private Integer nettyPort;
@Value("${netty.websocket-path}")
private String websocketPath;
@Bean
public NettyWebsocketServer nettyWebsocketServer() {
return new NettyWebsocketServer(nettyPort,websocketPath);
}
}
public class NettyWebsocketServer {
private final int port;
private final String websocketPath;
/**
* bossGroup 用于接收传入连接
*/
private final EventLoopGroup bossGroup;
/**
* workGroup 用于处理接受连接的流量
* bossGroup 接受连接成注册到workGroup
*/
private final EventLoopGroup workGroup;
public NettyWebsocketServer(int nettyPort,String websocketPath) {
this.port = nettyPort;
this.websocketPath = websocketPath;
// NioEventLoopGroup 是一个处理I/O操作的多线程事件循环
// 为不同类型的传输提供各种实现
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
}
@Async
public void init() {
// Bootstrap是一个设置服务器的助手类
ServerBootstrap serverBootstrap = new ServerBootstrap();
ChannelFuture channelFuture;
try {
channelFuture = serverBootstrap.group(bossGroup, workGroup)
// 指定NioServerSocketChannel类用于实例化一个新的
// Channel实例用于接受连接
.channel(NioServerSocketChannel.class)
// 是一个特殊的处理类,自定义添加管道,处理更多的程序
.childHandler(new WebsocketServerInitializer(websocketPath))
// 服务端接受连接队列的长度,若果队列已满,客户端连接将被拒绝
.option(ChannelOption.SO_BACKLOG, 128)
// Socket 参数,连接保活,默认值为false,启用该功能时
// TCP会主动探测空闲连接的有效性
// 可以将此功能视为TCP的心跳机制
// 默认的心跳间隔是7200s,Netty默认关闭该功能
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(port).sync();
// 等待直到服务器关闭套接字
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 销毁容器前,停止Netty
*/
@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
通过new WebSocketServerProtocolHandler(websocketPath)
实现与Spring方式相似的设置websocket端点,在添加WebSocketServerProtocolHandler
类之前,加入自定义的BeforeWebsocketHandler
类实现一些功能如:用户名与channel的映射。
public class WebsocketServerInitializer extends ChannelInitializer<SocketChannel> {
private final String websocketPath;
public WebsocketServerInitializer(String websocketPath) {
this.websocketPath = "/"+websocketPath;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// 使用http 解码器和编码器
.addLast(new HttpServerCodec())
// 使用块方式读写
.addLast(new ChunkedWriteHandler())
// 使用http 累加器
.addLast(new HttpObjectAggregator(8192))
// 在转换协议之前处理路径,通过路径绑定用
.addLast(new BeforeWebsocketHandler(websocketPath))
// 转换ws协议
.addLast(new WebSocketServerProtocolHandler(websocketPath))
// 添加自定义处理器
.addLast(new WebsocketServerHandler());
}
}
BeforeWebsocketHandler
类重要的一步是设置request.setUri(websocketPrefix);
,使得路径与上述的WebSocketServerProtocolHandler
的路径参数一致。
public class BeforeWebsocketHandler extends ChannelInboundHandlerAdapter {
private final String websocketPrefix;
public BeforeWebsocketHandler(String prefix) {
this.websocketPrefix = prefix;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断 msg 是否为 http request
String connection = "Upgrade";
// msg instanceof FullHttpRequest request 为 JDK17 版本写法
// FullHttpRequest request = (FullHttpRequest) msg; // JDK8 强转
if (msg instanceof FullHttpRequest request) {
if (connection.equals(request.headers().get("Connection"))) {
String regex = "^[0-9A-z\\u4e00-\\u9af5]{2,10}";
String path = request.uri().replace(websocketPrefix + "/", "");
String username = URLDecoder.decode(path, StandardCharsets.UTF_8);
if (username.matches(regex)) {
// 添加 建立channel映射
NettyWebsocketService.addUser(username, ctx);
// 重新设置 request URI 重要
request.setUri(websocketPrefix);
} else {
System.err.println("用户名不合法!关闭连接");
ctx.close();
}
}
}
super.channelRead(ctx, msg);
}
}
最后继承SimpleChannelInboundHandler
类,泛型参数为文本的TextWebSocketFrame
类即可
public class WebsocketServerHandler
extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ExecutorService executorService = new ThreadPoolExecutor(8, 8, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(500),new DefaultThreadFactory("WebsocketServerHandler"),
new ThreadPoolExecutor.CallerRunsPolicy());
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext,
TextWebSocketFrame textWebSocketFrame) throws Exception {
System.out.println(textWebSocketFrame.text());
// 服务器端接收客户端信息,需要做的事情。
String msg = textWebSocketFrame.text();
NettyWebsocketService.transformMessage(msg, channelHandlerContext);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 等待一会,推送列表消息
executorService.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyWebsocketService.pushUserList(ctx);
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyWebsocketService.removeUser(ctx);
ctx.close();
}
}
注意: NettyWebsocketService
是自定义实现的工具类。
Stomp方式
stomp方式需要@EnableWebSocketMessageBroker
以及实现WebSocketMessageBrokerConfigurer
接口,同样的这里需要一个注册的端点。并且重写了configureClientInboundChannel
方法,接着自定义一个实现的ChatInterceptor
类,对相关的stomp的消息进行拦截做一些额外的操作。
@EnableWebSocketMessageBroker
@Configuration
public class StompWebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp").setAllowedOriginPatterns("*").addInterceptors();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/chat");
config.enableSimpleBroker("/topic","/sliver-gravel");
config.setPreservePublishOrder(true);
// 用户主题的前缀:默认是/user
config.setUserDestinationPrefix("/sliver-gravel");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChatInterceptor());
}
}
这里的自定义实现类ChatInterceptor
用于在客户端的信息来到送达之前,进行用户挤退,用户拉取消息的一些逻辑操作,同样的StompChatService
类为自定义的工具类。在官方文档中示例中可以作为token校验,accessor.getLogin();
可以获取前端的stompClient.connect(headers,connectFunc,errorFunc)
中关于login
参数。具体可以查看StompHeaderAccessor
源码。
public class ChatInterceptor implements ChannelInterceptor {
private final MessageService messageService = new MessageService();
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor == null) {
return message;
}
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String sessionId = accessor.getSessionId();
String username = accessor.getLogin();
if (sessionId != null && username != null) {
// 强制下线
messageService.forcedOffline(sessionId, username, channel);
}
}
return message;
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
// 生成 Message类型信息,路由到 用户在线列表
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor == null) {
return;
}
Object simpHeart = message.getHeaders()
.get(StompHeaderAccessor.HEART_BEAT_HEADER);
List<String> heart =
accessor.getNativeHeader(StompHeaderAccessor.STOMP_HEARTBEAT_HEADER);
// 客户端关闭连接会 接收到两条 DISCONNECT 消息, 有一个有心跳键值,一个没有心跳键值,故选择一个
// 学识尚浅,不知原因。
boolean disconnect = StompCommand.DISCONNECT.equals(accessor.getCommand()) && (heart != null || simpHeart != null);
if (disconnect) {
messageService.sendOffline(accessor.getSessionId(), accessor.getLogin(), channel);
return;
}
// 发送上线信息的时候,自己也可能会接收到,所以前端做个兼容处理
// 接收到自己的消息就不做处理
// 原因可能是异步的原因
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String sessionId = accessor.getSessionId();
String username = accessor.getLogin();
if (sessionId != null && username != null) {
messageService.sendOnline(sessionId, username, channel);
}
return;
}
// Connect消息发送完毕之后,保证订阅该主题之后再发送,发送用户上线信息
if (StompCommand.SUBSCRIBE.equals(accessor.getCommand()) && accessor.getLogin() != null) {
messageService.sendUserList(accessor.getSessionId(), accessor.getLogin(), channel);
}
}
private StompHeaderAccessor stompHeaderAccessor(StompCommand command, SimpMessageType messageType) {
StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.create(command);
stompHeaderAccessor.setMessageTypeIfNotSet(messageType);
stompHeaderAccessor.setSessionAttributes(new HashMap<>(0));
return stompHeaderAccessor;
}
/**
* 消息服务
*/
private class MessageService {
/**
* 强制下线
*
* @param sessionId 当前进行连接的sessionID
* @param username 对应的用户
* @param channel 管道
*/
void forcedOffline(String sessionId, String username, MessageChannel channel) {
// 如果之前有登陆过的账号,则将之前的账号关闭
String lastSessionId = StompChatService.addUser(username, sessionId);
if (lastSessionId != null) {
// 发送错误的信息
sendErrorMessage(lastSessionId, username, channel);
// 暂停一会,保证错误信息在关闭之前发送
try {
TimeUnit.MILLISECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 该lastSessionId的连接关闭
StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.DISCONNECT, SimpMessageType.DISCONNECT);
accessor.setSessionId(lastSessionId);
// 设置心跳用于关闭连接信息,因为有两条关闭信息,所以用作区分
accessor.setHeartbeat(0, 0);
GenericMessage<byte[]> genericMessage = new GenericMessage<>(new byte[0], accessor.getMessageHeaders());
channel.send(genericMessage);
}
}
void sendOffline(String sessionId, String username, MessageChannel channel) {
if (username == null) {
username = StompChatService.removeUser(sessionId);
if (username == null) {
System.err.println("sessionId:" + sessionId + " 不存在");
return;
}
}
// 发送下线信息
StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.SEND, SimpMessageType.MESSAGE);
accessor.setNativeHeaderValues("destination", Collections.singletonList("/chat/offline"));
accessor.setNativeHeaderValues("content-length", Collections.singletonList(String.valueOf(username.length())));
accessor.setDestination("/chat/offline");
accessor.setHeartbeat(1000, 1000);
accessor.setSessionId(sessionId);
GenericMessage<byte[]> genericMessage = new GenericMessage<>(username.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
channel.send(genericMessage);
}
void sendOnline(String sessionId, String username, MessageChannel channel) {
StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.SEND, SimpMessageType.MESSAGE);
accessor.setNativeHeaderValues("destination", Collections.singletonList("/chat/online"));
accessor.setNativeHeaderValues("content-length", Collections.singletonList(String.valueOf(username.length())));
accessor.setDestination("/chat/online");
accessor.setHeartbeat(1000, 1000);
accessor.setSessionId(sessionId);
GenericMessage<byte[]> genericMessage = new GenericMessage<>(username.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
channel.send(genericMessage);
}
void sendUserList(String sessionId, String username, MessageChannel channel) {
StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.SEND, SimpMessageType.MESSAGE);
accessor.setNativeHeaderValues("destination", Collections.singletonList("/chat/user-list"));
accessor.setNativeHeaderValues("content-length", Collections.singletonList(String.valueOf(username.length())));
accessor.setDestination("/chat/user-list");
accessor.setHeartbeat(1000, 1000);
accessor.setSessionId(sessionId);
GenericMessage<byte[]> genericMessage = new GenericMessage<>(username.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
channel.send(genericMessage);
}
void sendErrorMessage(String sessionId, String username, MessageChannel channel) {
StompHeaderAccessor accessor = stompHeaderAccessor(StompCommand.SEND, SimpMessageType.MESSAGE);
accessor.setNativeHeaderValues("destination", Collections.singletonList("/chat/forced-offline"));
accessor.setNativeHeaderValues("content-length", Collections.singletonList(String.valueOf(username.length())));
accessor.setDestination("/chat/forced-offline");
accessor.setHeartbeat(1000, 1000);
accessor.setSessionId(sessionId);
GenericMessage<byte[]> genericMessage = new GenericMessage<>(username.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
channel.send(genericMessage);
}
}
}
最后就是处理消息的控制器,StompController
要使用@Controller
或@RestController
标注。方法使用@MessageMapping("/forced-offline")
进行标注,就像@RequsetMapping
一样,对应的路径处理的对应的请求,这里的路径有一个前缀,这个前缀在上述的StompWebsocketConfiguration
类中的configureMessageBroker(MessageBrokerRegistry config)
方法使用config.setApplicationDestinationPrefixes("/chat");
进行设置。
也是说这里的路径是:"/chat/forced-offline" 的格式
@Controller
@Slf4j
public class StompController {
@Resource
private SimpMessagingTemplate messagingTemplate;
@Value("${stomp.user-destination}")
private String userDestination;
private final ObjectMapper mapper = new ObjectMapper();
@MessageMapping("/forced-offline")
public void forcedOffline(String username) throws JsonProcessingException {
log.warn("stomp用户:{} 被挤下线", username);
ChatProtocol<ChatProtocol.ErrorMessage> chatProtocol = MessageUtil.errorMessageChatProtocol("-1", "您被挤下线");
messagingTemplate.convertAndSendToUser(username, userDestination, mapper.writeValueAsString(chatProtocol));
}
/**
* 向其他用户发送 指定用户上线信息
*
* @param username 用户名称
*/
@MessageMapping("/online")
public void onlineState(String username) throws JsonProcessingException {
log.info("stomp连接:{} 上线", username);
ChatProtocol<ChatProtocol.OnlineState> chatProtocol = MessageUtil.onlineStateProtocol(username, true);
messagingTemplate.convertAndSend("/topic" + userDestination, mapper.writeValueAsString(chatProtocol));
}
/**
* 向其他用户范松 指定用户离线信息
*
* @param username 用户名称
* @throws JsonProcessingException
*/
@MessageMapping("/offline")
public void offlineState(String username) throws JsonProcessingException {
log.info("stomp连接:{} 下线", username);
ChatProtocol<ChatProtocol.OnlineState> chatProtocol = MessageUtil.onlineStateProtocol(username, false);
messagingTemplate.convertAndSend("/topic" + userDestination, mapper.writeValueAsString(chatProtocol));
}
/**
* 获取所有在线用户
*
* @param username 指定用户
*/
@MessageMapping("/user-list")
public void sendUserList(String username) {
List<String> users = StompChatService.getUsersExceptUser(username);
ChatProtocol<ChatProtocol.User> userChatProtocol = MessageUtil.userChatProtocol(users);
try {
messagingTemplate.convertAndSendToUser(username, userDestination, mapper.writeValueAsString(userChatProtocol));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@MessageMapping("/message")
public void message(String payload) throws JsonProcessingException {
// ChatProtocol<ChatProtocol.Message> chatProtocol = mapper.readValue(String.valueOf(payload), new TypeReference<ChatProtocol<ChatProtocol.Message>>() { }); // JDK8
ChatProtocol<ChatProtocol.Message> chatProtocol = mapper.readValue(String.valueOf(payload), new TypeReference<>() {
});
ChatProtocol.Message message = chatProtocol.getData();
String chatType = "group";
if (chatType.equals(message.getType())) {
messagingTemplate.convertAndSend("/topic" + userDestination, payload);
return;
}
String toUser = message.getToUser();
messagingTemplate.convertAndSendToUser(toUser, userDestination, payload);
}
/**
* 订阅 topic/greetings 主题的前端 接受两次消息
*
* @param message
* @return
* @throws Exception
*/
@MessageMapping("/hello")
@SendTo("/topic/greetings")
public String greeting(String message) throws Exception {
System.err.println(message);
messagingTemplate.convertAndSend("/topic/greetings", message);
return message;
}
}
像@SendToUser
,@SendTo
这种一般只能发送给一个指定用户或所有订阅一个相同主题的客户。SimpMessagingTemplate
模版类则提供更多的api来实现更多特定场景的功能。官方已经注入了一个SimpMessagingTemplate
Bean,只需要通过@Resource
或@Autowired
使用即可。
github
完整示例: DawnSilverGravel Websocket Study
参考文档
转载自:https://juejin.cn/post/7268558840265408572