What is the simplest Spring Kafka @KafkaListener configuration to consume all records from a set of compacted topics?

huangapple go评论73阅读模式
英文:

What is the simplest Spring Kafka @KafkaListener configuration to consume all records from a set of compacted topics?

问题

以下是您要求的翻译内容:

我在我的 Spring 应用程序的 application.yaml 文件中有一些紧凑的 Kafka 主题的名称(topic1topic2,...,topicN)。我希望能够在启动时消费每个主题分区的所有记录。每个主题上的分区数事先未知。

官方的 Spring Kafka 2.6.1 文档建议最简单的方法是实现一个 PartitionFinder 并在 SpEL 表达式中使用它来动态查找主题的分区数,然后在 @TopicPartition 注解的 partitions 属性中使用 * 通配符(参见@KafkaListener 注解文档中的显式分区分配):

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
            partitions = "#{@finder.partitions('compacted')}"),
            partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    // 处理记录
}

由于我有几个主题,导致生成的代码非常冗长:

@KafkaListener(topicPartitions = {
            @TopicPartition(
                    topic = "${topic1}",
                    partitions = "#{@finder.partitions('${topic1}')}",
                    partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
            ),
            @TopicPartition(
                    topic = "${topic2}",
                    partitions = "#{@finder.partitions('${topic2}')}",
                    partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
            ),
            // 还有很多 @TopicPartition...
            @TopicPartition(
                    topic = "${topicN}",
                    partitions = "#{@finder.partitions('${topicN}')}",
                    partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
            )
    })
    public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
        // 处理记录
    }

如何通过配置 @KafkaListener 注解的 topicPartitions 属性以动态生成 @TopicPartition 数组(每个主题一个)来使这种重复的配置更加简洁?

英文:

I have the names of several compacted Kafka topics (topic1, topic2, ..., topicN) defined in my spring application.yaml file. I want to be able to consume all of the records on each topic partition on startup. The number of partitions on each topic is not known in advance.

The official Spring Kafka 2.6.1 documentation suggests the simplest way to do this is to implement a PartitionFinder and use it in a SpEL expresssion to dynamically look up the number of partitions for a topic, and to then use a * wildcard in the partitions attribute of a @TopicPartition annotation (see Explicit Partition Assignment in the @KafkaListener Annotation documentation):

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
            partitions = "#{@finder.partitions('compacted')}"),
            partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    // process record
}

Since I have several topics, the resulting code is very verbose:

@KafkaListener(topicPartitions = {
        @TopicPartition(
                topic = "${topic1}",
                partitions = "#{@finder.partitions('${topic1}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        ),
        @TopicPartition(
                topic = "${topic2}",
                partitions = "#{@finder.partitions('${topic2}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        ),
        // and many more @TopicPartitions...
        @TopicPartition(
                topic = "${topicN}",
                partitions = "#{@finder.partitions('${topicN}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        )
})
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    // process record
}

How can I make this repetitive configuration more concise by configuring the topicPartitions attribute of the @KafkaListener annotation with a dynamically generated array of @TopicPartions (one for each of my N topics)?

答案1

得分: 2

目前使用 @KafkaListener 是不可能的 - 请在 GitHub 上开启一个新的功能问题。

我能想到的唯一解决方法是通过从容器工厂中编程方式创建侦听器容器并创建侦听器适配器。如果需要,我可以提供一个示例。

编辑

这里是一个示例:

@SpringBootApplication
public class So64022266Application {

	public static void main(String[] args) {
		SpringApplication.run(So64022266Application.class, args);
	}

	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name("so64022266-1").partitions(10).replicas(1).build();
	}

	@Bean
	public NewTopic topic2() {
		return TopicBuilder.name("so64022266-2").partitions(10).replicas(1).build();
	}

	@Bean
	ConcurrentMessageListenerContainer<String, String> container(@Value("${topics}") String[] topics,
			PartitionFinder finder,
			ConcurrentKafkaListenerContainerFactory<String, String> factory,
			MyListener listener) throws Exception {

		MethodKafkaListenerEndpoint<String, String> endpoint = endpoint(topics, finder, listener);
		ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
		container.getContainerProperties().setGroupId("someGroup");
		return container;
	}

	@Bean
	MethodKafkaListenerEndpoint<String, String> endpoint(String[] topics, PartitionFinder finder,
			MyListener listener) throws NoSuchMethodException {

		MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setBean(listener);
		endpoint.setMethod(MyListener.class.getDeclaredMethod("listen", String.class, String.class));
		endpoint.setTopicPartitions(Arrays.stream(topics)
			.flatMap(topic -> finder.partitions(topic))
			.toArray(TopicPartitionOffset[]::new));
		endpoint.setMessageHandlerMethodFactory(methodFactory());
		return endpoint;
	}

	@Bean
	DefaultMessageHandlerMethodFactory methodFactory() {
		return new DefaultMessageHandlerMethodFactory();
	}

	@Bean
	public ApplicationRunner runner(KafkaTemplate<String, String> template,
			ConcurrentMessageListenerContainer<String, String> container) {

		return args -> {
			System.out.println(container.getAssignedPartitions());
			template.send("so64022266-1", "key1", "foo");
			template.send("so64022266-2", "key2", "bar");
		};
	}

}

@Component
class MyListener {

	public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
		System.out.println(key + ":" + payload);
	}

}

