使用 Reactor Netty 连接到服务器消息队列

huangapple go评论72阅读模式
英文:

Connecting to a Server message queue using Reactor Netty

问题

我正在尝试使用 Reactor Netty 连接到在 Docker 容器上运行的消息队列,我正在作为独立应用进行,而不使用 Spring Flux,因为有依赖性问题。

根据 Reactor Netty 文档中的示例,我看到了一种连接到服务器并获取响应的方法:

public static void main(String[] args) {
    String response =
        HttpClient.create()
                  .headers(h -> h.add("my header", my_header))
                  .get()
                  .uri(my_uri)
                  .responseContent() 
                  .aggregate()       
                  .asString()        
                  .block();
}

但是,当我尝试通过 System.out.println() 显示输出时,没有任何反应。

我还尝试了解如何使用:

Flux<V> response(BiFunction<HttpClientResponse, ByteBufFlux, Publisher<V>> receiver)

但我不太确定应该怎么做。我在文档中看到了一个名为 Connection 的类,它使用 TCPClient 并且有一个 subscribe 方法。

我有点迷茫,您能否指导我如何在 Reactor Netty 中实现这一点,而不使用 spring-flux?

谢谢

编辑:

经过一些尝试,我得到了以下代码:

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header))
               .get()
               .uri(my_uri)
               .response((res, bytes) -> {
                   System.out.println(bytes.asString());
                   return bytes.asString();})
               .subscribe();
}

这给了我一个 FluxHandle,我如何使用它来实际读取响应的主体内容呢?

英文:

I am trying to connect to a message queue running on a docker container using Reactor Netty. I am doing this as standalone, not using SpringFlux because of dependency issues.

From the examples in the Reactor Netty documentation I saw there is a way to connect to the Server and get a response:

public static void main(String[] args) {
        String response =
                HttpClient.create()
                          .headers(h -&gt; h.add(&quot;my header&quot;, my_header)
                          .get()
                          .uri(my_uri)
                          .responseContent() 
                          .aggregate()       
                          .asString()        
                          .block();
    }

but when I try afterwards to display the output via System.out.println() nothing happens.

I also tried to understand how to use:

Flux&lt;V&gt; response(BiFunction&lt;HttpClientResponse,ByteBufFlux,Publisher&lt;V&gt;&gt; receiver)

But I am not sure exactly what to do.
I saw in the documentation there is a class called Connection, which uses a TCPClient and has a method subscribe.

I am kind of lost, can you possibly point me in the right direction of implementing this in Reactor Netty without the use of spring-flux?

Thank you

EDIT:

After some experimentation i got this:

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -&gt; h.add(&quot;my header&quot;, my_header)
               .get()
               .uri(my_uri)
               .response((res, bytes) - &gt; {
                   System.out.println(bytes.asString());
                   return bytes.asString();})
               .subscribe();
}

This gives me a FluxHandle, how can I use that to actually read the body of the response?

答案1

得分: 0

所以我弄清楚了如何订阅并读取从服务器接收的数据,甚至使用jackson库将数据转换为JSON格式,以便我的代码更容易读取。

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header))
               .get()
               .uri(my_uri)
               .response((resp, bytes) -> {
                    return bytes.asString();
                })
                .subscribe(response -> {
                    try {
                        consumeData(new ObjectMapper()
                                .readValue(response, MyData.class));
                    } catch (IOException ex) {
                        System.out.println("ERROR converting to json: " + ex);
                    }
                    });
}

似乎当使用subscribe()方法时,我可以监听传入的响应并对其进行处理。我仍然需要添加一种在服务器停止时或消息队列关闭时关闭连接的方法,以免客户端在不存在的消息队列上挂起。

英文:

So I figured out how to subscibe and read the data recieved from the Server and even transform the data to JSON, using the jackson library, to be more easily read by my code.

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -&gt; h.add(&quot;my header&quot;, my_header)
               .get()
               .uri(my_uri)
               .response((resp, bytes) -&gt; {
                    return bytes.asString();
                })
                .subscribe(response -&gt; {
                    try {
                        consumeData(new ObjectMapper()
                                .readValue(response, MyData.class));
                    } catch (IOException ex) {
                        System.out.println(&quot;ERROR converting to json: &quot; + ex);
                    }
                    });
}

it seems that when using the subscribe() method i can listen to incoming responses and do something with them. I still need to add a way for the connection to close when the server stops, or the message queue is shut down, so the client doesn't hang on non-existent message queue.

huangapple
  • 本文由 发表于 2020年9月14日 01:25:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/63873606.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定