从MSK流式传输到RDS PostgreSQL使用MSK连接器

huangapple go评论61阅读模式
英文:

Stream from MSK to RDS PostgreSQL with MSK Connector

问题

我一直在这个问题上纠结了几天了。我使用kafkajs将数据发送到Kafka。每次我生产一条消息时,我为消息的key值分配一个UUID,然后将消息的value设置为类似这样的事件,然后对其进行字符串化:

// 生产者是用TypeScript编写的
const event = {
    eventtype: "event1",
    eventversion: "1.0.1",
    sourceurl: "https://some-url.com/source"
};
// 字符串化,因为kafkajs生产者只接受`string`或`Buffer`
const stringifiedEvent = JSON.stringify(event);

我使用以下配置启动我的connect-standalone JDBC Sink Connector:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://postgres:5432/eventservice
connection.password=postgres
connection.user=postgres

auto.create=true
auto.evolve=true
topics=topic1
tasks.max=1
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
value.converter.schema.registry.url=http://schema-registry:8081 

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=localhost:9092
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

当我使用connect-standalone worker.properties和connect-standalone.properties启动连接器时,它可以顺利运行并连接到PostgreSQL。然而,当我生产一个事件时,它失败,并显示以下错误消息:

WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records 
with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:609)

附带以下堆栈跟踪:

org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with 
'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and 
non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254) 
with a HashMap value and null value schema.
	at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
	at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

我一直在不断尝试解决问题,但不确定出了什么问题。一个解决方案导致另一个错误,而新错误的解决方案又导致了之前的错误。正确的配置是什么?我该如何解决这个问题?

英文:

I've been going in circles with this for a few days now. I'm sending data to Kafka using kafkajs. Each time I produce a message, I assign a UUID to the message.key value, and the the message.value is set to an event like this and then stringified:

// the producer is written in typescript
const event = {
    eventtype: "event1",
    eventversion: "1.0.1",
    sourceurl: "https://some-url.com/source"
};
// stringified because the kafkajs producer only accepts `string` or `Buffer`
const stringifiedEvent = JSON.stringify(event);

I start my connect-standalone JDBC Sink Connector with the following configurations:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://postgres:5432/eventservice
connection.password=postgres
connection.user=postgres

auto.create=true
auto.evolve=true
topics=topic1
tasks.max=1
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
value.converter.schema.registry.url=http://schema-registry:8081 

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=localhost:9092
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

When I start the connector with connect-standalone worker.properties connect-standalone.properties, it spins up and connects to PostgreSQL with no issue. However, when I produce an event, it fails with this error message:

WorkerSinkTask{id=local-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted. Error: Sink connector 'local-jdbc-sink-
connector' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records 
with a non-null Struct value and non-null Struct schema, but found record at (topic='topic1',partition=0,offset=0,timestamp=1676309784254) with a HashMap value and null value schema. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:609)

With this stack trace:

org.apache.kafka.connect.errors.ConnectException: Sink connector 'local-jdbc-sink-connector' is configured with 
'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and 
non-null Struct schema, but found record at (topic='txningestion2',partition=0,offset=0,timestamp=1676309784254) 
with a HashMap value and null value schema.
	at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
	at io.confluent.connect.jdbc.sink.RecordValidator.lambda$and$1(RecordValidator.java:41)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:81)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

I've been going back and forth trying to get it to read my messages, but I'm not sure what is going wrong. One solution just leads to another error, and the solution for the new error leads back to the previous error. What is the correct configuration? How do I resolve this?

答案1

得分: 0

为了读取和解析JSON数据,然后将其写入PostgreSQL,您需要必须传递一个模式。首先,您需要更改Kafka消息值的结构:

{
  "schema": { "..." },
  "payload": { "..." }
};

其中模式是定义负载将包含什么的JSON模式。例如,顶级字段schema将包含以下内容:

{
  "type": "struct",
  "name": "schema-name",
  "fields": [
    { "type": "string", "optional": false, "field": "source" },
    { "type": "string", "optional": false, "field": "message" }
  ]
}

然后,payload顶级字段将包含类似以下内容:

{
  "source": "https://example.com/",
  "message": "establishing connection..."
}

然后,您可以将这个值传递给您的kafkajs生产者:

producer.send({
  topic: "topic1",
  messages: [
    { 
      key: "key", 
      value: {
        schema: {
          type: "struct",
          name: "schema-name",
          fields: [
            { type: "string", optional: false, field: "source" },
            { type: "string", optional: false, field: "message" }
          ]
        },
        payload: {
          source: "https://example.com/",
          message: "establishing connection..."
        }
      }
    }
  ]  
});

