为什么在Kafka集群未运行时使用AdminClient创建主题时不会失败?

huangapple go评论70阅读模式
英文:

Why is AdminClient not failing when creating a topic with Kafka cluster not running?

问题

为什么在我运行以下生产者代码且Kafka甚至未运行时,我没有收到错误消息?

我本来期望createTopics方法会抛出异常,但实际上并未发生。为什么?

final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events", 1, (short) 1);
adminClient.createTopics(Arrays.asList(newTopic));
adminClient.close();
英文:

Why don't I get an error message when I run the following producer code and Kafka is not even running?

I would expect the createTopics method to throw an exception, but it doesn't happen. Why?

final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events", 1, (short) 1);
adminClient.createTopics(Arrays.asList(newTopic));
adminClient.close();

答案1

得分: 2

createTopics方法返回一个带有KafkaFutures作为值的CreateTopicsResult。由于您当前的代码未阻塞以等待此操作完成(使用get),并且未捕获任何异常,您的代码将在代理不可用时正常运行,但没有任何通知。

以下代码在代理不可用时将抛出ExecutionException

final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events-test", 1, (short) 1);
CreateTopicsResult topicResult = adminClient.createTopics(Arrays.asList(newTopic));
KafkaFuture<Void> resultFuture = topicResult.all();
try {
    resultFuture.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

我使用Kafka客户端2.5.0进行了测试,以下是异常信息:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at org.michael.big.data.kafka.java.BasicAdminClient.main(BasicAdminClient.java:27)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

请注意,类AdminAdminClient的超类)被注释为Evolving,可能会在未来的版本中进行更改。

英文:

The method createTopics returns a CreateTopicsResult with KafkaFutures as values. As you are currently not blocking your code for this action to be finished (using get) and not catching any Exception, your code will just run fine without any notification that the broker is not available.

The following code will throw an ExecutionException when your broker is not available:

final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, &quot;1000&quot;);
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, &quot;5000&quot;);

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic(&quot;events-test&quot;, 1, (short) 1);
CreateTopicsResult topicResult = adminClient.createTopics(Arrays.asList(newTopic));
KafkaFuture&lt;Void&gt; resultFuture = topicResult.all();
try {
    resultFuture.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

I tested it using Kafka client 2.5.0 and here is the Excpetion:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at org.michael.big.data.kafka.java.BasicAdminClient.main(BasicAdminClient.java:27)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

Be aware that the class Admin (the super class of AdminClient) is annotated as Evolving and might be changed in future releases.

huangapple
  • 本文由 发表于 2020年9月26日 19:56:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/64077406.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定