英文:
Spring boot Rest api with Spring Kafka
问题
我已经设计了一个Spring Boot REST API的ADD和GET方法。
@RestController("ProductV1Controller")
public class ProductController {
    private final IProductProducer _productProducer;
    
    public ProductController(IProductProducer productProducer) {
        _productProducer = productProducer;
    }
    @PostMapping()
    void AddProduct(@Valid @RequestBody ProductViewModel product) {
        _productProducer.AddProduct(product);
    }
    @GetMapping()
    List<ProductViewModel> Products() {
        List<ProductViewModel> test = _productProducer.GetProducts();
        return _productProducer.GetProducts();
    }
}
服务层:
@Service
public class ProductProducer implements IProductProducer {
    private final KafkaTemplate<String, Object> _template;
    public ProductProducer(KafkaTemplate<String, Object> _template) {
        this._template = _template;
    }
    @Override
    public List<ProductViewModel> GetProducts() {
        this._template.send(ProductTopicConstants.GET_PRODUCTS, null);
        return List.of(new ProductViewModel("", "", 0, "")); // 需要从Kafka返回值
    }
    @Override
    public void AddProduct(ProductViewModel product) {
        this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
    }
}
Kafka监听器:
@KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
public List<Product> GetProducts() {
    return _productRepository.findAll();
}
在服务层的GetProducts()方法中,我需要返回来自_productRepository.findAll()的项目列表。
关于使用Spring Kafka进行REST API的最佳方法是什么?
英文:
I have designed a Spring boot REST API ADD and GET method
    @RestController("ProductV1Controller")
    public class ProductController 
     {
         private final IProductProducer _productProducer;
         public ProductController(IProductProducer productProducer) {
        _productProducer = productProducer;}
         @PostMapping()
            void AddProduct(@Valid @RequestBody ProductViewModel product) {
                _productProducer.AddProduct(product);
            }
        
        @GetMapping()
            List<ProductViewModel> Products() {
                var test = _productProducer.GetProducts();
                return _productProducer.GetProducts();
            }
}
Service layer
@Service
    public class ProductProducer implements IProductProducer{
        private final KafkaTemplate<String, Object> _template;
    
        public ProductProducer(KafkaTemplate<String, Object> _template) {
            this._template = _template;
        }
    
        @Override
        public List<ProductViewModel> GetProducts() {
            this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
            return List.of(new ProductViewModel("","",0,"")); --> Need to return the value from the kafka
        }
    
        @Override
        public void AddProduct(ProductViewModel product) {
            this._template.send(ProductTopicConstants.ADD_PRODUCT, product);
        }
       
    }
Kafka Listener
 @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)
    public List<Product> GetProducts() {
        return _productRepository.findAll();
    }
