Spring Integration Ip 一个好用的TCP/UDP开发框架
前言
因此本文中,着重讲解Spring Integration Ip的实际应用。
导入依赖
如果是在POM中,则导入以下依赖(由于spring-integration-ip实际会引入spring-integration的依赖,因此无需再添加相关的依赖了)
<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-ip</artifactId> 
    <version>5.5.18</version> 
</dependency>
如果是在gradle中,则导入以下依赖
compile "org.springframework.integration:spring-integration-ip:5.5.18"
配置之前
Spring Integration Ip提供了对TCP和UDP的支持。
其中TcpSendingMessageHandler类用于发送TCP消息,TcpReceivingChannelAdapter类用于配置接收TCP消息的相关参数。
而UnicastSendingMessageHandler类用于发送UDP单播消息,UnicastReceivingChannelAdapter类用于配置接收UDP单播消息的相关参数。
另外,MulticastSendingMessageHandler类用于发送UDP组播消息,MulticastReceivingChannelAdapter类用于配置接收UDP组播消息的相关参数。
开始配置
UDP
以UDP为例,官方支持三种配置方式,XML配置方式、Java配置方式、Java DSL配置方式
UDP Outbound
UDP Outbound(UDP 出站)就是udp发送消息。以单播为例。
XML方式
<int-ip:udp-outbound-channel-adapter id="udpOut" 
    host="somehost" 
    port="11111" 
    multicast="false" 
    socket-customizer="udpCustomizer" 
    channel="exampleChannel"/>
Java方式
@Bean
public UnicastSendingMessageHandler handler() {
    return new UnicastSendingMessageHandler("localhost", 11111); 
}
Java DSL方式
@Bean
public IntegrationFlow udpOutFlow() {
    return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
                    .configureSocket(socket -> socket.setTrafficClass(0x10)))
                .get();
}
UDP Inbound
UDP Inbound(UDP 入站)就是udp接收消息。以独播为例。
XML方式
<int-ip:udp-inbound-channel-adapter id="udpReceiver" 
    channel="udpOutChannel" 
    port="11111" 
    receive-buffer-size="500" 
    multicast="false" 
    socket-customizer="udpCustomizer" 
    check-length="true"/>
Java方式
@Bean
public UnicastReceivingChannelAdapter udpIn() {
    UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
    adapter.setOutputChannelName("udpChannel");
    return adapter;
}
Java DSL方式
@Bean
public IntegrationFlow udpIn() {
    return IntegrationFlows.from(Udp.inboundAdapter(11111))
            .channel("udpChannel")
            .get();
}
项目实战
本文以UDP为例,分别配置客户端与服务端
客户端
定义用于配置UDP参数的类型
定义基础UDP参数类型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class BaseUdpProp {
    private String host;
    private Integer port;
}
定义用于配置UDP单播入站参数的类型
@Component
@ConfigurationProperties(prefix = "custom.udp.unicast.inbound")
public class UnicastInboundUdpProp extends BaseUdpProp {
}
定义用于配置UDP单播出站参数的类型
@Component
@ConfigurationProperties(prefix = "custom.udp.unicast.outbound")
public class UnicastOutboundUdpProp extends BaseUdpProp {
}
定义用于配置UDP组播出站参数的类型
@Component
@ConfigurationProperties(prefix = "custom.udp.multicast.outbound")
public class MulticastOutboundUdpProp extends BaseUdpProp {
}
自定义的命令
public class ClusterCommand {
    public static final String FINDING_COMMAND = "Finding cluster";
    public static final String JOINING_COMMAND = "Joining cluster";
    public static final String RESET_COMMAND = "Reset node";
}
配置UDP单播入站
配置了Message Router和Transformer消息组件,用来实现按内容进行不同通道分发的业务逻辑
@Configuration
public class UnicastInboundUdpConfig {
    @Resource
    private UnicastInboundUdpProp prop;
    @Resource
    private ObjectMapper objectMapper;
    
