如何在Kafka Streams应用程序中处理偏移提交期间的超时异常

huangapple go评论83阅读模式
英文:

How to handle Timeout exceptions during Offset Commit in a Kafka Streams application

问题

如果在Kafka Streams应用程序中提交偏移量时出现超时异常,是否会触发ProductionExceptionHandler?

60000ms的超时在成功提交偏移量之前已过期

我正在使用基于Spring Cloud Stream的Kafka Streams应用程序,并且我确实配置了一个ProductionExceptionHandler实现,以进行日志记录和继续处理。但似乎控制权并未传递到此超时异常的异常处理程序,所有流线程仍然会终止。

处理此问题的正确方法是什么?以下是完整的错误信息。

[-StreamThread-1] o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread [stream-app-0d77ace1-4d20-413b-a71a-94cca3e620e0-StreamThread-1] 由于以下错误无法提交流任务 0_3:
org.apache.kafka.common.errors.TimeoutException: 成功提交偏移量之前的60000ms超时已过期 {example-topic-3=OffsetAndMetadata{offset=139191,leaderEpoch=null,metadata='AQAAAXU0ut1W'}}
2020-10-17 04:05:23.410 ERROR [ratelimit-transformer,,,] 19 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : 流程线程 [stream-app-0d77ace1-4d20-413b-a71a-94cca3e620e0-StreamThread-1] 在处理过程中遇到以下意外的Kafka异常,这通常表示流内部错误:
org.apache.kafka.common.errors.TimeoutException: 成功提交偏移量之前的60000ms超时已过期 {example-topic-3=OffsetAndMetadata{offset=139191,leaderEpoch=null,metadata='AQAAAXU0ut1W'}}
2020-10-17 04:05:24.149 ERROR [ratelimit-transformer,,,] 19 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : 流客户端 [stream-app-0d77ace1-4d20-413b-a71a-94cca3e620e0] 所有流程线程均已终止。实例将处于错误状态,并应关闭。
在线程“stream-app-0d77ace1-4d20-413b-a71a-94cca3e620e0-StreamThread-1”中的异常:“org.apache.kafka.common.errors.TimeoutException:成功提交偏移量之前的60000ms超时已过期{example-topic-3=OffsetAndMetadata{offset=139191,leaderEpoch=null,metadata='AQAAAXU0ut1W'}}
英文:

If there is a timeout exception during committing offsets in a Kafka Streams application, does the ProductionExceptionHandler come into play?

Timeout of 60000ms expired before successfully committing offsets

I'm using a Spring cloud stream based Kafka streams application and I do have a ProductionExceptionHandler implementation configured to LogAndContinue. But it seems like the control doesn't come to the exception handler for this timeout exception and all stream threads die anyway.

What's the correct way to handle this? Here is the full error.

[-StreamThread-1] o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread [stream-app-0d77ace1-4d20-413b-a71a-94cca3e620e0-StreamThread-1] Failed to commit stream task 0_3 due to the following error:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {example-topic-3=OffsetAndMetadata{offset=139191, leaderEpoch=null, metadata='AQAAAXU0ut1W'}}
2020-10-17 04:05:23.410 ERROR [ratelimit-transformer,,,] 19 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [stream-app-0d77ace1-4d20-413b-a71a-94cca3e620e0-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {example-topic-3=OffsetAndMetadata{offset=139191, leaderEpoch=null, metadata='AQAAAXU0ut1W'}}
2020-10-17 04:05:24.149 ERROR [ratelimit-transformer,,,] 19 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [stream-app-0d77ace1-4d20-413b-a71a-94cca3e620e0] All stream threads have died. The instance will be in error state and should be closed.
Exception in thread "stream-app-0d77ace1-4d20-413b-a71a-94cca3e620e0-StreamThread-1" org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {example-topic-3=OffsetAndMetadata{offset=139191, leaderEpoch=null, metadata='AQAAAXU0ut1W'}}

答案1

得分: 2

目前,Kafka Streams 对于 TimeoutException 的处理不太完善,可能会导致 StreamThread 最终崩溃。大部分情况下,超时是由于网络问题或者 Kafka 不可用造成的。

Kafka Streams 记录的错误信息非常明确,是:“所有流线程都已停止。实例将处于错误状态,并应进行关闭。”;因此,你可以停止并重新启动 KafkaStreams 应用,但如果是网络问题,重新启动应用程序将无法解决。

需要注意的是,有一些正在进行中的工作:

你可以在 KafkaStreams 实例上配置一个 java.lang.Thread.UncaughtExceptionHandler 来处理这个异常(KafkaStreams JavaDoc)。

英文:

Currently TimeoutException is not very well managed by Kafka Streams and will eventually cause your StreamThread to crash. Most of a the time a timeouts happen due to network issue or kafka unavailability.

The error message logged by Kafka Streams is explicit "All stream threads have died. The instance will be in error state and should be closed."; So, you can stop and restart your KafkaStreams application but if it's a network issue restartingyour app will not help.

Note that there is some works in progress :

You can configure an java.lang.Thread.UncaughtExceptionHandler on your KafkaStreams instance to handle that exception (KafkaStreams JavaDoc)

huangapple
  • 本文由 发表于 2020年10月23日 07:37:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/64491946.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定