Spring integration Handle Connection Close event with Event Listener and Re establish it while using Dynamic TCP Routing

huangapple go评论103阅读模式

Spring integration Handle Connection Close event with Event Listener and Re establish it while using Dynamic TCP Routing


你可以在TcpRouter类中使用@Autowired注入ToTCP bean来发送消息,这将允许你在事件监听器中重新建立连接并发送数据。这是一个方便的方式来实现连接的重新建立和数据的发送。


private ToTCP toTCP;



I am using spring integration to create flow for request / response architecture and also receiving arbitrary data from server. Until this stage, i checked examples from spring-integration github and advices from @Gary Russell and @Artem Bilan.

Here is my gateway interface

@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface ToTCP {
    byte[] send(String data, @Header("host") String host, @Header("port") int port, @Header("irregularMessageChannelName") String channelName);
    byte[] send(String data, @Header("host") String host, @Header("port") int port);

Here is my my TcpClientConfig

public class TcpClientConfig {
    public IntegrationFlow toTcp() {
        return f -> f.route(new TcpRouter());

Here is my TcpRouter That Extends AbstractMessageRouter

public class TcpRouter extends AbstractMessageRouter {

    private final Logger log = LoggerFactory.getLogger(TcpRouter.class);

    private final static int MAX_CACHED = 100; // When this is exceeded, we remove the LRU.

    private HashMap<String, Message<?>> connectionRegistery = new HashMap<>();

    private final LinkedHashMap<String, MessageChannel> subFlows =
        new LinkedHashMap<String, MessageChannel>(MAX_CACHED, .75f, true) {

            protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
                if (size() > MAX_CACHED) {
                    return true;
                } else {
                    return false;


    private IntegrationFlowContext flowContext;

    protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
        MessageChannel channel;
        boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
        if (hasThisConnectionIrregularChannel) {
            channel = this.subFlows.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port") + ".extended");
        } else {
            channel = this.subFlows.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port"));

        if (channel == null) {
            channel = createNewSubflow(message);
        return Collections.singletonList(channel);

    private MessageChannel createNewSubflow(Message<?> message) {
        String host = (String) message.getHeaders().get("host");
        Integer port = (Integer) message.getHeaders().get("port");

        boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");

        Assert.state(host != null && port != null, "host and/or port header missing");
        String flowRegisterKey;

        if (hasThisConnectionIrregularChannel) {
            flowRegisterKey = host + port + ".extended";
        } else {
            flowRegisterKey = host + port;

        TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);

        ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();


        TcpOutboundGateway tcpOutboundGateway;
        if (hasThisConnectionIrregularChannel) {
            log.info("TcpRouter # createNewSubflow extended TcpOutboundGateway will be created");
            String irregularMessageChannelName = (String) message.getHeaders().get("irregularMessageChannelName");
            DirectChannel directChannel = getBeanFactory().getBean(irregularMessageChannelName, DirectChannel.class);
            tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
        } else {
            log.info("TcpRouter # createNewSubflow extended TcpOutboundGateway will be created");
            tcpOutboundGateway = new TcpOutboundGateway();


        tcpOutboundGateway.setAdviceChain(Arrays.asList(new Advice[]{tcpRetryAdvice()}));

        IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);

        IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
                .addBean("client_connection_" + flowRegisterKey, cf)
                .id(flowRegisterKey + ".flow")

        MessageChannel inputChannel = flowRegistration.getInputChannel();

        this.subFlows.put(flowRegisterKey, inputChannel);
        this.connectionRegistery.put("client_connection_" + flowRegisterKey, message);

        return inputChannel;

    private void removeSubFlow(Map.Entry<String, MessageChannel> eldest) {
        String hostPort = eldest.getKey();
        this.flowContext.remove(hostPort + ".flow");

    public RequestHandlerRetryAdvice tcpRetryAdvice() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();

        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();

        RetryTemplate retryTemplate = new RetryTemplate();

        RequestHandlerRetryAdvice tcpRetryAdvice = new RequestHandlerRetryAdvice();

        // This allows fail-controlling
        tcpRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(failMessageChannel()));

        return tcpRetryAdvice;

    public MessageChannel failMessageChannel() {
        return new DirectChannel();

    @ServiceActivator(inputChannel = "failMessageChannel")
    public void messageAggregation(String in) {
        log.error("TcpRouter # connection retry failed with message : " + in);

    private ToTCP toTCP;

    public void listen(TcpConnectionCloseEvent event) {
        String connectionFactoryName = event.getConnectionFactoryName();
        boolean isConnectionRegistered = this.connectionRegistery.containsKey(connectionFactoryName);
        if (isConnectionRegistered) {
            Message<?> message = this.connectionRegistery.get(connectionFactoryName);
            String host = (String) message.getHeaders().get("host");
            Integer port = (Integer) message.getHeaders().get("port");
            boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
            if (hasThisConnectionIrregularChannel) {
                log.info("TcpRouter # listen # registered tcp connection with arbitrary message channel closed for host {} and port {}, it will open again !!", host, port);
                String unsolicitedMessageChannelName = (String) message.getHeaders().get("irregularMessageChannelName");
                toTCP.send(message.getPayload().toString(), host, port, unsolicitedMessageChannelName);
            } else {
                log.info("TcpRouter # listen # registered tcp connection closed for host {} and port {}, it will open again !!", host, port);
                toTCP.send(message.getPayload().toString(), host, port);
        } else {
            log.info("TcpRouter # listen # unregistered tcp connection closed, no action required.");

In case of any connection close event, I can handle it with event listener. In event listener i can understand from connectionFactoryName that was registered in addBean("client_connection_" + flowRegisterKey, cf). Here is solution for that part

After handle which connection is closed, i should open it again to continue to receive arbitrary data OR make ready connection between TCP server to send any request... But i am not sure the way that i re establish connection with sending data.

Should i use

private ToTCP toTCP;

in TcpRouter class to send message again


Should i send message directly to

protected Collection<MessageChannel> determineTargetChannels(Message<?> message)

Method. I am confused about their working behaviour... Can you give me the correct idea that helps me to use more convenient way for EventListener to reestablish connection ?


得分: 1

"Actually you are right, reconnection request is same with initial time I called it."

"Should I use determineTargetChannels in that case?"

"No; do exactly the same in the event listener as whatever calls ToTCP in the first place (send a new request and handle the reply)."


>Actually you are right, reconnection request is same with initial time i called it.

>Should i use determineTargetChannels in that case ?

No; do exactly the same in the event listener as whatever calls ToTCP in the first place (send a new request and handle the reply).

  • 本文由 发表于 2020年7月22日 23:47:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/63038187.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
