发布-订阅通道与单个执行器

huangapple go评论74阅读模式
英文:

Publish-subscribe-channel with single executor

问题

我有一个集成流程,绑定到从Kafka接收的云流输入通道。

然后消息进入发布订阅通道,使用单线程执行器。

然后从那里它们进入一个处理订阅者。处理可能需要一些时间。

据我了解,处理是在单线程执行器上完成的。云流的线程被释放以从Kafka中获取另一条消息。

如果新消息到达,但处理线程仍然忙碌,会发生什么情况?
云流的线程会等待,还是消息将被丢弃?
如果会等待,那么等待多久?是否有一些默认的超时值?

我猜在这种情况下我们会丢失一些消息,因为我可以在Kafka中看到消息,但在数据库中没有相应的更新...

但是大多数消息都会如预期地进行处理。

public interface PointChannelProcessor {
    @Input("point-channel")
    MessageChannel pointChannel();
}

@Bean
public MessageChannel routeByKeyMessageChannel() {
    return MessageChannels.publishSubscribe(Executors.newSingleThreadExecutor()).get();
}

@Bean
public IntegrationFlow retrievePointListFromKafka() {
    return IntegrationFlows.from(pointChannelProcessor.pointChannel())
        .transform(new JsonToObjectTransformer(PointKafkaImport.class))
        .channel(ROUTE_BY_KEY_MESSAGE_CHANNEL)
        .get();
}

@Bean
public IntegrationFlow routePointsByKey(){
    return IntegrationFlows.from(ROUTE_BY_KEY_MESSAGE_CHANNEL)
        .enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, failedPointsChannel(), true))
        .route((GenericHandler<PointKafkaImport>) (payload, headers) -> {
              //一些路由逻辑
              if (...) {
                  return ONE_CHANNEL;
              } else if (...){
                  return NULL_CHANNEL;
              } else {
                  return ANOTHER_CHANNEL;
              }
        })
        .get();
}

//从路由中处理载荷的其他直接通道
@Bean
public IntegrationFlow savePoint(){
    return IntegrationFlows.from(ONE_CHANNEL)
        .handle((GenericHandler<PointKafkaImport>) (payload, headers) -> ...)
        .get();
}

@Bean
public IntegrationFlow updatePoint(){
    return IntegrationFlows.from(ANOTHER_CHANNEL)
        .handle((GenericHandler<PointKafkaImport>) (payload, headers) -> ...)
        .get();
}
英文:

I have an integration flow bound to cloud stream input channel from
Kafka.

Then messages go to publishSubscribe channel with singleThreadExecutor.

And from there they go to one subscriber which handles them. Handling can take some time.

As far as I understand handling is done on the singleThreadExecutor. And the cloud streaming's thread is freed to pickup another message from Kafka.

What will happen if new message arrives but the handling thread is still busy?
Will the cloud streaming's thread wait or message will be discarded?
If it would wait then for how long? Is there some default timeout value?

I have a guess that we are loosing some messages in such scenario as I can see messages in Kafka but there is no corresponding updates in the DB ...

But most of the messages are processed as expected.

public interface PointChannelProcessor {
@Input(&quot;point-channel&quot;)
MessageChannel pointChannel();
}
@Bean
public MessageChannel routeByKeyMessageChannel() {
return MessageChannels.publishSubscribe(Executors.newSingleThreadExecutor()).get();
}
@Bean
public IntegrationFlow retrievePointListFromKafka() {
return IntegrationFlows.from(pointChannelProcessor.pointChannel())
.transform(new JsonToObjectTransformer(PointKafkaImport.class))
.channel(ROUTE_BY_KEY_MESSAGE_CHANNEL)
.get();
}
@Bean
public IntegrationFlow routePointsByKey(){
return IntegrationFlows.from(ROUTE_BY_KEY_MESSAGE_CHANNEL)
.enrichHeaders(e -&gt; e.header(MessageHeaders.ERROR_CHANNEL, failedPointsChannel(), true))
.route((GenericHandler&lt;PointKafkaImport&gt;) (payload, headers) -&gt; {
//some routing logic
if (...) {
return ONE_CHANNEL;
} else if (...){
return NULL_CHANNEL;
} else {
return ANOTHER_CHANNEL;
}
})
.get();
}
//Other direct channels from routing which handles payload
@Bean
public IntegrationFlow savePoint(){
return IntegrationFlows.from(ONE_CHANNEL)
.handle((GenericHandler&lt;PointKafkaImport&gt;) (payload, headers) -&gt; ...)
.get();
}
@Bean
public IntegrationFlow updatePoint(){
return IntegrationFlows.from(ANOTHER_CHANNEL)
.handle((GenericHandler&lt;PointKafkaImport&gt;) (payload, headers) -&gt; ...)
.get();
}

答案1

得分: 1

请确保您理解以下问题,这样我才能更清楚地回答。根据我理解,您需要明白 发布-订阅(Publish-Subscribe)生产者-消费者(Producer-Consumer) 之间的区别。在 Kafka 中,您的代码负责从队列中获取消息,而消息并不是发送到您的代码中。因此,您的代码在获取队列中的消息时不能保持繁忙状态。

此外,有许多策略可以确保您从队列中读取和处理数据,而且不会丢失任何内容。在消费者端,您只需读取消息并增加偏移量,如果在处理过程中出现错误,可以重新读取消息。消息将会根据您在设置 Kafka 时所设定的策略进行移除。

英文:

You need be more clear about your problem, but as I understand, please make sure you find the difference between publish-subscribe and Producer-Consumer. In Kafka your code is responsible to pickup message from the queue and the message is not send to your code. So your code must NOT busy to pickup a message from the Queue.

Furthermore, there are many strategy to ensure that you read and process the data from queue and nothing lost. In consumer, you just read the message and increase offset, if you fail in processing you can read the message again. The messages will be removed by an strategy which you set when setup Kafka.

huangapple
  • 本文由 发表于 2020年7月23日 16:18:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/63049851.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定