英文:
Spring Integration Aggregator Throttler
问题
我有一条名为 SomeMessage
的消息,内容如下:
class SomeMessage{
id,
title
}
目前,我根据 id 聚合消息。消息在10秒后发布。
.aggregate(
a ->
a
.outputProcessor(messageProcessor())
.messageStore(messageGroupStore())
.correlationStrategy(correlationStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10)))
.handle(amqpOutboundEndpoint)
我需要一种根据 title
属性对消息进行节流的方法。如果 title=="A"
,仍应等待10秒进行聚合;如果 title=="B"
,则应等待60秒进行聚合,并且不应立即发送到 amqpOutboundEndpoint
,而应该进行一些节流(例如,每个具有 title=="B"
的消息之间应有30秒的间隔)。
最佳做法是什么?AmqpOutboundEndpoint
上有类似节流的功能吗?
更新
.groupTimeout(messageGroup -> {
if(anyMessageInGroupHasTitleB(messageGroup)){
return TimeUnit.SECONDS.toMillis(60);
}
else {
return TimeUnit.SECONDS.toMillis(10);
}
}))
.route(
(Function<SomeMessage, Boolean>) ec ->
ec.getTitle().equals("B"),
m -> m.subFlowMapping(true, sf ->
sf.channel(channels -> channels.queue(1))
.bridge(e -> e.poller(Pollers
.fixedDelay(60, TimeUnit.SECONDS)
.maxMessagesPerPoll(1)
))
).subFlowMapping(false, IntegrationFlowDefinition::bridge))
.handle(amqpOutboundEndpoint)
英文:
I have one message SomeMessage
that looks like this:
class SomeMessage{
id,
title
}
Currently, I aggregate messages based on id. Messages are released after 10 seconds.
.aggregate(
a ->
a
.outputProcessor(messageProcessor())
.messageStore(messageGroupStore())
.correlationStrategy(correlationStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10)))
.handle(amqpOutboundEndpoint)
What I need is a way to throttle messages based on title
property. If title=="A"
, it should still wait 10 seconds for aggregation; If title=="B"
it should wait 60 seconds for aggregation and it should not be immediately sent to amqpOutboundEndpoint
but it should have some throttling (eg. 30 seconds between every message that has title=="B"
).
What would be the best way to do this? Is there something like throttling on AmqpOutboundEndpoint
?
UPDATE
.groupTimeout(messageGroup -> {
if(anyMessageInGroupHasTitleB(messageGroup)){
return TimeUnit.SECONDS.toMillis(60);
}
else {
return TimeUnit.SECONDS.toMillis(10);
}
}))
.route(
(Function<SomeMessage, Boolean>) ec ->
ec.getTitle().equals("B"),
m -> m.subFlowMapping(true, sf ->
sf.channel(channels -> channels.queue(1))
.bridge(e -> e.poller(Pollers
.fixedDelay(60, TimeUnit.SECONDS)
.maxMessagesPerPoll(1)
))
).subFlowMapping(false, IntegrationFlowDefinition::bridge))
.handle(amqpOutboundEndpoint)
答案1
得分: 1
使用groupTimeoutExpression()
替代固定超时...
payload.title == 'A' ? 10000 : 30000
英文:
Use groupTimeoutExpression()
instead of a fixed timeout...
payload.title == 'A' ? 10000 : 30000
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论