Testcontainers SchemaRegistry 无法连接到 Kafka 容器。

huangapple go评论79阅读模式
英文:

Testcontainers SchemaRegistry can't connect to Kafka container

问题

I want to run integration tests that test my kafka listener and avro serialization. this requires a Kafka and a Schema registry (transitively also a Zookeeper).

When testing, I currently have to use a docker-compose.yml, but I want to reduce user error by building the required containers via testcontainers. The Kafka and Zookeeper instances get started neatly and seem to work just fine - my application can create the required topics, and the listener is subscribed as well. I can even send messages via the Kafka console producer.

What does not work is the Schema Registry. The container starts, apparently connects to the Zookeeper but cannot establish a connection to the broker. It retries connecting for some time until it times out and subsequently the container is stopped. I therefore cannot register and read my avro schematas for (De-)Serialization in my test, which fail because of this.

I can't find the reason why the Schema Registry can apparently connect to the Zookeeper but can't find my broker.

Did someone run into this problem as well? Did you manage to get this running? If so, how so? I need Kafka and the Schema Registry test containers to be fully available for my tests, so omitting any of them is not an option.

I could also keep using the docker-compose.yml but I would really like to set up my test environment fully programmatically.

The Schema Registry container logs the following:

2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO AdminClientConfig values:
/*  Omitted for brevity  */
(org.apache.kafka.clients.admin.AdminClientConfig)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka version: 7.3.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka commitId: 8628b0341c3c4676 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka startTimeMs: 1675871770281 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,308] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:10 [2023-02-08 15:56:10,313] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
/*  These lines repeat a few times until the container times out and exits.  */
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,298] ERROR Error while getting broker list. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:50 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:50     at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
2023-02-08 16:56:50     at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
2023-02-08 16:56:50     at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
2023-02-08 16:56:50     at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
2023-02-08 16:56:50     at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2023-02-08 16:56:50 Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] INFO Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ... (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] ERROR Expected 1 brokers but found only 0. Brokers found []. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 Using log4j config /etc/schema-registry/log4j.properties

My base test class. ITs that need Kafka extend this class:

@Testcontainers
@SpringBootTest
@Slf4j
public class AbstractIT {

  private

<details>
<summary>英文:</summary>

I want to run integration tests that test my kafka listener and avro serialization. this requires a Kafka and a Schema regsitry (transitively also a Zookeeper). 

When testing I currently have to a docker-compose.yml, but I want to reduce user error by building the required containers via testcontainers. The Kafka and Zookeeper instances get started neatly and seem to work just fine - my application can create the required topics and the listener is subscribed as well, I can even send messages via kafka console producer. 


What does not work is the SchemaRegistry. The container starts, apparently connects to the ZK but cannot establish a connection to the broker. It retries connecting for some time until it times out and subsequently the container is stopped. I therefore cannot register and read my avro schematas for (De-)Serialization in my test which fail because of this.

I can&#39;t find the reason why the SR can apparently connect to the ZK but cant find my broker.

Did someone run into this problem as well? Did you manage to get this running? If so, how so?
I need Kafka and the Schema Registry testcontainers to be fully available for my tests, so omitting any of them is not an option. 

I could also keep using the docker-compose.yml but I would really like to setup my test environment fully programmatically.

The schema registry container logs the following:

2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO AdminClientConfig values:
/* Omitted for brevity /
(org.apache.kafka.clients.admin.AdminClientConfig)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka version: 7.3.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka commitId: 8628b0341c3c4676 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka startTimeMs: 1675871770281 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,308] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:10 [2023-02-08 15:56:10,313] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
/
These lines repeat a few times until the container times out and exits. */
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,298] ERROR Error while getting broker list. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:50 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:50 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
2023-02-08 16:56:50 at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
2023-02-08 16:56:50 at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
2023-02-08 16:56:50 at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
2023-02-08 16:56:50 at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2023-02-08 16:56:50 Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] INFO Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ... (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] ERROR Expected 1 brokers but found only 0. Brokers found []. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 Using log4j config /etc/schema-registry/log4j.properties


My base test class. ITs that need Kafka extend this class

