Kafka SSL握手在自定义Java生产者中失败

huangapple go评论57阅读模式
英文:

Kafka SSL handshake failed in custom Java producer

问题

以下是您提供的信息的翻译:

尝试使用我的Kafka生产者应用程序生成一些数据,但出现以下错误:

[SocketServer brokerId=0] 与localhost/127.0.0.1的身份验证失败(SSL握手失败)(org.apache.kafka.common.network.Selector)

我使用SASL_SSL协议和PLAIN机制与Kafka通信。当我使用kafka-console-producer
sh kafka-console-producer.sh --broker-list localhost:9093 --topic kafka-topic --producer.config ../config/producer.properties
以及kafka-console-consumer
sh kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic kafka-topic --consumer.config ../config/consumer.properties
时,一切正常。这是我的server.properties的一部分:

listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093

listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="admin" \
   password="admin-secret" \
   user_admin="admin-secret";

sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

allow.everyone.if.no.acl.found=true

ssl.keystore.location=/mnt/data/kafka/config/keystore/kafka.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.secure.random.implementation=SHA1PRNG

producer.properties

bootstrap.servers=localhost:9093
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="admin" \
   password="admin-secret" \
   user_admin="admin-secret";
ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

consumer.properties

bootstrap.servers=localhost:9093
group.id=test-consumer-group
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="admin" \
   password="admin-secret" \
   user_admin="admin-secret";

ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

这是我的Java Kafka生产者应用程序:

private KafkaProducer<String, String> producer;
private String address;
private final int BATCH_SIZE = 16384 * 4;

private Properties setProperties() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 200);
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    properties.put("acks", "all");
    properties.put("sasl.mechanism", "PLAIN");
    properties.put("security.protocol", "SASL_SSL");
    properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";");
    properties.put("ssl.truststore.location", "/mnt/data/kafka/config/truststore/kafka.truststore.jks");
    properties.put("ssl.truststore.password", "password");
    return properties;
}

public void createTopicWithPartitions(String topicName, int partitionsCount) throws ExecutionException, InterruptedException {
    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address);
    AdminClient adminClient = AdminClient.create(properties);

    boolean isTopicExists = adminClient.listTopics().names().get().stream()
            .anyMatch(name -> name.equals(topicName));

    if (isTopicExists) {
        System.out.println("Topic already exists");
    } else {
        NewTopic newTopic = new NewTopic(topicName, partitionsCount, (short) 1);
        adminClient.createTopics(Collections.singleton(newTopic)).all().get();
    }

    adminClient.close();
}

public void sendMessages(String topicName, String payload, int messagesCount) {
    for (int i = 0; i < messagesCount; i++) {
        String partitionKey = DataUtils.generateSourceDeviceId(15).toUpperCase();
        producer.send(new ProducerRecord<>(topicName, partitionKey, payload));
    }
}

public KafkaMessagesProducer(String address) {
    this.address = address;
    this.producer = new KafkaProducer<>(setProperties());
}

public int getBATCH_SIZE() {
    return BATCH_SIZE;
}

正如我之前描述的,控制台生产者/消费者工作正常,但我的Java应用程序在SASL_SSL协议关闭时可以正常工作。

更新:证书生成工具来自此网站:
https://github.com/confluentinc/confluent-platform-security-tools/blob/master/kafka-generate-ssl.sh

英文:

Trying to produce some data using my Kafka producer application, but i get below error:
>[SocketServer brokerId=0] Failed authentication with localhost/127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector)

I use SASL_SSL protocol with PLAIN mechanism to communicate with Kafka. When I use kafka-console-producer
sh kafka-console-producer.sh --broker-list localhost:9093 --topic kafka-topic --producer.config ../config/producer.properties

and kafka-console-consumer
sh kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic kafka-topic --consumer.config ../config/consumer.properties

everything works fine. Here's a part of my server.properties:

listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093

listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username=&quot;admin&quot; \
   password=&quot;admin-secret&quot; \
   user_admin=&quot;admin-secret&quot;;

sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

allow.everyone.if.no.acl.found=true

ssl.keystore.location=/mnt/data/kafka/config/keystore/kafka.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.secure.random.implementation=SHA1PRNG

producer.properties

bootstrap.servers=localhost:9093
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username=&quot;admin&quot; \
   password=&quot;admin-secret&quot; \
   user_admin=&quot;admin-secret&quot;;
ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

consumer.properties

bootstrap.servers=localhost:9093
group.id=test-consumer-group
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username=&quot;admin&quot; \
   password=&quot;admin-secret&quot; \
   user_admin=&quot;admin-secret&quot;;

ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

And here's my Java Kafka producer application

private KafkaProducer&lt;String, String&gt; producer;
    private String address;
    private final int BATCH_SIZE = 16384 * 4;

    private Properties setProperties() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 200);
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, &quot;snappy&quot;);
        properties.put(&quot;acks&quot;, &quot;all&quot;);
        properties.put(&quot;sasl.mechanism&quot;, &quot;PLAIN&quot;);
        properties.put(&quot;security.protocol&quot;, &quot;SASL_SSL&quot;);
        properties.put(&quot;sasl.jaas.config&quot;, &quot;org.apache.kafka.common.security.plain.PlainLoginModule required username=\&quot;admin\&quot; password=\&quot;admin-secret\&quot; user_admin=\&quot;admin-secret\&quot;;&quot;);
        properties.put(&quot;ssl.truststore.location&quot;, &quot;/mnt/data/kafka/config/truststore/kafka.truststore.jks&quot;);
        properties.put(&quot;ssl.truststore.password&quot;, &quot;password&quot;);
        return properties;
    }

    public void createTopicWithPartitions(String topicName, int partitionsCount) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        AdminClient adminClient = AdminClient.create(properties);

        boolean isTopicExists = adminClient.listTopics().names().get().stream()
                .anyMatch(name -&gt; name.equals(topicName));

        if (isTopicExists) {
            System.out.println(&quot;Topic already exists&quot;);
        } else {
            NewTopic newTopic = new NewTopic(topicName, partitionsCount, (short) 1);
            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
        }

        adminClient.close();
    }

    public void sendMessages(String topicName, String payload, int messagesCount) {
        for (int i = 0; i &lt; messagesCount; i++) {
            String partitionKey = DataUtils.generateSourceDeviceId(15).toUpperCase();
            producer.send(new ProducerRecord&lt;&gt;(topicName, partitionKey, payload));
        }
    }

    public KafkaMessagesProducer(String address) {
        this.address = address;
        this.producer = new KafkaProducer&lt;&gt;(setProperties());
    }

    public int getBATCH_SIZE() {
        return BATCH_SIZE;
    }

As I described before console producer/consumer works fine, my Java application gets SSL handshake error, but with switched off SASL_SSL protocol my Java app works fine.

UPD: Certificate generation tool used from this website:
https://github.com/confluentinc/confluent-platform-security-tools/blob/master/kafka-generate-ssl.sh

答案1

得分: 1

我有一个问题,涉及到createTopicWithPartitions方法。我重新编写了在setProperties()方法中创建的属性,将其改为(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address)。

英文:

I had a problem with createTopicWithPartitions method. I rewrote properties created in setProperties() method by (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address)

huangapple
  • 本文由 发表于 2020年8月4日 17:29:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/63243933.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定