Kafka生产者失败 – 意外的错误代码:87

huangapple go评论69阅读模式
英文:

Kafka Producer Failed - Unexpected error code: 87

问题

Trying to sent Avro format message to Kafka and consume it. It was not sending a message until after some research added Thread.sleep(16000) in order for producer to wait on the message. However it stopped working again. It is `org.apache.kafka.common.protocol.Errors - Unexpected error code: 87. Failed to produce messages to topic`.

Any advise? My code below

public class AvroAutomationTest3IT {

    private static final Logger LOGGER = LoggerFactory.getLogger(AvroAutomationTest3IT.class);
    private static Properties producerProps;
    private final String topic = "topic.one";

    String schemaPath = "src/test/resources/automation-tests/sample-avro.avsc";

    // subject convention is "<topic-name>-value"
    String subject = topic + "-value";

    // avsc json string.
    String schema = null;

    // kafka broker list.
    private static String brokers = "xxx:9093";
    // schema registry url.
    private static String registry = "xxx:8081";

    private static Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();

    @BeforeAll
    public static void setUp() throws IOException {
        producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        producerProps.put("acks", "1");
        producerProps.put("reconnect.backoff.ms", "5000");
        producerProps.put("retry.backoff.ms", "1000");
        producerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry);
        //producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        producerProps.put(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        // producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
        //  producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(CLIENT_ID_CONFIG, "AvroProducer");
        // producerProps.put(ProducerConfig.ACKS_CONFIG, "0");
        // producerProps.put(ProducerConfig.RETRIES_CONFIG, "0");

        //configure the KafkaAvroSerializer
        producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

        //consumer properties
        producerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        producerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer");

        producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);

        //sslConfig
        producerProps.put("security.protocol", "SSL");
        producerProps.put("ssl.truststore.location", "C:/Users/xx/truststore.jks");
        producerProps.put("ssl.truststore.password", "expeditors");
        producerProps.put("ssl.keystore.location", "C:/Users/xx/xx.jks");
        producerProps.put("ssl.keystore.password", "xxx");
        producerProps.put("ssl.key.password", "xxx");
    }

    @Test
    public void avroTest() throws Exception {
        sendMessage();
        Thread.sleep(16000);

        readMessage();
    }

    public void readMessage() {
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(producerProps);
        consumer.subscribe(Collections.singletonList(topic));
        try {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(15000));

            // assertEquals(2, records.count(), "Expected 2 record");
            for (ConsumerRecord<String, byte[]> record : records) {
                try {
                    JsonElement el = this.parseAvroMessage(topic, record.value());

                    System.out.printf("offset = %d, value = %s\n", record.offset(), el);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.commitAsync();
            consumer.close(Duration.ofMillis(3000));
        }
    }

    private JsonElement parseAvroMessage(String topic, byte[] value) {
        HashMap<String, String> configs = new HashMap<>();
        configs.put("schema.registry.url", registry);

        KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
        deserializer.configure(configs, true);
        Object obj = deserializer.deserialize(topic, value);
        return gson.fromJson(obj.toString(), JsonElement.class);
    }

    public void sendMessage() throws IOException {
        // construct kafka producer.
        Producer<Integer, GenericRecord> producer = new KafkaProducer<>(producerProps);

        // message key.
        // int userIdInt = 1;
        // message value, avro generic record.
        GenericRecord record = this.buildRecord();

        // send avro message to the topic page-view-event.
        producer.send(new ProducerRecord<>("visibility.avro.topic.source.one", null, record));
        // producer.flush();
    }

    public GenericRecord buildRecord() throws IOException {
        // avsc json string.
        String schemaString = null;

        FileInputStream inputStream = new FileInputStream(schemaPath);
        try {
            schemaString = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
        } finally {
            inputStream.close();
        }

        // avro schema.
        Schema schema = new Schema.Parser().parse(schemaString);

        GenericRecord metadata = new GenericData.Record(schema.getField("metadata").schema());
        metadata.put("version", "1");
        metadata.put("eventName", "event.name");

        GenericRecord data = new GenericData.Record(schema.getField("data").schema());

        data.put("name", "Bob");
        data.put("age", 25);

        GenericRecord record = new GenericData.Record(schema);
        record.put("metadata", metadata);
        record.put("data", data);

        return record;
    }
}
英文:

Trying to sent Avro format message to Kafka and consume it. It was not sending a message until after some research added Thread.sleep(16000) in order for producer to wait on the message. However it stopped working again. It is org.apache.kafka.common.protocol.Errors - Unexpected error code: 87. Failed to produce messages to topic.

Any advise? My code below

