通过WebSocket实现日志打印功能
概述
项目中的需求,需要在浏览器前端页面输出执行日志,也就是说将后台的log中的info,error日志输,如下图,,帮助快速定位问题。
那么该如何实现呢?
整体设计
该功能的实现主要采用下面的技术:
- 通过websocket实现日志消息的传输
- 通过redis的发布、订阅模式实现消息的流转
上图是指标测试的整个时序流程图。
- 用户点击指标测试按钮,会同时建立两个请求,上图中的1和4,一个是http请求,
rs/sd/feature/test/run
,执行功能。 同时建立长连接websocket请求,sockjs/webSocketServer/info
, 他们有个一个共同的参数websocketKey, 值须要保持一致,这样才能够识别此次指标测试的数据推送到哪个webSocket中。 - rs/sd/feature/test/run接口执行过程中的info, warn, error日志都会被推送redis中,这些日志中包含了websocketKey, 标记这个日志属于哪个指标测试的,见上图中的2。
- 应用订阅redis主题,获取日志消息,解析,根据websocketKey字段,从sdm推送数据回浏览器,见图7。
代码实现
建立websocket请求
- websocket配置类
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Value("${websocket.num:100}")
private int sessionNum;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler(), "/webSocketServer")
.addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*");
registry.addHandler(webSocketHandler(), "/sockjs/webSocketServer")
.addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*").withSockJS();
}
@Bean
public SpringWebSocketHandler webSocketHandler() {
return new SpringWebSocketHandler(sessionNum);
}
@Bean
public RequestUpgradeStrategy strategy() {
return new TomcatRequestUpgradeStrategy();
}
}
public class SpringWebSocketHandler extends TextWebSocketHandler {
private static final String SESSION_ID = "HTTP.SESSION.ID";
private Logger logger = LoggerFactory.getLogger(SpringWebSocketHandler.class);
private Map<String, WebSocketSession> sessions;
private Pattern URL_PATTERN = Pattern.compile("(\?|&+)(.+?)=([^&]*)");
public SpringWebSocketHandler(int maxCacheSzie) {
sessions = Collections.synchronizedMap(new LRUCache<String, WebSocketSession>(maxCacheSzie));
}
/**
* 连接建立之后回调
*
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String key = getSessionKey(session);
sessions.put(key, session);
logger.info("session[{}] connected, current cached websocket session size: {}", key, sessions.size());
}
/**
* 连接关闭后回调
*
* @param session
* @param closeStatus
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
String key = getSessionKey(session);
sessions.remove(key);
logger.info("websocket session[{}] connection closed, current cached websocket session size: {}", key,
sessions.size());
}
private String getSessionKey(WebSocketSession session) {
Matcher matcher = URL_PATTERN.matcher(session.getUri().toString());
String key = null;
while (matcher.find()) {
if(Objects.equals(matcher.group(2), "websocketKey")) {
key = matcher.group(3);
}
}
if (StringUtils.isEmpty(key)) {
key = String.valueOf(session.getAttributes().get(SESSION_ID));
}
return key;
}
/**
* 发送消息处理
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
}
/**
* 传输错误处理
*
* @param session
* @param exception
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
logger.warn("websocket handleTransportError: {}", exception.getMessage());
if (session.isOpen()) {
session.close();
}
String sessionId = (String)session.getAttributes().get("HTTP.SESSION.ID");
sessions.remove(sessionId);
logger.info("websocket session[{}] connection closed, current cached websocket session size: {}", sessionId,
sessions.size());
}
/**
* 是否支持处理部分的消息
*
* @return
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 对指定session发送消息
*
* @param sessionId
* @param message
*/
public void sendMessageToSession(String sessionId, TextMessage message) {
WebSocketSession session = sessions.get(sessionId);
if (session == null) {
return;
}
try {
if (session.isOpen()) {
session.sendMessage(message);
} else {
sessions.remove(sessionId);
}
} catch (IOException e) {
logger.error("sendMessageToSession error", e);
}
}
/**
* 群发消息
*
* @param message
*/
public void sendMessageToSessions(TextMessage message) {
Iterator<Map.Entry<String, WebSocketSession>> it = sessions.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, WebSocketSession> entry = it.next();
String sessionId = entry.getKey();
WebSocketSession session = entry.getValue();
try {
if (session.isOpen()) {
session.sendMessage(message);
} else {
sessions.remove(sessionId);
}
} catch (IOException e) {
logger.error("sendMessageToSessions error", e);
}
}
}
}
添加日志的appender
- 定义webconsole appender
@Slf4j
@Plugin(name = "WebConsoleAppender", category = "core", elementType = "appender", printObject = true)
public class WebConsoleLogAppender extends AbstractAppender {
public WebConsoleLogAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
final boolean ignoreExceptions, Property[] properties) {
super(name, filter, layout, ignoreExceptions, properties);
}
@Override
public void append(LogEvent logEvent) {
String websocketkey = ServiceContext.getWebsocketKey();
if (StrUtil.isNotEmpty(websocketkey) && !websocketkey.toUpperCase().startsWith("SL")
&& !websocketkey.toUpperCase().startsWith("PKG")) {
String content = new String(this.getLayout().toByteArray(logEvent));
LoggerEvent loggerEvent =
new LoggerEvent(websocketkey, logEvent.getLevel().toString(), "webconsole", content, 50000000);
WebConsoleLogMsgManager.sendMsg(loggerEvent);
}
}
@PluginFactory
public static WebConsoleLogAppender createAppender(@PluginAttribute("name") String name,
@PluginElement("Filter") final Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginAttribute("ignoreExceptions") boolean ignoreExceptions) {
if (name == null) {
log.warn("No name provided for WebConsoleAppender");
return null;
}
if (layout == null) {
layout = PatternLayout.createDefaultLayout();
}
return new WebConsoleLogAppender(name, filter, layout, ignoreExceptions, null);
}
}
任何日志打印,该类的append方法都会接受到,然后推送到redis中,详见append方法。
- 将appender添加到log4j.xml或者logback.xml中
通过redis的消费订阅模式推送日志消息
- redis发布订阅配置
@Configuration
@ConditionalOnBean(RedisConfig.class)
public class RedisSubcrbieConfig {
/**
* 订阅主题
*/
public static final String SUB_KEY = "WEB_CONSOLE_LOG";
@Bean(name = "stringRedisTemplate")
public RedisTemplate stringRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
return redisTemplate;
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(SUB_KEY));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(WebConsoleLogMsgReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMsg");
}
}
public interface WebConsoleLogMsgReceiver {
/**
* 接受日志消息
* @param message 日志消息内容
*/
void receiveMsg(String message);
}
@Component
public class RedisLogMsgReceiver implements WebConsoleLogMsgReceiver {
@Autowired
private SpringWebSocketHandler handler;
@Override
public void receiveMsg(String message) {
LoggerEvent loggerEvent = JSON.parseObject(message, LoggerEvent.class);
TextMessage textMessage = new TextMessage(JSON.toJSONBytes(loggerEvent, SerializerFeature.PrettyFormat));
handler.sendMessageToSession(loggerEvent.getSessionId(), textMessage);
}
}
- 发布日志消息
public interface WebConsoleLogMsgSender {
/**
* 发送webconsole日志数据
* @param loggerEvent 日志数据
*/
void send(LoggerEvent loggerEvent);
}
@Component
@Slf4j
public class RedisLogMsgSender implements WebConsoleLogMsgSender {
@Autowired
@Qualifier("stringRedisTemplate")
private RedisTemplate redisTemplate;
@Override
public void send(LoggerEvent loggerEvent) {
redisTemplate.convertAndSend(RedisSubcrbieConfig.SUB_KEY, JSON.toJSONString(loggerEvent));
}
}
@Slf4j
public class WebConsoleLogMsgManager {
/**
* 发送消息
* @param loggerEvent 日志对象
*/
public static void sendMsg(LoggerEvent loggerEvent) {
if(loggerEvent == null) {
log.warn("logger event is empty");
return;
}
WebConsoleLogMsgSender sender = SdBeanFactory.getBean(WebConsoleLogMsgSender.class);
sender.send(loggerEvent);
}
}
- 在appender中发布日志消息
总结
大家想过,这里采用了redis作为发布订阅模式,为什么不是通过kafka的队列呢?
因为应用为了高可用,需要部署多个节点,每个节点去kafkfa消费的group必须不一样,也就是说,sdm每个节点必须都要消费全部的日志。为什么这样?
试想如果应用是多个节点,会存在什么问题呢?比如有两个应用节点, A节点, B节点。 用户点击指标测试按钮,如果sockjs/webSocketServer/info被转发到A节点,如果AB节点共享同一个group的话,此时正好B节点消费了kafka数据,那么A节点就消费不到,转发到A节点的websocket也就获取不到日志数据,无法展示了,所以要保证不同的节点都要消费到kafkfa数据,sdm节点的group要不一样,但是实际情况下,group一般都是比较固定的。
大家有没有其他思路实现该功能?可以在评论区留言。
转载自:https://juejin.cn/post/7129863610150420488