我的Spring Kafka单元测试几乎每次都遇到了ProducerFencedException。

huangapple go评论66阅读模式
英文:

Why my Spring Kafka unit test almost ran into ProducerFencedException every time

问题

测试包括两个测试案例

@Test
@Nested
内部类 Test1 {
   fun test1() {
     mypublisher.publishInTransaction(topic1)  // 见 log1
     // 检查 listener1  // 见 log2
   }
}

@Test
@Nested
内部类 Test2 {
   fun test2() {
     mypublisher.publishInTransaction(topic2)  // 见 log3
     // 检查 listener2
   }
}

fun MyPublisher.publishInTransaction() {
  // 这是一个 kafkatemplate 实例
  producerOnlyKafkaTemplate.executeInransaction {
    producerOnlyKafkaTemplate.send(xxxx)
  }
}

test1 在 test2 之前运行,日志按时间顺序排列

log1

[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : 创建新的生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord())
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
 2023-05-30 12:14:26.913  [Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []

log2

[Processor-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : 创建新的生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@671afc6b] []
xxx 这发生在监听器内部并且验证通过

log3

它重用了在 log1 中创建的生产者,间隔约为32秒

 2023-05-30 12:14:58.206  [Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord(xx))

[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []


[kafka-network-thread-test-txn-TX-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] 正在排队事务请求 AddPartitionsToTxnRequestData(transactionalId='ruis-MacBook-Pro-2.local-test-txn-TX-0', producerId=0, producerEpoch=0, topics=[AddPartitionsToTxnTopic(name="*", partitions=[0])]) []

发送 ADD_PARTITIONS_TO_TXN 请求附带头部 RequestHeader

接收到节点 0 的 ADD_PARTITIONS_TO_TXN 响应,对于请求 xxx 结果=[AddPartitionsToTxnPartitionResult(partitionIndex=0, errorCode=90)])]) []

o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] 过渡到严重错误状态,原因是 org.apache.kafka.common.errors.ProducerFencedException: 存在具有相同 transactionalId 的更新生产者,将当前生产者隔离。 []

[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction 失败: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : 在某些操作期间发生错误;从缓存中移除生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []

在我看来,log1 中的 producer@58bb6ba7 已经提交并获得了一个更新的 epoch,然后在 test2(log3) 中以一个较低的 epoch 被重用,可能的问题是什么?

环境是

spring-kafka-test 3.0.6

更新 (2023年5月31日)

在这种情况下,我看到 MyProducerOnlyFactory bean 被调用了两次(它应该是单例的),因此我有了
myproducer-txnid 与两个 MyProducerOnlyFactory 实例相关联。

使用 MyProducerOnlyFactory-1,我获得了 myproducer-txnid-0,提交事务并将其返回到缓存中。

随后,在另一个监听器线程中,一个具有 MyProducerOnlyFactory-2 的 kafkatemplate 从缓存中找到了上述的生产者实例,提交了事务并将 producerepoch 更新为 1。

之后,MyProducerOnlyFactory-1 又获取了 myproducer-txnid-0,它认为它仍然是 epoch=0,尝试提交事务但当然会收到 ProducerFencedException。

我使用以下方式修复了问题

@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
  //xxx
}

通过这样,我可以看到 MyProducerOnlyFactory bean 只初始化了一次。

英文:

The test is consist of 2 test cases

@Test
@Nested
inner class Test1 {
   fun test1() {
     mypublisher.publishInTransaction(topic1)  // see log1
     // check listener1  // see log2
   }
}

@Test
@Nested
inner class Test2 {
   fun test2() {
     mypublisher.publishInTransaction(topic2)  // see log3
     // check listener2
   }
}

fun MyPublisher.publishInTransaction() {
  // it is kafkatemplate instance
  producerOnlyKafkaTemplate.executeInransaction {
    producerOnlyKafkaTemplate.send(xxxx)
  }
}

test1 run before test2, log is in time order

log1

[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord())
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
 2023-05-30 12:14:26.913  [Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []

log2

[Processor-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@671afc6b] []
xxx this happened inside listener and the verification passed

log3

it reuses the producer created in log1, the gap is around 32seconds

 2023-05-30 12:14:58.206  [Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord(xx))

[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []


[kafka-network-thread-test-txn-TX-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Enqueuing transactional request AddPartitionsToTxnRequestData(transactionalId='ruis-MacBook-Pro-2.local-test-txn-TX-0', producerId=0, producerEpoch=0, topics=[AddPartitionsToTxnTopic(name="*", partitions=[0])]) []

Sending ADD_PARTITIONS_TO_TXN request with header RequestHeader

Received ADD_PARTITIONS_TO_TXN response from node 0 for request with xxx results=[AddPartitionsToTxnPartitionResult(partitionIndex=0, errorCode=90)])]) []

o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. []


[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []

it looks to me the producer@58bb6ba7 in log1, has comitted and got a newer epoch, however it is reused by test2(log3) with a lower epoch, what could be the problem?

the ENV is

spring-kafka-test 3.0.6

Updated (2023/May/31)

In this case I see the MyProducerOnlyFactory bean is called twice(it should be singleton), hence I have
myproducer-txnid linked to two MyProducerOnlyFactory instances.

with MyProducerOnlyFactory-1 I get myproducer-txnid-0, commit transaction and return it to cache.

later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2 find the above producer instance from cache, commit transaction and update producerepoch to 1

after that MyProducerOnlyFactory-1 get back the myproducer-txnid-0, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.

I fixed the issue with

@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
  //xxx
}

with that I can see the MyProducerOnlyFactory bean initialised once only.

答案1

得分: 0

In this case I see the MyProducerOnlyFactory bean is called twice (it should be singleton), hence I have myproducer-txnid linked to two MyProducerOnlyFactory instances.

with MyProducerOnlyFactory-1 I get myproducer-txnid-0, commit transaction and return it to cache.

later on, in another listener thread, a Kafka template with MyProducerOnlyFactory-2 finds the above producer instance from cache, commits the transaction, and updates producerEpoch to 1.

after that, MyProducerOnlyFactory-1 gets back the myproducer-txnid-0, it thinks it is still epoch=0, commits the transaction but gets a ProducerFencedException of course.

I fixed the issue with NestedTestConfiguration

@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
  //xxx
}

With that, I can see the MyProducerOnlyFactory bean initialized only once.

英文:

In this case I see the MyProducerOnlyFactory bean is called twice(it should be singleton), hence I have myproducer-txnid linked to two MyProducerOnlyFactory instances.

with MyProducerOnlyFactory-1 I get myproducer-txnid-0, commit transaction and return it to cache.

later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2 find the above producer instance from cache, commit transaction and update producerepoch to 1

after that MyProducerOnlyFactory-1 get back the myproducer-txnid-0, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.

I fixed the issue with NestedTestConfiguration

@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
  //xxx
}

with that I can see the MyProducerOnlyFactory bean initialised once only.

huangapple
  • 本文由 发表于 2023年5月30日 12:52:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76361701.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定