英文:
RecordTooLargeException in kafka
问题
以下是出现RecordTooLargeException异常的Kafka发布代码。
尝试了在stackoverflow上给出的关于不同属性(如max.request.size等)的所有可能的解决方案,但都没有起作用。精确的堆栈跟踪如下:
Caused by: org.springframework.kafka.KafkaException: 发送失败;嵌套异常为 org.apache.kafka.common.errors.RecordTooLargeException: 序列化后的消息大小为 1696090 字节,超过了 max.request.size 配置的值 1048576。
@SuppressWarnings("unchecked")
@Override
public void run(String... args) throws Exception {
JSONArray array = new JSONArray();
for (int i = 0; i < 8000; i++) {
JSONObject object = new JSONObject();
object.put("no", 1);
object.put("name", "Kella Vivek");
object.put("salary", 1000);
object.put("address", "2-143");
object.put("city", "gpm");
object.put("pin", 534316);
object.put("dist", "west");
object.put("state", "ap");
object.put("username", "mff");
object.put("password", "mff");
array.add(object);
}
ObjectMapper mapper = new ObjectMapper();
String string = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(array);
template.send("consume", string);
}
英文:
the following is kafka publishing code which is giving RecordTooLargeException exception.
tried all possible solutions given in stackoverflow giving info about different properties like max.request.size etc. but nothing worked. exact stack trace is
Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1696090 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration
.
@SuppressWarnings("unchecked")
@Override
public void run(String... args) throws Exception {
JSONArray array = new JSONArray();
for (int i = 0; i < 8000; i++) {
JSONObject object = new JSONObject();
object.put("no", 1);
object.put("name", "Kella Vivek");
object.put("salary", 1000);
object.put("address", "2-143");
object.put("city", "gpm");
object.put("pin", 534316);
object.put("dist", "west");
object.put("state", "ap");
object.put("username", "mff");
object.put("password", "mff");
array.add(object);
}
ObjectMapper mapper = new ObjectMapper();
String string = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(array);
template.send("consume", string);
}
答案1
得分: 2
这不是一个 Spring 的问题。你需要调整 Kafka 生产者中的一些参数,以使其正常工作。
回答你的问题,我做了以下操作来启用发送 100MB 的消息。
创建属性并根据你的需求设置 buffer.memory、message.max.bytes 和 max.request.size 的大小。
Properties producerProperties = new Properties();
producerProperties.put("buffer.memory", 104857600);
producerProperties.put("message.max.bytes", 104857600);
producerProperties.put("max.request.size", 104857600);
producerProperties.put("bootstrap.servers", kafkaBootstrapServers);
producerProperties.put("acks", "all");
producerProperties.put("retries", 0);
producerProperties.put("batch.size", 16384);
producerProperties.put("linger.ms", 1);
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
使用上述属性创建生产者:
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
然后进行发送操作。
private static void sendKafkaMessage(String payload,
KafkaProducer<String, String> producer,
String topic)
{
logger.info("Sending Kafka message: " + payload);
producer.send(new ProducerRecord<>(topic, payload));
}
你还需要确保目标服务器也支持大型消息。我在服务器中进行了以下配置以支持大型消息。
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
replica.fetch.max.bytes=104857600
message.max.bytes=104857600
英文:
This is not a spring problem. You need to tweak a number of parameters in kafka producer to make this working.
Now to answer your question I did the following to enable sending 100 mb messages.
Create the properties and set size for buffer.memory, message.max.bytes and max.request.size as per your requirements.
Properties producerProperties = new Properties();
producerProperties.put("buffer.memory", 104857600);
producerProperties.put("message.max.bytes", 104857600);
producerProperties.put("max.request.size", 104857600);
producerProperties.put("bootstrap.servers", kafkaBootstrapServers);
producerProperties.put("acks", "all");
producerProperties.put("retries", 0);
producerProperties.put("batch.size", 16384);
producerProperties.put("linger.ms", 1);
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Create Producer using the above properties:
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
And now send.
private static void sendKafkaMessage(String payload,
KafkaProducer<String, String> producer,
String topic)
{
logger.info("Sending Kafka message: " + payload);
producer.send(new ProducerRecord<>(topic, payload));
}
You also need to ensure that the target server supports huge messages too. I configured following in the server for supporting huge messages.
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
replica.fetch.max.bytes=104857600
message.max.bytes=104857600
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论