我的Spring Kafka单元测试几乎每次都遇到了ProducerFencedException。

huangapple go评论95阅读模式
英文:

Why my Spring Kafka unit test almost ran into ProducerFencedException every time

问题

测试包括两个测试案例

  1. @Test
  2. @Nested
  3. 内部类 Test1 {
  4. fun test1() {
  5. mypublisher.publishInTransaction(topic1) // 见 log1
  6. // 检查 listener1 // 见 log2
  7. }
  8. }
  9. @Test
  10. @Nested
  11. 内部类 Test2 {
  12. fun test2() {
  13. mypublisher.publishInTransaction(topic2) // 见 log3
  14. // 检查 listener2
  15. }
  16. }
  17. fun MyPublisher.publishInTransaction() {
  18. // 这是一个 kafkatemplate 实例
  19. producerOnlyKafkaTemplate.executeInransaction {
  20. producerOnlyKafkaTemplate.send(xxxx)
  21. }
  22. }

test1 在 test2 之前运行,日志按时间顺序排列

log1

  1. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : 创建新的生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
  2. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
  3. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord())
  4. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
  5. 2023-05-30 12:14:26.913 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []

log2

  1. [Processor-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : 创建新的生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@671afc6b] []
  2. xxx 这发生在监听器内部并且验证通过

log3

  1. 它重用了在 log1 中创建的生产者,间隔约为32
  2. 2023-05-30 12:14:58.206 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
  3. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord(xx))
  4. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
  5. [kafka-network-thread-test-txn-TX-0] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] 正在排队事务请求 AddPartitionsToTxnRequestData(transactionalId='ruis-MacBook-Pro-2.local-test-txn-TX-0', producerId=0, producerEpoch=0, topics=[AddPartitionsToTxnTopic(name="*", partitions=[0])]) []
  6. 发送 ADD_PARTITIONS_TO_TXN 请求附带头部 RequestHeader
  7. 接收到节点 0 ADD_PARTITIONS_TO_TXN 响应,对于请求 xxx 结果=[AddPartitionsToTxnPartitionResult(partitionIndex=0, errorCode=90)])]) []
  8. o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] 过渡到严重错误状态,原因是 org.apache.kafka.common.errors.ProducerFencedException: 存在具有相同 transactionalId 的更新生产者,将当前生产者隔离。 []
  9. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : commitTransaction 失败: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
  10. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
  11. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : 在某些操作期间发生错误;从缓存中移除生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []

在我看来,log1 中的 producer@58bb6ba7 已经提交并获得了一个更新的 epoch,然后在 test2(log3) 中以一个较低的 epoch 被重用,可能的问题是什么?

环境是

  1. spring-kafka-test 3.0.6

更新 (2023年5月31日)

在这种情况下,我看到 MyProducerOnlyFactory bean 被调用了两次(它应该是单例的),因此我有了
myproducer-txnid 与两个 MyProducerOnlyFactory 实例相关联。

使用 MyProducerOnlyFactory-1,我获得了 myproducer-txnid-0,提交事务并将其返回到缓存中。

随后,在另一个监听器线程中,一个具有 MyProducerOnlyFactory-2 的 kafkatemplate 从缓存中找到了上述的生产者实例,提交了事务并将 producerepoch 更新为 1。

之后,MyProducerOnlyFactory-1 又获取了 myproducer-txnid-0,它认为它仍然是 epoch=0,尝试提交事务但当然会收到 ProducerFencedException。

我使用以下方式修复了问题

  1. @NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
  2. @SpringBootTest(xxx)
  3. class MyNestedIntegrationTest {
  4. //xxx
  5. }

通过这样,我可以看到 MyProducerOnlyFactory bean 只初始化了一次。

英文:

The test is consist of 2 test cases

  1. @Test
  2. @Nested
  3. inner class Test1 {
  4. fun test1() {
  5. mypublisher.publishInTransaction(topic1) // see log1
  6. // check listener1 // see log2
  7. }
  8. }
  9. @Test
  10. @Nested
  11. inner class Test2 {
  12. fun test2() {
  13. mypublisher.publishInTransaction(topic2) // see log3
  14. // check listener2
  15. }
  16. }
  17. fun MyPublisher.publishInTransaction() {
  18. // it is kafkatemplate instance
  19. producerOnlyKafkaTemplate.executeInransaction {
  20. producerOnlyKafkaTemplate.send(xxxx)
  21. }
  22. }

