英文:
Spring transaction synchronisation not working (TransactionalEventListener)
问题
我知道这个问题在这个网站上以略有不同的格式已经被提出,但是按照那些帖子上给出的建议对我毫无帮助。我已经花了将近两天的时间在这个问题上,已经没有了思路。
我们有一个Spring Boot微服务,它只是监听IBM MQ队列中的消息,进行一些转换,然后将其转发到一个Kafka主题。我们希望这个过程是事务性的,这样就不会丢失消息(对我们的业务至关重要)。我们还希望能够在事务提交和回滚事件上做出反应,以进行监控和支持。
我只是在互联网上找了一些“如何做”的地方,可以很容易地通过使用@Transactional
注解来以声明性的方式实现事务行为,如下所示:
@Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "DEV.QUEUE.1", containerFactory = "mqListenerContainerFactory", concurrency = "10")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
// 一些工作在这里,包括转发到Kafka主题:
// ...
// ...
// 然后发布一个事件,应该对其进行操作:
applicationEventPublisher.publishEvent(new MqConsumedEvent("JMS Correlation ID", "Message Payload"));
// 取消下面的异常注释以创建回滚场景
// 或者将其注释掉以完成处理
throw new RuntimeException("No good Pal!");
}
正如预期的那样,当在处理中引发异常的消息时,处理将因事务管理器不断回滚而无限循环。这对我们来说很有用。
现在我们希望在我们的监听器方法中发布的MqConsumedEvent能够被下面的onRollback
方法拦截:
@Component
@Slf4j
public class MqConsumedEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = MqConsumedEvent.class)
public void onCommit(MqConsumedEvent event) {
log.info("MQ message with correlation id {} committed to Kafka", event.getCorrelationId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, classes = MqConsumedEvent.class)
public void onRollback(MqConsumedEvent event) {
log.info("Failed to commit MQ message with correlation id {} to Kafka", event.getCorrelationId());
}
}
但这并没有发生。类似地,将监听器中引发异常的部分注释掉后,我们的MQ消息将传递到Kafka。但是onCommit
方法没有执行。
经过进一步研究和Spring的调试,我认为这不会执行是因为Spring认为在发布事件时没有活动的事务,因此我的事件被忽略了。在日志中评估TransactionSynchronizationManager.isActualTransactionActive()
并打印它显示false
,这很难解释,因为正如我所说,当有意引发异常时,事务会按预期回滚。
提前感谢您的意见。
更新:
我设置的断点将我带到了执行此ApplicationListenerMethodTransactionalAdapter
类的地方:
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
出于我不理解的原因,第一个if条件为false。然后,由于我在@TransactionalEventListener
中没有将fallbackExecution设置为true
,它将进入else分支,只是跳过事件。
英文:
I am aware this question has been asked in slightly different formats on this site but following the advices given on those posts took me nowhere. I already spent close to two days on this and I am out of ideas.
We have a spring boot micro service which does nothing more than listening for a message coming into an IBM MQ queue do a little bit of transformation and forwarding it to a Kafka topic. We want this to be transactional so there would be no message lost (critical to our business). We also want to be able to react on transaction commit and rollback events for the purpose of monitoring and support.
I just followed a few "how to" places on the internet and I can easily achieve transactional behaviour in a declarative way using @Transactional
annotation like below:
@Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "DEV.QUEUE.1", containerFactory = "mqListenerContainerFactory", concurrency = "10")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
// Some work here including forward to Kafka topic:
// ...
// ...
// Then publish an event which is supposed to be acted on:
applicationEventPublisher.publishEvent(new MqConsumedEvent("JMS Correlation ID", "Message Payload"));
// Uncommented exception below to create a rollback scenario
// or comment it out to have the processing completed
throw new RuntimeException("No good Pal!");
}
As expected when playing a message with the exception in place the processing will spin forever because of the transaction manager rollbacking again and again. This is good for us.
Now we expect the MqConsumedEvent being published inside our listener method to be intercepted by the onRollback
method below:
@Component
@Slf4j
public class MqConsumedEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = MqConsumedEvent.class)
public void onCommit(MqConsumedEvent event) {
log.info("MQ message with correlation id {} committed to Kafka", event.getCorrelationId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, classes = MqConsumedEvent.class)
public void onRollback(MqConsumedEvent event) {
log.info("Failed to commit MQ message with correlation id {} to Kafka", event.getCorrelationId());
}
}
This is not happening. Similar commenting out the Exception throwing in the listener makes our MQ message being passed to Kafka. However the onCommit
method is not executed.
From further research and spring debug I believe this is not executing because spring thinks there is no active transaction when publishing the event and such my event it is just ignored. Evaluating TransactionSynchronizationManager.isActualTransactionActive()
and printing it in the logs shows false
which is hard to explain because as I said the transaction rollbacks as expected when an exception is thrown on purpose.
Thank you in advance for your inputs.
UPDATE:
The breakpoints I put brought me to the execution of this ApplicationListenerMethodTransactionalAdapter
class:
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
For reason I am not understanding the first if condition is false. Then fallback execution is false
as I haven't set it true
in my @TransactionalEventListener
usage it will end up on the else branch and just skip the event.
答案1
得分: 0
我遇到了同样的问题。在我的情况下,原来是我在项目中定义了一个 ApplicationEventMulticaster
。
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
var eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
这导致 ApplicationListenerMethodTransactionalAdapter
在不同的线程中执行(而不是事件发布的线程)。这就是为什么 TransactionSynchronizationManager.isActualTransactionActive()
最终为 false,事件没有被执行。
对我来说,移除 ApplicationEventMulticaster
的定义就解决了问题。
英文:
I had the same problem. In my case it turns out that I had defined an ApplicationEventMulticaster
in my project.
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
var eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
That make the ApplicationListenerMethodTransactionalAdapter
to be executed in a different thread (not the one where the event was published). That's why TransactionSynchronizationManager.isActualTransactionActive()
ends up to be false and the event do not get executed.
Removing the definition of the ApplicationEventMulticaster
worked fine for me.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论