英文:
Non-blocking streaming of data between two Quarkus services (Vert.x with Mutiny in Java)
问题
Update!
我在解决了一些与主要问题无关的问题后,已经修复了示例代码中的一些小错误。主要问题仍然是有关服务之间的非阻塞流式传输。
Background info:
我正在将一个基于Spring WebFlux的服务移植到Quarkus平台下。该服务在多个大数据集上运行长时间搜索,并在Flux(text/event-stream)中返回部分结果,这些结果在可用时逐步生成。
Problem:
目前,我正试图在Quarkus下使用Mutiny Multi与Vert.x,但无法弄清楚消费者服务如何在不阻塞的情况下接收此流。
在所有示例中,消费者要么是JS前端页面,要么生产者的内容类型是application/json,似乎会阻塞直到Multi完成,然后将其作为一个JSON对象发送(在我的应用程序中没有意义)。
Questions:
- 如何使用Mutiny风格的Vert.x WebClient接收text/event-stream?
- 如果问题是WebClient无法接收连续的流:在两个Quarkus服务之间流式传输数据的标准方法是什么?
Here is a simplified example
Test entity
public class SearchResult implements Serializable {
private String content;
public SearchResult(String content) {
this.content = content;
}
//.. toString, getters and setters
}
Producer 1. 简单的无限流 -> 阻塞
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.onItem().transform(n -> new SearchResult(n.toString()));
}
Producer 2. 带有Vertx Paths的无限流 -> 阻塞
@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
log.info("routed run");
return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.onItem().transform(n -> new SearchResult(n.toString())));
}
Producer 3. 简单的有限流 -> 阻塞直到完成
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.transform().byTakingFirstItems(5)
.onItem().transform(n -> new SearchResult(n.toString()));
}
Consumer:
尝试了多种不同的解决方案,既在生产者端,也在消费者端,但是在每种情况下,流都会阻塞,直到完全完成,或者在无限流的情况下无限期地挂起。我使用httpie得到了相同的结果。这是最新的迭代:
WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
client.get("/string")
.send()
.onFailure().invoke(resp -> log.error("error: " + resp))
.onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
.toMulti()
.subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));
英文:
Update!
I have fixed minor bugs in the sample code after solving some of the problems that were irrelevant to the main question which is still about non-blocking streaming between services.
Background info:
I'm porting a Spring WebFlux service under Quarkus. The service runs long searches on multiple huge data sets and returns the partial results in a Flux (text/event-stream) as they become available.
Problem:
Right now I'm trying to use Mutiny Multi with Vert.x under Quarkus but cannot figure out how the consumer service could receive this stream without blocking.
In all examples the consumer is either a JS front end page or the producer's content type is application/json that seems to bluck until the Multi completes before sending it over in one JSON object (which makes no sense in my application).
Questions:
- How to receive text/event-stream with the Mutiny-flavoured Vert.x WebClient?
- If the problem would be that the WebClient is not able to receive continuous steams: What is the standard way to stream data between two Quarkus services?
Here is a simplified example
Test entity
public class SearchResult implements Serializable {
private String content;
public SearchResult(String content) {
this.content = content;
}
//.. toString, getters and setters
}
Producer 1. simple infinite stream -> hangs
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2) .onItem().transform(n -> new SearchResult(n.toString()));
}
Producer 2. with Vertx Paths infinite stream -> hangs
@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
log.info("routed run");
return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.onItem().transform(n -> new SearchResult(n.toString()));
}
Producer 3. simple finite stream -> blocks until completion
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.transform().byTakingFirstItems(5)
.onItem().transform(n -> new SearchResult(n.toString()));
}
Consumer:
Tried multiple different solutions on both producer and consumer sides, but in every case the the stream blocks until it is complete or hangs indefinitely without transferring data for infinite streams. I get the same results with httpie. Here is the latest iteration:
WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
client.get("/string")
.send()
.onFailure().invoke(resp -> log.error("error: " + resp))
.onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
.toMulti()
.subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));
答案1
得分: 4
Vert.x Web客户端在不进行配置的情况下无法与SSE(服务器发送事件)一起使用。
来自https://vertx.io/docs/vertx-web-client/java/:
> 响应完全缓冲,使用BodyCodec.pipe将响应导管到写入流
它会等待响应完成。您可以使用原始的Vert.x HTTP客户端,或者使用pipe
编解码器。示例可在https://vertx.io/docs/vertx-web-client/java/#_decoding_responses中找到。
另外,您可以使用SSE客户端,例如:
https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34
英文:
The Vert.x Web Client does not work with SSE (Without configuration).
From https://vertx.io/docs/vertx-web-client/java/:
> Responses are fully buffered, use BodyCodec.pipe to pipe the response to a write stream
It waits until the response completes. You can either use the raw Vert.x HTTP Client or use the pipe
codec. Examples are given on https://vertx.io/docs/vertx-web-client/java/#_decoding_responses.
Alternatively, you can use an SSE client such as in:
https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论