public class AvroAutomationTest3IT {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroAutomationTest3IT.class);
private static Properties producerProps;
private final String topic = &quot;topic.one&quot;;
String schemaPath = &quot;src/test/resources/automation-tests/sample-avro.avsc&quot;;
// subject convention is &quot;&lt;topic-name&gt;-value&quot;
String subject = topic + &quot;-value&quot;;
// avsc json string.
String schema = null;
// kafka broker list.
private static String brokers = &quot;xxx:9093&quot;;
// schema registry url.
private static String registry = &quot;xxx:8081&quot;;
private static Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
@BeforeAll
public static void setUp() throws IOException {
producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
producerProps.put(&quot;acks&quot;, &quot;1&quot;);
producerProps.put(&quot;reconnect.backoff.ms&quot;, &quot;5000&quot;);
producerProps.put(&quot;retry.backoff.ms&quot;, &quot;1000&quot;);
producerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry);
//producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.IntegerSerializer&quot;);
producerProps.put(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
// producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, &quot;io.confluent.kafka.serializers.KafkaAvroSerializer&quot;);
//  producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(CLIENT_ID_CONFIG, &quot;AvroProducer&quot;);
// producerProps.put(ProducerConfig.ACKS_CONFIG, &quot;0&quot;);
// producerProps.put(ProducerConfig.RETRIES_CONFIG, &quot;0&quot;);
//configure the KafkaAvroSerializer
producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, &quot;16384&quot;);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, &quot;1&quot;);
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, &quot;33554432&quot;);
//consumer properties
producerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
producerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
producerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
producerProps.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;NewConsumer&quot;);
producerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
producerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
//sslConfig
producerProps.put(&quot;security.protocol&quot;, &quot;SSL&quot;);
producerProps.put(&quot;ssl.truststore.location&quot;, &quot;C:/Users/xx/truststore.jks&quot;);
producerProps.put(&quot;ssl.truststore.password&quot;, &quot;expeditors&quot;);
producerProps.put(&quot;ssl.keystore.location&quot;, &quot;C:/Users/xx/xx.jks&quot;);
producerProps.put(&quot;ssl.keystore.password&quot;, &quot;xxx&quot;);
producerProps.put(&quot;ssl.key.password&quot;, &quot;xxx&quot;);
}
@Test
public void avroTest() throws Exception {
sendMessage();
Thread.sleep(16000);
readMessage();
}
public void readMessage() {
KafkaConsumer&lt;String, byte[]&gt; consumer = new KafkaConsumer&lt;&gt;(producerProps);
consumer.subscribe(Collections.singletonList(topic));
try {
ConsumerRecords&lt;String, byte[]&gt; records = consumer.poll(Duration.ofMillis(15000));
// assertEquals(2, records.count(), &quot;Expected 2 record&quot;);
for (ConsumerRecord&lt;String, byte[]&gt; record : records) {
try {
JsonElement el = this.parseAvroMessage(topic, record.value());
System.out.printf(&quot;offset = %d, value = %s\n&quot;, record.offset(), el);
} catch (Exception ex) {
ex.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.commitAsync();
consumer.close(Duration.ofMillis(3000));
}
}
private JsonElement parseAvroMessage(String topic, byte[] value) {
HashMap&lt;String, String&gt; configs = new HashMap&lt;&gt;();
configs.put(&quot;schema.registry.url&quot;, registry);
KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
deserializer.configure(configs, true);
Object obj = deserializer.deserialize(topic, value);
return gson.fromJson(obj.toString(), JsonElement.class);
}
public void sendMessage() throws IOException {
// construct kafka producer.
Producer&lt;Integer, GenericRecord&gt; producer = new KafkaProducer&lt;Integer, GenericRecord&gt;(producerProps);
// message key.
// int userIdInt = 1;
// message value, avro generic record.
GenericRecord record = this.buildRecord();
// send avro message to the topic page-view-event.
producer.send(new ProducerRecord&lt;Integer, GenericRecord&gt;(&quot;visibility.avro.topic.source.one&quot;, null, record));
// producer.flush();
}
public GenericRecord buildRecord() throws IOException {
// avsc json string.
String schemaString = null;
FileInputStream inputStream = new FileInputStream(schemaPath);
try {
schemaString = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
} finally {
inputStream.close();
}
// avro schema.
Schema schema = new Schema.Parser().parse(schemaString);
GenericRecord metadata = new GenericData.Record(schema.getField(&quot;metadata&quot;).schema());
metadata.put(&quot;version&quot;, &quot;1&quot;);
metadata.put(&quot;eventName&quot;, &quot;event.name&quot;);
GenericRecord data = new GenericData.Record(schema.getField(&quot;data&quot;).schema());
data.put(&quot;name&quot;, &quot;Bob&quot;);
data.put(&quot;age&quot;, 25);
GenericRecord record = new GenericData.Record(schema);
record.put(&quot;metadata&quot;, metadata);
record.put(&quot;data&quot;, data);
return record;
}
}

答案1

得分: 0

我解决了这个问题。我需要取消注释 // int userIdInt = 1; 并且在这行代码中使用 userIdInt 替代 null:

// 将 Avro 消息发送到主题 page-view-event。
producer.send(new ProducerRecord<Integer, GenericRecord>("visibility.avro.topic.source.one", userIdInt, record));

英文:

I solved the issue. I needed to un-comment // int userIdInt = 1; and use userIdInt instead of null in this line of code:

// send avro message to the topic page-view-event.
producer.send(new ProducerRecord<Integer, GenericRecord>("visibility.avro.topic.source.one", null, record));

huangapple
  • 本文由 发表于 2020年9月2日 03:49:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/63694512.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定