英文:
NPE when encrypting fields for SQLServer databases using debezium
问题
我正在使用debezium将SQLServer的变更数据捕获(CDC)到kafka,根据业务需求,部分列必须加密。
就环境而言,我在K8S上运行了2个kafka-connect实例,并且总共有大约50个连接器在从SQL-Server到Kafka的流数据中运行。
以下是连接器JSON文件的片段
{
"name": "live.sql.users",
...
"transforms.unwrap.delete.handling.mode": "drop",
"transforms": "unwrap,cipher",
"predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.cipher.predicate": "isTombstone",
"transforms.cipher.negate": "true",
"transforms.cipher.cipher_data_keys": "[ { "identifier": "my-key", "material": { "primaryKeyId": 1000000001, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "GhDLeulEJRDC8/19NMUXqw2jK", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 2000000002, "outputPrefixType": "TINK" } ] } } ]",
"transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "ENCRYPT",
"predicates": "isTombstone",
"transforms.cipher.field_config": "[{"name":"Password"},{"name":"MobNumber"}, {"name":"UserName"}]",
"transforms.cipher.cipher_data_key_identifier": "my-key"
...
}
当我应用它时,几秒钟后,在调用/connectors/<connector_name>/status API时,我收到以下错误信息
org.apache.kafka.connect.errors.ConnectException: 在错误处理程序中超出了容忍度\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:346)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)\n\t
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\t
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: 错误:加密字段路径'UserName'的数据'deleted605'的加密失败\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:90)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.lambda$matchFields$0(SchemaawareRecordHandler.java:73)\n\t
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\t
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.matchFields(SchemaawareRecordHandler.java:50)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:163)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:140)\n\t
at org.apache.kafka.connect.runtime.PredicatedTransformation.apply(PredicatedTransformation.java:56)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t
... 11 more\nCaused by: java.lang.NullPointerException\n\t
at com.esotericsoftware.kryo.util.DefaultGenerics.nextGenericTypes(DefaultGenerics.java:77)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.pushTypeVariables(FieldSerializer.java:144)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:102)\n\t
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:627)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:75)\n\t
... 21 more\n
需要注意的是,相同的配置在其他连接器中运行时没有问题。
英文:
I am using debezium to do a CDC from SQLServer to kafka, and as per the business needs, some of the columns must be encrypted.
For the environment POV, I have 2 kafka-connect instances running on K8S, and I have in total around 50 connectors running that stream data from SQL-Server to Kafka.
Here is the snippet of the connector json file
{
"name": "live.sql.users",
...
"transforms.unwrap.delete.handling.mode": "drop",
"transforms": "unwrap,cipher",
"predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.cipher.predicate": "isTombstone",
"transforms.cipher.negate": "true",
"transforms.cipher.cipher_data_keys": "[ { \"identifier\": \"my-key\", \"material\": { \"primaryKeyId\": 1000000001, \"key\": [ { \"keyData\": { \"typeUrl\": \"type.googleapis.com/google.crypto.tink.AesGcmKey\", \"value\": \"GhDLeulEJRDC8/19NMUXqw2jK\", \"keyMaterialType\": \"SYMMETRIC\" }, \"status\": \"ENABLED\", \"keyId\": 2000000002, \"outputPrefixType\": \"TINK\" } ] } } ]",
"transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "ENCRYPT",
"predicates": "isTombstone",
"transforms.cipher.field_config": "[{\"name\":\"Password\"},{\"name\":\"MobNumber\"}, {\"name\":\"UserName\"}]",
"transforms.cipher.cipher_data_key_identifier": "my-key"
...
}
and when I applied it, after few seconds I got the below error, when I call the /connectors/<connector_name>/status api
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:346)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)\n\t
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\t
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: error: ENCRYPT of field path 'UserName' having data 'deleted605' failed unexpectedly\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:90)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.lambda$matchFields$0(SchemaawareRecordHandler.java:73)\n\t
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\t
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.matchFields(SchemaawareRecordHandler.java:50)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:163)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:140)\n\t
at org.apache.kafka.connect.runtime.PredicatedTransformation.apply(PredicatedTransformation.java:56)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t
... 11 more\nCaused by: java.lang.NullPointerException\n\t
at com.esotericsoftware.kryo.util.DefaultGenerics.nextGenericTypes(DefaultGenerics.java:77)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.pushTypeVariables(FieldSerializer.java:144)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:102)\n\t
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:627)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:75)\n\t
... 21 more\n
Knowing that, the same configs working with other connectors with no problems
答案1
得分: 1
经过进一步调试并查看Kryo库,发现Kryo
类不是线程安全的,根据Kryo的文档:
> Kryo不是线程安全的。每个线程应该有自己的Kryo、Input和Output实例。
我在kryptonite仓库上开了一个帖子,主要的提交者确认它不支持多线程,唯一的解决办法是使用单独的连接器实例或连接池(完整的帖子),但由于我有50多个连接器同时运行,这种方式不可行。
关于Kryo实例的连接池选项,这里有一个指南,但我没有尝试过。
希望这能帮助到遇到相同问题或将来可能会遇到这个问题的人。
【更新】
现在,kryptonite-for-kafka支持了Kryo实例的连接池选项,感谢Hans的快速响应,我测试了这个改变,连续4天都正常工作。
英文:
After further debugging and looking into Kryo library, it ends up that Kryo
class is not thread-safe, as per Kryo the documentation:
> Kryo is not thread safe. Each thread should have its own Kryo, Input, and Output instances.
I opened a thread on kryptonite repo, and it has been confirmed from the main committer that it doesn't support multi threads and the only way to do this is to have separate connector instance or pooling (the full thread), which is not feasible as I have more than 50 connectors running in the same time.
Regarding the pooling option of Kryo Instance, here is guide on how to do it, yet I didn't try it out.
Hope this helps anyone with the same problem or will face it in future.
[Update]
Now kryptonite-for-kafka support the pooling option for kryo Instance, thanks to Hans for the quick turnaround, I tested the change and working fine for 4 consecutive days.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论