    /**
     * 用于配置UDP单播的适配器,对应前文中提到的Channel Adapter的概念。
     * 该实例创建后,会自动产生对于目标端口的监听线程
     * 这里我们只定义了通道的名称,没有创建通道,所以会使用默认的DirectChannel,
     * 如果希望自定义通道,则可以调用adapter.setOutputChannel方法
     *
     * @author huangji
    */
    @Bean
    public UnicastReceivingChannelAdapter unicastAdapter() {
        UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(prop.getPort());
        adapter.setOutputChannelName("UnicastInboundUdpChannel");
        return adapter;
    }
    /**
     * 填入inputChannel,用来接收从apdater输出的数据,
     * 之后根据我们定义的判断逻辑,来分发给对应的目标通道
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @param headers 加了@Headers注解,会被通道自动注入消息头
     * @return 返回通道名称,即最终分发的目标通道的名称
     * @author huangji
     */
    @Router(inputChannel = "UnicastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received unicast inbound from " + host);
        System.out.println("content is " + payload);
        try {
            objectMapper.readValue(payload, new TypeReference<List<String>>() {
            });
            return "NodeNameListChannel";
        } catch (JsonProcessingException ignored) {
        }
        try {
            objectMapper.readValue(payload, NodeJoiningParameter.class);
            return "JoiningParameterChannel";
        } catch (JsonProcessingException ignored) {
        }
        if (payload.equals(ClusterCommand.RESET_COMMAND)) {
            return "JoinedListeningChannel";
        }
        return "OtherChannel";
    }
    /**
     * 这里用@Transformer注解定义了一个转换器组件,将负载类型转化成了目标类型
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @return 返回转化后的结果List<String>
     * @throws JsonProcessingException
     * @author huangji
     */
    @Transformer(inputChannel = "NodeNameListChannel", outputChannel = "NodeNameTransformerChannel")
    public List<String> nodeNameListTransformer(String payload) throws JsonProcessingException {
        return objectMapper.readValue(payload, new TypeReference<>() {
        });
    }
    /**
     * 这里用@Transformer注解定义了一个转换器组件,将负载类型转化成了目标类型
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @return 返回转化后的结果
     * @throws JsonProcessingException
     * @author huangji
     */
    @Transformer(inputChannel = "JoiningParameterChannel", outputChannel = "ParameterTransformerChannel")
    public NodeJoiningParameter parameterTransformer(String payload) throws JsonProcessingException {
        return objectMapper.readValue(payload, NodeJoiningParameter.class);
    }
}
配置消息最终接收的通道
在这里进行自身业务逻辑的处理
    @ServiceActivator(inputChannel = "NodeNameTransformerChannel")
    public void nodeInfoListHandler(List<String> list, @Headers Map<String, Object> headers) {
          ...
    }
    @ServiceActivator(inputChannel = "ParameterTransformerChannel")
    public void joiningParameterHandler(NodeJoiningParameter parameter, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        String udpHost = udpConfig.getMessageHandler().getHost();
        if (host.equals(udpHost)) {
           ...
        }
    }
    @ServiceActivator(inputChannel = "JoinedListeningChannel")
    public void joinedListeningHandler(@Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        String udpHost = udpConfig.getMessageHandler().getHost();
        if (host.equals(udpHost)) {
           ...
        }
    }
    @ServiceActivator(inputChannel = "OtherChannel")
    public void otherHandler(String payload) {
        System.out.println("other payload: " + payload);
    }
