从MSK流式传输到RDS PostgreSQL使用MSK连接器

huangapple go评论90阅读模式
英文:

Stream from MSK to RDS PostgreSQL with MSK Connector

问题

我一直在这个问题上纠结了几天了。我使用kafkajs将数据发送到Kafka。每次我生产一条消息时,我为消息的key值分配一个UUID,然后将消息的value设置为类似这样的事件,然后对其进行字符串化:

  1. // 生产者是用TypeScript编写的
  2. const event = {
  3. eventtype: "event1",
  4. eventversion: "1.0.1",
  5. sourceurl: "https://some-url.com/source"
  6. };
  7. // 字符串化,因为kafkajs生产者只接受`string`或`Buffer`
  8. const stringifiedEvent = JSON.stringify(event);

我使用以下配置启动我的connect-standalone JDBC Sink Connector:

  1. # connect-standalone.properties
  2. name=local-jdbc-sink-connector
  3. connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
  4. dialect.name=PostgreSqlDatabaseDialect
  5. connection.url=jdbc:postgresql://postgres:5432/eventservice
  6. connection.password=postgres
  7. connection.user=postgres
  8. auto.create=true
  9. auto.evolve=true
  10. topics=topic1
  11. tasks.max=1
  12. insert.mode=upsert
  13. pk.mode=record_key
  14. pk.fields=id
  1. # worker.properties
  2. offset.storage.file.filename=/tmp/connect.offsets
  3. offset.flush.interval.ms=10000
  4. value.converter=org.apache.kafka.connect.json.JsonConverter
  5. value.converter.schemas.enable=false
  6. value.converter.schema.registry.url=http://schema-registry:8081
  7. key.converter=org.apache.kafka.connect.storage.StringConverter
  8. key.converter.schemas.enable=false
  9. bootstrap.servers=localhost:9092
  10. group.id=jdbc-sink-connector-worker
  11. worker.id=jdbc-sink-worker-1
  12. offset.storage.topic=connect-offsets
  13. offset.storage.replication.factor=1
  14. config.storage.topic=connect-configs
  15. config.storage.replication.factor=1
  16. status.storage.topic=connect-status
  17. status.storage.replication.factor=1

当我使用connect-standalone worker.properties和connect-standalone.properties启动连接器时,它可以顺利运行并连接到PostgreSQL。然而,当我生产一个事件时,它失败,并显示以下错误消息:

  1. WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception.
  2. Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
  3. connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records
  4. with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema.
  5. (org.apache.kafka.connect.runtime.WorkerSinkTask:609)

附带以下堆栈跟踪:

  1. org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with
  2. 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and
  3. non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254)
  4. with a HashMap value and null value schema.
  5. at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
  6. at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
  7. at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
  8. at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
  9. at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
  10. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
  11. at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
  12. at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
  13. at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
  14. at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
  15. at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
  16. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  17. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  18. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  19. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  20. at java.base/java.lang.Thread.run(Thread.java:829)

我一直在不断尝试解决问题,但不确定出了什么问题。一个解决方案导致另一个错误,而新错误的解决方案又导致了之前的错误。正确的配置是什么?我该如何解决这个问题?

英文:

I've been going in circles with this for a few days now. I'm sending data to Kafka using kafkajs. Each time I produce a message, I assign a UUID to the message.key value, and the the message.value is set to an event like this and then stringified:

  1. // the producer is written in typescript
  2. const event = {
  3. eventtype: "event1",
  4. eventversion: "1.0.1",
  5. sourceurl: "https://some-url.com/source"
  6. };
  7. // stringified because the kafkajs producer only accepts `string` or `Buffer`
  8. const stringifiedEvent = JSON.stringify(event);

I start my connect-standalone JDBC Sink Connector with the following configurations:

  1. # connect-standalone.properties
  2. name=local-jdbc-sink-connector
  3. connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
  4. dialect.name=PostgreSqlDatabaseDialect
  5. connection.url=jdbc:postgresql://postgres:5432/eventservice
  6. connection.password=postgres
  7. connection.user=postgres
  8. auto.create=true
  9. auto.evolve=true
  10. topics=topic1
  11. tasks.max=1
  12. insert.mode=upsert
  13. pk.mode=record_key
  14. pk.fields=id
  1. # worker.properties
  2. offset.storage.file.filename=/tmp/connect.offsets
  3. offset.flush.interval.ms=10000
  4. value.converter=org.apache.kafka.connect.json.JsonConverter
  5. value.converter.schemas.enable=false
  6. value.converter.schema.registry.url=http://schema-registry:8081
  7. key.converter=org.apache.kafka.connect.storage.StringConverter
  8. key.converter.schemas.enable=false
  9. bootstrap.servers=localhost:9092
  10. group.id=jdbc-sink-connector-worker
  11. worker.id=jdbc-sink-worker-1
  12. offset.storage.topic=connect-offsets
  13. offset.storage.replication.factor=1
  14. config.storage.topic=connect-configs
  15. config.storage.replication.factor=1
  16. status.storage.topic=connect-status
  17. status.storage.replication.factor=1

