likes
comments
collection
share

Spring Integration Ip 一个好用的TCP/UDP开发框架

作者站长头像
站长
· 阅读数 85

前言

因此本文中,着重讲解Spring Integration Ip的实际应用。

导入依赖

如果是在POM中,则导入以下依赖(由于spring-integration-ip实际会引入spring-integration的依赖,因此无需再添加相关的依赖了)

<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-ip</artifactId> 
    <version>5.5.18</version> 
</dependency>

如果是在gradle中,则导入以下依赖

compile "org.springframework.integration:spring-integration-ip:5.5.18"

配置之前

Spring Integration Ip提供了对TCP和UDP的支持。

其中TcpSendingMessageHandler类用于发送TCP消息,TcpReceivingChannelAdapter类用于配置接收TCP消息的相关参数。

而UnicastSendingMessageHandler类用于发送UDP单播消息,UnicastReceivingChannelAdapter类用于配置接收UDP单播消息的相关参数。

另外,MulticastSendingMessageHandler类用于发送UDP组播消息,MulticastReceivingChannelAdapter类用于配置接收UDP组播消息的相关参数。

开始配置

UDP

以UDP为例,官方支持三种配置方式,XML配置方式、Java配置方式、Java DSL配置方式

UDP Outbound

UDP Outbound(UDP 出站)就是udp发送消息。以单播为例。

XML方式

<int-ip:udp-outbound-channel-adapter id="udpOut" 
    host="somehost" 
    port="11111" 
    multicast="false" 
    socket-customizer="udpCustomizer" 
    channel="exampleChannel"/>

Java方式

@Bean
public UnicastSendingMessageHandler handler() {
    return new UnicastSendingMessageHandler("localhost", 11111); 
}

Java DSL方式

@Bean
public IntegrationFlow udpOutFlow() {
    return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
                    .configureSocket(socket -> socket.setTrafficClass(0x10)))
                .get();
}

UDP Inbound

UDP Inbound(UDP 入站)就是udp接收消息。以独播为例。

XML方式

<int-ip:udp-inbound-channel-adapter id="udpReceiver" 
    channel="udpOutChannel" 
    port="11111" 
    receive-buffer-size="500" 
    multicast="false" 
    socket-customizer="udpCustomizer" 
    check-length="true"/>

Java方式

@Bean
public UnicastReceivingChannelAdapter udpIn() {
    UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
    adapter.setOutputChannelName("udpChannel");
    return adapter;
}

Java DSL方式

@Bean
public IntegrationFlow udpIn() {
    return IntegrationFlows.from(Udp.inboundAdapter(11111))
            .channel("udpChannel")
            .get();
}

项目实战

本文以UDP为例,分别配置客户端与服务端

客户端

定义用于配置UDP参数的类型

定义基础UDP参数类型

@Data
@NoArgsConstructor
@AllArgsConstructor
public class BaseUdpProp {
    private String host;
    private Integer port;
}

定义用于配置UDP单播入站参数的类型

@Component
@ConfigurationProperties(prefix = "custom.udp.unicast.inbound")
public class UnicastInboundUdpProp extends BaseUdpProp {

}

定义用于配置UDP单播出站参数的类型

@Component
@ConfigurationProperties(prefix = "custom.udp.unicast.outbound")
public class UnicastOutboundUdpProp extends BaseUdpProp {

}

定义用于配置UDP组播出站参数的类型

@Component
@ConfigurationProperties(prefix = "custom.udp.multicast.outbound")
public class MulticastOutboundUdpProp extends BaseUdpProp {

}

自定义的命令

public class ClusterCommand {
    public static final String FINDING_COMMAND = "Finding cluster";
    public static final String JOINING_COMMAND = "Joining cluster";
    public static final String RESET_COMMAND = "Reset node";
}

配置UDP单播入站

配置了Message Router和Transformer消息组件,用来实现按内容进行不同通道分发的业务逻辑

@Configuration
public class UnicastInboundUdpConfig {

    @Resource
    private UnicastInboundUdpProp prop;
    @Resource
    private ObjectMapper objectMapper;
    
