使用Debezium在SQLServer数据库中加密字段时出现NPE错误。

huangapple go评论51阅读模式
英文:

NPE when encrypting fields for SQLServer databases using debezium

问题

我正在使用debezium将SQLServer的变更数据捕获(CDC)到kafka,根据业务需求,部分列必须加密。

就环境而言,我在K8S上运行了2个kafka-connect实例,并且总共有大约50个连接器在从SQL-Server到Kafka的流数据中运行。

以下是连接器JSON文件的片段

{
"name": "live.sql.users",
...
"transforms.unwrap.delete.handling.mode": "drop",
"transforms": "unwrap,cipher",
"predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.cipher.predicate": "isTombstone",
"transforms.cipher.negate": "true",
"transforms.cipher.cipher_data_keys": "[ { "identifier": "my-key", "material": { "primaryKeyId": 1000000001, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "GhDLeulEJRDC8/19NMUXqw2jK", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 2000000002, "outputPrefixType": "TINK" } ] } } ]",
"transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
"transforms.cipher.cipher_mode": "ENCRYPT",
"predicates": "isTombstone",
"transforms.cipher.field_config": "[{"name":"Password"},{"name":"MobNumber"}, {"name":"UserName"}]",
"transforms.cipher.cipher_data_key_identifier": "my-key"
...
}


当我应用它时,几秒钟后,在调用/connectors/<connector_name>/status API时,我收到以下错误信息

org.apache.kafka.connect.errors.ConnectException: 在错误处理程序中超出了容忍度\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:346)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)\n\t
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\t
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: 错误:加密字段路径'UserName'的数据'deleted605'的加密失败\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:90)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.lambda$matchFields$0(SchemaawareRecordHandler.java:73)\n\t
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\t
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.matchFields(SchemaawareRecordHandler.java:50)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:163)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:140)\n\t
at org.apache.kafka.connect.runtime.PredicatedTransformation.apply(PredicatedTransformation.java:56)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t
... 11 more\nCaused by: java.lang.NullPointerException\n\t
at com.esotericsoftware.kryo.util.DefaultGenerics.nextGenericTypes(DefaultGenerics.java:77)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.pushTypeVariables(FieldSerializer.java:144)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:102)\n\t
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:627)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:75)\n\t
... 21 more\n


需要注意的是,相同的配置在其他连接器中运行时没有问题。
英文:

I am using debezium to do a CDC from SQLServer to kafka, and as per the business needs, some of the columns must be encrypted.

For the environment POV, I have 2 kafka-connect instances running on K8S, and I have in total around 50 connectors running that stream data from SQL-Server to Kafka.

Here is the snippet of the connector json file

{
&quot;name&quot;: &quot;live.sql.users&quot;,
...
        &quot;transforms.unwrap.delete.handling.mode&quot;: &quot;drop&quot;,
        &quot;transforms&quot;: &quot;unwrap,cipher&quot;,
        &quot;predicates.isTombstone.type&quot;: &quot;org.apache.kafka.connect.transforms.predicates.RecordIsTombstone&quot;,
        &quot;transforms.unwrap.drop.tombstones&quot;: &quot;false&quot;,
        &quot;transforms.unwrap.type&quot;: &quot;io.debezium.transforms.ExtractNewRecordState&quot;,
        &quot;transforms.cipher.predicate&quot;: &quot;isTombstone&quot;,
        &quot;transforms.cipher.negate&quot;: &quot;true&quot;,
        &quot;transforms.cipher.cipher_data_keys&quot;: &quot;[ { \&quot;identifier\&quot;: \&quot;my-key\&quot;, \&quot;material\&quot;: { \&quot;primaryKeyId\&quot;: 1000000001, \&quot;key\&quot;: [ { \&quot;keyData\&quot;: { \&quot;typeUrl\&quot;: \&quot;type.googleapis.com/google.crypto.tink.AesGcmKey\&quot;, \&quot;value\&quot;: \&quot;GhDLeulEJRDC8/19NMUXqw2jK\&quot;, \&quot;keyMaterialType\&quot;: \&quot;SYMMETRIC\&quot; }, \&quot;status\&quot;: \&quot;ENABLED\&quot;, \&quot;keyId\&quot;: 2000000002, \&quot;outputPrefixType\&quot;: \&quot;TINK\&quot; } ] } } ]&quot;,
        &quot;transforms.cipher.type&quot;: &quot;com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value&quot;,
        &quot;transforms.cipher.cipher_mode&quot;: &quot;ENCRYPT&quot;,
        &quot;predicates&quot;: &quot;isTombstone&quot;,
        &quot;transforms.cipher.field_config&quot;: &quot;[{\&quot;name\&quot;:\&quot;Password\&quot;},{\&quot;name\&quot;:\&quot;MobNumber\&quot;}, {\&quot;name\&quot;:\&quot;UserName\&quot;}]&quot;,
        &quot;transforms.cipher.cipher_data_key_identifier&quot;: &quot;my-key&quot;
...
}

and when I applied it, after few seconds I got the below error, when I call the /connectors/<connector_name>/status api

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:346)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)\n\t
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\t
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: error: ENCRYPT of field path &#39;UserName&#39; having data &#39;deleted605&#39; failed unexpectedly\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:90)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.lambda$matchFields$0(SchemaawareRecordHandler.java:73)\n\t
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\t
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.matchFields(SchemaawareRecordHandler.java:50)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:163)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:140)\n\t
at org.apache.kafka.connect.runtime.PredicatedTransformation.apply(PredicatedTransformation.java:56)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t
... 11 more\nCaused by: java.lang.NullPointerException\n\t
at com.esotericsoftware.kryo.util.DefaultGenerics.nextGenericTypes(DefaultGenerics.java:77)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.pushTypeVariables(FieldSerializer.java:144)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:102)\n\t
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:627)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:75)\n\t
... 21 more\n

Knowing that, the same configs working with other connectors with no problems

答案1

得分: 1

经过进一步调试并查看Kryo库,发现Kryo类不是线程安全的,根据Kryo的文档

> Kryo不是线程安全的。每个线程应该有自己的Kryo、Input和Output实例。

我在kryptonite仓库上开了一个帖子,主要的提交者确认它不支持多线程,唯一的解决办法是使用单独的连接器实例或连接池(完整的帖子),但由于我有50多个连接器同时运行,这种方式不可行。

关于Kryo实例的连接池选项,这里有一个指南,但我没有尝试过。

希望这能帮助到遇到相同问题或将来可能会遇到这个问题的人。

【更新】

现在,kryptonite-for-kafka支持了Kryo实例的连接池选项,感谢Hans的快速响应,我测试了这个改变,连续4天都正常工作。

英文:

After further debugging and looking into Kryo library, it ends up that Kryo class is not thread-safe, as per Kryo the documentation:

> Kryo is not thread safe. Each thread should have its own Kryo, Input, and Output instances.

I opened a thread on kryptonite repo, and it has been confirmed from the main committer that it doesn't support multi threads and the only way to do this is to have separate connector instance or pooling (the full thread), which is not feasible as I have more than 50 connectors running in the same time.

Regarding the pooling option of Kryo Instance, here is guide on how to do it, yet I didn't try it out.

Hope this helps anyone with the same problem or will face it in future.

[Update]

Now kryptonite-for-kafka support the pooling option for kryo Instance, thanks to Hans for the quick turnaround, I tested the change and working fine for 4 consecutive days.

huangapple
  • 本文由 发表于 2023年2月10日 05:06:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/75404421.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定