@Testcontainers
@SpringBootTest
@Slf4j
public class AbstractIT {

private static final Network network = Network.newNetwork();

protected static GenericContainer ZOOKEEPER = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-zookeeper:7.2.0"))
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv(Map.of(
"ZOOKEEPER_CLIENT_PORT", "2181",
"ZOOKEEPER_TICK_TIME", "2000"));

protected static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka"))
.withExternalZookeeper("zookeeper:2181")
.dependsOn(ZOOKEEPER)
.withNetwork(network)
.withNetworkAliases("broker");

protected static final GenericContainer SCHEMAREGSISTRY = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-schema-registry"))
.dependsOn(ZOOKEEPER, KAFKA)
.withEnv(Map.of(
"SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL", "zookeeper:2181",
"SCHEMA_REGISTRY_HOST_NAME", "schemaregistry",
"SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085",
"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092"))
.withNetwork(network)
.withNetworkAliases("schemaregistry");

@DynamicPropertySource
static void registerPgProperties(DynamicPropertyRegistry registry) {
registry.add("bootstrap.servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
registry.add("spring.data.mongodb.uri", MONGODB::getConnectionString);
registry.add("spring.data.mongodb.database", () ->"test");
}

//container startup, shutdown as well as topic creation omitted for brevity

}


My docker-compose.yml that I want to replicate with testcontainers

version: "3.5"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:7.2.0
hostname: broker
container_name: broker
restart: always
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_SCHEMA_REGISTRY_URL: "schemaregistry:8085"

schemaregistry:
container_name: schemaregistry
hostname: schemaregistry
image: confluentinc/cp-schema-registry:5.1.2
restart: always
depends_on:
- zookeeper
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
ports:
- "8085:8085"
volumes:
- "./src/main/avro/:/etc/schema"


</details>


# 答案1
**得分**: 5

以下是代码部分的翻译:

**Schema Registry**
```java
public class SchemaRegistryContainer extends GenericContainer&lt;SchemaRegistryContainer&gt; {
	public static final String SCHEMA_REGISTRY_IMAGE =
			&quot;confluentinc/cp-schema-registry&quot;;
	public static final int SCHEMA_REGISTRY_PORT = 8081;

	public SchemaRegistryContainer() {
		this(CONFLUENT_PLATFORM_VERSION);
	}

	public SchemaRegistryContainer(String version) {
		super(SCHEMA_REGISTRY_IMAGE + &quot;:&quot; + version);

		waitingFor(Wait.forHttp(&quot;/subjects&quot;).forStatusCode(200));
		withExposedPorts(SCHEMA_REGISTRY_PORT);
	}

	public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
		return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + &quot;:9092&quot;);
	}

	public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
		withNetwork(network);
		withEnv(&quot;SCHEMA_REGISTRY_HOST_NAME&quot;, &quot;schema-registry&quot;);
		withEnv(&quot;SCHEMA_REGISTRY_LISTENERS&quot;, &quot;http://0.0.0.0:8081&quot;);
		withEnv(&quot;SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS&quot;, &quot;PLAINTEXT://&quot; + bootstrapServers);
		return self();
	}
}

Kafka + Schema Registry

public static final String CONFLUENT_PLATFORM_VERSION = &quot;5.5.1&quot;;

private static final Network KAFKA_NETWORK = Network.newNetwork();
private static final DockerImageName KAFKA_IMAGE = DockerImageName.parse(&quot;confluentinc/cp-kafka&quot;)
	.withTag(CONFLUENT_PLATFORM_VERSION);
private static final KafkaContainer KAFKA = new KafkaContainer(KAFKA_IMAGE)
	.withNetwork(KAFKA_NETWORK)
	.withEnv(&quot;KAFKA_TRANSACTION_STATE_LOG_MIN_ISR&quot;, &quot;1&quot;)
	.withEnv(&quot;KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR&quot;, &quot;1&quot;);

private static final SchemaRegistryContainer SCHEMA_REGISTRY =
		new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION);
    
@BeforeAll
static void startKafkaContainer() {
	KAFKA.start();
	SCHEMA_REGISTRY.withKafka(KAFKA).start();

    // 初始化用于消费者或生产者的 Kafka 属性
    ....
    kafkaProperties.setBootstrapServers(KAFKA.getBootstrapServers());
    kafkaProperties.setSchemaRegistryUrl(&quot;http://&quot; + SCHEMA_REGISTRY.getHost() + &quot;:&quot; + SCHEMA_REGISTRY.getFirstMappedPort());
}

希望这有所帮助。如果您需要进一步的翻译或有其他问题,请随时提问。

英文:

Here is working setup of Kafka & Schema Registry for Testcontainers. It could help to find the issue in your setup

Schema Registry

public class SchemaRegistryContainer extends GenericContainer&lt;SchemaRegistryContainer&gt; {
	public static final String SCHEMA_REGISTRY_IMAGE =
			&quot;confluentinc/cp-schema-registry&quot;;
	public static final int SCHEMA_REGISTRY_PORT = 8081;

	public SchemaRegistryContainer() {
		this(CONFLUENT_PLATFORM_VERSION);
	}

	public SchemaRegistryContainer(String version) {
		super(SCHEMA_REGISTRY_IMAGE + &quot;:&quot; + version);

		waitingFor(Wait.forHttp(&quot;/subjects&quot;).forStatusCode(200));
		withExposedPorts(SCHEMA_REGISTRY_PORT);
	}

	public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
		return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + &quot;:9092&quot;);
	}

	public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
		withNetwork(network);
		withEnv(&quot;SCHEMA_REGISTRY_HOST_NAME&quot;, &quot;schema-registry&quot;);
		withEnv(&quot;SCHEMA_REGISTRY_LISTENERS&quot;, &quot;http://0.0.0.0:8081&quot;);
		withEnv(&quot;SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS&quot;, &quot;PLAINTEXT://&quot; + bootstrapServers);
		return self();
	}
}

Kafka + Schema Registry

    public static final String CONFLUENT_PLATFORM_VERSION = &quot;5.5.1&quot;;

	private static final Network KAFKA_NETWORK = Network.newNetwork();
	private static final DockerImageName KAFKA_IMAGE = DockerImageName.parse(&quot;confluentinc/cp-kafka&quot;)
			.withTag(CONFLUENT_PLATFORM_VERSION);
	private static final KafkaContainer KAFKA = new KafkaContainer(KAFKA_IMAGE)
			.withNetwork(KAFKA_NETWORK)
			.withEnv(&quot;KAFKA_TRANSACTION_STATE_LOG_MIN_ISR&quot;, &quot;1&quot;)
			.withEnv(&quot;KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR&quot;, &quot;1&quot;);

	private static final SchemaRegistryContainer SCHEMA_REGISTRY =
			new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION);
    
    @BeforeAll
	static void startKafkaContainer() {
		KAFKA.start();
		SCHEMA_REGISTRY.withKafka(KAFKA).start();

        // init kafka properties for consumer or producer
        ....
        kafkaProperties.setBootstrapServers(KAFKA.getBootstrapServers());
		kafkaProperties.setSchemaRegistryUrl(&quot;http://&quot; + SCHEMA_REGISTRY.getHost() + &quot;:&quot; + SCHEMA_REGISTRY.getFirstMappedPort());
	}

huangapple
  • 本文由 发表于 2023年2月9日 00:14:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/75388650.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定