将Debezium服务器到PubSub的delete-operations翻译成破坏应用程序。

huangapple go评论64阅读模式
英文:

Debezium Server to PubSub: delete-operations breaks the application

问题

I want to send notifications about changes in my source databases (Postgres and SQL Server) to PubSub to store the data later in BigQuery. So far so good. I decided to have a look at Debezium Server and deployed it as a Docker container for testing. After some debugging, I received notifications in PubSub for creates and updates. However, when I try to delete records in the database, the server crashes without sending a message to PubSub. In the logs, I can see something like:

{"timestamp":"2023-03-30T17:29:13.722Z","sequence":230,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the task and engine","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}

And this is my application.properties file. The only non-basic thing is the topic routing, I guess, since I didn't want to create a PubSub topic for every table.

debezium.sink.pravega.scope=empty
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=XXXXX
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/tmp/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=XXXXXXXX
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.topic.prefix=streamio23
debezium.source.schema.include.list=inventory
debezium.snapshot.new.tables=parallel
debezium.source.plugin.name=pgoutput
debezium.transforms=Reroute
debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
debezium.transforms.Reroute.topic.regex=(.*)inventory(.*)
debezium.transforms.Reroute.topic.replacement=stream.stream.stream.inventory.orders
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=true
debezium.source.database.history.file.filename=/tmp/FileDatabaseHistory.dat
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
pk.mode=record_key

Maybe someone knows the problem and has a solution for this? The error is the same for Postgres and SQL Server, so I think it might be a problem with the sink. I've already tried multiple configurations for the application.properties file and checked the offset, but the problem remains the same. Thank you in advance!

英文:

i want to send notfications about changes in my source databases (postgres and sql-server) to pubsub to store the data later in bigquery. so far so good. i decided to have a look at debezium server and deployed for testing purposes debezium server as a docker container and created a sql server database and a postgres database.

After some debugging i received my first notifications in pubsub for creates and updates. So far so good.

When i try to delete a records in the database the server crashes without sending a message to pubsub. in the logs i can see something like:

{"timestamp":"2023-03-30T17:29:13.722Z","sequence":230,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the task and engine","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:13.723Z","sequence":231,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"Stopping down connector","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.178Z","sequence":232,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-15-thread-1","threadId":59,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.181Z","sequence":233,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-16-thread-1","threadId":60,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.182Z","sequence":234,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Finished streaming","threadName":"debezium-postgresconnector-streamio23-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.183Z","sequence":235,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Connected metrics set to 'false'","threadName":"debezium-postgresconnector-streamio23-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"postgres","dbz.connectorName":"streamio23","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.199Z","sequence":236,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1}
{"timestamp":"2023-03-30T17:29:14.201Z","sequence":237,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"ERROR","message":"Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.', error = 'io.debezium.DebeziumException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.'","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"034bf7902d51","processName":"io.debezium.server.Main","processId":1,"exception":{"refId":1,"exceptionType":"io.debezium.DebeziumException","message":"java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.","frames":[{"class":"io.debezium.server.pubsub.PubSubChangeConsumer","method":"handleBatch","line":257},{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":101},{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":913},{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":229},{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":170},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}],"causedBy":{"exception":{"refId":2,"exceptionType":"java.util.concurrent.ExecutionException","message":"com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute."

and this is my application.properties-file. THe only nonbasic thing is the topic-routing i guess since i didnt want to create a pubsub topic for every table.

debezium.sink.pravega.scope=empty
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=XXXXX
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/tmp/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=XXXXXXXX
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.topic.prefix=streamio23
debezium.source.schema.include.list=inventory
debezium.snapshot.new.tables=parallel
debezium.source.plugin.name=pgoutput
debezium.transforms=Reroute
debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
debezium.transforms.Reroute.topic.regex=(.*)inventory(.*)
debezium.transforms.Reroute.topic.replacement=stream.stream.stream.inventory.orders
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=true
debezium.source.database.history.file.filename=/tmp/FileDatabaseHistory.dat
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
pk.mode=record_key

Maybe someone know the problem and has a solution for this? The error is the same for postgres and mssql so i think its a problem with the sink?

I already tried multiple configurations for the application.properties file and checked the offset. But the problem was always the same.

Thank you in advance!

答案1

得分: 2

我遇到了相同的问题,并通过设置以下方式解决:

```debezium.source.tombstones.on.delete=false```

[这里](https://debezium.io/documentation/reference/1.2/connectors/postgresql.html#postgresql-property-tombstones-on-delete) 可以查看关于墓碑事件的说明。简而言之,它是用于启用Kafka主题的日志压缩以实际删除事件的功能。对于发布订阅(pubsub)来说,这并不理想。
英文:

I had the same issue, and solved by setting:

debezium.source.tombstones.on.delete=false

here you can see the explanation of what the tombstone event is for. Briefly, it's a feature for enabling kafka topics with log compaction to actually delete the event. This is not desireable for pubsub.

huangapple
  • 本文由 发表于 2023年3月31日 01:57:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/75891500.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定