When I start the connector with connect-standalone worker.properties connect-standalone.properties, it spins up and connects to PostgreSQL with no issue. However, when I produce an event, it fails with this error message:

  1. WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception.
  2. Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
  3. connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records
  4. with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema.
  5. (org.apache.kafka.connect.runtime.WorkerSinkTask:609)

With this stack trace:

  1. org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with
  2. 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and
  3. non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254)
  4. with a HashMap value and null value schema.
  5. at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
  6. at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
  7. at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
  8. at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
  9. at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
  10. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
  11. at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
  12. at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
  13. at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
  14. at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
  15. at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
  16. at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  17. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  18. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  19. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  20. at java.base/java.lang.Thread.run(Thread.java:829)

I've been going back and forth trying to get it to read my messages, but I'm not sure what is going wrong. One solution just leads to another error, and the solution for the new error leads back to the previous error. What is the correct configuration? How do I resolve this?

答案1

得分: 0

为了读取和解析JSON数据,然后将其写入PostgreSQL,您需要必须传递一个模式。首先,您需要更改Kafka消息值的结构:

  1. {
  2. "schema": { "..." },
  3. "payload": { "..." }
  4. };

其中模式是定义负载将包含什么的JSON模式。例如,顶级字段schema将包含以下内容:

  1. {
  2. "type": "struct",
  3. "name": "schema-name",
  4. "fields": [
  5. { "type": "string", "optional": false, "field": "source" },
  6. { "type": "string", "optional": false, "field": "message" }
  7. ]
  8. }

然后,payload顶级字段将包含类似以下内容:

  1. {
  2. "source": "https://example.com/",
  3. "message": "establishing connection..."
  4. }

然后,您可以将这个值传递给您的kafkajs生产者:

  1. producer.send({
  2. topic: "topic1",
  3. messages: [
  4. {
  5. key: "key",
  6. value: {
  7. schema: {
  8. type: "struct",
  9. name: "schema-name",
  10. fields: [
  11. { type: "string", optional: false, field: "source" },
  12. { type: "string", optional: false, field: "message" }
  13. ]
  14. },
  15. payload: {
  16. source: "https://example.com/",
  17. message: "establishing connection..."
  18. }
  19. }
  20. }
  21. ]
  22. });

现在,消息已经配置好,您需要对worker.properties文件进行以下更改:

  1. # key converters are just for strings, because the Kafka message key is a string
  2. key.converter=org.apache.kafka.connect.storage.StringConverter
  3. key.converter.schemas.enable=false
  4. # values are in JSON, and a schema is passed, so "schemas.enable" must be "true"
  5. value.converter=org.apache.kafka.connect.json.JsonConverter
  6. value.converter.schemas.enable=true

并且您需要对connect-standalone.properties文件进行以下更改:

  1. # this must be insert, because "upsert" requires that a
  2. # primary key be provided by the message, either by the Kafka
  3. # message value or key
  4. insert.mode=insert
  5. # pk.mode=none because you are not writing a primary key
  6. # using the sink connector - each record generates a serial PK value
  7. pk.mode=none
  8. # delete.enabled=false because we are not treating null fields as deletes
  9. delete.enabled=false

进行这些更改后,您的配置文件将如下所示:

  1. # connect-standalone.properties
  2. name=local-jdbc-sink-connector
  3. connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
  4. dialect.name=PostgreSqlDatabaseDialect
  5. connection.url=jdbc:postgresql://<host>:<port>/<database>
  6. connection.password=<password>
  7. connection.user=<user>
  8. auto.create=true
  9. auto.evolve=true
  10. topics=<topics>
  11. tasks.max=1
  12. insert.mode=insert
  13. delete.enabled=false
  14. pk.mode=none
  15. consumer.auto.offset.reset=latest
  1. # worker.properties
  2. offset.storage.file.filename=/tmp/connect.offsets
  3. offset.flush.interval.ms=10000
  4. value.converter=org.apache.kafka.connect.json.JsonConverter
  5. value.converter.schemas.enable=true
  6. key.converter=org.apache.kafka.connect.storage.StringConverter
  7. key.converter.schemas.enable=false
  8. bootstrap.servers=<brokers>
  9. group.id=jdbc-sink-connector-worker
  10. worker.id=jdbc-sink-worker-1
  11. consumer.auto.offset.reset=latest
  12. offset.storage.topic=connect-offsets
  13. offset.storage.replication.factor=1
  14. config.storage.topic=connect-configs
  15. config.storage.replication.factor=1
  16. status.storage.topic=connect-status
  17. status.storage.replication.factor=1