    /**
     * 用于配置UDP单播的适配器,对应前文中提到的Channel Adapter的概念。
     * 该实例创建后,会自动产生对于目标端口的监听线程
     * 这里我们只定义了通道的名称,没有创建通道,所以会使用默认的DirectChannel,
     * 如果希望自定义通道,则可以调用adapter.setOutputChannel方法
     *
     * @author huangji
    */
    @Bean
    public UnicastReceivingChannelAdapter unicastAdapter() {
        UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(prop.getPort());
        adapter.setOutputChannelName("UnicastInboundUdpChannel");

        return adapter;
    }

    /**
     * 填入inputChannel,用来接收从apdater输出的数据,
     * 之后根据我们定义的判断逻辑,来分发给对应的目标通道
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @param headers 加了@Headers注解,会被通道自动注入消息头
     * @return 返回通道名称,即最终分发的目标通道的名称
     * @author huangji
     */
    @Router(inputChannel = "UnicastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received unicast inbound from " + host);
        System.out.println("content is " + payload);
        try {
            objectMapper.readValue(payload, new TypeReference<List<String>>() {
            });
            return "NodeNameListChannel";
        } catch (JsonProcessingException ignored) {

        }
        try {
            objectMapper.readValue(payload, NodeJoiningParameter.class);
            return "JoiningParameterChannel";
        } catch (JsonProcessingException ignored) {

        }
        if (payload.equals(ClusterCommand.RESET_COMMAND)) {
            return "JoinedListeningChannel";
        }


        return "OtherChannel";
    }

    /**
     * 这里用@Transformer注解定义了一个转换器组件,将负载类型转化成了目标类型
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @return 返回转化后的结果List<String>
     * @throws JsonProcessingException
     * @author huangji
     */
    @Transformer(inputChannel = "NodeNameListChannel", outputChannel = "NodeNameTransformerChannel")
    public List<String> nodeNameListTransformer(String payload) throws JsonProcessingException {
        return objectMapper.readValue(payload, new TypeReference<>() {
        });
    }

    /**
     * 这里用@Transformer注解定义了一个转换器组件,将负载类型转化成了目标类型
     * 
     * @param payload 负载消息,会被通道自动注入到变量中
     * @return 返回转化后的结果
     * @throws JsonProcessingException
     * @author huangji
     */
    @Transformer(inputChannel = "JoiningParameterChannel", outputChannel = "ParameterTransformerChannel")
    public NodeJoiningParameter parameterTransformer(String payload) throws JsonProcessingException {
        return objectMapper.readValue(payload, NodeJoiningParameter.class);
    }
}

配置消息最终接收的通道

在这里进行自身业务逻辑的处理

    @ServiceActivator(inputChannel = "NodeNameTransformerChannel")
    public void nodeInfoListHandler(List<String> list, @Headers Map<String, Object> headers) {
          ...
    }

    @ServiceActivator(inputChannel = "ParameterTransformerChannel")
    public void joiningParameterHandler(NodeJoiningParameter parameter, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        String udpHost = udpConfig.getMessageHandler().getHost();
        if (host.equals(udpHost)) {
           ...
        }
    }

    @ServiceActivator(inputChannel = "JoinedListeningChannel")
    public void joinedListeningHandler(@Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        String udpHost = udpConfig.getMessageHandler().getHost();
        if (host.equals(udpHost)) {
           ...
        }
    }

    @ServiceActivator(inputChannel = "OtherChannel")
    public void otherHandler(String payload) {
        System.out.println("other payload: " + payload);
    }

发送消息

有效负载的内容在代码中必须是String类型或者byte[]类型,否则会在发送消息时抛出异常

发送UDP广播消息

MulticastSendingMessageHandler multicastHandler= SpringUtil.getBean("multicastHandler");

multicastHandler.handleMessage(MessageBuilder.withPayload(ClusterCommand.FINDING_COMMAND).build());

发送UDP单播消息

UnicastSendingMessageHandler messageHandler = outboundUdp.getMessageHandler();

messageHandler.handleMessage(MessageBuilder.withPayload(ClusterCommand.JOINING_COMMAND).build());

服务端

配置UDP组播入站

