Spring Boot与Spring Kafka的Rest API

huangapple go评论106阅读模式
英文:

Spring boot Rest api with Spring Kafka

问题

我已经设计了一个Spring Boot REST API的ADD和GET方法。

  1. @RestController("ProductV1Controller")
  2. public class ProductController {
  3. private final IProductProducer _productProducer;
  4. public ProductController(IProductProducer productProducer) {
  5. _productProducer = productProducer;
  6. }
  7. @PostMapping()
  8. void AddProduct(@Valid @RequestBody ProductViewModel product) {
  9. _productProducer.AddProduct(product);
  10. }
  11. @GetMapping()
  12. List<ProductViewModel> Products() {
  13. List<ProductViewModel> test = _productProducer.GetProducts();
  14. return _productProducer.GetProducts();
  15. }
  16. }

服务层:

  1. @Service
  2. public class ProductProducer implements IProductProducer {
  3. private final KafkaTemplate<String, Object> _template;
  4. public ProductProducer(KafkaTemplate<String, Object> _template) {
  5. this._template = _template;
  6. }
  7. @Override
  8. public List<ProductViewModel> GetProducts() {
  9. this._template.send(ProductTopicConstants.GET_PRODUCTS, null);
  10. return List.of(new ProductViewModel("", "", 0, "")); // 需要从Kafka返回值
  11. }
  12. @Override
  13. public void AddProduct(ProductViewModel product) {
  14. this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
  15. }
  16. }

Kafka监听器:

  1. @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
  2. public List<Product> GetProducts() {
  3. return _productRepository.findAll();
  4. }

在服务层的GetProducts()方法中,我需要返回来自_productRepository.findAll()的项目列表。

关于使用Spring Kafka进行REST API的最佳方法是什么?

英文:

I have designed a Spring boot REST API ADD and GET method

  1. @RestController(&quot;ProductV1Controller&quot;)
  2. public class ProductController
  3. {
  4. private final IProductProducer _productProducer;
  5. public ProductController(IProductProducer productProducer) {
  6. _productProducer = productProducer;}
  7. @PostMapping()
  8. void AddProduct(@Valid @RequestBody ProductViewModel product) {
  9. _productProducer.AddProduct(product);
  10. }
  11. @GetMapping()
  12. List&lt;ProductViewModel&gt; Products() {
  13. var test = _productProducer.GetProducts();
  14. return _productProducer.GetProducts();
  15. }
  16. }

Service layer

  1. @Service
  2. public class ProductProducer implements IProductProducer{
  3. private final KafkaTemplate&lt;String, Object&gt; _template;
  4. public ProductProducer(KafkaTemplate&lt;String, Object&gt; _template) {
  5. this._template = _template;
  6. }
  7. @Override
  8. public List&lt;ProductViewModel&gt; GetProducts() {
  9. this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
  10. return List.of(new ProductViewModel(&quot;&quot;,&quot;&quot;,0,&quot;&quot;)); --&gt; Need to return the value from the kafka
  11. }
  12. @Override
  13. public void AddProduct(ProductViewModel product) {
  14. this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
  15. }
  16. }

Kafka Listener

  1. @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
  2. public List&lt;Product&gt; GetProducts() {
  3. return _productRepository.findAll();
  4. }

In the service layer GetProducts() I need to return the list of the project that is coming from the _productRepository.findAll();

What is the best approach to do the REST API with Spring kafka.

答案1

得分: 2

你需要使用ReplyingKafkaTemplate来将结果返回给REST控制器。

详见ReplyingKafkaTemplate

版本2.1.3引入了KafkaTemplate的子类,用于提供请求/回复语义。该类名为ReplyingKafkaTemplate,除了父类中的方法之外还有一个方法。

结果是一个ListenableFuture,异步填充结果(或超时的异常)。结果还有一个sendFuture属性,这是调用KafkaTemplate.send()的结果。您可以使用此future来确定发送操作的结果。

文档中有一个示例。

