How to enable database + kafka transaction in Spring Cloud Stream for producer-only transactions (db + kafka)?

huangapple go评论94阅读模式
英文:

How to enable database + kafka transaction in Spring Cloud Stream for producer-only transactions (db + kafka)?

问题

我们有一个事件驱动的分布式架构,使用Spring Cloud Stream实现了独立的生产者和消费者微服务。应用程序需要执行以下操作:在生产者中,先进行数据库的插入/更新操作,然后将消息发布到Kafka。但是,事务只对数据库起作用,对Kafka没有起作用。如果发生错误,数据库事务会回滚,但Kafka消息仍然会被发送并被消费微服务读取。

使用的版本:spring-kafka 2.8.11,spring-boot 2.7.7,spring-cloud版本为2021.0.5

为了启用事务,在Spring Boot应用程序类上使用了@EnableTransactionManagement注解。对于仅生产者的事务,我尝试使用@Transactional和文档中的其他替代方法,但它们都不起作用。在测试事务时,我在发送Kafka消息后手动抛出了RuntimeException。

示例代码(仅需要生产者事务):

@Autowired
private final StreamBridge streamBridge;

@Transactional
public void sendDbAndKafkaUpdate() {
    // 在这里进行数据库写操作...
    
    // 发布Kafka消息
    sendKafkaMessage();
}

private void sendKafkaMessage() {
    streamBridge.send("topic-name", messageEvent);

    // 在这里抛出RuntimeException。
}

用于启用生产者事务的应用程序yaml配置:

spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: ${kafka.unique.tx.id.per.instance}  //每个服务实例都设置了这个
            producer:
              configuration:
                retries: 1
                acks: all
    
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
                schema.registry.url: ${kafka.schema.registry.url}

我已经查阅了文档,但不太清楚推荐的处理方法是什么?参考文档(参考生产者事务部分):https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#kafka-transactional-binder

文档中提出了以下代码来启用仅生产者事务:

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${kafka.unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager<byte[], byte[]> tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionIdPrefix(txId);
    return tm;
}

我尝试过,但如果在发布Kafka消息后手动抛出RuntimeException,这种方法不起作用。数据库事务会回滚,但Kafka消息仍然会被发送(并被消费应用程序消费)。

问题

  1. 如果使用StreamBridge发送消息到一个主题,绑定器的名称应该是什么?它是否指的是apache-kafka-binder本身,这意味着如果只使用该绑定器,null就可以了?还是与在应用程序yaml中配置的绑定相关(注意:在使用StreamBridge的情况下,没有使用输出绑定)?

  2. 更重要的是,如何同步仅生产者事务,其中包括数据库更新后发布Kafka消息,并考虑以下几点:

    • 上述引用文档建议使用ChainedTransactionManager来同步事务("如果您希望将仅生产者事务与其他事务管理器的事务同步,使用ChainedTransactionManager")。但是,请注意,ChainedTransactionManager已被弃用。
    • 还请注意,应用程序中没有直接使用KafkaTemplate(考虑到SCS提供了抽象)。

[编辑] 解决方案

在消费者绑定的isolation.level或默认级别上定义它,如下所示:

spring.cloud.stream.kafka.binder.configuration.isolation.level: read_committed

请注意,在文档中,有时将值写为"read-committed"(而不是"read_committed"),但这对我来说不起作用。

英文:

We have an event driven distributed architecture with separate producer and consumer microservices using Spring Cloud Stream and the application needs to perform the following: In the producer, a database insert/update followed by publishing a message to Kafka. But, transactions are only working for the database, and not working for kafka. The DB transaction gets rolled back on error, but the kafka message still gets sent and read by the consuming microservice.

Versions used: spring-kafka 2.8.11, spring-boot 2.7.7, spring-cloud version 2021.0.5

For enabling transactions, @EnableTransactionManagement annotation is used on the spring boot application class. For producer only transaction, I have tried to use @Transactional and some other alternatives found in the documentation, but none of them are working. When testing the transaction, I manually throw a RuntimeException after the kafka message is sent in the code.

Sample code (Producer only transaction needed):-

@Autowired
private final StreamBridge streamBridge;

@Transactional
public void sendDbAndKafkaUpdate() {
    // db write here...
    
    // publish kafka message
    sendKafkaMessage();
}

private void sendKafkaMessage() {
    streamBridge.send(&quot;topic-name&quot;, messageEvent);

    //throw a RuntimeException here.
}

The application yaml configuration for enabling producer transaction:


spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: ${kafka.unique.tx.id.per.instance}  //this is set per service instance
            producer:
              configuration:
                retries: 1
                acks: all
    
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
                schema.registry.url: ${kafka.schema.registry.url}

I have searched the documentation but it is not very clear what is the recommended approach to handle this? Reference documentation (refer the section for producer only transactions):- https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#kafka-transactional-binder

The documentation proposes the following code to enable producer only transaction:-

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value(&quot;${kafka.unique.tx.id.per.instance}&quot;) String txId) {

    ProducerFactory&lt;byte[], byte[]&gt; pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager&lt;&gt;(pf);
    tm.setTransactionId(txId)
    return tm;
}