发送消息
有效负载的内容在代码中必须是String类型或者byte[]类型,否则会在发送消息时抛出异常
发送UDP广播消息
MulticastSendingMessageHandler multicastHandler= SpringUtil.getBean("multicastHandler");
multicastHandler.handleMessage(MessageBuilder.withPayload(ClusterCommand.FINDING_COMMAND).build());
发送UDP单播消息
UnicastSendingMessageHandler messageHandler = outboundUdp.getMessageHandler();
messageHandler.handleMessage(MessageBuilder.withPayload(ClusterCommand.JOINING_COMMAND).build());
服务端
配置UDP组播入站
@Configuration
public class MulticastInboundUdpConfig {
    @Value("${custom.udp.multicast.inbound.host}")
    private String host;
    @Value("${custom.udp.multicast.inbound.port}")
    private Integer port;
    /**
     * 与单播的配置类似,这里也是配置组播的ChannelAdapter
     *
    */
    @Bean
    public MulticastReceivingChannelAdapter multicastAdapter() {
        MulticastReceivingChannelAdapter adapter = new MulticastReceivingChannelAdapter(host, port);
        adapter.setOutputChannelName("MulticastInboundUdpChannel");
        return adapter;
    }
    /**
     * 这里也是配置消息的Router组件
     *
    */
    @Router(inputChannel = "MulticastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String remoteHost = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received multicast inbound from " + remoteHost);
        System.out.println("content is " + payload);
        if (payload.equals(ClusterCommand.FINDING_COMMAND)) {
            return "FindingChannel";
        }
        return "OtherChannel";
    }
}
配置UDP单播入站
@Configuration
public class UnicastInboundUdpConfig {
    @Value("${custom.udp.unicast.inbound.port}")
    private Integer port;
    @Bean
    public UnicastReceivingChannelAdapter unicastAdapter() {
        UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(port);
        adapter.setOutputChannelName("UnicastInboundUdpChannel");
        return adapter;
    }
    @Router(inputChannel = "UnicastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received unicast inbound from " + host);
        if (payload.equals(ClusterCommand.JOINING_COMMAND)) {
            return "JoiningChannel";
        }
        return "OtherChannel";
    }
}
配置消息最终接收的通道
    @ServiceActivator(inputChannel = "FindingChannel")
    public void listenFindingHandler(@Headers Map<String, Object> headers) throws JsonProcessingException {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        UnicastSendingMessageHandler messageHandler = new UnicastSendingMessageHandler(host, udpOutboundPort);
        String payload = objectMapper.writeValueAsString(fetchNodeNameList());
        messageHandler.handleMessage(MessageBuilder.withPayload(payload).build());
    }
    @ServiceActivator(inputChannel = "JoiningChannel")
    public void listenJoiningHandler(@Headers Map<String, Object> headers) throws IOException, InterruptedException {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        UnicastSendingMessageHandler messageHandler = new UnicastSendingMessageHandler(host, udpOutboundPort);
       ...
    }
    @ServiceActivator(inputChannel = "OtherChannel")
    public void otherHandler(String payload) {
        System.out.println("other payload: " + payload);
    }
附录
消息头
Spring Integration Ip包含以下的头部信息,可以根据需要去获取自己想要的头部信息。
| Header Name | IpHeaders Constant | Description | 
|---|---|---|
| ip_hostname | HOSTNAME | The host name from which a TCP message or UDP packet was received. If lookupHostisfalse, this contains the IP address. | 
| ip_address | IP_ADDRESS | The IP address from which a TCP message or UDP packet was received. | 
| ip_port | PORT | The remote port for a UDP packet. | 
| ip_localInetAddress | IP_LOCAL_ADDRESS | The local InetAddressto which the socket is connected (since version 4.2.5). | 
| ip_ackTo | ACKADDRESS | The remote IP address to which UDP application-level acknowledgments are sent. The framework includes acknowledgment information in the data packet. | 
| ip_ackId | ACK_ID | A correlation ID for UDP application-level acknowledgments. The framework includes acknowledgment information in the data packet. | 
| ip_tcp_remotePort | REMOTE_PORT | The remote port for a TCP connection. | 
| ip_connectionId | CONNECTION_ID | A unique identifier for a TCP connection. Set by the framework for inbound messages. When sending to a server-side inbound channel adapter or replying to an inbound gateway, this header is required so that the endpoint can determine the connection to which to send the message. | 
| ip_actualConnectionId | ACTUAL_CONNECTION_ID | For information only. When using a cached or failover client connection factory, it contains the actual underlying connection ID. | 
| contentType | MessageHeaders.CONTENT_TYPE | An optional content type for inbound messages Described after this table. Note that, unlike the other header constants, this constant is in the MessageHeadersclass, not theIpHeadersclass. | 
转载自:https://juejin.cn/post/7291477529778913320




