Vertx如何使Eventbus请求等待消费者消息回复

huangapple go评论82阅读模式
英文:

Vertx how to make Eventbus request to wait for consumer message reply

问题

由于我是一个文本模型,我会为您提供翻译的部分,而不包括代码。以下是您提供的文本的翻译:

"我是Vertx的新手,我试图使用Eventbus进行请求-响应,但在尝试时遇到了问题。我已经完成了以下操作:

EventBus bus = vertx.eventBus();
// 在这里我调用请求
bus.<JsonObject>request("previewdata", m ,this::handle);

public  void handle(AsyncResult<Message<JsonObject>> result) {
    // 在30秒内,如果消费者没有发送回复,请求将失败
    if(result.succeeded()){
        System.out.println("答案"+Thread.currentThread().getName());
        System.out.println(result.result().body());
    } else{
        result.cause().printStackTrace();
    }
}

IDEng ie = new IDEng();
// 请求的消费者
bus.<JsonObject>consumer("previewdata", this::getPreviewData);

public void getPreviewData(Message<JsonObject> message) { 
     JsonObject json = message.body();
     for (int i=0; i<10000; i++) {
         json.put("flag"+i, "got"+i);
     }
     try {
         // 仅为示例,我等待了40秒(因此它失败,因为超过30秒),可能需要更多时间
         Thread.sleep(40000);
     }catch (Exception e) {
        // 处理异常
     }
     message.reply(json);
} 

我创建了一个使用Worker Verticle的Vertx示例,其中包括一个请求和一个消费者,但请求不等待消费者回复响应,因为默认超时时间为30秒。但根据我们的实际情况,有时不确定请求需要多长时间。所以请帮助我如何等待响应。

我知道我们可以使用DeliveryOptions来设置超时,但这不是一个合适的方式,因为我们无法估计任务的时间,特别是当它依赖于第三方服务器时。

例如:

new DeliveryOptions().setSendTimeout(50000)

请告诉我是否有人可以帮助我解决这个问题,或者我是不是在错误的方式中进行操作?"

英文:

As I am new to Vertx, I was trying request- response using Eventbus, But While trying that I am stuck at once place, what I had done:

EventBus bus = vertx.eventBus();
//Here I call the request
bus.<JsonObject>request("previewdata", m ,this::handle);

public  void handle(AsyncResult<Message<JsonObject>> result) {
    //request get fails before consumer don't send reply within 30 seconds
    if(result.succeeded()){
        System.out.println("Answer: "+Thread.currentThread().getName());
        System.out.println(result.result().body());
    } else{
        result.cause().printStackTrace();
    }
}

IDEng ie = new IDEng();
//Consumer of request
bus.<JsonObject>consumer("previewdata", this::getPreviewData);

public void getPreviewData(Message<JsonObject> message) { 
	 JsonObject json = message.body();
	 for (int i=0; i<10000; i++) {
		 json.put("flag"+i, "got"+i);
	 }
	 try {
         //Only for example I had put wait of 40 seconds (so it fail as greate than 30 seconds) it may take more time
	     Thread.sleep(40000);
	 }catch (Exception e) {
		// TODO: handle exception
	 }
	 message.reply(json);
} 

I had created one vertx example using Worker Verticle and I have one request and consumer, But request is not waiting for consumer to reply the response and it's get failed in 30 seconds as it's Default timeout is 30 seconds, But per our real time scenario sometime it is not sure that how much time will request take. So please help me how can I wait for response.

I know we can set Timeout using DeliveryOptions but that's not a proper way as we never estimate some tasks time if it depends on third server.
E.g

new DeliveryOptions().setSendTimeout(50000)

Please let me know if someone can help me out is this or am I doing this in wrong way?

答案1

得分: 1

关于将响应解耦到不同的总线地址,例如 "previewdataresponse":

A 顶点:)

bus.<JsonObject>request("previewdata", m, this::handle);
// 如果回复不是成功,则显示错误

bus.<JsonObject>consumer("previewdataresponse", this::handleResponse);

public void handleResponse(Message<JsonObject> message) {
    // 对响应进行处理
}

B 顶点:)

bus.<JsonObject>consumer("previewdata", this::getPreviewData);

public void getPreviewData(Message<JsonObject> message) {
    JsonObject json = message.body();
    //...
    message.reply(json); // 确认已收到消息
    try {
        Thread.sleep(40000);
        bus.<JsonObject>send("previewdataresponse", json);
    } catch (Exception e) {
        // TODO: 处理异常
    }
}
英文:

how about decoupling the response to a different bus-address e.g "previewdataresponse":

Verticle A.)

bus.&lt;JsonObject&gt;request(&quot;previewdata&quot;, m ,this::handle);
// show error if reply is not success

bus.&lt;JsonObject&gt;consumer(&quot;previewdataresponse&quot;, this::handleResponse);

public void handleResponse(Message&lt;JsonObject&gt; message) { 
	// do something with the reponse
}

Verticle B.)

bus.&lt;JsonObject&gt;consumer(&quot;previewdata&quot;, this::getPreviewData);

public void getPreviewData(Message&lt;JsonObject&gt; message) { 
     JsonObject json = message.body();
     //...
     message.reply(json); // acknowledge that you received the message
     try {
         Thread.sleep(40000);
         bus.&lt;JsonObject&gt;send(&quot;previewdataresponse&quot;, json);
     }catch (Exception e) {
        // TODO: handle exception
     }
} 

huangapple
  • 本文由 发表于 2020年8月28日 14:26:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/63628557.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定