这将使您的生产者能够发送到集群,同时Sink Connector将能够读取和解析您的JSON值,并将它们写入Postgres。为了提供更多细节,您的数据库表将如下所示:

  1. CREATE TABLE IF NOT EXISTS table1(
  2. id SERIAL PRIMARY KEY,
  3. origin varchar(132),
  4. message text
  5. );
英文:

In order to read and parse JSON data then subsequently write it to PostgreSQL, you are required to pass a schema. First of all, you need to change your Kafka message value structure:

  1. {
  2. &quot;schema&quot;: { &quot;...&quot; }
  3. &quot;payload&quot;: { &quot;...&quot; }
  4. };

Where the schema is a JSON schema that defines what the payload would contain. For example, the top-level field schema would contain the following:

  1. {
  2. &quot;type&quot;: &quot;struct&quot;,
  3. &quot;name&quot;: &quot;schema-name&quot;,
  4. &quot;fields&quot;: [
  5. { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot; false, &quot;field&quot;: &quot;source&quot; },
  6. { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot; false, &quot;field&quot;: &quot;message&quot; }
  7. ]
  8. }

The payload top-level field would then contain something similar to the following:

  1. {
  2. &quot;source&quot;: &quot;https://example.com/&quot;,
  3. &quot;message&quot;: &quot;establishing connection...&quot;
  4. }

You can then pass this to your kafkajs producer:

  1. producer.send({
  2. topic: &quot;topic1&quot;,
  3. messages: [
  4. {
  5. key: &quot;key&quot;,
  6. value: {
  7. schema: {
  8. type: &quot;struct&quot;,
  9. name: &quot;schema-name&quot;,
  10. fields: [
  11. { type: &quot;string&quot;, optional false, field: &quot;source&quot; },
  12. { type: &quot;string&quot;, optional false, field: &quot;message&quot; }
  13. ]
  14. },
  15. payload: {
  16. source: &quot;https://example.com/&quot;,
  17. message: &quot;establishing connection...&quot;
  18. }
  19. }
  20. }
  21. ]
  22. });

Now that the message is configured, you need to make these changes to your worker.properties file:

  1. # key converters are just for strings, because the Kafka message key is a string
  2. key.converter=org.apache.kafka.connect.storage.StringConverter
  3. key.converter.schemas.enable=false
  4. # values are in JSON, and a schema is passed, so &quot;schemas.enable&quot; must be &quot;true&quot;
  5. value.converter=org.apache.kafka.connect.json.JsonConverter
  6. value.converter.schemas.enable=true

And you need to make these changes to your connect-standalone.properties file:

  1. # this must be insert, because &quot;upsert&quot; requires that a
  2. # primary key be provided by the message, either by the Kafka
  3. # message value or key
  4. insert.mode=insert
  5. # pk.mode=none because you are not writing a primary key
  6. # using the sink connector - each record generates a serial PK value
  7. pk.mode=none
  8. # delete.enabled=false because we are not treating null fields as deletes
  9. delete.enabled=false

Make those changes, and you're configuration files will look like the following:

  1. # connect-standalone.properties
  2. name=local-jdbc-sink-connector
  3. connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
  4. dialect.name=PostgreSqlDatabaseDialect
  5. connection.url=jdbc:postgresql://&lt;host&gt;:&lt;port&gt;/&lt;database&gt;
  6. connection.password=&lt;password&gt;
  7. connection.user=&lt;user&gt;
  8. auto.create=true
  9. auto.evolve=true
  10. topics=&lt;topics&gt;
  11. tasks.max=1
  12. insert.mode=insert
  13. delete.enabled=false
  14. pk.mode=none
  15. consumer.auto.offset.reset=latest
  1. # worker.properties
  2. offset.storage.file.filename=/tmp/connect.offsets
  3. offset.flush.interval.ms=10000
  4. value.converter=org.apache.kafka.connect.json.JsonConverter
  5. value.converter.schemas.enable=true
  6. key.converter=org.apache.kafka.connect.storage.StringConverter
  7. key.converter.schemas.enable=false
  8. bootstrap.servers=&lt;brokers&gt;
  9. group.id=jdbc-sink-connector-worker
  10. worker.id=jdbc-sink-worker-1
  11. consumer.auto.offset.reset=latest
  12. offset.storage.topic=connect-offsets
  13. offset.storage.replication.factor=1
  14. config.storage.topic=connect-configs
  15. config.storage.replication.factor=1
  16. status.storage.topic=connect-status
  17. status.storage.replication.factor=1

This will enabled your producer to send to your cluster, and the Sink Connector will be able to read and parse your JSON values and write them to Postgres. For added detail, your database table would look like this:

  1. CREATE TABLE IF NOT EXISTS table1(
  2. id SERIAL PRIMARY KEY,
  3. origin varchar(132),
  4. message text
  5. );

huangapple
  • 本文由 发表于 2023年2月14日 04:11:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/75440735.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定