英文:
Can't override successfully in Kafka Connect 2.4 my connectors
问题
你好,我正在寻找如何通过Java代码使用已在2.3版本中发布的新的覆盖策略。
我想创建一个示例,类似于以下内容:
-
创建一个包含10条消息的主题
-
创建一个消费者,消费消息,然后将它们发送到默认的FileSink
-
创建一个覆盖式Sink,不应该从消费者获取数据(它配置为Earliest)
-
生产一条消息,由两个Sink同时消费和获取!
这里是我的SINK(文件)连接器的配置(默认配置):
connectorOut = new FileStreamSinkConnector();
taskOut = new FileStreamSinkTask();
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_LATEST);
sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
connectorOut.start(sinkProperties);
taskOut.initialize(createMock(SinkTaskContext.class));
taskOut.start(connectorOut.taskConfigs(1).get(0));
这里是Earliest(仅变化的部分):
sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_EARLY);
sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
接下来,我将创建一个消费者,从主题中获取消息,然后将这个列表传递给每个连接器的任务:
myLatestOne.getTaskOut().put(data);
myEarlyOne.getTaskOut().put(data);
但看起来我做得不对!因为所有的消息都被每个连接器获取了。
这里是我使用的代码覆盖代码的拉取请求的代码。
如果我漏掉了什么,请随时告诉我!(第一个问题)。
谢谢!
英文:
Hi i'm looking to use the new Override policy, that have been released in 2.3, through java code.
And i want to create an example like this :
-
Create a Topic with 10 messages
-
Create a Consumer that consume Messages and next send them to a default FileSink
-
Create an Override Sink that should not take the data from the Consumer (It's configured as Earliest)
-
Produce a message that is Consume and take by the two Sinks !
Here are the configurations of my SINK (file) connectors (the default one) :
taskOut = new FileStreamSinkTask();
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_LATEST);
sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
connectorOut.start(sinkProperties);
taskOut.initialize(createMock(SinkTaskContext.class));
taskOut.start(connectorOut.taskConfigs(1).get(0));
And here the Earliest (only what is changing) :
sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_EARLY);
sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Next i'm going to create a consumer that take the messages from the Topic as
List< SinkRecord >
I give this list to the tasks of each connectors :
myLatestOne.getTaskOut().put(data);
myEarlyOne.getTaskOut().put(data);
But it looks that i'm not doing the right way ! Because all messages are taken by each connectors
Here the code the pull Request of the code Override code that i'm using.
If i miss something don't hesitate to tell me ! (first question).
Thank's
答案1
得分: 2
所以我用JAVA完成了它。我找到了一种在终端上操作的简单方法:
执行命令
首先启动我们的Zookeeper服务器:
bin/zookeeper-server-start.sh config/zokeeper.properties
然后启动我们的Kafka服务器:
bin/kafka-server-start.sh config/server.properties
我们需要创建一个主题:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
现在我们需要生产消息:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [你的消息]
现在我们可以启动我们的worker,连接了一个连接器。
您可以在配置文件中找到它们的属性。
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.client.config.override.policy=All
connector.client.config.override.policy=All
允许通过连接器覆盖客户端。
这是我们的连接器,选项是earliest
(如果没有保存的偏移量,则从第一个条目开始):
name=local-file-earliest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.earliest.txt
topics=test
consumer.override.auto.offset.reset=earliest
value.converter=org.apache.kafka.connect.storage.StringConverter
sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-early.properties
几秒钟后我们停止它(您可以查看tmp/test.sink.earliest.txt
)。
这次我们添加了一个新的连接器:
name=local-file-latest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.latest.txt
topics=test
consumer.override.auto.offset.reset=latest
value.converter=org.apache.kafka.connect.storage.StringConverter
我们可以同时启动它们:
sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-early.properties config/connect-file-sink-latest.properties
我们可以添加新的消息并检查/tmp/test.sink.latest.txt
是否仅填充了这些消息。
解释
这里的主要思想是能够以不同的方式为每个连接器设置默认可重配置项。因此,我们在此使用了覆盖策略。
英文:
So i gave to do it trough out JAVA. I found a way to do with terminal that is pretty easy :
Command to do
We first launch our server Zookeeper :
bin/zookeeper-server-start.sh config/zokeeper.properties
Next we start our server kafka :
bin/kafka-server-start.sh config/server.properties
We need to create a topic :
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Now we need to produce messages :
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [Your message]
And now we can launch our worker, with 1 connector connected.
You can have their properties in the config file.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.client.config.override.policy=All
connector.client.config.override.policy=All
Allow to override the client by the connector.
Here is our connector with the option earliest
(If there is no offset saved it start from the first entry)
name=local-file-earliest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.earliest.txt
topics=test
consumer.override.auto.offset.reset=earliest
value.converter=org.apache.kafka.connect.storage.StringConverter
sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-early.properties
We stop it few seconds later (you can look at tmp/test.sink.earliest.txt
).
This time we add a new connector :
name=local-file-latest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.latest.txt
topics=test
consumer.override.auto.offset.reset=latest
value.converter=org.apache.kafka.connect.storage.StringConverter
We can launch both of them :
sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-early.properties config/connect-file-sink-latest.properties
We can add new messages and check if /tmp/test.sink.latest.txt
is only fill with those messages.
Explanation
The main idea here is to be able to have a default reconfigurable each connector in a different way. So to do it we are using the add Override Policy
答案2
得分: 0
每个连接器将创建一个新的消费者组ID。如果它们都从相同的主题中读取,那么它们将都会获得所有的消息。
此外,消费者和生产者的覆盖已经在工作程序级别上是可行的,我还没有看到有人像这样自己编写连接器,因为你可以直接使用connect-standalone
。
英文:
Each connector will create a new consumer group ID. If they both read from the same topics, then they will both get all messages
Also, consumer and producer overrides have already been possible at the worker level, and I've not seen anyone write their own connector like this since you could just use connect-standalone
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论