In the service layer GetProducts() I need to return the list of the project that is coming from the _productRepository.findAll();
What is the best approach to do the REST API with Spring kafka.
答案1
得分: 2
你需要使用ReplyingKafkaTemplate来将结果返回给REST控制器。
版本2.1.3引入了KafkaTemplate的子类,用于提供请求/回复语义。该类名为ReplyingKafkaTemplate,除了父类中的方法之外还有一个方法。
结果是一个ListenableFuture,异步填充结果(或超时的异常)。结果还有一个sendFuture属性,这是调用KafkaTemplate.send()的结果。您可以使用此future来确定发送操作的结果。
文档中有一个示例。
编辑
@SpringBootApplication
@RestController
public class So63058608Application {
	private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
	public static void main(String[] args) {
		SpringApplication.run(So63058608Application.class, args);
	}
	@Autowired
	private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;
	@GetMapping(path = "/get")
	public List<String> getThem() throws Exception {
		RequestReplyFuture<String, String, List<String>> future =
				this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
		LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
		return future.get(10, TimeUnit.SECONDS).value();
	}
	@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
	@SendTo
	public List<String> returnList(@Payload(required = false) String payload) {
		return new ArrayList<>(List.of("foo", "bar", "baz"));
	}
	@Bean
	public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
			ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
		containerFactory.setReplyTemplate(kafkaTemplate(pf));
		ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
		ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
		return replyer;
	}
	@Bean
	public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
			ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
		ConcurrentMessageListenerContainer<String, List<String>> container =
				containerFactory.createContainer("so63058608-2");
		container.getContainerProperties().setGroupId("so63058608-2");
		container.setBatchErrorHandler(new BatchLoggingErrorHandler());
		return container;
	}
	@Bean
	public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
		return new KafkaTemplate<>(pf);
	}
	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
	}
	@Bean
	public NewTopic topic3() {
		return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
	}
}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get
["foo","bar","baz"]
编辑2
还有一个返回某个对象列表的示例...
@SpringBootApplication
@RestController
public class So63058608Application {
	private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
	public static void main(String[] args) {
		SpringApplication.run(So63058608Application.class, args);
	}
	@Autowired
	private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;
	@GetMapping(path = "/get")
	public List<Foo> getThem() throws Exception {
		RequestReplyFuture<String, String, List<Foo>> future =
				this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
		LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
		List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
		LOG.info(result.toString());
		return result;
	}
	@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
	@SendTo
	public List<Foo> returnList(@Payload(required = false) String payload) {
		return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
	}
	@Bean
	public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
			ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
		containerFactory.setReplyTemplate(kafkaTemplate(pf));
		ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
		ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
		return replyer;
	}
	@Bean
	public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
			ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
		ConcurrentMessageListenerContainer<String, List<Foo>> container =
				containerFactory.createContainer("so63058608-2");
		container.getContainerProperties().setGroupId("so63058608-2");
		container.setBatchErrorHandler(new BatchLoggingErrorHandler());
		return container;
	}
	@Bean
	public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
		return new KafkaTemplate<>(pf);
	}
	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
	}
	@Bean
	public NewTopic topic3() {
		return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
	}
	public static JavaType returnType(byte[] data, Headers headers) {
		return TypeFactory.defaultInstance()
				.constructCollectionLikeType(List.class, Foo.class);
	}
}
class Foo {
	private String bar;
	public Foo() {
	}
	public Foo(String bar) {
		this.bar = bar;
	}
	public String getBar() {
		return this.bar;
	}
	public void setBar(String bar) {
		this.bar = bar;
	}
	@Override
	public String toString() {
		return "Foo [bar=" + this.bar + "]";
	}
}
英文:
You need to use a ReplyingKafkaTemplate to return a result to the rest controller.
>Version 2.1.3 introduced a subclass of KafkaTemplate to provide request/reply semantics. The class is named ReplyingKafkaTemplate and has one method (in addition to those in the superclass).
>The result is a ListenableFuture that is asynchronously populated with the result (or an exception, for a timeout). The result also has a sendFuture property, which is the result of calling KafkaTemplate.send(). You can use this future to determine the result of the send operation.
The documentation has an example.
EDIT
@SpringBootApplication
@RestController
public class So63058608Application {
	private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
	public static void main(String[] args) {
		SpringApplication.run(So63058608Application.class, args);
	}
	@Autowired
	private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;
	@GetMapping(path = "/get")
	public List<String> getThem() throws Exception {
		RequestReplyFuture<String, String, List<String>> future =
				this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
		LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
		return future.get(10, TimeUnit.SECONDS).value();
	}
	@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
	@SendTo
	public List<String> returnList(@Payload(required = false) String payload) {
		return new ArrayList<>(List.of("foo", "bar", "baz"));
	}
	@Bean
	public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
			ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
		containerFactory.setReplyTemplate(kafkaTemplate(pf));
		ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
		ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
		return replyer;
	}
	@Bean
	public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(
			ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
		ConcurrentMessageListenerContainer<String, List<String>> container =
				containerFactory.createContainer("so63058608-2");
		container.getContainerProperties().setGroupId("so63058608-2");
		container.setBatchErrorHandler(new BatchLoggingErrorHandler());
		return container;
	}
	@Bean
	public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
		return new KafkaTemplate<>(pf);
	}
	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
	}
	@Bean
	public NewTopic topic3() {
		return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
	}
}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
$ curl localhost:8080/get
["foo","bar","baz"]
EDIT2
And with a list of some object returned...
@SpringBootApplication
@RestController
public class So63058608Application {
	private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
	public static void main(String[] args) {
		SpringApplication.run(So63058608Application.class, args);
	}
	@Autowired
	private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;
	@GetMapping(path = "/get")
	public List<Foo> getThem() throws Exception {
		RequestReplyFuture<String, String, List<Foo>> future =
				this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
		LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
		List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
		LOG.info(result.toString());
		return result;
	}
	@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)
	@SendTo
	public List<Foo> returnList(@Payload(required = false) String payload) {
		return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
	}
	@Bean
	public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
			ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
		containerFactory.setReplyTemplate(kafkaTemplate(pf));
		ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
		ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
		return replyer;
	}
	@Bean
	public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(
			ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
		ConcurrentMessageListenerContainer<String, List<Foo>> container =
				containerFactory.createContainer("so63058608-2");
		container.getContainerProperties().setGroupId("so63058608-2");
		container.setBatchErrorHandler(new BatchLoggingErrorHandler());
		return container;
	}
	@Bean
	public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
		return new KafkaTemplate<>(pf);
	}
	@Bean
	public NewTopic topic1() {
		return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
	}
	@Bean
	public NewTopic topic3() {
		return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
	}
	public static JavaType returnType(byte[] data, Headers headers) {
		return TypeFactory.defaultInstance()
				.constructCollectionLikeType(List.class, Foo.class);
	}
}
class Foo {
	private String bar;
	public Foo() {
	}
	public Foo(String bar) {
		this.bar = bar;
	}
	public String getBar() {
		return this.bar;
	}
	public void setBar(String bar) {
		this.bar = bar;
	}
	@Override
	public String toString() {
		return "Foo [bar=" + this.bar + "]";
	}
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论