Spring Boot与Spring Kafka的Rest API

huangapple go评论77阅读模式
英文:

Spring boot Rest api with Spring Kafka

问题

我已经设计了一个Spring Boot REST API的ADD和GET方法。

@RestController("ProductV1Controller")
public class ProductController {

    private final IProductProducer _productProducer;
    
    public ProductController(IProductProducer productProducer) {
        _productProducer = productProducer;
    }

    @PostMapping()
    void AddProduct(@Valid @RequestBody ProductViewModel product) {
        _productProducer.AddProduct(product);
    }

    @GetMapping()
    List<ProductViewModel> Products() {
        List<ProductViewModel> test = _productProducer.GetProducts();
        return _productProducer.GetProducts();
    }
}

服务层:

@Service
public class ProductProducer implements IProductProducer {

    private final KafkaTemplate<String, Object> _template;

    public ProductProducer(KafkaTemplate<String, Object> _template) {
        this._template = _template;
    }

    @Override
    public List<ProductViewModel> GetProducts() {
        this._template.send(ProductTopicConstants.GET_PRODUCTS, null);
        return List.of(new ProductViewModel("", "", 0, "")); // 需要从Kafka返回值
    }

    @Override
    public void AddProduct(ProductViewModel product) {
        this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
    }

}

Kafka监听器:

@KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
public List<Product> GetProducts() {
    return _productRepository.findAll();
}

在服务层的GetProducts()方法中,我需要返回来自_productRepository.findAll()的项目列表。

关于使用Spring Kafka进行REST API的最佳方法是什么?

英文:

I have designed a Spring boot REST API ADD and GET method

    @RestController(&quot;ProductV1Controller&quot;)
    public class ProductController 
     {

         private final IProductProducer _productProducer;
         public ProductController(IProductProducer productProducer) {
        _productProducer = productProducer;}

         @PostMapping()
            void AddProduct(@Valid @RequestBody ProductViewModel product) {
                _productProducer.AddProduct(product);
            }
        
        @GetMapping()
            List&lt;ProductViewModel&gt; Products() {
                var test = _productProducer.GetProducts();
                return _productProducer.GetProducts();
            }
}

Service layer

@Service

    public class ProductProducer implements IProductProducer{
        private final KafkaTemplate&lt;String, Object&gt; _template;
    
        public ProductProducer(KafkaTemplate&lt;String, Object&gt; _template) {
            this._template = _template;
        }
    
        @Override
        public List&lt;ProductViewModel&gt; GetProducts() {
            this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
            return List.of(new ProductViewModel(&quot;&quot;,&quot;&quot;,0,&quot;&quot;)); --&gt; Need to return the value from the kafka
        }
    
        @Override
        public void AddProduct(ProductViewModel product) {
            this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
        }
       
    }

Kafka Listener

 @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
    public List&lt;Product&gt; GetProducts() {
        return _productRepository.findAll();
    }

In the service layer GetProducts() I need to return the list of the project that is coming from the _productRepository.findAll();

What is the best approach to do the REST API with Spring kafka.

答案1

得分: 2

你需要使用ReplyingKafkaTemplate来将结果返回给REST控制器。

详见ReplyingKafkaTemplate

版本2.1.3引入了KafkaTemplate的子类,用于提供请求/回复语义。该类名为ReplyingKafkaTemplate,除了父类中的方法之外还有一个方法。

结果是一个ListenableFuture,异步填充结果(或超时的异常)。结果还有一个sendFuture属性,这是调用KafkaTemplate.send()的结果。您可以使用此future来确定发送操作的结果。

文档中有一个示例。

编辑

@SpringBootApplication
@RestController
public class So63058608Application {

	private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

	public static void main(String[] args) {
		SpringApplication.run(So63058608Application.class, args);
	}

	@Autowired
	private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;

	@GetMapping(path = "/get")
	public List<String> getThem() throws Exception {
		RequestReplyFuture<String, String, List<String>> future =
				this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
		LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
		return future.get(10, TimeUnit.SECONDS).value();
	}

	@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
	@SendTo
	public List<String> returnList(@Payload(required = false) String payload) {
		return new ArrayList<>(List.of("foo", "bar", "baz"));
	}

	@Bean
	public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
			ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {

		containerFactory.setReplyTemplate(kafkaTemplate(pf));
		ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
		ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
		return replyer;
	}

