org.apache.kafka.common.errors.TimeoutException: 在 60000 毫秒后,元数据中不存在主题

huangapple go评论80阅读模式
英文:

org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms

问题

我遇到了错误:

org.apache.kafka.common.errors.TimeoutException: 在 60000 毫秒后从元数据中找不到主题 testtopic2。
尝试在本地 Windows 上使用 Java 向我的本地 kafka 实例生成主题。请注意,主题 testtopic2 存在,并且我可以使用 Windows 控制台生产者将消息正常生成到它。

以下是我正在使用的代码:

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Kafka_Producer {

    public static void main(String[] args){

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        TestCallback callback = new TestCallback();
        for (long i = 0; i < 100 ; i++) {
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                    "testtopic2", "key-" + i, "message-"+i );
            producer.send(data, callback);
        }

        producer.close();
    }


    private static class TestCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                System.out.println("向主题生成消息时出错:" + recordMetadata);
                e.printStackTrace();
            } else {
                String message = String.format("已将消息生成到主题:%s 分区:%s 偏移量:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                System.out.println(message);
            }
        }
    }

}

Pom 依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

list 和 describe 的输出:

org.apache.kafka.common.errors.TimeoutException: 在 60000 毫秒后,元数据中不存在主题

org.apache.kafka.common.errors.TimeoutException: 在 60000 毫秒后,元数据中不存在主题

英文:

I'm getting the error:

 org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.

When trying to produce to the topic in my local kafka instance on windows using Java. Note that the topic testtopic2 exists and I'm able produce messages to it using the windows console producer just fine.

Below the code that I'm using:

import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Kafka_Producer {
public static void main(String[] args){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
props.put(ProducerConfig.ACKS_CONFIG, &quot;all&quot;);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);
Producer&lt;String, String&gt; producer = new KafkaProducer&lt;String, String&gt;(props);
TestCallback callback = new TestCallback();
for (long i = 0; i &lt; 100 ; i++) {
ProducerRecord&lt;String, String&gt; data = new ProducerRecord&lt;String, String&gt;(
&quot;testtopic2&quot;, &quot;key-&quot; + i, &quot;message-&quot;+i );
producer.send(data, callback);
}
producer.close();
}
private static class TestCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println(&quot;Error while producing message to topic :&quot; + recordMetadata);
e.printStackTrace();
} else {
String message = String.format(&quot;sent message to topic:%s partition:%s  offset:%s&quot;, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);
}
}
}
}

Pom dependency:

&lt;dependency&gt;
&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
&lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
&lt;version&gt;2.6.0&lt;/version&gt;
&lt;/dependency&gt;

Output of list and describe:
org.apache.kafka.common.errors.TimeoutException: 在 60000 毫秒后,元数据中不存在主题

org.apache.kafka.common.errors.TimeoutException: 在 60000 毫秒后,元数据中不存在主题

答案1

得分: 20

我今天也遇到了同样的问题。我是Kafka的新手,只是想尝试运行一个简单的Java生产者和消费者示例。我成功让消费者工作起来了,但是在生产者方面却一直遇到了与你一样的“元数据中不存在该主题”错误。

最后,出于绝望,我在生产者中添加了一些代码来输出主题。当我这样做时,由于jackson-databind和jackson-core包中缺少类,我得到了运行时错误。在添加了这些类之后,我不再遇到“主题不存在”的错误。我删除了我临时添加的输出主题的代码,它仍然正常工作。

英文:

I was having this same problem today. I'm a newbie at Kafka and was simply trying to get a sample Java producer and consumer running. I was able to get the consumer working, but kept getting the same "topic not present in metadata" error as you, with the producer.

Finally, out of desperation, I added some code to my producer to dump the topics. When I did this, I then got runtime errors because of missing classes in packages jackson-databind and jackson-core. After adding them, I no longer got the "topic not present" error. I removed the topic-dumping code I temporarily added, an it still worked.

答案2

得分: 15

这个错误也可能是因为目标 Kafka 实例“挂掉”或其URL错误。

