春季集成聚合器节流器

huangapple go评论74阅读模式
英文:

Spring Integration Aggregator Throttler

问题

我有一条名为 SomeMessage 的消息,内容如下:

class SomeMessage{
 id,
 title
}

目前,我根据 id 聚合消息。消息在10秒后发布。

.aggregate(
            a ->
                a 
                .outputProcessor(messageProcessor())
                .messageStore(messageGroupStore())
                .correlationStrategy(correlationStrategy())
                .expireGroupsUponCompletion(true)
                .sendPartialResultOnExpiry(true)
                .groupTimeout(TimeUnit.SECONDS.toMillis(10)))
        .handle(amqpOutboundEndpoint)

我需要一种根据 title 属性对消息进行节流的方法。如果 title=="A",仍应等待10秒进行聚合;如果 title=="B",则应等待60秒进行聚合,并且不应立即发送到 amqpOutboundEndpoint,而应该进行一些节流(例如,每个具有 title=="B" 的消息之间应有30秒的间隔)。

最佳做法是什么?AmqpOutboundEndpoint 上有类似节流的功能吗?

更新

.groupTimeout(messageGroup -> {
                      if(anyMessageInGroupHasTitleB(messageGroup)){
                        return TimeUnit.SECONDS.toMillis(60);
                      }
                      else {
                        return TimeUnit.SECONDS.toMillis(10);
                      }
                    }))
        .route(
            (Function<SomeMessage, Boolean>) ec ->
            ec.getTitle().equals("B"),
        m -> m.subFlowMapping(true, sf ->
            sf.channel(channels -> channels.queue(1))
                .bridge(e -> e.poller(Pollers
                    .fixedDelay(60, TimeUnit.SECONDS)
                    .maxMessagesPerPoll(1)
                ))
        ).subFlowMapping(false, IntegrationFlowDefinition::bridge))
    .handle(amqpOutboundEndpoint)
英文:

I have one message SomeMessage that looks like this:

class SomeMessage{
 id,
 title
}

Currently, I aggregate messages based on id. Messages are released after 10 seconds.

.aggregate(
            a ->
                a 
                .outputProcessor(messageProcessor())
                .messageStore(messageGroupStore())
                .correlationStrategy(correlationStrategy())
                .expireGroupsUponCompletion(true)
                .sendPartialResultOnExpiry(true)
                .groupTimeout(TimeUnit.SECONDS.toMillis(10)))
        .handle(amqpOutboundEndpoint)

What I need is a way to throttle messages based on title property. If title=="A", it should still wait 10 seconds for aggregation; If title=="B" it should wait 60 seconds for aggregation and it should not be immediately sent to amqpOutboundEndpoint but it should have some throttling (eg. 30 seconds between every message that has title=="B").

What would be the best way to do this? Is there something like throttling on AmqpOutboundEndpoint?

UPDATE

.groupTimeout(messageGroup -> {
                      if(anyMessageInGroupHasTitleB(messageGroup)){
                        return TimeUnit.SECONDS.toMillis(60);
                      }
                      else {
                        return TimeUnit.SECONDS.toMillis(10);
                      }
                    }))
        .route(
            (Function<SomeMessage, Boolean>) ec ->
            ec.getTitle().equals("B"),
        m -> m.subFlowMapping(true, sf ->
            sf.channel(channels -> channels.queue(1))
                .bridge(e -> e.poller(Pollers
                    .fixedDelay(60, TimeUnit.SECONDS)
                    .maxMessagesPerPoll(1)
                ))
        ).subFlowMapping(false, IntegrationFlowDefinition::bridge))
    .handle(amqpOutboundEndpoint)

答案1

得分: 1

使用groupTimeoutExpression()替代固定超时...

payload.title == 'A' ? 10000 : 30000
英文:

Use groupTimeoutExpression() instead of a fixed timeout...

payload.title == 'A' ? 10000 : 30000

huangapple
  • 本文由 发表于 2020年8月17日 21:47:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/63452197.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定