@Component
class PartitionFinder {

	private final ConsumerFactory<String, String> consumerFactory;

	public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
		this.consumerFactory = consumerFactory;
	}

	public Stream<TopicPartitionOffset> partitions(String topic) {
		System.out.println("+" + topic + "+");
		try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
			return consumer.partitionsFor(topic).stream()
					.map(part -> new TopicPartitionOffset(topic, part.partition(), 0L));
		}
	}

}
topics=so64022266-1, so64022266-2

如果您需要处理墓碑记录(null 值),我们需要增强处理程序工厂;我们当前不公开框架的处理程序工厂。

英文:

It's not currently possible with @KafkaListener - please open a new feature issue on GitHub.

The only work around I can think of is to programmatically create a listener container from the container factory and create a listener adapter. I can provide an example if you need it.

EDIT

Here is an example:

@SpringBootApplication
public class So64022266Application {

	public static void main(String[] args) {
		SpringApplication.run(So64022266Application.class, args);
	}

	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name(&quot;so64022266-1&quot;).partitions(10).replicas(1).build();
	}

	@Bean
	public NewTopic topic2() {
		return TopicBuilder.name(&quot;so64022266-2&quot;).partitions(10).replicas(1).build();
	}

	@Bean
	ConcurrentMessageListenerContainer&lt;String, String&gt; container(@Value(&quot;${topics}&quot;) String[] topics,
			PartitionFinder finder,
			ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; factory,
			MyListener listener) throws Exception {

		MethodKafkaListenerEndpoint&lt;String, String&gt; endpoint = endpoint(topics, finder, listener);
		ConcurrentMessageListenerContainer&lt;String, String&gt; container = factory.createListenerContainer(endpoint);
		container.getContainerProperties().setGroupId(&quot;someGroup&quot;);
		return container;
	}

	@Bean
	MethodKafkaListenerEndpoint&lt;String, String&gt; endpoint(String[] topics, PartitionFinder finder,
			MyListener listener) throws NoSuchMethodException {

		MethodKafkaListenerEndpoint&lt;String, String&gt; endpoint = new MethodKafkaListenerEndpoint&lt;&gt;();
		endpoint.setBean(listener);
		endpoint.setMethod(MyListener.class.getDeclaredMethod(&quot;listen&quot;, String.class, String.class));
		endpoint.setTopicPartitions(Arrays.stream(topics)
			.flatMap(topic -&gt; finder.partitions(topic))
			.toArray(TopicPartitionOffset[]::new));
		endpoint.setMessageHandlerMethodFactory(methodFactory());
		return endpoint;
	}

	@Bean
	DefaultMessageHandlerMethodFactory methodFactory() {
		return new DefaultMessageHandlerMethodFactory();
	}

	@Bean
	public ApplicationRunner runner(KafkaTemplate&lt;String, String&gt; template,
			ConcurrentMessageListenerContainer&lt;String, String&gt; container) {

		return args -&gt; {
			System.out.println(container.getAssignedPartitions());
			template.send(&quot;so64022266-1&quot;, &quot;key1&quot;, &quot;foo&quot;);
			template.send(&quot;so64022266-2&quot;, &quot;key2&quot;, &quot;bar&quot;);
		};
	}

}

@Component
class MyListener {

	public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
		System.out.println(key + &quot;:&quot; + payload);
	}

}

@Component
class PartitionFinder {

	private final ConsumerFactory&lt;String, String&gt; consumerFactory;

	public PartitionFinder(ConsumerFactory&lt;String, String&gt; consumerFactory) {
		this.consumerFactory = consumerFactory;
	}

	public Stream&lt;TopicPartitionOffset&gt; partitions(String topic) {
		System.out.println(&quot;+&quot; + topic + &quot;+&quot;);
		try (Consumer&lt;String, String&gt; consumer = consumerFactory.createConsumer()) {
			return consumer.partitionsFor(topic).stream()
					.map(part -&gt; new TopicPartitionOffset(topic, part.partition(), 0L));
		}
	}

}
topics=so64022266-1, so64022266-2

If you need to deal with tombstone records (null values) we need to enhance the handler factory; we currently don't expose the framework's handler factory.

huangapple
  • 本文由 发表于 2020年9月23日 14:33:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/64022266.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定