在这种情况下,向 Kafka 发送消息的线程将会被阻塞在 max.block.ms 时间上,其默认值恰好为 60000 毫秒。

您可以通过传递修改后的值来检查是否是因为上述属性:

Properties props = new Properties();
...(其他属性)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); # 30 秒,或您选择的其他值

如果在您指定的时间后抛出了 TimeoutException,则应检查您的 Kafka URL 是否正确,或者 Kafka 实例是否存活。

英文:

This error also can appear because of destination Kafka instance "died" or URL to it is wrong.

In such a case a thread that sends message to Kafka will be blocked on max.block.ms time which defaults exactly to 60000 ms.

You can check whether it is because of above property by passing changed value:

Properties props = new Properties();
...(among others)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); # 30 sec or any other value of your choice 

If TimeoutException is thrown after your specified time, then you should check whether your URL to Kafka is correct or Kafka instance is alive.

答案3

得分: 11

这也可能是由一个不存在的 分区 引起的。

例如,如果你只有一个分区 [0],而你的生产者尝试发送到分区 [1],你会得到相同的错误。在这种情况下,主题是存在的,但分区不存在。

英文:

It might also be caused by an nonexistent partition.

e.g. If you have a single partition [0] and your producer tries to send to partition [1] you'll get the same error. The topic in this case exists, but not the partition.

答案4

得分: 5

首先,我想感谢Bobb Dobbs的回答,我今天也在这个问题上苦苦挣扎了一阵子。我只想补充一下,我需要添加的唯一依赖是jackson-databind。除了kafka-clients,这是我项目中唯一的依赖。

更新:我对正在发生的情况有了更多了解。kafka-clients将其jackson-databind依赖项的范围设置为“provided”,这意味着它希望在运行时由JDK或容器提供。有关提供的Maven范围的更多详细信息,请参阅此文章

> 此范围用于标记应由JDK或容器在运行时提供的依赖关系,因此得名。
> 此范围的一个很好的用例是在某个容器中部署的Web应用程序,在这种情况下,容器已经提供了一些库。

我不确定将其范围设置为提供的确切理由,除非可能这个库是人们通常希望自己提供的,以便保持最新版本以进行安全修复等。

英文:

First off I want to say thanks to Bobb Dobbs for his answer, I was also struggling with this for a while today. I just want to add that the only dependency I had to add is jackson-databind. This is the only dependency I have in my project, besides kafka-clients.

Update: I've learned a bit more about what's going on. kafka-clients sets the scope of its jackson-databind dependency as "provided," which means it expects it to be provided at runtime by the JDK or a container. See this article for more details on the provided maven scope.

> This scope is used to mark dependencies that should be provided at runtime by JDK or a container, hence the name.
> A good use case for this scope would be a web application deployed in some container, where the container already provides some libraries itself.

I'm not sure the exact reasoning on setting its scope to provided, except that maybe this library is something people normally would want to provide themselves to keep it up to the latest version for security fixes, etc.

答案5

得分: 5

我在团队中的某个成员更改了 spring.kafka.security.protocol 配置的值时遇到了这个问题(我们在项目中使用了 Spring)。之前在我们的配置中它是“SSL”,但现在已经更新为 PLAINTEXT。在连接到使用SSL的集群的更高环境中,我们看到了 OP 遇到的错误。

为什么我们会看到这个错误,而不是SSL错误或身份验证错误,我不太清楚,但如果你遇到了这个错误,也许值得仔细检查一下你连接到Kafka集群的客户端身份验证配置。

英文:

I saw this issue when someone on my team had changed the value for the spring.kafka.security.protocol config (we are using Spring on my project). Previously it had been "SSL" in our config, but it was updated to be PLAINTEXT. In higher environments where we connect to a cluster that uses SSL, we saw the error OP ran into.

Why we saw this error as opposed to an SSL error or authentication error is beyond me, but if you run into this error it may be worth double checking your client authentication configs to your Kafka cluster.

答案6

得分: 4