@Configuration
public class MulticastInboundUdpConfig {
    @Value("${custom.udp.multicast.inbound.host}")
    private String host;
    @Value("${custom.udp.multicast.inbound.port}")
    private Integer port;

    /**
     * 与单播的配置类似,这里也是配置组播的ChannelAdapter
     *
    */
    @Bean
    public MulticastReceivingChannelAdapter multicastAdapter() {
        MulticastReceivingChannelAdapter adapter = new MulticastReceivingChannelAdapter(host, port);
        adapter.setOutputChannelName("MulticastInboundUdpChannel");

        return adapter;
    }

    /**
     * 这里也是配置消息的Router组件
     *
    */
    @Router(inputChannel = "MulticastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String remoteHost = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received multicast inbound from " + remoteHost);
        System.out.println("content is " + payload);
        if (payload.equals(ClusterCommand.FINDING_COMMAND)) {
            return "FindingChannel";
        }

        return "OtherChannel";
    }
}

配置UDP单播入站

@Configuration
public class UnicastInboundUdpConfig {
    @Value("${custom.udp.unicast.inbound.port}")
    private Integer port;

    @Bean
    public UnicastReceivingChannelAdapter unicastAdapter() {
        UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(port);
        adapter.setOutputChannelName("UnicastInboundUdpChannel");

        return adapter;
    }

    @Router(inputChannel = "UnicastInboundUdpChannel")
    public String router(String payload, @Headers Map<String, Object> headers) {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        System.out.println("received unicast inbound from " + host);
        if (payload.equals(ClusterCommand.JOINING_COMMAND)) {
            return "JoiningChannel";
        }

        return "OtherChannel";
    }
}

配置消息最终接收的通道

    @ServiceActivator(inputChannel = "FindingChannel")
    public void listenFindingHandler(@Headers Map<String, Object> headers) throws JsonProcessingException {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        UnicastSendingMessageHandler messageHandler = new UnicastSendingMessageHandler(host, udpOutboundPort);
        String payload = objectMapper.writeValueAsString(fetchNodeNameList());
        messageHandler.handleMessage(MessageBuilder.withPayload(payload).build());
    }

    @ServiceActivator(inputChannel = "JoiningChannel")
    public void listenJoiningHandler(@Headers Map<String, Object> headers) throws IOException, InterruptedException {
        String host = headers.get(IpHeaders.IP_ADDRESS).toString();
        UnicastSendingMessageHandler messageHandler = new UnicastSendingMessageHandler(host, udpOutboundPort);
       ...
    }

    @ServiceActivator(inputChannel = "OtherChannel")
    public void otherHandler(String payload) {
        System.out.println("other payload: " + payload);
    }

附录

消息头

Spring Integration Ip包含以下的头部信息,可以根据需要去获取自己想要的头部信息。

Header NameIpHeaders ConstantDescription
ip_hostnameHOSTNAMEThe host name from which a TCP message or UDP packet was received. If lookupHost is false, this contains the IP address.
ip_addressIP_ADDRESSThe IP address from which a TCP message or UDP packet was received.
ip_portPORTThe remote port for a UDP packet.
ip_localInetAddressIP_LOCAL_ADDRESSThe local InetAddress to which the socket is connected (since version 4.2.5).
ip_ackToACKADDRESSThe remote IP address to which UDP application-level acknowledgments are sent. The framework includes acknowledgment information in the data packet.
ip_ackIdACK_IDA correlation ID for UDP application-level acknowledgments. The framework includes acknowledgment information in the data packet.
ip_tcp_remotePortREMOTE_PORTThe remote port for a TCP connection.
ip_connectionIdCONNECTION_IDA unique identifier for a TCP connection. Set by the framework for inbound messages. When sending to a server-side inbound channel adapter or replying to an inbound gateway, this header is required so that the endpoint can determine the connection to which to send the message.
ip_actualConnectionIdACTUAL_CONNECTION_IDFor information only. When using a cached or failover client connection factory, it contains the actual underlying connection ID.
contentTypeMessageHeaders. CONTENT_TYPEAn optional content type for inbound messages Described after this table. Note that, unlike the other header constants, this constant is in the MessageHeaders class, not the IpHeaders class.
转载自:https://juejin.cn/post/7291477529778913320
评论
请登录