英文:
Spring Integration DSL IntegrationFlow filter() does not return anything and waiting infinitely to return
问题
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.PollerSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, ActiveMQAutoConfiguration.class})
public class SpringIntegrationTestApplication {
public static void main2(String[] args) {
RetryTemplate template = new RetryTemplate();
RetryPolicy policy = new MaxAttemptsRetryPolicy(200);
template.setRetryPolicy(policy);
template.execute(context -> {
System.out.println("Trying");
throw new RuntimeException("problem");
});
}
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationTestApplication.class, args);
Cafe cafe = ctx.getBean(Cafe.class);
Order entirelyGoodOrder = new Order(1);
entirelyGoodOrder.addItem(DrinkType.LATTE, 3, true);
entirelyGoodOrder.addItem(DrinkType.ESPRESSO, 3, true);
entirelyGoodOrder.addItem(DrinkType.MOCHA, 3, true);
cafe.placeOrder(entirelyGoodOrder);
System.out.println("Hit 'Enter' to terminate");
System.in.read();
ctx.close();
}
@MessagingGateway
public interface Cafe {
@Gateway(requestChannel = "orders.input")
void placeOrder(Order order);
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedDelay(1000);
}
@Bean
public IntegrationFlow orders(IntegrationFlow businessLogic, IntegrationFlow errorHandler123, IntegrationFlow publishingGateway) {
return f -> f
.log(LoggingHandler.Level.INFO)
.split(Order.class, Order::getItems)
.gateway(businessLogic, gatewayEndpointSpec -> {
gatewayEndpointSpec.advice(retry());
})
.route(Message.class, message -> message.getPayload() instanceof ExceptionalMessage,
router -> router
.subFlowMapping(true, errorHandler123)
.channelMapping(false, "nullChannel"));
}
public static RequestHandlerRetryAdvice retry() {
RetryTemplate template = new RetryTemplate();
RetryPolicy policy = new MaxAttemptsRetryPolicy(3);
template.setRetryPolicy(policy);
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRetryTemplate(template);
requestHandlerRetryAdvice.setRecoveryCallback(context -> new ExceptionalMessage(context.getLastThrowable()));
return requestHandlerRetryAdvice;
}
@Bean
public IntegrationFlow businessLogic() {
return f -> f
.handle((payload, headers) -> {
OrderItem orderItem = (OrderItem) payload;
if (orderItem.getDrinkType() == DrinkType.LATTE) {
String message = String.format("throwing exception from first channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
throw new RuntimeException("Broken order first channel");
} else {
String message = String.format("Processed inside first channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
return orderItem;
}
})
.channel("publishingChannel");
}
@Bean
public IntegrationFlow publishingGateway() {
return IntegrationFlows.from("publishingChannel")
.filter(source -> source.equals(1), spec -> {
spec.requiresReply(true);
})
.handle((payload, header) -> {
System.out.println("publishingGateway CALLED");
OrderItem orderItem = (OrderItem) payload;
if (orderItem.getDrinkType() == DrinkType.ESPRESSO) {
String message = String.format("throwing exception from publishingGateway channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
throw new RuntimeException("Broken order publishingGateway channel");
} else {
String message = String.format("Processed inside publishingGateway channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
return orderItem;
}
})
.get();
}
@Bean
public IntegrationFlow errorHandler123() {
return f -> f
.log(LoggingHandler.Level.ERROR, message -> String.format("Message being dropped. %s", message.getPayload()));
}
}
在添加了requiresReply(true)
的过滤器后,发布网关内部的管道开始无限等待。如果我移除过滤器,它会按预期工作。尝试添加spec.requiresReply(true)
,但会抛出错误ReplyRequiredException: No reply produced by handler
。被丢弃的消息。
英文:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.PollerSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, ActiveMQAutoConfiguration.class})
public class SpringIntegrationTestApplication {
public static void main2(String[] args) {
RetryTemplate template = new RetryTemplate();
RetryPolicy policy = new MaxAttemptsRetryPolicy(200);
template.setRetryPolicy(policy);
template.execute(context -> {
System.out.println("Trying");
throw new RuntimeException("problem");
});
}
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationTestApplication.class, args);
Cafe cafe = ctx.getBean(Cafe.class);
Order entirelyGoodOrder = new Order(1);
entirelyGoodOrder.addItem(DrinkType.LATTE, 3, true);
entirelyGoodOrder.addItem(DrinkType.ESPRESSO, 3, true);
entirelyGoodOrder.addItem(DrinkType.MOCHA, 3, true);
cafe.placeOrder(entirelyGoodOrder);
System.out.println("Hit 'Enter' to terminate");
System.in.read();
ctx.close();
}
@MessagingGateway
public interface Cafe {
@Gateway(requestChannel = "orders.input")
void placeOrder(Order order);
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedDelay(1000);
}
@Bean
public IntegrationFlow orders(IntegrationFlow businessLogic, IntegrationFlow errorHandler123, IntegrationFlow publishingGateway) {
return f -> f
.log(LoggingHandler.Level.INFO)
.split(Order.class, Order::getItems)
.gateway(businessLogic, gatewayEndpointSpec -> {
gatewayEndpointSpec.advice(retry());
})
.route(Message.class, message -> message.getPayload() instanceof ExceptionalMessage,
router -> router
.subFlowMapping(true, errorHandler123)
.channelMapping(false, "nullChannel"));
}
public static RequestHandlerRetryAdvice retry() {
RetryTemplate template = new RetryTemplate();
RetryPolicy policy = new MaxAttemptsRetryPolicy(3);
template.setRetryPolicy(policy);
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRetryTemplate(template);
requestHandlerRetryAdvice.setRecoveryCallback(context -> new ExceptionalMessage(context.getLastThrowable()));
return requestHandlerRetryAdvice;
}
@Bean
public IntegrationFlow businessLogic() {
return f -> f
.handle((payload, headers) -> {
OrderItem orderItem = (OrderItem) payload;
if (orderItem.getDrinkType() == DrinkType.LATTE) {
String message = String.format("throwing exception from first channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
throw new RuntimeException("Broken order first channel");
} else {
String message = String.format("Processed inside first channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
return orderItem;
}
})
.channel("publishingChannel");
}
@Bean
public IntegrationFlow publishingGateway() {
return IntegrationFlows.from("publishingChannel")
.filter(source -> source.equals(1), spec -> {
spec.requiresReply(true);
})
.handle((payload, header) -> {
System.out.println("publishingGateway CALLED");
OrderItem orderItem = (OrderItem) payload;
if (orderItem.getDrinkType() == DrinkType.ESPRESSO) {
String message = String.format("throwing exception from publishingGateway channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
throw new RuntimeException("Broken order publishingGateway channel");
} else {
String message = String.format("Processed inside publishingGateway channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
return orderItem;
}
})
.get();
}
@Bean
public IntegrationFlow errorHandler123() {
return f -> f
.log(LoggingHandler.Level.ERROR, message -> String.format("Message being dropped. %s", message.getPayload()));
}
}
Inside publishing gateway after adding a filter without requiresReply(true), pipeline started waiting infinitely.
And if I remove filter it works as expected.
Tried adding spec.requiresReply(true); but it throws error
ReplyRequiredException: No reply produced by handler
Message being dropped. ExceptionalMessage[exceptionMessage=org.springframework.messaging.MessagingException: Failed to handle; nested exception is org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'publishingGateway.org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and its 'requiresReply' property is set to true.,
答案1
得分: 0
默认情况下,网关将无限期等待回复;如果你过滤掉请求,就不会有回复。
在网关上设置replyTimeout
属性;只要你使用默认的(直接)通道(所以所有操作都在调用线程上运行),你可以将超时设置为0。
英文:
By default, the gateway will wait indefinitely for a reply; there will be no reply if you filter out the request.
Set the replyTimeout
property on the gateway; as long as you use the default (direct) channels (so everything runs on the calling thread), you can safely set the timeout to 0.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论