我也遇到了类似的问题,在我的MacBook本地环境中尝试这个。这相当令人沮丧,我尝试了几种方法:

  1. 停止了Zookeeper,停止了Kafka,重新启动了ZK和Kafka。(没有帮助)
  2. 停止了ZK。删除了ZK数据目录。删除了Kafka日志目录,然后重新启动了Kafka。(没有帮助)
  3. 重新启动了我的MacBook - 这解决了问题。

我在生产环境中使用Kafka已经超过3年了,但在集群上没有遇到过这个问题,只在我的本地环境中出现过。不过,重新启动对我有用。

英文:

I also had similar issue, where I was trying this on my local environment on my macbook. It was quite frustrating and I tried a few approaches

  1. Stopped Zookeeper, Stopped Kafka, restarted ZK and Kafka. (Didn't help)
  2. Stopped ZK. Deleted ZK data directory. Deleted Kafka logs.dirs and restarted Kafka (Didn't help)
  3. Restarted my macbook - This did the trick.

I have used Kafka in production for more than 3 years, but didn't face this problem on the cluster, happened only on my local environment. However, restarting fixes it for me.

答案7

得分: 2

这个错误是一个明显的错误,可能是由以下深层条件触发的。

  1. 首先且最常见的情况是您的Kafka生产者配置有误,请检查您的Kafka属性BOOTSTRAP_SERVERS_CONFIG是否为正确的服务器地址。
  2. 在Docker环境中,您可能需要检查端口映射情况。
  3. 检查防火墙是否已经打开了位于Broker所在服务器上的9092端口。
  4. 如果您的Broker在SSL下运行,请检查您的生产者配置,包括SSL_TRUSTSTORE_LOCATION_CONFIGSECURITY_PROTOCOL_CONFIGSSL_TRUSTSTORE_TYPE_CONFIG
    而且,一些Broker配置同时在SSL和PLAINTEXT下运行,请确保您需要哪个端口。
英文:

This error is an apparent error, and it may be triggered by the following deep conditions.

  1. First and the most situation is your kafka producer config is wrong, check your kafka properties BOOTSTRAP_SERVERS_CONFIG weather is correct server address.
  2. In docker environment, you might check your port mapping.
  3. Check whether the firewall has opened port 9092 of the server where the broker is located.
  4. If your broker run in ssl, check your producer config about SSL_TRUSTSTROE_LOCATION_CONFIG, SECURITY_PROTOCOL_CONFIG, SSL_TRUSTSTORE_TYPE_CONFIG.
    And, some broker config both run in ssl and PLAINTEXT, make sure which port is your need.

答案8

得分: 1

  1. 我创建了一个只有一个分区的主题,并尝试将该主题填充到10个分区中。然后我遇到了这个问题。

  2. 我使用 kafka-topics.sh 脚本删除了该主题,但没有等待清理完成。我开始填充该主题。当我查看主题元数据时,它只有一个分区,并且我遇到了与上面回答中提到的第一部分完全相同的问题。

英文:
  1. I created a topic with single partition and tried to populate the topic into 10 partitions. And I got this issue.

  2. I deleted the topic using kafka-topics.sh script, but didn't wait long to finish the clean up. I started populating the topic. When I was looking at topic metadata, it has one partition and I am getting exactly same issue as mentioned in first part of this answer.

答案9

得分: 0

你可能需要检查生产者的属性 metadata.max.idle.ms。

生产者缓存元数据的时间与上述配置的值一样长。在经纪人端对元数据进行的任何更改不会立即在客户端(生产者)上生效。然而,重新启动生产者应在启动时读取元数据。

更新:在这里查看默认值... https://kafka.apache.org/documentation.html#metadata.max.idle.ms

英文:

You may want to check your producer properties for metadata.max.idle.ms

The metadata a producer caches for as long as above configured value. Any changes to the meta on the broker end will not be available on the client (producer) immediately. Restarting a producer should however, read the metadata at startup.

Update: check default values here.. https://kafka.apache.org/documentation.html#metadata.max.idle.ms

答案10

得分: 0

请注意,这也可能发生,因为kafka-client和Spring的版本不兼容。

更多信息请参阅https://spring.io/projects/spring-kafka中的"Kafka客户端兼容性"矩阵。

英文:

Note that this could happen as well because the versions of kafka-client and Spring are not compatible

More info in https://spring.io/projects/spring-kafka "Kafka Client Compatibility" matrix

答案11

得分: 0

kafka-topic --bootstrap-server 127.0.0.1:9092 --topic my_first --create --partitions 3

使用上述命令首次尝试通过Kafka流将主题插入流中

此处的my_first是主题名称。

英文:

kafka-topic --bootstrap-server 127.0.0.1:9092 --topic my_first --create --partitions 3

First try to insert the topic with in the Kafka stream using the above command

here my_first is the topic name.

答案12

得分: 0

  1. 在pom文件中的两个依赖项: kafka-streams 和 spring-kafka
  2. 在 application.yml(或 properties)文件中:
spring:
  kafka:
    bootstrap-servers: <service_url/bootstrap_server_url>
    producer:
      bootstrap-servers: <service_url/bootstrap_server_url>
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      group-id: <your_consumer_id>
  1. @SpringBootApplication 类中另一个注解: @EnableKafka

这样将可以使其正常运行,没有任何错误。

英文:
  1. The two dependencies in pom : kafka-streams and spring-kafka
  2. in application.yml (or properties) :
    spring:
kafka:
bootstrap-servers: &lt;service_url/bootstrap_server_ur&gt;
producer:
bootstrap-servers: &lt;service_url/bootstrap_server_url&gt;
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
group-id: &lt;your_consumer_id&gt;
  1. @SpringBootApplication class another annotation : @EnableKafka

This will make it work without any errors.

答案13

得分: 0

我遇到了相同的问题。
当您的引导程序或注册表URL错误或无法访问时,可能会出现这种情况。

英文:

I was facing same issue.
It could happen when your bootstrap or registry URL are wrong or unreachable

答案14

得分: 0

如果您在使用 testcontainers 设置集成测试时遇到相同的错误,可能是因为容器内部使用的 Kafka 端口与外部暴露的端口冲突。因此,请确保启动的引导服务器端口正确映射到您在测试中使用的暴露端口。

在我的情况下,我在 Kafka 容器启动后只是替换了属性文件条目:

KafkaContainer kafka = new KafkaContainer(...);
kafka.start();
String brokers = kafka.getBootstrapServers()
TestPropertySourceUtils.addInlinedPropertiesToEnvironment(context,
"spring.kafka.bootstrap-servers=" + brokers,
"spring.kafka.producer.bootstrap-servers=" + brokers,
"spring.kafka.consumer.bootstrap-servers=" + brokers
);

在找到解决方法之前,我花了很多时间,希望这能帮助到其他人。

英文:

in case if you came here with same error while setting up your integration tests using testcontainers, this could happen because of used port by kafka inside container and exposed port outside. So, make sure that started bootstrap server port is correctly mapped to exposed port that you are using in your tests.

In my case i just replaced properties file entries after kafka container started:

KafkaContainer kafka = new KafkaContainer(...);
kafka.start();
String brokers = kafka.getBootstrapServers()
TestPropertySourceUtils.addInlinedPropertiesToEnvironment(context,
&quot;spring.kafka.bootstrap-servers=&quot; + brokers,
&quot;spring.kafka.producer.bootstrap-servers=&quot; + brokers,
&quot;spring.kafka.consumer.bootstrap-servers=&quot; + brokers
);

Spent quite some time before I figured it out, hope this helps someone.

答案15

得分: 0

我遇到了相同的问题,原因是配置错误。以下是有效的生产者配置。请将${}属性更改为您的配置。不要忘记设置所有属性:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ${servers});
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "${user}:${pass}");
props.put("sasl.kerberos.service.name", "kafka");
props.put("auto.register.schemas", "false");
props.put("schema.registry.url", "${https://your_url}");
props.put("schema.registry.ssl.truststore.location", "client_truststore.jks");
props.put("schema.registry.ssl.truststore.password", "${password}");

