无法在Kafka Connect 2.4中成功覆盖我的连接器。

huangapple go评论76阅读模式
英文:

Can't override successfully in Kafka Connect 2.4 my connectors

问题

你好,我正在寻找如何通过Java代码使用已在2.3版本中发布的新的覆盖策略。

我想创建一个示例,类似于以下内容:

  • 创建一个包含10条消息的主题

  • 创建一个消费者,消费消息,然后将它们发送到默认的FileSink

  • 创建一个覆盖式Sink,不应该从消费者获取数据(它配置为Earliest)

  • 生产一条消息,由两个Sink同时消费和获取!

这里是我的SINK(文件)连接器的配置(默认配置):

connectorOut = new FileStreamSinkConnector();
taskOut = new FileStreamSinkTask();
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_LATEST);
sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
connectorOut.start(sinkProperties);
taskOut.initialize(createMock(SinkTaskContext.class));
taskOut.start(connectorOut.taskConfigs(1).get(0));

这里是Earliest(仅变化的部分):

sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_EARLY);
sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

接下来,我将创建一个消费者,从主题中获取消息,然后将这个列表传递给每个连接器的任务:

myLatestOne.getTaskOut().put(data);
myEarlyOne.getTaskOut().put(data);

但看起来我做得不对!因为所有的消息都被每个连接器获取了。

这里是我使用的代码覆盖代码的拉取请求的代码。

如果我漏掉了什么,请随时告诉我!(第一个问题)。

谢谢!

英文:

Hi i'm looking to use the new Override policy, that have been released in 2.3, through java code.

And i want to create an example like this :

  • Create a Topic with 10 messages

  • Create a Consumer that consume Messages and next send them to a default FileSink

  • Create an Override Sink that should not take the data from the Consumer (It's configured as Earliest)

  • Produce a message that is Consume and take by the two Sinks !

Here are the configurations of my SINK (file) connectors (the default one) :

        taskOut = new FileStreamSinkTask();
        Map&lt;String, String&gt; sinkProperties = new HashMap&lt;&gt;();
        sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_LATEST);
        sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
        sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;latest&quot;);
        connectorOut.start(sinkProperties);
        taskOut.initialize(createMock(SinkTaskContext.class));
        taskOut.start(connectorOut.taskConfigs(1).get(0));

And here the Earliest (only what is changing) :

     sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_EARLY);
        sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
        sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);

Next i'm going to create a consumer that take the messages from the Topic as
List< SinkRecord >

I give this list to the tasks of each connectors :

        myLatestOne.getTaskOut().put(data);
        myEarlyOne.getTaskOut().put(data);

But it looks that i'm not doing the right way ! Because all messages are taken by each connectors

Here the code the pull Request of the code Override code that i'm using.

If i miss something don't hesitate to tell me ! (first question).

Thank's

答案1

得分: 2

所以我用JAVA完成了它。我找到了一种在终端上操作的简单方法:

执行命令

首先启动我们的Zookeeper服务器:

 bin/zookeeper-server-start.sh config/zokeeper.properties

然后启动我们的Kafka服务器:

bin/kafka-server-start.sh config/server.properties

我们需要创建一个主题:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

现在我们需要生产消息:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
&gt; [你的消息]

现在我们可以启动我们的worker,连接了一个连接器。
您可以在配置文件中找到它们的属性。

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.client.config.override.policy=All 

connector.client.config.override.policy=All 允许通过连接器覆盖客户端。

这是我们的连接器,选项是earliest(如果没有保存的偏移量,则从第一个条目开始):

name=local-file-earliest-sink 
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.earliest.txt
topics=test
consumer.override.auto.offset.reset=earliest
value.converter=org.apache.kafka.connect.storage.StringConverter
sudo ./bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink-early.properties

几秒钟后我们停止它(您可以查看tmp/test.sink.earliest.txt)。

这次我们添加了一个新的连接器:

name=local-file-latest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.latest.txt
topics=test
consumer.override.auto.offset.reset=latest
value.converter=org.apache.kafka.connect.storage.StringConverter

我们可以同时启动它们:

sudo ./bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink-early.properties config/connect-file-sink-latest.properties 

我们可以添加新的消息并检查/tmp/test.sink.latest.txt是否仅填充了这些消息。

解释

这里的主要思想是能够以不同的方式为每个连接器设置默认可重配置项。因此,我们在此使用了覆盖策略

英文:

So i gave to do it trough out JAVA. I found a way to do with terminal that is pretty easy :

Command to do

We first launch our server Zookeeper :

 bin/zookeeper-server-start.sh config/zokeeper.properties

Next we start our server kafka :

bin/kafka-server-start.sh config/server.properties

We need to create a topic :

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

Now we need to produce messages :

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
&gt; [Your message]

And now we can launch our worker, with 1 connector connected.
You can have their properties in the config file.

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.client.config.override.policy=All 

connector.client.config.override.policy=All Allow to override the client by the connector.

Here is our connector with the option earliest (If there is no offset saved it start from the first entry)

name=local-file-earliest-sink 
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.earliest.txt
topics=test
consumer.override.auto.offset.reset=earliest
value.converter=org.apache.kafka.connect.storage.StringConverter
sudo ./bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink-early.properties

We stop it few seconds later (you can look at tmp/test.sink.earliest.txt ).

This time we add a new connector :

name=local-file-latest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.latest.txt
topics=test
consumer.override.auto.offset.reset=latest
value.converter=org.apache.kafka.connect.storage.StringConverter

We can launch both of them :

sudo ./bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink-early.properties config/connect-file-sink-latest.properties 

We can add new messages and check if /tmp/test.sink.latest.txt is only fill with those messages.

Explanation

The main idea here is to be able to have a default reconfigurable each connector in a different way. So to do it we are using the add Override Policy

答案2

得分: 0

每个连接器将创建一个新的消费者组ID。如果它们都从相同的主题中读取,那么它们将都会获得所有的消息。

此外,消费者和生产者的覆盖已经在工作程序级别上是可行的,我还没有看到有人像这样自己编写连接器,因为你可以直接使用connect-standalone

英文:

Each connector will create a new consumer group ID. If they both read from the same topics, then they will both get all messages

Also, consumer and producer overrides have already been possible at the worker level, and I've not seen anyone write their own connector like this since you could just use connect-standalone

huangapple
  • 本文由 发表于 2020年4月9日 22:55:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/61124045.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定