怎样在 @KafkaListener 中使用 @Transactional?

huangapple go评论94阅读模式
英文:

How to use @Transactional with @KafkaListener?

问题

有没有可能在使用@KafkaListener注解的方法中使用声明性事务管理(通过@Transactional)?
我想要使用它,以便例如可以为每个监听器定义单独的事务超时时间。
我的设置如下:

事务管理器:

@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
                                                                                 org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
  return new ChainedKafkaTransactionManager<>(
    kafkaTransactionManager, 
    hibernateTransactionManager);
}

Kafka监听器:

@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}

问题是 - KafkaMessageListenerContainer在调用此类方法之前会创建自己的事务 - 它使用自己的TransactionTemplate:

@Nullable
private TransactionTemplate determineTransactionTemplate() {
  return this.transactionManager != null
    ? new TransactionTemplate(this.transactionManager)
	: null;
}

并不使用TransactionInterceptor。那么如何为特定的@KafkaListener方法设置特定的事务超时时间?

英文:

Is there any possibility to use declarative tx management (via @Transactional) with @KafkaListener annotated method ?
I would like to use it in order to, for example, define separate tx timeout per listener.
My setup is as follows:

TransactionManager:

@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager&lt;Object, Object&gt; chainedHibernateTm(KafkaTransactionManager&lt;String, String&gt; kafkaTransactionManager,
                                                                                 org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
  return new ChainedKafkaTransactionManager&lt;&gt;(
    kafkaTransactionManager, 
    hibernateTransactionManager);
}

KafkaListener:

@KafkaListener(topic = &quot;my_topic&quot;)
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}

The problem is - KafkaMessageListenerContainer creates it's own transaction before such method is invoked - it uses its own TransactionTemplate:

@Nullable
private TransactionTemplate determineTransactionTemplate() {
  return this.transactionManager != null
    ? new TransactionTemplate(this.transactionManager)
	: null;
}

TransactionInterceptor is not used. So how to set specific tx timeout for concrete @KafkaListener method ?

答案1

得分: 3

事务管理的各个方面通过普通的@Transactional无法与Kafka监听器一起使用,就像您的意图一样 - 没有内置的交互。

有一个专门的章节介绍如何在事务内部使用应用程序事件:事务绑定事件,使用注解@TransactionalEventListener有助于确保事务已成功完成。

> 您可以使用@EventListener注解注册常规事件监听器。如果您需要将其绑定到事务,请使用@TransactionalEventListener。这样做时,默认情况下,监听器绑定到事务的提交阶段。

英文:

The aspects of transaction management through plain @Transactional don't work with Kafka listeners as you intent - there is no in-built interaction.

There is a dedicated chapter how to use application events with within a transaction: Transaction Bound Event using the annotation @TransactionalEventListener helps to assure that the transaction has completed successfully.

> You can register a regular event listener by using the @EventListener annotation. If you need to bind it to the transaction, use @TransactionalEventListener. When you do so, the listener is bound to the commit phase of the transaction by default.

答案2

得分: 2

可以做到,但稍微有些复杂,因为你需要将已消费的偏移量发送到Kafka事务中。

你可以不使用ChainedKafkaTransactionManager,而是为容器使用KafkaTransactionManager,并为HibernateTransactionManager使用@Transactional

这将产生类似的结果,因为 Hibernate 事务将在 Kafka 事务之前提交(如果 Hibernate 提交失败,则 Kafka 事务将回滚)。

编辑

要为每个监听器容器配置不同的链式事务管理器,你可以像这样做:

@Component
class ContainerFactoryCustomizer {

	ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
			ChainedKafkaTransactionManager<?, ?> chainedOne,
			ChainedKafkaTransactionManager<?, ?> chainedTwo) {
		factory.setContainerCustomizer(
				container -> {
					String groupId = container.getContainerProperties().getGroupId();
					if (groupId.equals("foo")) {
						container.getContainerProperties().setTransactionManager(chainedOne);
					}
					else {
						container.getContainerProperties().setTransactionManager(chainedTwo);
					}
				});
	}

}

其中每个链式事务管理器都有一个具有不同默认超时时间的 Hibernate 事务管理器。

groupid 是从 @KafkaListeneridgroupId 属性中获取的。

英文:

It can be done, but it's a bit complicated because you have to send the consumed offset(s) to the Kafka transaction.

Instead of using a ChainedKafkaTransactionManager, you can use a KafkaTransactionManager for the container and @Transactional for the HibernateTransactionManager.

This will give similar results, since the hibernate TX will commit just before the Kafka transaction (and if the hibernate commit fails, the Kafka TX will roll back).

EDIT

To configure a different chained TM into each listener container you can do something like this.

@Component
class ContainerFactoryCustomizer {

	ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory&lt;?, ?, ?&gt; factory,
			ChainedKafkaTransactionManager&lt;?, ?&gt; chainedOne,
			ChainedKafkaTransactionManager&lt;?, ?&gt; chainedTwo) {
		factory.setContainerCustomizer(
				container -&gt; {
					String groupId = container.getContainerProperties().getGroupId();
					if (groupId.equals(&quot;foo&quot;)) {
						container.getContainerProperties().setTransactionManager(chainedOne);
					}
					else {
						container.getContainerProperties().setTransactionManager(chainedTwo);
					}
				});
	}

}

Where each chained TM has a Hibernate TM with a different default timeout.

The groupid is populated from the @KafkaListener id or groupId property.

huangapple
  • 本文由 发表于 2020年4月10日 22:28:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/61142419.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定