	@Bean
	public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
			ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {

		ConcurrentMessageListenerContainer<String, List<String>> container =
				containerFactory.createContainer("so63058608-2");
		container.getContainerProperties().setGroupId("so63058608-2");
		container.setBatchErrorHandler(new BatchLoggingErrorHandler());
		return container;
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
		return new KafkaTemplate<>(pf);
	}

	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
	}

	@Bean
	public NewTopic topic3() {
		return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
	}

}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get
["foo","bar","baz"]

编辑2

还有一个返回某个对象列表的示例...

@SpringBootApplication
@RestController
public class So63058608Application {

	private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

	public static void main(String[] args) {
		SpringApplication.run(So63058608Application.class, args);
	}

	@Autowired
	private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;

	@GetMapping(path = "/get")
	public List<Foo> getThem() throws Exception {
		RequestReplyFuture<String, String, List<Foo>> future =
				this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
		LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
		List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
		LOG.info(result.toString());
		return result;
	}

	@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
	@SendTo
	public List<Foo> returnList(@Payload(required = false) String payload) {
		return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
	}

	@Bean
	public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
			ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {

		containerFactory.setReplyTemplate(kafkaTemplate(pf));
		ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
		ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
		return replyer;
	}

	@Bean
	public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
			ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {

		ConcurrentMessageListenerContainer<String, List<Foo>> container =
				containerFactory.createContainer("so63058608-2");
		container.getContainerProperties().setGroupId("so63058608-2");
		container.setBatchErrorHandler(new BatchLoggingErrorHandler());
		return container;
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
		return new KafkaTemplate<>(pf);
	}

	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
	}

	@Bean
	public NewTopic topic3() {
		return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
	}

	public static JavaType returnType(byte[] data, Headers headers) {
		return TypeFactory.defaultInstance()
				.constructCollectionLikeType(List.class, Foo.class);
	}

}

class Foo {

	private String bar;

	public Foo() {
	}

	public Foo(String bar) {
		this.bar = bar;
	}

	public String getBar() {
		return this.bar;
	}

	public void setBar(String bar) {
		this.bar = bar;
	}

	@Override
	public String toString() {
		return "Foo [bar=" + this.bar + "]";
	}

}
英文:

You need to use a ReplyingKafkaTemplate to return a result to the rest controller.

See ReplyingKafkaTemplate.

>Version 2.1.3 introduced a subclass of KafkaTemplate to provide request/reply semantics. The class is named ReplyingKafkaTemplate and has one method (in addition to those in the superclass).

>The result is a ListenableFuture that is asynchronously populated with the result (or an exception, for a timeout). The result also has a sendFuture property, which is the result of calling KafkaTemplate.send(). You can use this future to determine the result of the send operation.

The documentation has an example.

EDIT

@SpringBootApplication
@RestController
public class So63058608Application {

	private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

	public static void main(String[] args) {
		SpringApplication.run(So63058608Application.class, args);
	}

	@Autowired
	private ReplyingKafkaTemplate&lt;String, String, List&lt;String&gt;&gt; replyTemplate;

	@GetMapping(path = &quot;/get&quot;)
	public List&lt;String&gt; getThem() throws Exception {
		RequestReplyFuture&lt;String, String, List&lt;String&gt;&gt; future =
				this.replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(&quot;so63058608-1&quot;, 0, null, null));
		LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
		return future.get(10, TimeUnit.SECONDS).value();
	}

	@KafkaListener(id = &quot;so63058608-1&quot;, topics = &quot;so63058608-1&quot;, splitIterables = false)
	@SendTo
	public List&lt;String&gt; returnList(@Payload(required = false) String payload) {
		return new ArrayList&lt;&gt;(List.of(&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;));
	}

	@Bean
	public ReplyingKafkaTemplate&lt;String, String, List&lt;String&gt;&gt; replyer(ProducerFactory&lt;String, String&gt; pf,
			ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;String&gt;&gt; containerFactory) {

		containerFactory.setReplyTemplate(kafkaTemplate(pf));
		ConcurrentMessageListenerContainer&lt;String, List&lt;String&gt;&gt; container = replyContainer(containerFactory);
		ReplyingKafkaTemplate&lt;String, String, List&lt;String&gt;&gt; replyer = new ReplyingKafkaTemplate&lt;&gt;(pf, container);
		return replyer;
	}

	@Bean
	public ConcurrentMessageListenerContainer&lt;String, List&lt;String&gt;&gt; replyContainer(
			ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;String&gt;&gt; containerFactory) {

		ConcurrentMessageListenerContainer&lt;String, List&lt;String&gt;&gt; container =
				containerFactory.createContainer(&quot;so63058608-2&quot;);
		container.getContainerProperties().setGroupId(&quot;so63058608-2&quot;);
		container.setBatchErrorHandler(new BatchLoggingErrorHandler());
		return container;
	}

	@Bean
	public KafkaTemplate&lt;String, String&gt; kafkaTemplate(ProducerFactory&lt;String, String&gt; pf) {
		return new KafkaTemplate&lt;&gt;(pf);
	}

	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name(&quot;so63058608-1&quot;).partitions(1).replicas(1).build();
	}

	@Bean
	public NewTopic topic3() {
		return TopicBuilder.name(&quot;so63058608-2&quot;).partitions(1).replicas(1).build();
	}

}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get
[&quot;foo&quot;,&quot;bar&quot;,&quot;baz&quot;]

