Using Schema Registry from Confluent with Avro and Kafka in Spring Boot Applications

huangapple go评论38阅读模式
英文:

Using Schema Registry from Confluent with Avro and Kafka in Spring Boot Applications

问题

首先,我必须说我对Confluent不太熟悉。

我正在按照这个教程进行操作:https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/,但我卡住了。

我无法为Kafka创建消费者,因为我收到了错误消息:io.confluent.common.config.ConfigException: 缺少必需的配置 "schema.registry.url",它没有默认值。

我在yml配置中找不到这个模式属性。

Confluent正在本地运行:

$: confluent local start
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

在我在Spring中设置用户主题之后,从控制中心我看到了一个不同的模式:

{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}

这是我的文件:

user.avro

{"namespace": "com.example.demo.model",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "age",  "type": "int"}
 ]
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 其他依赖 -->

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.2.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>5.3.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <repositories>
        <!-- 其他Maven仓库 -->
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.10.0</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

DemoApplication.java

package com.example.demo;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class DemoApplication {

    @Value("${topic.name}")
    private String topicName;

    @Value("${topic.partitions-num}")
    private Integer partitions;

    @Value("${topic.replication-factor}")
    private short replicationFactor;

    @Bean
    NewTopic moviesTopic() {
        return new NewTopic(topicName, partitions, replicationFactor);
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

Consumer.java

package com.example.demo.kafka;

import com.example.demo.model.User;
import lombok.extern.apachecommons.CommonsLog;
import org.apache.kafka.clients.consumer

<details>
<summary>英文:</summary>

First of all, I must say I&#39;m not familiar with confluent.

I was following this tutorial: https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/ and I got stuck.

I couldn&#39;t create the consumer for Kafka because I&#39;ve received an error: io.confluent.common.config.ConfigException: Missing required configuration &quot;schema.registry.url&quot; which has no default value.

I couldn&#39;t find this schema property in yml config.

The confluent is running locally:

$: confluent local start
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]


After I setup users topic in Spring, from control-center I see a different schema:

{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "users",
"namespace": "ksql",
"type": "record"
}


These are my files:
user.avro

{"namespace": "com.example.demo.model",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}


pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd&quot;>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>

&lt;properties&gt;
&lt;java.version&gt;11&lt;/java.version&gt;
&lt;/properties&gt;
&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.projectlombok&lt;/groupId&gt;
&lt;artifactId&gt;lombok&lt;/artifactId&gt;
&lt;optional&gt;true&lt;/optional&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-starter-test&lt;/artifactId&gt;
&lt;scope&gt;test&lt;/scope&gt;
&lt;exclusions&gt;
&lt;exclusion&gt;
&lt;groupId&gt;org.junit.vintage&lt;/groupId&gt;
&lt;artifactId&gt;junit-vintage-engine&lt;/artifactId&gt;
&lt;/exclusion&gt;
&lt;/exclusions&gt;
&lt;/dependency&gt;

<!-- <dependency>-->
<!-- <groupId>org.apache.avro</groupId>-->
<!-- <artifactId>avro</artifactId>-->
<!-- <version>1.10.0</version>-->
<!-- </dependency>-->

    &lt;!-- other dependencies --&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
&lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;io.confluent&lt;/groupId&gt;
&lt;artifactId&gt;kafka-schema-registry-client&lt;/artifactId&gt;
&lt;version&gt;5.3.0&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.avro&lt;/groupId&gt;
&lt;artifactId&gt;avro&lt;/artifactId&gt;
&lt;version&gt;1.10.0&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;io.confluent&lt;/groupId&gt;
&lt;artifactId&gt;kafka-avro-serializer&lt;/artifactId&gt;
&lt;version&gt;5.2.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;io.confluent&lt;/groupId&gt;
&lt;artifactId&gt;kafka-streams-avro-serde&lt;/artifactId&gt;
&lt;version&gt;5.3.0&lt;/version&gt;
&lt;exclusions&gt;
&lt;exclusion&gt;
&lt;groupId&gt;org.slf4j&lt;/groupId&gt;
&lt;artifactId&gt;slf4j-log4j12&lt;/artifactId&gt;
&lt;/exclusion&gt;
&lt;/exclusions&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;
&lt;repositories&gt;
&lt;!-- other maven repositories the project --&gt;
&lt;repository&gt;
&lt;id&gt;confluent&lt;/id&gt;
&lt;url&gt;https://packages.confluent.io/maven/&lt;/url&gt;
&lt;/repository&gt;
&lt;/repositories&gt;
&lt;build&gt;
&lt;plugins&gt;
&lt;plugin&gt;
&lt;groupId&gt;org.apache.avro&lt;/groupId&gt;
&lt;artifactId&gt;avro-maven-plugin&lt;/artifactId&gt;
&lt;version&gt;1.10.0&lt;/version&gt;
&lt;executions&gt;
&lt;execution&gt;
&lt;phase&gt;generate-sources&lt;/phase&gt;
&lt;goals&gt;
&lt;goal&gt;schema&lt;/goal&gt;
&lt;/goals&gt;
&lt;configuration&gt;
&lt;sourceDirectory&gt;${project.basedir}/src/main/avro/&lt;/sourceDirectory&gt;
&lt;outputDirectory&gt;${project.basedir}/src/main/java/&lt;/outputDirectory&gt;
&lt;/configuration&gt;
&lt;/execution&gt;
&lt;/executions&gt;
&lt;/plugin&gt;
&lt;plugin&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-maven-plugin&lt;/artifactId&gt;
&lt;/plugin&gt;
&lt;/plugins&gt;
&lt;/build&gt;

</project>


DemoApplication.java

package com.example.demo;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class DemoApplication {

@Value(&quot;${topic.name}&quot;)
private String topicName;
@Value(&quot;${topic.partitions-num}&quot;)
private Integer partitions;
@Value(&quot;${topic.replication-factor}&quot;)
private short replicationFactor;
@Bean
NewTopic moviesTopic() {
return new NewTopic(topicName, partitions, replicationFactor);
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

}


Consumer.java

package com.example.demo.kafka;

import com.example.demo.model.User;
import lombok.extern.apachecommons.CommonsLog;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@CommonsLog(topic = "Consumer Logger")
public class Consumer {

@Value(&quot;${topic.name}&quot;)
private String topicName;
@KafkaListener(topics = &quot;users&quot;, groupId = &quot;group_id&quot;)
public void consume(ConsumerRecord&lt;String, User&gt; record) {
log.info(String.format(&quot;Consumed message -&gt; %s&quot;, record.value()));
}

}


KafkaController

package com.example.demo.kafka;

import com.example.demo.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/user")
public class KafkaController {

private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = &quot;/publish&quot;)
public void sendMessageToKafkaTopic(@RequestParam(&quot;name&quot;) String name, @RequestParam(&quot;age&quot;) Integer age) {
this.producer.sendMessage(new User(name, age));
}

}


Producer.java

package com.example.demo.kafka;

import com.example.demo.model.User;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@CommonsLog(topic = "Producer Logger")
public class Producer {

@Value(&quot;${topic.name}&quot;)
private String TOPIC;
private final KafkaTemplate&lt;String, User&gt; kafkaTemplate;
@Autowired
public Producer(KafkaTemplate&lt;String, User&gt; kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
void sendMessage(User user) {
this.kafkaTemplate.send(this.TOPIC, user.getName().toString(), user);
log.info(String.format(&quot;Produced user -&gt; %s&quot;, user));
}

}


application.yml

server:
port: 8080
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bootstrap-servers: localhost:9092

topic:
name: users
partitions-num: 1
replication-factor: 1


</details>
# 答案1
**得分**: 2
将字符串映射添加为 `properties`
```yml
spring:
kafka:
bootstrap-servers: localhost:9092
# 在相应的客户端中设置Confluent设置
producer:
value-serializer: io.confluent.kafka...
consumer:
value-deserializer: io.confluent.kafka...
properties:
# 如果未提供,默认的模式注册表URL是localhost:8081
"[schema.registry.url]": http://localhost:8081

文档 - https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

英文:

Add the string mapping as properties

spring:
  kafka:
    bootstrap-servers: localhost:9092
    # Setup Confluent Settings in respective client
    producer:
      value-serializer: io.confluent.kafka...
    consumer:
      value-deserializer: io.confluent.kafka...
    properties:
      # default url for schema registry is localhost:8081 if it is not supplied
      &quot;[schema.registry.url]&quot;: http://localhost:8081

Docs - https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

huangapple
  • 本文由 发表于 2020年7月23日 07:44:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/63044704.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定