英文:
Connecting to a Server message queue using Reactor Netty
问题
我正在尝试使用 Reactor Netty 连接到在 Docker 容器上运行的消息队列,我正在作为独立应用进行,而不使用 Spring Flux,因为有依赖性问题。
根据 Reactor Netty 文档中的示例,我看到了一种连接到服务器并获取响应的方法:
public static void main(String[] args) {
String response =
HttpClient.create()
.headers(h -> h.add("my header", my_header))
.get()
.uri(my_uri)
.responseContent()
.aggregate()
.asString()
.block();
}
但是,当我尝试通过 System.out.println()
显示输出时,没有任何反应。
我还尝试了解如何使用:
Flux<V> response(BiFunction<HttpClientResponse, ByteBufFlux, Publisher<V>> receiver)
但我不太确定应该怎么做。我在文档中看到了一个名为 Connection 的类,它使用 TCPClient 并且有一个 subscribe 方法。
我有点迷茫,您能否指导我如何在 Reactor Netty 中实现这一点,而不使用 spring-flux?
谢谢
编辑:
经过一些尝试,我得到了以下代码:
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header))
.get()
.uri(my_uri)
.response((res, bytes) -> {
System.out.println(bytes.asString());
return bytes.asString();})
.subscribe();
}
这给了我一个 FluxHandle,我如何使用它来实际读取响应的主体内容呢?
英文:
I am trying to connect to a message queue running on a docker container using Reactor Netty. I am doing this as standalone, not using SpringFlux because of dependency issues.
From the examples in the Reactor Netty documentation I saw there is a way to connect to the Server and get a response:
public static void main(String[] args) {
String response =
HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.responseContent()
.aggregate()
.asString()
.block();
}
but when I try afterwards to display the output via System.out.println() nothing happens.
I also tried to understand how to use:
Flux<V> response(BiFunction<HttpClientResponse,ByteBufFlux,Publisher<V>> receiver)
But I am not sure exactly what to do.
I saw in the documentation there is a class called Connection, which uses a TCPClient and has a method subscribe.
I am kind of lost, can you possibly point me in the right direction of implementing this in Reactor Netty without the use of spring-flux?
Thank you
EDIT:
After some experimentation i got this:
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.response((res, bytes) - > {
System.out.println(bytes.asString());
return bytes.asString();})
.subscribe();
}
This gives me a FluxHandle, how can I use that to actually read the body of the response?
答案1
得分: 0
所以我弄清楚了如何订阅并读取从服务器接收的数据,甚至使用jackson
库将数据转换为JSON格式,以便我的代码更容易读取。
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header))
.get()
.uri(my_uri)
.response((resp, bytes) -> {
return bytes.asString();
})
.subscribe(response -> {
try {
consumeData(new ObjectMapper()
.readValue(response, MyData.class));
} catch (IOException ex) {
System.out.println("ERROR converting to json: " + ex);
}
});
}
似乎当使用subscribe()
方法时,我可以监听传入的响应并对其进行处理。我仍然需要添加一种在服务器停止时或消息队列关闭时关闭连接的方法,以免客户端在不存在的消息队列上挂起。
英文:
So I figured out how to subscibe and read the data recieved from the Server and even transform the data to JSON, using the jackson
library, to be more easily read by my code.
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.response((resp, bytes) -> {
return bytes.asString();
})
.subscribe(response -> {
try {
consumeData(new ObjectMapper()
.readValue(response, MyData.class));
} catch (IOException ex) {
System.out.println("ERROR converting to json: " + ex);
}
});
}
it seems that when using the subscribe() method i can listen to incoming responses and do something with them. I still need to add a way for the connection to close when the server stops, or the message queue is shut down, so the client doesn't hang on non-existent message queue.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论