编辑

  1. @SpringBootApplication
  2. @RestController
  3. public class So63058608Application {
  4. private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
  5. public static void main(String[] args) {
  6. SpringApplication.run(So63058608Application.class, args);
  7. }
  8. @Autowired
  9. private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;
  10. @GetMapping(path = "/get")
  11. public List<String> getThem() throws Exception {
  12. RequestReplyFuture<String, String, List<String>> future =
  13. this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
  14. LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
  15. return future.get(10, TimeUnit.SECONDS).value();
  16. }
  17. @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
  18. @SendTo
  19. public List<String> returnList(@Payload(required = false) String payload) {
  20. return new ArrayList<>(List.of("foo", "bar", "baz"));
  21. }
  22. @Bean
  23. public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
  24. ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
  25. containerFactory.setReplyTemplate(kafkaTemplate(pf));
  26. ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
  27. ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
  28. return replyer;
  29. }
  30. @Bean
  31. public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
  32. ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
  33. ConcurrentMessageListenerContainer<String, List<String>> container =
  34. containerFactory.createContainer("so63058608-2");
  35. container.getContainerProperties().setGroupId("so63058608-2");
  36. container.setBatchErrorHandler(new BatchLoggingErrorHandler());
  37. return container;
  38. }
  39. @Bean
  40. public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
  41. return new KafkaTemplate<>(pf);
  42. }
  43. @Bean
  44. public NewTopic topic1() {
  45. return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
  46. }
  47. @Bean
  48. public NewTopic topic3() {
  49. return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
  50. }
  51. }
  1. spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  2. spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  3. spring.kafka.consumer.auto-offset-reset=earliest
  4. spring.kafka.consumer.properties.spring.json.trusted.packages=*
  5. spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  6. spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  1. $ curl localhost:8080/get
  2. ["foo","bar","baz"]

编辑2

还有一个返回某个对象列表的示例...

  1. @SpringBootApplication
  2. @RestController
  3. public class So63058608Application {
  4. private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
  5. public static void main(String[] args) {
  6. SpringApplication.run(So63058608Application.class, args);
  7. }
  8. @Autowired
  9. private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;
  10. @GetMapping(path = "/get")
  11. public List<Foo> getThem() throws Exception {
  12. RequestReplyFuture<String, String, List<Foo>> future =
  13. this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
  14. LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
  15. List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
  16. LOG.info(result.toString());
  17. return result;
  18. }
  19. @KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
  20. @SendTo
  21. public List<Foo> returnList(@Payload(required = false) String payload) {
  22. return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
  23. }
  24. @Bean
  25. public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
  26. ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
  27. containerFactory.setReplyTemplate(kafkaTemplate(pf));
  28. ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
  29. ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
  30. return replyer;
  31. }
  32. @Bean
  33. public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
  34. ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
  35. ConcurrentMessageListenerContainer<String, List<Foo>> container =
  36. containerFactory.createContainer("so63058608-2");
  37. container.getContainerProperties().setGroupId("so63058608-2");
  38. container.setBatchErrorHandler(new BatchLoggingErrorHandler());
  39. return container;
  40. }
  41. @Bean
  42. public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
  43. return new KafkaTemplate<>(pf);
  44. }
  45. @Bean
  46. public NewTopic topic1() {
  47. return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
  48. }
  49. @Bean
  50. public NewTopic topic3() {
  51. return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
  52. }
  53. public static JavaType returnType(byte[] data, Headers headers) {
  54. return TypeFactory.defaultInstance()
  55. .constructCollectionLikeType(List.class, Foo.class);
  56. }
  57. }
  58. class Foo {
  59. private String bar;
  60. public Foo() {
  61. }
  62. public Foo(String bar) {
  63. this.bar = bar;
  64. }
  65. public String getBar() {
  66. return this.bar;
  67. }
  68. public void setBar(String bar) {
  69. this.bar = bar;
  70. }
  71. @Override
  72. public String toString() {
  73. return "Foo [bar=" + this.bar + "]";
  74. }
  75. }
