likes
comments
collection
share

sonic-agent是如何整合通信的

作者站长头像
站长
· 阅读数 25

业务背景

agent作为sonic云真机的核心功能工具,担负着接收来自sonic-server端发来的业务消息以及自身状态、设备状态等信息的回传功能。具体主要体现在如下的核心功能上

  • sonic-server端发送的任务请求消息:创建任务,agent管理等
  • agent自身的状态信息回传,比如agent启动时上报自身信息
  • 设备信息回传,Android和iOS设备接入时回传设备信息
  • 云真机启动时设备各项数据流的回传:画面流、音频流、性能数据、设备自身的动态参数(电量等)

其中,云真机部分的数据流传送需要实时性和高效性,来保证用户在体验云真机时有不错的体感。那么结合整体来看,就需要一种长连接的通信机制,保证通信的高效率。那么肯定不必多说,在这个需求下,websocket肯定是业务的首选,而sonic也是采用了websocket的这套机制来进行实时性、高效性的双端通信。

逻辑结构

sonic-agent是如何整合通信的

上手实践

本次的实践是基于纯后端,也就是server和client之间用websocket实现通信,借助http接口请求,实现通信效果。

Server端

  1. pom引入相关依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>
  1. 定义websocket连接自定义功能的配置类WsEndpointConfigure
public class WsEndpointConfigure extends ServerEndpointConfig.Configurator implements ApplicationContextAware {

    private static volatile BeanFactory beanFactory;

    @Override
    public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
        return beanFactory.getBean(clazz);
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        WsEndpointConfigure.beanFactory = applicationContext;
    }
}

关于ServerEndpointConfig.Configurator类,它主要的作用是在服务端配置websocket的端点行为。后续会讲到。

但实际上如果自身对websocket连接没有自定义需求的话,可以不用定义这个类。

  1. 配置ServerEndpointExporter
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }

    @Bean
    public WsEndpointConfigure wsEndpointConfigure(){
        return new WsEndpointConfigure();
    }
}
  1. 实现端点类,注意serverEndpoint的用法
@Component
@Slf4j
@ServerEndpoint(value = "/agent/conn", configurator = WsEndpointConfigure.class)
public class TransportServer {

    @OnOpen
    public void onOpen(Session session) throws IOException {
        log.info("Session: " + session.getId() + " is connected!");
        session.getBasicRemote().sendText("Hello client!");
    }

    @OnMessage
    public void onMessage(String message, Session session) throws IOException{
        log.info("Session: " + session.getId() + " received msg from client: " + message);
        session.getBasicRemote().sendText("Server received msg: " + message);
    }

    @OnClose
    public void onClose(Session session){
        log.info("Session: " + session.getId() + " closed!");
    }

    @OnError
    public void onError(Session session, Throwable error){
        log.info("Session: " + session.getId() + " has error: " + error.getMessage());
    }
}

那么服务端的endpoint就完成了,等待客户端的连接,连接上了就先会走onOpen方法,表明有客户端连接。

Client端

client端则更为简易,因为客户端所要做的就是两个事

  • 确定服务端URI,并发起连接
  • 发送业务消息

在这里,文章中对sonic-agent的client做了一个改动,客户端改为了依赖注入,并确保注入完成的后,client是一个即时可用的对象。之所以这么做的原因,是因为client在发起connect方法连接的时候,会等待服务端响应,若不处理,则有可能发送出现client为null导致的NPE问题。

  1. pom依赖配置
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.java-websocket</groupId>
        <artifactId>Java-Websocket</artifactId>
        <version>1.5.3</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>
  1. 定义客户端连接类TransportClient,实现四个抽象方法
@Slf4j
public class TransportClient extends WebSocketClient {
    public TransportClient(URI serverUri) {
        super(serverUri);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("start connect to server");
    }

    @Override
    public void onMessage(String s) {
        log.info("message from server: " + s);
    }

    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("client closing......");
    }

    @Override
    public void onError(Exception e) {
        log.info(e.getMessage());
    }
}
  1. 使用Configuration注解注入bean,这里取了个巧,在初始化bean的时候校验了client是否连接server成功。
@Slf4j
@Configuration
public class TransportClientConfig {
    @Bean
    public TransportClient transportClient() throws InterruptedException {
        String url = "ws://localhost:8091/agent/conn";
        URI uri = URI.create(url);
        TransportClient client = new TransportClient(uri);
        client.connect();
        while (!client.isOpen()){
            log.info("connecting....");
            Thread.sleep(1000);
        }
        return client;
    }
}
  1. 利用http接口发送数据消息
