英文:
Kafka Producer in eclipse does not send messages to topic
问题
以下是翻译好的内容:
无法使用Java从Eclipse(在Windows主机操作系统上)通过KafkaProducer发送消息到运行在Hortonworks Sandbox上的Kafka主题。我的Java代码如下:
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "sandbox-hdp.hortonworks.com:6667");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Future<RecordMetadata> ck = null;
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
try {
for (int i = 0; i < 1; i++) {
System.out.println(i);
ck = kafkaProducer.send(
new ProducerRecord<String, String>("kafkatopic", Integer.toString(i), "test message - " + i));
kafkaProducer.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(ck.toString());
// System.out.println(ck.get().toString()); -> gives null
kafkaProducer.close();
}
}
}
这是您的POM文件中的依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
您已将本地主机名添加到指向Sandbox的位置,以下是您的Windows(主机操作系统)上的hosts文件:
127.0.0.1 sandbox-hdp.hortonworks.com
您使用以下命令创建了一个名为"kafkatopic"的主题:
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatopic
您可以从生产者发送消息:
[root@sandbox-hdp ~]# /usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-producer.sh --broker-list sandbox-hdp.hortonworks.com:6667 --topic kafkatopic
> statement1
> statement2
> statement3
> statement4
> statement5
并且还可以在另一个标签中同时从消费者看到消息:
[root@sandbox-hdp ~]# /usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
this is statement1
this is statement2
this is statement3
this is statement4
this is statement5
尽管在cmd界面上的消费者中能够看到来自生产者的消息,但您无法从Windows上的Java代码外部将消息发送到运行在Hortonworks Sandbox上的Kafka主题。
这是consumer.properties:
bootstrap.servers=localhost:9092
group.id=test-consumer-group
auto.offset.reset=
这是producer.properties:
bootstrap.servers=localhost:9092
compression.type=none
partitioner.class=
request.timeout.ms=
max.block.ms=
linger.ms=
max.request.size=
batch.size=
buffer.memory=
这是server.properties:
auto.create.topics.enable=true
listeners=PLAINTEXT://sandbox-hdp.hortonworks.com:6667
...
这是zookeeper.properties:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
您还提供了有关Kafka端口、配置和属性的截图,如下所示:
英文:
I am unable send the messages from KafkaProducer using java from eclipse on Windows(Host OS) to kafka topic running on Hortonworks Sandbox. My java code is below
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "sandbox-hdp.hortonworks.com:6667");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Future<RecordMetadata> ck = null;
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
try {
for (int i = 0; i < 1; i++) {
System.out.println(i);
ck = kafkaProducer.send(
new ProducerRecord<String, String>("kafkatopic", Integer.toString(i), "test message - " + i));
kafkaProducer.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(ck.toString());
// System.out.println(ck.get().toString()); ->gives null
kafkaProducer.close();
}
}
}
There are no errors when i run this java code, It just prints the index of the message, in this case just 0 and then terminates and i am unable to see the 0 in the console-consumer on cmd interface of hortonworks sandbox.
This is pom.xml dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
I have added localhost to point to sandbox as follows. This is my hosts file on windows(Host OS)
# Copyright (c) 1993-2009 Microsoft Corp.
#
# This is a sample HOSTS file used by Microsoft TCP/IP for Windows.
#
# This file contains the mappings of IP addresses to host names. Each
# entry should be kept on an individual line. The IP address should
# be placed in the first column followed by the corresponding host name.
# The IP address and the host name should be separated by at least one
# space.
#
# Additionally, comments (such as these) may be inserted on individual
# lines or following the machine name denoted by a '#' symbol.
#
# For example:
#
# 102.54.94.97 rhino.acme.com # source server
# 38.25.63.10 x.acme.com # x client host
# localhost name resolution is handled within DNS itself.
# 127.0.0.1 localhost
# ::1 localhost
127.0.0.1 sandbox-hdp.hortonworks.com
I created a topic called kafkatopic using this command below
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatopic
I am able to send messages from producer
[root@sandbox-hdp ~]# /usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-producer.sh --broker-list sandbox-hdp.hortonworks.com:6667 --topic kafkatopic
>statement1
>statement2
>statement3
>statement4
>statement5
And also able to see messages from consumer in other tab as well simultaneously
[root@sandbox-hdp ~]# /usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
this is statement1
this is statement2
this is statement3
this is statement4
this is statement5
I can see the messages are sent to topic from producer to consumer on cmd interface but i am unable to send messages externally from Java on Windows(host OS) to kafka topic on hortonworks sandbox.
This is consumer.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# consumer group id
group.id=test-consumer-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=
This is producer.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
This is server.properties
# Generated by Apache Ambari. Sun May 3 19:25:08 2020
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
compression.type=producer
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
controller.message.queue.size=10
controller.socket.timeout.ms=30000
default.replication.factor=1
delete.topic.enable=true
external.kafka.metrics.exclude.prefix=kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory,kafka.server.BrokerTopicMetrics.BytesRejectedPerSec
external.kafka.metrics.include.prefix=kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile,kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile,kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile,kafka.network.RequestMetrics.RequestsPerSec.request
fetch.purgatory.purge.interval.requests=10000
kafka.ganglia.metrics.group=kafka
kafka.ganglia.metrics.host=localhost
kafka.ganglia.metrics.port=8671
kafka.ganglia.metrics.reporter.enabled=true
kafka.metrics.reporters=
kafka.timeline.metrics.host_in_memory_aggregation=
kafka.timeline.metrics.host_in_memory_aggregation_port=
kafka.timeline.metrics.host_in_memory_aggregation_protocol=
kafka.timeline.metrics.hosts=
kafka.timeline.metrics.maxRowCacheSize=10000
kafka.timeline.metrics.port=
kafka.timeline.metrics.protocol=
kafka.timeline.metrics.reporter.enabled=true
kafka.timeline.metrics.reporter.sendInterval=5900
kafka.timeline.metrics.truststore.password=
kafka.timeline.metrics.truststore.path=
kafka.timeline.metrics.truststore.type=
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
listeners=PLAINTEXT://sandbox-hdp.hortonworks.com:6667
log.cleanup.interval.mins=10
log.dirs=/kafka-logs
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.bytes=-1
log.retention.check.interval.ms=600000
log.retention.hours=168
log.roll.hours=168
log.segment.bytes=1073741824
message.max.bytes=1000000
min.insync.replicas=1
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offset.metadata.max.bytes=4096
offsets.commit.required.acks=-1
offsets.commit.timeout.ms=5000
offsets.load.buffer.size=5242880
offsets.retention.check.interval.ms=600000
offsets.retention.minutes=86400000
offsets.topic.compression.codec=0
offsets.topic.num.partitions=50
offsets.topic.replication.factor=1
offsets.topic.segment.bytes=104857600
port=6667
producer.metrics.enable=false
producer.purgatory.purge.interval.requests=10000
queued.max.requests=500
replica.fetch.max.bytes=1048576
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.max.messages=4000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=30000
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
security.inter.broker.protocol=PLAINTEXT
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
ssl.client.auth=none
ssl.key.password=
ssl.keystore.location=
ssl.keystore.password=
ssl.truststore.location=
ssl.truststore.password=
zookeeper.connect=sandbox-hdp.hortonworks.com:2181
zookeeper.connection.timeout.ms=25000
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000
This is zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
I am also attaching kafka ports, configurations and properties below.
答案1
得分: 3
kafkaProducer.send实际上将消息添加到缓冲内存中并立即返回,稍后生产者会批量发送消息以提高效率。
> 生产者由一个缓冲区空间池组成,该空间池保存尚未传输到服务器的记录,以及一个后台I/O线程,负责将这些记录转换为请求并将它们传输到集群。在使用后未关闭生产者将会泄漏这些资源。
>
> send() 方法是异步的。调用时,它将记录添加到待发送记录的缓冲区中并立即返回。这允许生产者将单个记录一起批处理以提高效率。
kafkaProducer.flush 可以使用 flush
立即发送缓冲内存中的记录。
> 调用此方法会使所有缓冲的记录立即可用于发送(即使 linger.ms 大于0),并会在与这些记录关联的请求完成时阻塞。flush() 的后置条件是任何先前发送的记录都将已完成(例如,Future.isDone() == true)。当请求根据您指定的 acks 配置被成功确认,否则会导致错误时,请求被视为已完成。
英文:
kafkaProducer.send actually adds the message to buffer memory and immediately returns, and later the producer sends messages in batches for efficiency
>The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.
>
> The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
kafkaProducer.flush you can use flush
records immediately available in buffer memory
>Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true). A request is considered completed when it is successfully acknowledged according to the acks configuration you have specified or else it results in an error.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论