英文:

You need to use a ReplyingKafkaTemplate to return a result to the rest controller.

See ReplyingKafkaTemplate.

>Version 2.1.3 introduced a subclass of KafkaTemplate to provide request/reply semantics. The class is named ReplyingKafkaTemplate and has one method (in addition to those in the superclass).

>The result is a ListenableFuture that is asynchronously populated with the result (or an exception, for a timeout). The result also has a sendFuture property, which is the result of calling KafkaTemplate.send(). You can use this future to determine the result of the send operation.

The documentation has an example.

EDIT

  1. @SpringBootApplication
  2. @RestController
  3. public class So63058608Application {
  4. private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
  5. public static void main(String[] args) {
  6. SpringApplication.run(So63058608Application.class, args);
  7. }
  8. @Autowired
  9. private ReplyingKafkaTemplate&lt;String, String, List&lt;String&gt;&gt; replyTemplate;
  10. @GetMapping(path = &quot;/get&quot;)
  11. public List&lt;String&gt; getThem() throws Exception {
  12. RequestReplyFuture&lt;String, String, List&lt;String&gt;&gt; future =
  13. this.replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(&quot;so63058608-1&quot;, 0, null, null));
  14. LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
  15. return future.get(10, TimeUnit.SECONDS).value();
  16. }
  17. @KafkaListener(id = &quot;so63058608-1&quot;, topics = &quot;so63058608-1&quot;, splitIterables = false)
  18. @SendTo
  19. public List&lt;String&gt; returnList(@Payload(required = false) String payload) {
  20. return new ArrayList&lt;&gt;(List.of(&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;));
  21. }
  22. @Bean
  23. public ReplyingKafkaTemplate&lt;String, String, List&lt;String&gt;&gt; replyer(ProducerFactory&lt;String, String&gt; pf,
  24. ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;String&gt;&gt; containerFactory) {
  25. containerFactory.setReplyTemplate(kafkaTemplate(pf));
  26. ConcurrentMessageListenerContainer&lt;String, List&lt;String&gt;&gt; container = replyContainer(containerFactory);
  27. ReplyingKafkaTemplate&lt;String, String, List&lt;String&gt;&gt; replyer = new ReplyingKafkaTemplate&lt;&gt;(pf, container);
  28. return replyer;
  29. }
  30. @Bean
  31. public ConcurrentMessageListenerContainer&lt;String, List&lt;String&gt;&gt; replyContainer(
  32. ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;String&gt;&gt; containerFactory) {
  33. ConcurrentMessageListenerContainer&lt;String, List&lt;String&gt;&gt; container =
  34. containerFactory.createContainer(&quot;so63058608-2&quot;);
  35. container.getContainerProperties().setGroupId(&quot;so63058608-2&quot;);
  36. container.setBatchErrorHandler(new BatchLoggingErrorHandler());
  37. return container;
  38. }
  39. @Bean
  40. public KafkaTemplate&lt;String, String&gt; kafkaTemplate(ProducerFactory&lt;String, String&gt; pf) {
  41. return new KafkaTemplate&lt;&gt;(pf);
  42. }
  43. @Bean
  44. public NewTopic topic1() {
  45. return TopicBuilder.name(&quot;so63058608-1&quot;).partitions(1).replicas(1).build();
  46. }
  47. @Bean
  48. public NewTopic topic3() {
  49. return TopicBuilder.name(&quot;so63058608-2&quot;).partitions(1).replicas(1).build();
  50. }
  51. }
  1. spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  2. spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  3. spring.kafka.consumer.auto-offset-reset=earliest
  4. spring.kafka.consumer.properties.spring.json.trusted.packages=*
  5. spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  6. spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  1. $ curl localhost:8080/get
  2. [&quot;foo&quot;,&quot;bar&quot;,&quot;baz&quot;]

EDIT2

