英文:
NullPointerException + org.apache.kafka.common.header.internals.RecordHeader.key
问题
我在我的微服务上自上次发布以来一直遇到以下异常。由于异常不完整(省略号...),我无法理解其中的含义。
根据我的理解,Kafka记录的键可以为null(因为此时使用RoundRobinStragey来选择正确的分区)。
我已经没有更多的想法了!有关可能引起此问题的任何想法吗?
{"@timestamp":"2023-02-21 15:58:16.947","@version":"1","message":"stream-client [Servicename-954725e4-a291-4fce-8f7e-77f8b5eeab6b]在处理过程中遇到了以下异常,Kafka Streams选择了SHUTDOWN_CLIENT。流客户端现在将关闭。","logger":"org.apache.kafka.streams.KafkaStreams","thread":"Servicename-954725e4-a291-4fce-8f7e-77f8b5eeab6b-StreamThread-5","level":"ERROR","stacktrace":"org.apache.kafka.streams.errors.StreamsException: 在处理中捕获异常。任务ID=0_16,处理器=KSTREAM-SOURCE-0000000000,主题=my-topic-name,分区=16,偏移=263041154,堆栈跟踪=java.lang.NullPointerException\n\t在 org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45)\n\t在 org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor.lambda$null$6(AbstractKafkaStreamsBinderProcessor.java:498)\n\t在 java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\t在 org.springframework.cloud.st..."}
英文:
I am getting the following exception on my microservice since my last release. As the exception is not complete (elipise ...), I am not able to make sense out it.
As per my understanding kafka records can have null for key (as then RoundRobinStragey is used to choose the correct partition).
I am out of ideas! Any idea as to what could be causing it?
{"@timestamp":"2023-02-21 15:58:16.947","@version":"1","message":"stream-client [Servicename-954725e4-a291-4fce-8f7e-77f8b5eeab6b] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. ","logger":"org.apache.kafka.streams.KafkaStreams","thread":"Servicename-954725e4-a291-4fce-8f7e-77f8b5eeab6b-StreamThread-5","level":"ERROR","stacktrace":"org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_16, processor=KSTREAM-SOURCE-0000000000, topic=my-topic-name, partition=16, offset=263041154, stacktrace=java.lang.NullPointerException\n\tat org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45)\n\tat org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor.lambda$null$6(AbstractKafkaStreamsBinderProcessor.java:498)\n\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat org.springframework.cloud.st...
答案1
得分: 1
根据我的理解,Kafka记录的键可以为null...
对于记录是正确的,但对于记录头不是(每个记录头都有键和值)。
您在某种程度上有一个RecordHeader
(在record.headers()
中),其键为null,这应该是不可能的:
public RecordHeader(String key, byte[] value) {
Objects.requireNonNull(key, "不允许空标头键");
this.key = key;
this.value = value;
}
public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
this.keyBuffer = Objects.requireNonNull(keyBuffer, "不允许空标头键");
this.valueBuffer = valueBuffer;
}
在这里,尝试从缓冲区提取键时出现了NPE:
public String key() {
if (key == null) {
key = Utils.utf8(keyBuffer, keyBuffer.remaining());
keyBuffer = null;
}
return key;
}
英文:
>As per my understanding kafka records can have null for key...
That is true for records, but not for record headers (each of which has a key and value).
You somehow have a RecordHeader
(in record.headers()
) with a null key, which should be impossible:
public RecordHeader(String key, byte[] value) {
Objects.requireNonNull(key, "Null header keys are not permitted");
this.key = key;
this.value = value;
}
public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
this.keyBuffer = Objects.requireNonNull(keyBuffer, "Null header keys are not permitted");
this.valueBuffer = valueBuffer;
}
The NPE is on the keyBuffer
here, while trying to extract the key from the buffer:
public String key() {
if (key == null) {
key = Utils.utf8(keyBuffer, keyBuffer.remaining());
keyBuffer = null;
}
return key;
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论