英文:
Why my Spring Kafka unit test almost ran into ProducerFencedException every time
问题
测试包括两个测试案例
@Test
@Nested
内部类 Test1 {
fun test1() {
mypublisher.publishInTransaction(topic1) // 见 log1
// 检查 listener1 // 见 log2
}
}
@Test
@Nested
内部类 Test2 {
fun test2() {
mypublisher.publishInTransaction(topic2) // 见 log3
// 检查 listener2
}
}
fun MyPublisher.publishInTransaction() {
// 这是一个 kafkatemplate 实例
producerOnlyKafkaTemplate.executeInransaction {
producerOnlyKafkaTemplate.send(xxxx)
}
}
test1 在 test2 之前运行,日志按时间顺序排列
log1
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : 创建新的生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord())
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
2023-05-30 12:14:26.913 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
log2
[Processor-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : 创建新的生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@671afc6b] []
xxx 这发生在监听器内部并且验证通过
log3
它重用了在 log1 中创建的生产者,间隔约为32秒
2023-05-30 12:14:58.206 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord(xx))
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
[kafka-network-thread-test-txn-TX-0] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] 正在排队事务请求 AddPartitionsToTxnRequestData(transactionalId='ruis-MacBook-Pro-2.local-test-txn-TX-0', producerId=0, producerEpoch=0, topics=[AddPartitionsToTxnTopic(name="*", partitions=[0])]) []
发送 ADD_PARTITIONS_TO_TXN 请求附带头部 RequestHeader
接收到节点 0 的 ADD_PARTITIONS_TO_TXN 响应,对于请求 xxx 结果=[AddPartitionsToTxnPartitionResult(partitionIndex=0, errorCode=90)])]) []
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] 过渡到严重错误状态,原因是 org.apache.kafka.common.errors.ProducerFencedException: 存在具有相同 transactionalId 的更新生产者,将当前生产者隔离。 []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : commitTransaction 失败: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : 在某些操作期间发生错误;从缓存中移除生产者: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
在我看来,log1 中的 producer@58bb6ba7 已经提交并获得了一个更新的 epoch,然后在 test2(log3) 中以一个较低的 epoch 被重用,可能的问题是什么?
环境是
spring-kafka-test 3.0.6
更新 (2023年5月31日)
在这种情况下,我看到 MyProducerOnlyFactory
bean 被调用了两次(它应该是单例的),因此我有了
myproducer-txnid
与两个 MyProducerOnlyFactory
实例相关联。
使用 MyProducerOnlyFactory-1
,我获得了 myproducer-txnid-0
,提交事务并将其返回到缓存中。
随后,在另一个监听器线程中,一个具有 MyProducerOnlyFactory-2
的 kafkatemplate 从缓存中找到了上述的生产者实例,提交了事务并将 producerepoch 更新为 1。
之后,MyProducerOnlyFactory-1
又获取了 myproducer-txnid-0
,它认为它仍然是 epoch=0,尝试提交事务但当然会收到 ProducerFencedException。
我使用以下方式修复了问题
@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
//xxx
}
通过这样,我可以看到 MyProducerOnlyFactory
bean 只初始化了一次。
英文:
The test is consist of 2 test cases
@Test
@Nested
inner class Test1 {
fun test1() {
mypublisher.publishInTransaction(topic1) // see log1
// check listener1 // see log2
}
}
@Test
@Nested
inner class Test2 {
fun test2() {
mypublisher.publishInTransaction(topic2) // see log3
// check listener2
}
}
fun MyPublisher.publishInTransaction() {
// it is kafkatemplate instance
producerOnlyKafkaTemplate.executeInransaction {
producerOnlyKafkaTemplate.send(xxxx)
}
}
test1 run before test2, log is in time order
log1
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord())
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
2023-05-30 12:14:26.913 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
log2
[Processor-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@671afc6b] []
xxx this happened inside listener and the verification passed
log3
it reuses the producer created in log1, the gap is around 32seconds
2023-05-30 12:14:58.206 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord(xx))
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
[kafka-network-thread-test-txn-TX-0] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Enqueuing transactional request AddPartitionsToTxnRequestData(transactionalId='ruis-MacBook-Pro-2.local-test-txn-TX-0', producerId=0, producerEpoch=0, topics=[AddPartitionsToTxnTopic(name="*", partitions=[0])]) []
Sending ADD_PARTITIONS_TO_TXN request with header RequestHeader
Received ADD_PARTITIONS_TO_TXN response from node 0 for request with xxx results=[AddPartitionsToTxnPartitionResult(partitionIndex=0, errorCode=90)])]) []
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
it looks to me the producer@58bb6ba7 in log1, has comitted and got a newer epoch, however it is reused by test2(log3) with a lower epoch, what could be the problem?
the ENV is
spring-kafka-test 3.0.6
Updated (2023/May/31)
In this case I see the MyProducerOnlyFactory
bean is called twice(it should be singleton), hence I have
myproducer-txnid
linked to two MyProducerOnlyFactory
instances.
with MyProducerOnlyFactory-1
I get myproducer-txnid-0
, commit transaction and return it to cache.
later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2
find the above producer instance from cache, commit transaction and update producerepoch to 1
after that MyProducerOnlyFactory-1
get back the myproducer-txnid-0
, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.
I fixed the issue with
@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
//xxx
}
with that I can see the MyProducerOnlyFactory
bean initialised once only.
答案1
得分: 0
In this case I see the MyProducerOnlyFactory
bean is called twice (it should be singleton), hence I have myproducer-txnid
linked to two MyProducerOnlyFactory
instances.
with MyProducerOnlyFactory-1
I get myproducer-txnid-0
, commit transaction and return it to cache.
later on, in another listener thread, a Kafka template with MyProducerOnlyFactory-2
finds the above producer instance from cache, commits the transaction, and updates producerEpoch
to 1.
after that, MyProducerOnlyFactory-1
gets back the myproducer-txnid-0
, it thinks it is still epoch=0
, commits the transaction but gets a ProducerFencedException
of course.
I fixed the issue with NestedTestConfiguration
@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
//xxx
}
With that, I can see the MyProducerOnlyFactory
bean initialized only once.
英文:
In this case I see the MyProducerOnlyFactory
bean is called twice(it should be singleton), hence I have myproducer-txnid
linked to two MyProducerOnlyFactory
instances.
with MyProducerOnlyFactory-1
I get myproducer-txnid-0
, commit transaction and return it to cache.
later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2
find the above producer instance from cache, commit transaction and update producerepoch to 1
after that MyProducerOnlyFactory-1
get back the myproducer-txnid-0
, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.
I fixed the issue with NestedTestConfiguration
@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
//xxx
}
with that I can see the MyProducerOnlyFactory
bean initialised once only.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论