Spring Kafka transaction – No transaction is in process, run the template operation within the scope of a template.executeInTransaction()

huangapple go评论70阅读模式
英文:

Spring Kafka transaction - No transaction is in process, run the template operation within the scope of a template.executeInTransaction()

问题

我正在尝试使用`KafkaTemplate`从事务中向Kafka发布消息:

    @Autowired
    KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;

    @Transactional
    @RabbitListener(queues = "queueName")
    void input(final List<Message> messages) {
         for (Message msg : messages) {
              PublishRequest request = prepareRequest(msg);
              kafkaTemplate.sendDefault(request.getKey(), reguest.getValue());
         }
         transactionalDatabaseInserts();
    }
    
但是当我这样做时,我得到了以下异常:

&gt; Caused by: java.lang.IllegalStateException: No transaction is in
&gt; process; possible solutions: run the template operation within the
&gt; scope of a template.executeInTransaction() operation, start a
&gt; transaction with @Transactional before invoking the template method,
&gt; run in a transaction started by a listener container when consuming a
&gt; record

`KafkaTemplate`的配置:

    @EnableTransactionManagement
    @Configuration
    public class KafkaConfig{
        @Bean
        KafkaTransactionManager<GenericRecord, GenericRecord> kafkaTransactionManager(final ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
            return new KafkaTransactionManager<>(producerFactory);
        }
    
        @Bean
        KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate(final ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
            return new KafkaTemplate<>(producerFactory);
        }
    }

在我的`application.yaml`中,我已经包含了:

    spring.kafka.producer.transaction-id-prefix: tx-

我希望我的方法能够与`@Transactional`一起使用,而不是使用`kafkaTemplate.executeInTransaction()`。为什么会出现这个异常?
英文:

I'm trying to publish messages to Kafka from a transaction using KafkaTemplate:

@Autowired
KafkaTemplate&lt;GenericRecord, GenericRecord&gt; kafkaTemplate;

@Transactional
@RabbitListener(queues = &quot;queueName&quot;)
void input(final List&lt;Message&gt; messages) {
     for (Message msg : messages) {
          PublishRequest request = prepareRequest(msg);
          kafkaTemplate.sendDefault(request.getKey(), reguest.getValue());
     }
     transactionalDatabaseInserts();
}

But when I do it I'm getting this exception:

> Caused by: java.lang.IllegalStateException: No transaction is in
> process; possible solutions: run the template operation within the
> scope of a template.executeInTransaction() operation, start a
> transaction with @Transactional before invoking the template method,
> run in a transaction started by a listener container when consuming a
> record

Configs for KafkaTemplate:

@EnableTransactionManagement
@Configuration
public class KafkaConfig{
    @Bean
    KafkaTransactionManager&lt;GenericRecord, GenericRecord&gt; kafkaTransactionManager(final ProducerFactory&lt;GenericRecord, GenericRecord&gt; producerFactory) {
        return new KafkaTransactionManager&lt;&gt;(producerFactory);
    }

    @Bean
    KafkaTemplate&lt;GenericRecord, GenericRecord&gt; kafkaTemplate(final ProducerFactory&lt;GenericRecord, GenericRecord&gt; producerFactory) {
        return new KafkaTemplate&lt;&gt;(producerFactory);
    }
}

In my application.yaml I have included:

spring.kafka.producer.transaction-id-prefix: tx-

I want my method to work with @Transactional and not kafkaTemplate.executeInTransaction(). Why am I getting that exception?

答案1

得分: 1

你的配置肯定有问题 - 这个代码按预期工作...

@SpringBootApplication
@EnableTransactionManagement
public class So63596919Application {

	public static void main(String[] args) {
		SpringApplication.run(So63596919Application.class, args);
	}

	@Autowired
	private KafkaTemplate<String, String> template;

	private final CountDownLatch latch = new CountDownLatch(1);

	@Transactional
	@RabbitListener(queues = "so63596919")
	public void listen(List<String> in) throws InterruptedException {
		System.out.println(in);
		in.forEach(str -> this.template.send("so63596919", str));
		System.out.println("Hit enter to exit listener and commit transaction");
		this.latch.await();
	}

	@KafkaListener(id = "so63596919", topics = "so63596919")
	public void listen(String in) {
		System.out.println(in);
	}

	@Bean
	public Queue queue() {
		return new Queue("so63596919");
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("so63596919").partitions(1).replicas(1).build();
	}

	@Bean
	public ApplicationRunner runner(RabbitTemplate template, AbstractRabbitListenerContainerFactory<?> factory) {
		factory.setBatchListener(true);
		factory.setContainerCustomizer(container -> {
			((SimpleMessageListenerContainer) container).setConsumerBatchEnabled(true);
			container.setDeBatchingEnabled(true);
		});
		return args -> {
			template.convertAndSend("so63596919", "foo");
			template.convertAndSend("so63596919", "bar");
			System.in.read();
			this.latch.countDown();
		};
	}

}
spring.kafka.producer.transaction-id-prefix: tx-
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.rabbitmq.listener.simple.batch-size=2
英文:

You must have something mis-configured - this works as expected...

@SpringBootApplication
@EnableTransactionManagement
public class So63596919Application {

	public static void main(String[] args) {
		SpringApplication.run(So63596919Application.class, args);
	}

	@Autowired
	private KafkaTemplate&lt;String, String&gt; template;

	private final CountDownLatch latch = new CountDownLatch(1);

	@Transactional
	@RabbitListener(queues = &quot;so63596919&quot;)
	public void listen(List&lt;String&gt; in) throws InterruptedException {
		System.out.println(in);
		in.forEach(str -&gt; this.template.send(&quot;so63596919&quot;, str));
		System.out.println(&quot;Hit enter to exit listener and commit transaction&quot;);
		this.latch.await();
	}

	@KafkaListener(id = &quot;so63596919&quot;, topics = &quot;so63596919&quot;)
	public void listen(String in) {
		System.out.println(in);
	}

	@Bean
	public Queue queue() {
		return new Queue(&quot;so63596919&quot;);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name(&quot;so63596919&quot;).partitions(1).replicas(1).build();
	}

	@Bean
	public ApplicationRunner runner(RabbitTemplate template, AbstractRabbitListenerContainerFactory&lt;?&gt; factory) {
		factory.setBatchListener(true);
		factory.setContainerCustomizer(container -&gt; {
				((SimpleMessageListenerContainer) container).setConsumerBatchEnabled(true);
				container.setDeBatchingEnabled(true);
		});
		return args -&gt; {
			template.convertAndSend(&quot;so63596919&quot;, &quot;foo&quot;);
			template.convertAndSend(&quot;so63596919&quot;, &quot;bar&quot;);
			System.in.read();
			this.latch.countDown();
		};
	}

}
spring.kafka.producer.transaction-id-prefix: tx-
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.rabbitmq.listener.simple.batch-size=2

If you can trim down your project to a small example like this, I can take a look to see what's wrong.

huangapple
  • 本文由 发表于 2020年8月26日 19:43:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/63596919.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定