KafkaProducer<String, ClassEvent> producer = new KafkaProducer<>(props);

ClassEvent event = getEventObjectData();

ProducerRecord<String, ClassEvent> record = new ProducerRecord<>(args[0], event);

集群中执行:

java -Djava.security.auth.login.config=${jaas.conf} -cp ${your-producer-example.jar} ${your.package.class.ClassName} ${topic}

希望能有所帮助。

英文:

I was having the same problem, and it's because of wrong config. Here's my Producer configuration that worked. Change ${} properties with your config. Don't forget to set all properties:

    Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ${servers});
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(&quot;enable.auto.commit&quot;, &quot;false&quot;);
props.put(&quot;auto.offset.reset&quot;, &quot;earliest&quot;);
props.put(&quot;security.protocol&quot;, &quot;SASL_PLAINTEXT&quot;);
props.put(&quot;basic.auth.credentials.source&quot;, &quot;USER_INFO&quot;);
props.put(&quot;basic.auth.user.info&quot;, &quot;${user}:${pass}&quot;);
props.put(&quot;sasl.kerberos.service.name&quot;, &quot;kafka&quot;);
props.put(&quot;auto.register.schemas&quot;, &quot;false&quot;);
props.put(&quot;schema.registry.url&quot;, &quot;${https://your_url}&quot;);
props.put(&quot;schema.registry.ssl.truststore.location&quot;, &quot;client_truststore.jks&quot;);
props.put(&quot;schema.registry.ssl.truststore.password&quot;, &quot;${password}&quot;);
KafkaProducer producer = new KafkaProducer(props);
ClassEvent event = getEventObjectData();
ProducerRecord&lt;String, ClassEvent&gt; record = new ProducerRecord&lt;String, ClassEvent&gt;(args[0], event);

Execution from cluster:

java -Djava.security.auth.login.config=${jaas.conf} -cp ${your-producer-example.jar} ${your.package.class.ClassName} ${topic}

Hope it helps

答案16

得分: 0

在从一个主题读取流并写入另一个主题的情况下,一个可能的解决方案是:

<被读取的数据集>.drop("partition")

解释:
从读取的数据帧中的行附带有源的分区列,如果源主题的分区数多于目标主题,那么它将尝试将行写入目标中指定的分区。如果目标主题上不存在该分区,则会出现错误。

在集群模式下部署时,我还能够获得错误的更全面版本:
主题的分区22在60000毫秒后的元数据中不存在,共有3个分区。

解决方案要么是删除分区列,让 Kafka 自己选择分区,要么是用所需的分区号替换原始分区号(使用模运算#目标分区数)。

英文:

In the case of reading stream and writing stream from a topic to another, a possible solution could be:

&lt;dataset being read&gt;.drop(&quot;partition&quot;)

Explanation:
the row in the read dataframe comes with the source's partition column, if the source topic has more partitions than the destination topic has, then it's gonna try to write the row to the specified partition in the destination. If that partition doesn't exist on the destination topic then you will get the error.

I was also able to obtain a more comprehensive version of the error when deployed in cluster mode:
Partition 22 of topic <topic name> with partition count 3 is not present in metadata after 60000 ms.

The solution would be to either drop the partition column and let kafka choose the partition itself, or replace the original partition number with a desired one (using modulo #destination partitions).

答案17

得分: 0

我遇到了相同的问题,问题很简单,生产者无法连接到引导服务器,我的问题与 SSL 配置的 JKS 信任存储有关,一旦我正确配置了它,一切都恢复正常工作。

英文:

I faced the same issue, it's simply the producer wasn't able to connect to the bootstrap server, and my problem was related to the JKS trust-store for the SSL configuration, once I configured it correctly, everything started to work as usual.

huangapple
  • 本文由 发表于 2020年9月3日 06:39:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/63714401.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定