EDIT2

And with a list of some object returned...

@SpringBootApplication
@RestController
public class So63058608Application {

	private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

	public static void main(String[] args) {
		SpringApplication.run(So63058608Application.class, args);
	}

	@Autowired
	private ReplyingKafkaTemplate&lt;String, String, List&lt;Foo&gt;&gt; replyTemplate;

	@GetMapping(path = &quot;/get&quot;)
	public List&lt;Foo&gt; getThem() throws Exception {
		RequestReplyFuture&lt;String, String, List&lt;Foo&gt;&gt; future =
				this.replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(&quot;so63058608-1&quot;, 0, null, null));
		LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
		List&lt;Foo&gt; result = future.get(10, TimeUnit.SECONDS).value();
		LOG.info(result.toString());
		return result;
	}

	@KafkaListener(id = &quot;so63058608-1&quot;, topics = &quot;so63058608-1&quot;, splitIterables = false)
	@SendTo
	public List&lt;Foo&gt; returnList(@Payload(required = false) String payload) {
		return new ArrayList&lt;&gt;(List.of(new Foo(&quot;foo&quot;), new Foo(&quot;bar&quot;), new Foo(&quot;baz&quot;)));
	}

	@Bean
	public ReplyingKafkaTemplate&lt;String, String, List&lt;Foo&gt;&gt; replyer(ProducerFactory&lt;String, String&gt; pf,
			ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;Foo&gt;&gt; containerFactory) {

		containerFactory.setReplyTemplate(kafkaTemplate(pf));
		ConcurrentMessageListenerContainer&lt;String, List&lt;Foo&gt;&gt; container = replyContainer(containerFactory);
		ReplyingKafkaTemplate&lt;String, String, List&lt;Foo&gt;&gt; replyer = new ReplyingKafkaTemplate&lt;&gt;(pf, container);
		return replyer;
	}

	@Bean
	public ConcurrentMessageListenerContainer&lt;String, List&lt;Foo&gt;&gt; replyContainer(
			ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;Foo&gt;&gt; containerFactory) {

		ConcurrentMessageListenerContainer&lt;String, List&lt;Foo&gt;&gt; container =
				containerFactory.createContainer(&quot;so63058608-2&quot;);
		container.getContainerProperties().setGroupId(&quot;so63058608-2&quot;);
		container.setBatchErrorHandler(new BatchLoggingErrorHandler());
		return container;
	}

	@Bean
	public KafkaTemplate&lt;String, String&gt; kafkaTemplate(ProducerFactory&lt;String, String&gt; pf) {
		return new KafkaTemplate&lt;&gt;(pf);
	}

	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name(&quot;so63058608-1&quot;).partitions(1).replicas(1).build();
	}

	@Bean
	public NewTopic topic3() {
		return TopicBuilder.name(&quot;so63058608-2&quot;).partitions(1).replicas(1).build();
	}

	public static JavaType returnType(byte[] data, Headers headers) {
		return TypeFactory.defaultInstance()
				.constructCollectionLikeType(List.class, Foo.class);
	}

}

class Foo {

	private String bar;

	public Foo() {
	}

	public Foo(String bar) {
		this.bar = bar;
	}

	public String getBar() {
		return this.bar;
	}

	public void setBar(String bar) {
		this.bar = bar;
	}

	@Override
	public String toString() {
		return &quot;Foo [bar=&quot; + this.bar + &quot;]&quot;;
	}

}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]

huangapple
  • 本文由 发表于 2020年7月24日 00:10:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/63058608.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定