英文:
Spring Kafka transaction - No transaction is in process, run the template operation within the scope of a template.executeInTransaction()
问题
我正在尝试使用`KafkaTemplate`从事务中向Kafka发布消息:
@Autowired
KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;
@Transactional
@RabbitListener(queues = "queueName")
void input(final List<Message> messages) {
for (Message msg : messages) {
PublishRequest request = prepareRequest(msg);
kafkaTemplate.sendDefault(request.getKey(), reguest.getValue());
}
transactionalDatabaseInserts();
}
但是当我这样做时,我得到了以下异常:
> Caused by: java.lang.IllegalStateException: No transaction is in
> process; possible solutions: run the template operation within the
> scope of a template.executeInTransaction() operation, start a
> transaction with @Transactional before invoking the template method,
> run in a transaction started by a listener container when consuming a
> record
`KafkaTemplate`的配置:
@EnableTransactionManagement
@Configuration
public class KafkaConfig{
@Bean
KafkaTransactionManager<GenericRecord, GenericRecord> kafkaTransactionManager(final ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate(final ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
在我的`application.yaml`中,我已经包含了:
spring.kafka.producer.transaction-id-prefix: tx-
我希望我的方法能够与`@Transactional`一起使用,而不是使用`kafkaTemplate.executeInTransaction()`。为什么会出现这个异常?
英文:
I'm trying to publish messages to Kafka from a transaction using KafkaTemplate
:
@Autowired
KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;
@Transactional
@RabbitListener(queues = "queueName")
void input(final List<Message> messages) {
for (Message msg : messages) {
PublishRequest request = prepareRequest(msg);
kafkaTemplate.sendDefault(request.getKey(), reguest.getValue());
}
transactionalDatabaseInserts();
}
But when I do it I'm getting this exception:
> Caused by: java.lang.IllegalStateException: No transaction is in
> process; possible solutions: run the template operation within the
> scope of a template.executeInTransaction() operation, start a
> transaction with @Transactional before invoking the template method,
> run in a transaction started by a listener container when consuming a
> record
Configs for KafkaTemplate
:
@EnableTransactionManagement
@Configuration
public class KafkaConfig{
@Bean
KafkaTransactionManager<GenericRecord, GenericRecord> kafkaTransactionManager(final ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate(final ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
In my application.yaml
I have included:
spring.kafka.producer.transaction-id-prefix: tx-
I want my method to work with @Transactional
and not kafkaTemplate.executeInTransaction()
. Why am I getting that exception?
答案1
得分: 1
你的配置肯定有问题 - 这个代码按预期工作...
@SpringBootApplication
@EnableTransactionManagement
public class So63596919Application {
public static void main(String[] args) {
SpringApplication.run(So63596919Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(1);
@Transactional
@RabbitListener(queues = "so63596919")
public void listen(List<String> in) throws InterruptedException {
System.out.println(in);
in.forEach(str -> this.template.send("so63596919", str));
System.out.println("Hit enter to exit listener and commit transaction");
this.latch.await();
}
@KafkaListener(id = "so63596919", topics = "so63596919")
public void listen(String in) {
System.out.println(in);
}
@Bean
public Queue queue() {
return new Queue("so63596919");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63596919").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(RabbitTemplate template, AbstractRabbitListenerContainerFactory<?> factory) {
factory.setBatchListener(true);
factory.setContainerCustomizer(container -> {
((SimpleMessageListenerContainer) container).setConsumerBatchEnabled(true);
container.setDeBatchingEnabled(true);
});
return args -> {
template.convertAndSend("so63596919", "foo");
template.convertAndSend("so63596919", "bar");
System.in.read();
this.latch.countDown();
};
}
}
spring.kafka.producer.transaction-id-prefix: tx-
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.rabbitmq.listener.simple.batch-size=2
英文:
You must have something mis-configured - this works as expected...
@SpringBootApplication
@EnableTransactionManagement
public class So63596919Application {
public static void main(String[] args) {
SpringApplication.run(So63596919Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(1);
@Transactional
@RabbitListener(queues = "so63596919")
public void listen(List<String> in) throws InterruptedException {
System.out.println(in);
in.forEach(str -> this.template.send("so63596919", str));
System.out.println("Hit enter to exit listener and commit transaction");
this.latch.await();
}
@KafkaListener(id = "so63596919", topics = "so63596919")
public void listen(String in) {
System.out.println(in);
}
@Bean
public Queue queue() {
return new Queue("so63596919");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63596919").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(RabbitTemplate template, AbstractRabbitListenerContainerFactory<?> factory) {
factory.setBatchListener(true);
factory.setContainerCustomizer(container -> {
((SimpleMessageListenerContainer) container).setConsumerBatchEnabled(true);
container.setDeBatchingEnabled(true);
});
return args -> {
template.convertAndSend("so63596919", "foo");
template.convertAndSend("so63596919", "bar");
System.in.read();
this.latch.countDown();
};
}
}
spring.kafka.producer.transaction-id-prefix: tx-
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.rabbitmq.listener.simple.batch-size=2
If you can trim down your project to a small example like this, I can take a look to see what's wrong.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论