Spring Integration DSL IntegrationFlow filter() does not return anything and waiting infinitely to return

huangapple go评论85阅读模式
英文:

Spring Integration DSL IntegrationFlow filter() does not return anything and waiting infinitely to return

问题

import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.PollerSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, ActiveMQAutoConfiguration.class})
public class SpringIntegrationTestApplication {

    public static void main2(String[] args) {
        RetryTemplate template = new RetryTemplate();
        RetryPolicy policy = new MaxAttemptsRetryPolicy(200);
        template.setRetryPolicy(policy);
        template.execute(context -> {
            System.out.println("Trying");
            throw new RuntimeException("problem");
        });
    }

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationTestApplication.class, args);

        Cafe cafe = ctx.getBean(Cafe.class);

        Order entirelyGoodOrder = new Order(1);
        entirelyGoodOrder.addItem(DrinkType.LATTE, 3, true);
        entirelyGoodOrder.addItem(DrinkType.ESPRESSO, 3, true);
        entirelyGoodOrder.addItem(DrinkType.MOCHA, 3, true);

        cafe.placeOrder(entirelyGoodOrder);

        System.out.println("Hit 'Enter' to terminate");
        System.in.read();
        ctx.close();
    }

    @MessagingGateway
    public interface Cafe {

        @Gateway(requestChannel = "orders.input")
        void placeOrder(Order order);
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerSpec poller() {
        return Pollers.fixedDelay(1000);
    }

    @Bean
    public IntegrationFlow orders(IntegrationFlow businessLogic, IntegrationFlow errorHandler123, IntegrationFlow publishingGateway) {
        return f -> f
                .log(LoggingHandler.Level.INFO)
                .split(Order.class, Order::getItems)
                .gateway(businessLogic, gatewayEndpointSpec -> {
                    gatewayEndpointSpec.advice(retry());
                })
                .route(Message.class, message -> message.getPayload() instanceof ExceptionalMessage,
                        router -> router
                                .subFlowMapping(true, errorHandler123)
                                .channelMapping(false, "nullChannel"));
    }

    public static RequestHandlerRetryAdvice retry() {
        RetryTemplate template = new RetryTemplate();
        RetryPolicy policy = new MaxAttemptsRetryPolicy(3);
        template.setRetryPolicy(policy);
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRetryTemplate(template);
        requestHandlerRetryAdvice.setRecoveryCallback(context -> new ExceptionalMessage(context.getLastThrowable()));
        return requestHandlerRetryAdvice;
    }

    @Bean
    public IntegrationFlow businessLogic() {
        return f -> f

                .handle((payload, headers) -> {
                    OrderItem orderItem = (OrderItem) payload;
                    if (orderItem.getDrinkType() == DrinkType.LATTE) {
                        String message = String.format("throwing exception from first channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);

                        throw new RuntimeException("Broken order first channel");
                    } else {
                        String message = String.format("Processed inside first channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);
                        return orderItem;
                    }
                })
                .channel("publishingChannel");
    }

    @Bean
    public IntegrationFlow publishingGateway() {
        return IntegrationFlows.from("publishingChannel")
                .filter(source -> source.equals(1),  spec -> {
                    spec.requiresReply(true);

                })
                .handle((payload, header) -> {
                    System.out.println("publishingGateway CALLED");

                    OrderItem orderItem = (OrderItem) payload;
                    if (orderItem.getDrinkType() == DrinkType.ESPRESSO) {
                        String message = String.format("throwing exception from publishingGateway channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);

                        throw new RuntimeException("Broken order publishingGateway channel");
                    } else {
                        String message = String.format("Processed inside publishingGateway channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);
                        return orderItem;
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow errorHandler123() {
        return f -> f
                .log(LoggingHandler.Level.ERROR, message -> String.format("Message being dropped. %s", message.getPayload()));
    }
}

在添加了requiresReply(true)的过滤器后,发布网关内部的管道开始无限等待。如果我移除过滤器,它会按预期工作。尝试添加spec.requiresReply(true),但会抛出错误ReplyRequiredException: No reply produced by handler。被丢弃的消息。

英文:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.PollerSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, ActiveMQAutoConfiguration.class})
public class SpringIntegrationTestApplication {
public static void main2(String[] args) {
RetryTemplate template = new RetryTemplate();
RetryPolicy policy = new MaxAttemptsRetryPolicy(200);
template.setRetryPolicy(policy);
template.execute(context -> {
System.out.println("Trying");
throw new RuntimeException("problem");
});
}
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationTestApplication.class, args);
Cafe cafe = ctx.getBean(Cafe.class);
Order entirelyGoodOrder = new Order(1);
entirelyGoodOrder.addItem(DrinkType.LATTE, 3, true);
entirelyGoodOrder.addItem(DrinkType.ESPRESSO, 3, true);
entirelyGoodOrder.addItem(DrinkType.MOCHA, 3, true);
cafe.placeOrder(entirelyGoodOrder);
System.out.println("Hit 'Enter' to terminate");
System.in.read();
ctx.close();
}
@MessagingGateway
public interface Cafe {
@Gateway(requestChannel = "orders.input")
void placeOrder(Order order);
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedDelay(1000);
}
@Bean
public IntegrationFlow orders(IntegrationFlow businessLogic, IntegrationFlow errorHandler123, IntegrationFlow publishingGateway) {
return f -> f
.log(LoggingHandler.Level.INFO)
.split(Order.class, Order::getItems)
.gateway(businessLogic, gatewayEndpointSpec -> {
gatewayEndpointSpec.advice(retry());
})
.route(Message.class, message -> message.getPayload() instanceof ExceptionalMessage,
router -> router
.subFlowMapping(true, errorHandler123)
.channelMapping(false, "nullChannel"));
}
public static RequestHandlerRetryAdvice retry() {
RetryTemplate template = new RetryTemplate();
RetryPolicy policy = new MaxAttemptsRetryPolicy(3);
template.setRetryPolicy(policy);
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRetryTemplate(template);
requestHandlerRetryAdvice.setRecoveryCallback(context -> new ExceptionalMessage(context.getLastThrowable()));
return requestHandlerRetryAdvice;
}
@Bean
public IntegrationFlow businessLogic() {
return f -> f
.handle((payload, headers) -> {
OrderItem orderItem = (OrderItem) payload;
if (orderItem.getDrinkType() == DrinkType.LATTE) {
String message = String.format("throwing exception from first channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
throw new RuntimeException("Broken order first channel");
} else {
String message = String.format("Processed inside first channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
return orderItem;
}
})
.channel("publishingChannel");
}
@Bean
public IntegrationFlow publishingGateway() {
return IntegrationFlows.from("publishingChannel")
.filter(source -> source.equals(1),  spec -> {
spec.requiresReply(true);
})
.handle((payload, header) -> {
System.out.println("publishingGateway CALLED");
OrderItem orderItem = (OrderItem) payload;
if (orderItem.getDrinkType() == DrinkType.ESPRESSO) {
String message = String.format("throwing exception from publishingGateway channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
throw new RuntimeException("Broken order publishingGateway channel");
} else {
String message = String.format("Processed inside publishingGateway channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
System.out.println(message);
return orderItem;
}
})
.get();
}
@Bean
public IntegrationFlow errorHandler123() {
return f -> f
.log(LoggingHandler.Level.ERROR, message -> String.format("Message being dropped. %s", message.getPayload()));
}
}

Inside publishing gateway after adding a filter without requiresReply(true), pipeline started waiting infinitely.

And if I remove filter it works as expected.

Tried adding spec.requiresReply(true); but it throws error
ReplyRequiredException: No reply produced by handler

Message being dropped. ExceptionalMessage[exceptionMessage=org.springframework.messaging.MessagingException: Failed to handle; nested exception is org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'publishingGateway.org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and its 'requiresReply' property is set to true.,

答案1

得分: 0

默认情况下,网关将无限期等待回复;如果你过滤掉请求,就不会有回复。

在网关上设置replyTimeout属性;只要你使用默认的(直接)通道(所以所有操作都在调用线程上运行),你可以将超时设置为0。

英文:

By default, the gateway will wait indefinitely for a reply; there will be no reply if you filter out the request.

Set the replyTimeout property on the gateway; as long as you use the default (direct) channels (so everything runs on the calling thread), you can safely set the timeout to 0.

huangapple
  • 本文由 发表于 2023年8月9日 01:54:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/76862073.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定