英文:
Spring integration Handle Connection Close event with Event Listener and Re establish it while using Dynamic TCP Routing
问题
你可以在TcpRouter类中使用@Autowired
注入ToTCP
bean来发送消息,这将允许你在事件监听器中重新建立连接并发送数据。这是一个方便的方式来实现连接的重新建立和数据的发送。
所以,你可以在TcpRouter类中添加以下注入:
@Autowired
private ToTCP toTCP;
然后,在你的事件监听器中,使用toTCP
来重新建立连接并发送数据,这将帮助你更方便地处理连接关闭事件并重新建立连接。
英文:
I am using spring integration to create flow for request / response architecture and also receiving arbitrary data from server. Until this stage, i checked examples from spring-integration github and advices from @Gary Russell and @Artem Bilan.
Here is my gateway interface
@Component
@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface ToTCP {
byte[] send(String data, @Header("host") String host, @Header("port") int port, @Header("irregularMessageChannelName") String channelName);
byte[] send(String data, @Header("host") String host, @Header("port") int port);
}
Here is my my TcpClientConfig
@Component
public class TcpClientConfig {
@Bean
public IntegrationFlow toTcp() {
return f -> f.route(new TcpRouter());
}
}
Here is my TcpRouter That Extends AbstractMessageRouter
public class TcpRouter extends AbstractMessageRouter {
private final Logger log = LoggerFactory.getLogger(TcpRouter.class);
private final static int MAX_CACHED = 100; // When this is exceeded, we remove the LRU.
private HashMap<String, Message<?>> connectionRegistery = new HashMap<>();
private final LinkedHashMap<String, MessageChannel> subFlows =
new LinkedHashMap<String, MessageChannel>(MAX_CACHED, .75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
if (size() > MAX_CACHED) {
removeSubFlow(eldest);
return true;
} else {
return false;
}
}
};
@Autowired
private IntegrationFlowContext flowContext;
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageChannel channel;
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
if (hasThisConnectionIrregularChannel) {
channel = this.subFlows.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port") + ".extended");
} else {
channel = this.subFlows.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port"));
}
if (channel == null) {
channel = createNewSubflow(message);
}
return Collections.singletonList(channel);
}
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
Assert.state(host != null && port != null, "host and/or port header missing");
String flowRegisterKey;
if (hasThisConnectionIrregularChannel) {
flowRegisterKey = host + port + ".extended";
} else {
flowRegisterKey = host + port;
}
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setSoTimeout(0);
cf.setSoKeepAlive(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
TcpOutboundGateway tcpOutboundGateway;
if (hasThisConnectionIrregularChannel) {
log.info("TcpRouter # createNewSubflow extended TcpOutboundGateway will be created");
String irregularMessageChannelName = (String) message.getHeaders().get("irregularMessageChannelName");
DirectChannel directChannel = getBeanFactory().getBean(irregularMessageChannelName, DirectChannel.class);
tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
} else {
log.info("TcpRouter # createNewSubflow extended TcpOutboundGateway will be created");
tcpOutboundGateway = new TcpOutboundGateway();
}
tcpOutboundGateway.setConnectionFactory(cf);
tcpOutboundGateway.setAdviceChain(Arrays.asList(new Advice[]{tcpRetryAdvice()}));
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
//.addBean(cf)
.addBean("client_connection_" + flowRegisterKey, cf)
.id(flowRegisterKey + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(flowRegisterKey, inputChannel);
this.connectionRegistery.put("client_connection_" + flowRegisterKey, message);
return inputChannel;
}
private void removeSubFlow(Map.Entry<String, MessageChannel> eldest) {
String hostPort = eldest.getKey();
this.flowContext.remove(hostPort + ".flow");
}
@Bean
public RequestHandlerRetryAdvice tcpRetryAdvice() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMaxInterval(1000);
backOffPolicy.setMultiplier(2);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
RequestHandlerRetryAdvice tcpRetryAdvice = new RequestHandlerRetryAdvice();
tcpRetryAdvice.setRetryTemplate(retryTemplate);
// This allows fail-controlling
tcpRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(failMessageChannel()));
return tcpRetryAdvice;
}
@Bean
public MessageChannel failMessageChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "failMessageChannel")
public void messageAggregation(String in) {
log.error("TcpRouter # connection retry failed with message : " + in);
}
@Autowired
private ToTCP toTCP;
@EventListener
public void listen(TcpConnectionCloseEvent event) {
String connectionFactoryName = event.getConnectionFactoryName();
boolean isConnectionRegistered = this.connectionRegistery.containsKey(connectionFactoryName);
if (isConnectionRegistered) {
Message<?> message = this.connectionRegistery.get(connectionFactoryName);
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
if (hasThisConnectionIrregularChannel) {
log.info("TcpRouter # listen # registered tcp connection with arbitrary message channel closed for host {} and port {}, it will open again !!", host, port);
String unsolicitedMessageChannelName = (String) message.getHeaders().get("irregularMessageChannelName");
toTCP.send(message.getPayload().toString(), host, port, unsolicitedMessageChannelName);
} else {
log.info("TcpRouter # listen # registered tcp connection closed for host {} and port {}, it will open again !!", host, port);
toTCP.send(message.getPayload().toString(), host, port);
}
} else {
log.info("TcpRouter # listen # unregistered tcp connection closed, no action required.");
}
}
}
In case of any connection close event, I can handle it with event listener. In event listener i can understand from connectionFactoryName
that was registered in addBean("client_connection_" + flowRegisterKey, cf)
. Here is solution for that part
After handle which connection is closed, i should open it again to continue to receive arbitrary data OR make ready connection between TCP server to send any request... But i am not sure the way that i re establish connection with sending data.
Should i use
@Autowired
private ToTCP toTCP;
in TcpRouter class to send message again
OR
Should i send message directly to
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message)
Method. I am confused about their working behaviour... Can you give me the correct idea that helps me to use more convenient way for EventListener to reestablish connection ?
答案1
得分: 1
"Actually you are right, reconnection request is same with initial time I called it."
"Should I use determineTargetChannels
in that case?"
"No; do exactly the same in the event listener as whatever calls ToTCP
in the first place (send a new request and handle the reply)."
英文:
>Actually you are right, reconnection request is same with initial time i called it.
>Should i use determineTargetChannels in that case ?
No; do exactly the same in the event listener as whatever calls ToTCP
in the first place (send a new request and handle the reply).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论