@RestController
@RequestMapping("/client")
public class HelloController {

    @Resource
    private TransportClient client;

    @GetMapping("/send")
    public String sendMsg(@RequestParam String message){
        client.send(message);
        return "ok";
    }
}

client端至此也完成了,启动server和client,调用接口即可查看双端相应。

设计与优化

这样就实现了一个在server端和client端之间通信的引擎。可以看出,最后可以完全定义一个JSON对象,转化为JSON的字符串后在需要的地方调用client的send方法即可。然后在JSON对象中去定义task类型和具体的业务字段含义。而sonic-agent也正是这样子做的,将业务消息放在双端的onMessage方法里面,对JSON字符串进行字段提取,然后通过switch语句进行定位,并处理case中的业务逻辑。

那么抛开websocket,如果还需要实现这样的一套逻辑,有没有其它的方法呢?答案肯定是有的,那就是使用Netty,这个高性能的异步网络通信框架。如果使用netty,整体结构上和websocket的写法是很相似的。

  • 在服务端启动一个ServerBootstrap,并配置相关的workerGroup和bossGroup,配置非阻塞Nio通道后,绑定端口启动,等待客户端连接
  • 定义交互协议格式,并设计好相对应的编解码类,实现ByteBuf和Object之间的转化
  • 定义业务handler,实现业务交互逻辑
  • 客户端启动Bootstrap,设定Nio通道,配置服务端地址信息,然后启动。
  • 发送信息至服务端进行处理

当然在这个项目里,websocket足够使用了,但是可以作为思想拓展这样试一试。

另外,由于云真机的屏幕流数据是直接与前端交互处理的,这也是为什么会使用websocket的第二个原因,前端直接提取数据流,渲染后输出在页面上,很方便,无缝衔接。

ServerEndpointConfig.Configurator

前面提到了ServerEndpointConfig.Configurator,简而言之,它是用于配置Websocket端点行为的类。核心方法如下:

public String getNegotiatedSubprotocol(List<String> supported, List<String> requested) {
    return fetchContainerDefaultConfigurator().getNegotiatedSubprotocol(supported, requested);
}

public List<Extension> getNegotiatedExtensions(List<Extension> installed, List<Extension> requested) {
    return fetchContainerDefaultConfigurator().getNegotiatedExtensions(installed, requested);
}

public boolean checkOrigin(String originHeaderValue) {
    return fetchContainerDefaultConfigurator().checkOrigin(originHeaderValue);
}

public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    fetchContainerDefaultConfigurator().modifyHandshake(sec, request, response);
}

public <T extends Object> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
    return fetchContainerDefaultConfigurator().getEndpointInstance(clazz);
}

它的主要作用有以下几点

  1. 自定义配置,比如上面的modifyHandshake方法,可以修改过程中的Http请求和响应,也可以来重写checkOrigin来实现跨域请求
public class CustomConfigurator extends ServerEndpointConfig.Configurator {
    
    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        // 允许所有来源的跨域请求
        response.getHeaders().add("Access-Control-Allow-Origin", "*");
    }
}
  1. 允许在Endpoint中管理会话,可以重写onOpen、onMessage、onClose、onError等方法来实现自身的业务逻辑
  2. 拓展功能,可以添加自定义的日志记录等
public class LoggingConfigurator extends ServerEndpointConfig.Configurator {

    private static final Logger logger = Logger.getLogger(LoggingConfigurator.class.getName());

    @Override
    public void modifyHandshake(ServerEndpointConfig config) {
        // 在握手之前记录连接请求信息
        logger.info("WebSocket handshake initiated for endpoint: " + config.getEndpointClass().getName());
    }

    @Override
    public void destroy(ServerEndpointConfig config) {
        // 在销毁 WebSocket 端点时记录信息
        logger.info("WebSocket endpoint destroyed for: " + config.getEndpointClass().getName());
    }
}

它就是一个可以允许开发者在websocket中实现定制化功能的一个类,如果你没有定制化需求,在上面的例子中可以不需要重写这个类。

以下是chatGPT对于这个类是否需要的回答:

sonic-agent是如何整合通信的

转载自:https://juejin.cn/post/7279720181534588962
评论
请登录