test1 run before test2, log is in time order

log1

  1. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
  2. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
  3. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord())
  4. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
  5. 2023-05-30 12:14:26.913 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []

log2

  1. [Processor-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@671afc6b] []
  2. xxx this happened inside listener and the verification passed

log3

  1. it reuses the producer created in log1, the gap is around 32seconds
  2. 2023-05-30 12:14:58.206 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
  3. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord(xx))
  4. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
  5. [kafka-network-thread-test-txn-TX-0] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Enqueuing transactional request AddPartitionsToTxnRequestData(transactionalId='ruis-MacBook-Pro-2.local-test-txn-TX-0', producerId=0, producerEpoch=0, topics=[AddPartitionsToTxnTopic(name="*", partitions=[0])]) []
  6. Sending ADD_PARTITIONS_TO_TXN request with header RequestHeader
  7. Received ADD_PARTITIONS_TO_TXN response from node 0 for request with xxx results=[AddPartitionsToTxnPartitionResult(partitionIndex=0, errorCode=90)])]) []
  8. o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. []
  9. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
  10. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
  11. [Test worker] o.s.k.core.DefaultKafkaProducerFactory : Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []

it looks to me the producer@58bb6ba7 in log1, has comitted and got a newer epoch, however it is reused by test2(log3) with a lower epoch, what could be the problem?

the ENV is

  1. spring-kafka-test 3.0.6

Updated (2023/May/31)

In this case I see the MyProducerOnlyFactory bean is called twice(it should be singleton), hence I have
myproducer-txnid linked to two MyProducerOnlyFactory instances.

with MyProducerOnlyFactory-1 I get myproducer-txnid-0, commit transaction and return it to cache.

later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2 find the above producer instance from cache, commit transaction and update producerepoch to 1

after that MyProducerOnlyFactory-1 get back the myproducer-txnid-0, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.

I fixed the issue with

  1. @NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
  2. @SpringBootTest(xxx)
  3. class MyNestedIntegrationTest {
  4. //xxx
  5. }

with that I can see the MyProducerOnlyFactory bean initialised once only.

答案1

得分: 0

In this case I see the MyProducerOnlyFactory bean is called twice (it should be singleton), hence I have myproducer-txnid linked to two MyProducerOnlyFactory instances.

with MyProducerOnlyFactory-1 I get myproducer-txnid-0, commit transaction and return it to cache.

later on, in another listener thread, a Kafka template with MyProducerOnlyFactory-2 finds the above producer instance from cache, commits the transaction, and updates producerEpoch to 1.

after that, MyProducerOnlyFactory-1 gets back the myproducer-txnid-0, it thinks it is still epoch=0, commits the transaction but gets a ProducerFencedException of course.

I fixed the issue with NestedTestConfiguration

  1. @NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
  2. @SpringBootTest(xxx)
  3. class MyNestedIntegrationTest {
  4. //xxx
  5. }

With that, I can see the MyProducerOnlyFactory bean initialized only once.

英文:

In this case I see the MyProducerOnlyFactory bean is called twice(it should be singleton), hence I have myproducer-txnid linked to two MyProducerOnlyFactory instances.

with MyProducerOnlyFactory-1 I get myproducer-txnid-0, commit transaction and return it to cache.

later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2 find the above producer instance from cache, commit transaction and update producerepoch to 1

after that MyProducerOnlyFactory-1 get back the myproducer-txnid-0, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.

I fixed the issue with NestedTestConfiguration

  1. @NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
  2. @SpringBootTest(xxx)
  3. class MyNestedIntegrationTest {
  4. //xxx
  5. }

with that I can see the MyProducerOnlyFactory bean initialised once only.

huangapple
  • 本文由 发表于 2023年5月30日 12:52:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76361701.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定