I have tried but this doesn't work if I manually throw a RuntimeException after publishing a message to kafka. The DB transaction gets rolled back but the kafka message is still sent (and consumed by the consuming application)

Questions

  1. What should be the binder name in case StreamBridge is used to send message to a topic. Does it refer to the apache-kafka-binder itself, which would mean null will be fine if using only that binder? Or is this related to the bindings configured in application yaml (note: no output binding is used in this case where streamBridge is used)?

  2. More importantly, how can I synchronize a producer-only transaction where a database update is followed by a kafka message published, taking into consideration the following points:-

  • The documentation referred above suggests to use a ChainedTransactionManager to synchronize transactions ("If you wish to synchronize producer-only transactions with those from some other transaction manager, use a ChainedTransactionManager.") But, note that ChainedTransactionManager has been deprecated.
  • Also, note that KafkaTemplate is not used directly in the application (considering SCS provides abstractions)

[EDIT] Solution

Instead of using isolation.level at the consumer binding or default level, define it on the kafka binder configuration level as follows:-

spring.cloud.stream.kafka.binder.configuration.isolation.level: read_committed

Note that in the documentation, the value is sometimes mentioned as "read-committed" (instead of "read_committed"), but this didn't work for me.

答案1

得分: 0

对于如下所示的设置,您不需要定义自定义的Kafka事务管理器。

@Autowired
private final StreamBridge streamBridge;

@Transactional
public void sendDbAndKafkaUpdate() {
    // 在此处进行数据库写入...
    
    // 发布Kafka消息
    sendKafkaMessage();
}

private void sendKafkaMessage() {
    streamBridge.send("topic-name", messageEvent);

    // 在此处抛出RuntimeException。
}

它应该是端到端的事务。@Transactional注解将使用数据库事务管理器作为主要事务管理器(例如JpaTransactionManager)。我假设在您的情况下,数据库事务管理器是由Spring Boot自动配置的。当事务拦截器拦截调用时,它将启动一个新的数据库事务,并在此事务下执行方法。由于您提供了transaction-id-prefix,当调用StreamBridge#send方法时,操作将在事务中完成。然而,StreamBridge使用的内部KafkaTemplate会将Kafka事务与现有的JPA事务同步。在方法退出时,首先提交主要事务,然后是同步事务。如果在Kafka发送后抛出异常,两个事务都将回滚。

您确定Kafka事务没有回滚吗?您是如何验证的?在下游消费者中,您是否使用了read_committedisolation.level?(spring.cloud.stream.kafka.binder.configuration.isolation.level

还要记住的一件事是,如果应用程序中有自动配置的TransactionManager,则不需要在应用程序上添加@EnableTransactionManagement,因为Spring Boot已经应用了该注解。

在您的情况下,您不需要使用任何链接的事务管理器。只有在您想要更改事务提交顺序时才需要它。例如,如果您希望Kafka事务先于数据库事务提交,可以使用链接的事务管理器或嵌套@Transactional方法调用。但是,根据您的说明,您的应用程序不需要这些高级设置。

如果问题仍然存在,请随时创建一个小的示例应用程序,我们可以在其中重现该问题。

英文:

For a setup like the following you don't need to define a custom Kafka transaction manager.

@Autowired
private final StreamBridge streamBridge;

@Transactional
public void sendDbAndKafkaUpdate() {
    // db write here...
    
    // publish kafka message
    sendKafkaMessage();
}

private void sendKafkaMessage() {
    streamBridge.send(&quot;topic-name&quot;, messageEvent);

    //throw a RuntimeException here.
}

It should be transactional end-to-end. The @Transactional annotation will use the database transaction manager as the primary one (for e.g. JpaTransactionManager). I am assuming that the db txn manager is auto-configured by Spring Boot in your case. When the transaction interceptor intercepts the call, it will start a new db transaction and the method is executed under this transaction. Since you are providing the transaction-id-prefix, when the StreamBridge#send method is called, the operation will be done transactionally. The internal KafkaTemplate that the StreamBridge uses, however, synchronizes the Kafka transaction with the existing JPA transaction. Upon exiting the method, the primary transaction commits first, followed by synchronized transactions. If an exception is thrown after the Kafka send, both transactions will be rolled back.

Are you sure that the Kafka transaction is not rolled back? How did you verify that? In your downstream consumer, did you use an isolation.level of read_committed? (spring.cloud.stream.kafka.binder.configuration.isolation.level)

Another thing to keep in mind is that if you have an auto-configured TransactionManager in the application, you do not need to add @EnableTransactionManagement on the application, as Spring Boot already applies that.

You don't need to use any chained transaction manager with your scenario. That is only needed if you want to change the order of transaction commits. For e.g., if you want the Kafka transaction to commit first instead of the DB one, you can use a chained TM or nest the @Transactional method calls. But, by looking at your explanation, your application does not warrant those advanced setups.

If things still don't work, feel free to create a small sample application where we can reproduce the issue.

huangapple
  • 本文由 发表于 2023年8月8日 20:36:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76859634.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定