And with a list of some object returned...

  1. @SpringBootApplication
  2. @RestController
  3. public class So63058608Application {
  4. private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
  5. public static void main(String[] args) {
  6. SpringApplication.run(So63058608Application.class, args);
  7. }
  8. @Autowired
  9. private ReplyingKafkaTemplate&lt;String, String, List&lt;Foo&gt;&gt; replyTemplate;
  10. @GetMapping(path = &quot;/get&quot;)
  11. public List&lt;Foo&gt; getThem() throws Exception {
  12. RequestReplyFuture&lt;String, String, List&lt;Foo&gt;&gt; future =
  13. this.replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(&quot;so63058608-1&quot;, 0, null, null));
  14. LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
  15. List&lt;Foo&gt; result = future.get(10, TimeUnit.SECONDS).value();
  16. LOG.info(result.toString());
  17. return result;
  18. }
  19. @KafkaListener(id = &quot;so63058608-1&quot;, topics = &quot;so63058608-1&quot;, splitIterables = false)
  20. @SendTo
  21. public List&lt;Foo&gt; returnList(@Payload(required = false) String payload) {
  22. return new ArrayList&lt;&gt;(List.of(new Foo(&quot;foo&quot;), new Foo(&quot;bar&quot;), new Foo(&quot;baz&quot;)));
  23. }
  24. @Bean
  25. public ReplyingKafkaTemplate&lt;String, String, List&lt;Foo&gt;&gt; replyer(ProducerFactory&lt;String, String&gt; pf,
  26. ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;Foo&gt;&gt; containerFactory) {
  27. containerFactory.setReplyTemplate(kafkaTemplate(pf));
  28. ConcurrentMessageListenerContainer&lt;String, List&lt;Foo&gt;&gt; container = replyContainer(containerFactory);
  29. ReplyingKafkaTemplate&lt;String, String, List&lt;Foo&gt;&gt; replyer = new ReplyingKafkaTemplate&lt;&gt;(pf, container);
  30. return replyer;
  31. }
  32. @Bean
  33. public ConcurrentMessageListenerContainer&lt;String, List&lt;Foo&gt;&gt; replyContainer(
  34. ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;Foo&gt;&gt; containerFactory) {
  35. ConcurrentMessageListenerContainer&lt;String, List&lt;Foo&gt;&gt; container =
  36. containerFactory.createContainer(&quot;so63058608-2&quot;);
  37. container.getContainerProperties().setGroupId(&quot;so63058608-2&quot;);
  38. container.setBatchErrorHandler(new BatchLoggingErrorHandler());
  39. return container;
  40. }
  41. @Bean
  42. public KafkaTemplate&lt;String, String&gt; kafkaTemplate(ProducerFactory&lt;String, String&gt; pf) {
  43. return new KafkaTemplate&lt;&gt;(pf);
  44. }
  45. @Bean
  46. public NewTopic topic1() {
  47. return TopicBuilder.name(&quot;so63058608-1&quot;).partitions(1).replicas(1).build();
  48. }
  49. @Bean
  50. public NewTopic topic3() {
  51. return TopicBuilder.name(&quot;so63058608-2&quot;).partitions(1).replicas(1).build();
  52. }
  53. public static JavaType returnType(byte[] data, Headers headers) {
  54. return TypeFactory.defaultInstance()
  55. .constructCollectionLikeType(List.class, Foo.class);
  56. }
  57. }
  58. class Foo {
  59. private String bar;
  60. public Foo() {
  61. }
  62. public Foo(String bar) {
  63. this.bar = bar;
  64. }
  65. public String getBar() {
  66. return this.bar;
  67. }
  68. public void setBar(String bar) {
  69. this.bar = bar;
  70. }
  71. @Override
  72. public String toString() {
  73. return &quot;Foo [bar=&quot; + this.bar + &quot;]&quot;;
  74. }
  75. }
  1. spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
  1. [Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]

huangapple
  • 本文由 发表于 2020年7月24日 00:10:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/63058608.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定