现在,消息已经配置好,您需要对worker.properties文件进行以下更改:

# key converters are just for strings, because the Kafka message key is a string
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

# values are in JSON, and a schema is passed, so "schemas.enable" must be "true"
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

并且您需要对connect-standalone.properties文件进行以下更改:

# this must be insert, because "upsert" requires that a 
# primary key be provided by the message, either by the Kafka 
# message value or key
insert.mode=insert

# pk.mode=none because you are not writing a primary key 
# using the sink connector - each record generates a serial PK value
pk.mode=none

# delete.enabled=false because we are not treating null fields as deletes
delete.enabled=false

进行这些更改后,您的配置文件将如下所示:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://<host>:<port>/<database>
connection.password=<password>
connection.user=<user>
auto.create=true
auto.evolve=true
topics=<topics>
tasks.max=1
insert.mode=insert
delete.enabled=false
pk.mode=none
consumer.auto.offset.reset=latest
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=<brokers>
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1
consumer.auto.offset.reset=latest

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

这将使您的生产者能够发送到集群,同时Sink Connector将能够读取和解析您的JSON值,并将它们写入Postgres。为了提供更多细节,您的数据库表将如下所示:

CREATE TABLE IF NOT EXISTS table1(
  id SERIAL PRIMARY KEY,
  origin varchar(132),
  message text
);
英文:

In order to read and parse JSON data then subsequently write it to PostgreSQL, you are required to pass a schema. First of all, you need to change your Kafka message value structure:

{
  &quot;schema&quot;: { &quot;...&quot; }
  &quot;payload&quot;: { &quot;...&quot; }
};

Where the schema is a JSON schema that defines what the payload would contain. For example, the top-level field schema would contain the following:

{
  &quot;type&quot;: &quot;struct&quot;,
  &quot;name&quot;: &quot;schema-name&quot;,
  &quot;fields&quot;: [
    { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot; false, &quot;field&quot;: &quot;source&quot; },
    { &quot;type&quot;: &quot;string&quot;, &quot;optional&quot; false, &quot;field&quot;: &quot;message&quot; }
  ]
}

The payload top-level field would then contain something similar to the following:

{
  &quot;source&quot;: &quot;https://example.com/&quot;,
  &quot;message&quot;: &quot;establishing connection...&quot;
}

You can then pass this to your kafkajs producer:

producer.send({
  topic: &quot;topic1&quot;,
  messages: [
    { 
      key: &quot;key&quot;, 
      value: {
        schema: {
          type: &quot;struct&quot;,
          name: &quot;schema-name&quot;,
          fields: [
            { type: &quot;string&quot;, optional false, field: &quot;source&quot; },
            { type: &quot;string&quot;, optional false, field: &quot;message&quot; }
          ]
        },
        payload: {
          source: &quot;https://example.com/&quot;,
          message: &quot;establishing connection...&quot;
        }
      }
    }
  ]  
});

Now that the message is configured, you need to make these changes to your worker.properties file:

# key converters are just for strings, because the Kafka message key is a string
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

# values are in JSON, and a schema is passed, so &quot;schemas.enable&quot; must be &quot;true&quot;
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

And you need to make these changes to your connect-standalone.properties file:

# this must be insert, because &quot;upsert&quot; requires that a 
# primary key be provided by the message, either by the Kafka 
# message value or key
insert.mode=insert

# pk.mode=none because you are not writing a primary key 
# using the sink connector - each record generates a serial PK value
pk.mode=none

# delete.enabled=false because we are not treating null fields as deletes
delete.enabled=false

Make those changes, and you're configuration files will look like the following:

# connect-standalone.properties
name=local-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
dialect.name=PostgreSqlDatabaseDialect
connection.url=jdbc:postgresql://&lt;host&gt;:&lt;port&gt;/&lt;database&gt;
connection.password=&lt;password&gt;
connection.user=&lt;user&gt;
auto.create=true
auto.evolve=true
topics=&lt;topics&gt;
tasks.max=1
insert.mode=insert
delete.enabled=false
pk.mode=none
consumer.auto.offset.reset=latest
# worker.properties
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

bootstrap.servers=&lt;brokers&gt;
group.id=jdbc-sink-connector-worker
worker.id=jdbc-sink-worker-1
consumer.auto.offset.reset=latest

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1

This will enabled your producer to send to your cluster, and the Sink Connector will be able to read and parse your JSON values and write them to Postgres. For added detail, your database table would look like this:

CREATE TABLE IF NOT EXISTS table1(
  id SERIAL PRIMARY KEY,
  origin varchar(132),
  message text
);

huangapple
  • 本文由 发表于 2023年2月14日 04:11:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/75440735.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定