grpc::ClientBidiReactor在连续创建ClientBidiReactor时崩溃

huangapple go评论122阅读模式
英文:

grpc::ClientBidiReactor crashes when create ClientBidiReactor Continuously

问题

以下是您提供的代码部分的翻译:

我有一个如下所示的函数:

ErrorCode SendData(const char *data, size_t length)
{
auto stream = std::make_unique(stub_.get(), client_id_, timeout_seconds_);
return stream->SendMessage(data, length);
}

和 Stream 类:

```cpp
class Stream : public grpc::ClientBidiReactor<SendBinaryDataRequest, SendBinaryDataResponse> {
   public:
    Stream(MessageTransfer::Stub *stub)
        : context_(), status_(grpc::Status::OK), done_(false), request_(), response_(), mu_(), cv_()
    {
      stub->async()->SendStreamBinaryDataWithAck(&context_, this);
      StartCall();
      std::cout << "新建构造函数:" << this << std::endl;
    }

    void OnWriteDone(bool ok) override
    {
      std::unique_lock<std::mutex> l(mu_);
      if (ok) {
        StartRead(&response_);
      }
    }

    void OnWritesDoneDone(bool) override
    {
      std::unique_lock<std::mutex> l(mu_);
      cv_.notify_one();
    }
    
    void OnReadDone(bool ok) override
    {
      std::unique_lock<std::mutex> l(mu_);
      if (ok) {
        cv_.notify_one();
      }
    }

    void OnDone(const grpc::Status &s) override
    {
      std::unique_lock<std::mutex> l(mu_);
      LOG("%p 流关闭 状态:%d 错误:%s\n", this, s.error_code(), s.error_message().c_str());
      status_ = s;
      done_   = true;
      cv_.notify_one();
    }

    ErrorCode SendMessage(const char *data, size_t length)
    {
      std::unique_lock<std::mutex> l(mu_);

      if (done_) {
        return convertToErrorCode(status_);
      }

      request_.set_data(data, length);
      LOG("开始写入 %p\n", this);
      StartWrite(&request_);
      if(std::cv_status::timeout == cv_.wait_for(l, std::chrono::seconds(timeout_sec_))) {
        return ErrorCode_TIMEOUT;
      }
      if (done_) {
        return convertToErrorCode(status_);
      }
      return static_cast<ErrorCode>(response_.error_code());
    }
  }

当我使用 SendData 去连接不存在的服务器时,它会打印 "failed to connect to all addresses",这是正确的;但是当我多次调用 SendData 时,它崩溃了。

纯虚函数被调用
在没有活动异常的情况下终止调用
中止(核心已转储)

日志:

新建构造函数:0x168dbc0
开始写入 0x168dbc0
0x168dbc0 流关闭 状态:14 错误:failed to connect to all addresses; 最后错误: UNKNOWN: unix:/tmp/hs_message.sock: 连接被拒绝
新建构造函数:0x168dbc0
开始写入 0x168dbc0
分段错误 (核心已转储)

似乎每次我创建一个流时,指针都是相同的。但我不认为这是崩溃的原因。

英文:

I have a function as follows:

  ErrorCode SendData(const char *data, size_t length)
  {
    auto stream = std::make_unique&lt;Stream&gt;(stub_.get(), client_id_, timeout_seconds_);
    return stream-&gt;SendMessage(data, length);
  }

and Stream class:

class Stream : public grpc::ClientBidiReactor&lt;SendBinaryDataRequest, SendBinaryDataResponse&gt; {
   public:
    Stream(MessageTransfer::Stub *stub)
        : context_(), status_(grpc::Status::OK), done_(false), request_(), response_(), mu_(), cv_(), 
    {
      stub-&gt;async()-&gt;SendStreamBinaryDataWithAck(&amp;context_, this);
      StartCall();
      std::cout &lt;&lt; &quot;new construtor:&quot; &lt;&lt; this &lt;&lt; std::endl;
    }

    void OnWriteDone(bool ok) override
    {
      std::unique_lock&lt;std::mutex&gt; l(mu_);
      if (ok) {
        StartRead(&amp;response_);
      }
    }

    void OnWritesDoneDone(bool) override
    {
      std::unique_lock&lt;std::mutex&gt; l(mu_);
      cv_.notify_one();
    }
    
    void OnReadDone(bool ok) override
    {
      std::unique_lock&lt;std::mutex&gt; l(mu_);
      if (ok) {
        cv_.notify_one();
      }
    }

    void OnDone(const grpc::Status &amp;s) override
    {
      std::unique_lock&lt;std::mutex&gt; l(mu_);
      LOG(&quot;%p stream closed status:%d error:%s\n&quot;, this, s.error_code(), s.error_message().c_str());
      status_ = s;
      done_   = true;
      cv_.notify_one();
    }

    ErrorCode SendMessage(const char *data, size_t length)
    {
      std::unique_lock&lt;std::mutex&gt; l(mu_);

      if (done_) {
        return convertToErrorCode(status_);
      }

      request_.set_data(data, length);
      LOG(&quot;StartWrite %p\n&quot;, this);
      StartWrite(&amp;request_);
      if(std::cv_status::timeout == cv_.wait_for(l, std::chrono::seconds(timeout_sec_))) {
        return ErrorCode_TIMEOUT;
      }
      if (done_) {
      return convertToErrorCode(status_);
      }
      return static_cast&lt;ErrorCode&gt;(response_.error_code());
    }
  }

when I use SendData to non-existent server, it will print "failed to connect to all addresses", this is right; However when I call SendData several times, it crashes:

pure virtual method called
terminate called without an active exception
Aborted (core dumped)

log:

new construtor:0x168dbc0
StartWrite 0x168dbc0
0x168dbc0 stream closed status:14 error:failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/hs_message.sock: Connection refused
new construtor:0x168dbc0
StartWrite 0x168dbc0
Segmentation fault (core dumped)

It seems the ptr is same every time I make a stream. But I dont think it should be cause of the crash

gdb info
another gdb info
Can anyone help me? thanks

答案1

得分: 0

以下是翻译好的部分:

最终,我在添加AddHold/RemoveHold之后发现程序成功运行
类似于这样的代码:

class Streampublic grpc::ClientBidiReactor<SendBinaryDataRequest, SendBinaryDataResponse> {
public:
StreamMessageTransfer::Stub *stub
context_(),status_grpc::Status::OK),done_false),request_(),response_(),mu_(),cv_(),
{
stub->async()->SendStreamBinaryDataWithAck(&context_, this);
AddHold()
StartCall();
std::cout << "new construtor:" << this << std::endl;
}

void OnWriteDone(bool ok) override
{
std::unique_lock<std::mutex> l(mu_);
if (ok) {
StartRead(&response_);
} else {
RemoveHold();
}
}

void OnWritesDoneDone(bool) override
{
std::unique_lock<std::mutex> l(mu_);
cv_.notify_one();
}

void OnReadDone(bool ok) override
{
std::unique_lock<std::mutex> l(mu_);
if (ok) {
cv_.notify_one();
} else {
RemoveHold();
}
}

void OnDone(const grpc::Status &s) override
{
std::unique_lock<std::mutex> l(mu_);
LOG("%p stream closed status:%d error:%s\n", this, s.error_code(), s.error_message().c_str());
status_ = s;
done_   = true;
cv_.notify_one();
}

ErrorCode SendMessage(const char *data, size_t length)
{
std::unique_lock<std::mutex> l(mu_);

if (done_) {
return convertToErrorCode(status_);
}

request_.set_data(data, length);
LOG("StartWrite %p\n", this);
StartWrite(&request_);
if(std::cv_status::timeout == cv_.wait_for(l, std::chrono::seconds(timeout_sec_))) {
return ErrorCode_TIMEOUT;
}
if (done_) {
return convertToErrorCode(status_);
}
return static_cast<ErrorCode>(response_.error_code());
}
}
英文:

At last I find the program runs successfully after I added AddHold/RemoveHold
Codes like this:

class Stream : public grpc::ClientBidiReactor&lt;SendBinaryDataRequest,SendBinaryDataResponse&gt; {
public:
Stream(MessageTransfer::Stub *stub)
: context_(), status_(grpc::Status::OK), done_(false), request_(), response_(), mu_(), cv_(), 
{
stub-&gt;async()-&gt;SendStreamBinaryDataWithAck(&amp;context_, this);
AddHold()
StartCall();
std::cout &lt;&lt; &quot;new construtor:&quot; &lt;&lt; this &lt;&lt; std::endl;
}
void OnWriteDone(bool ok) override
{
std::unique_lock&lt;std::mutex&gt; l(mu_);
if (ok) {
StartRead(&amp;response_);
} else {
RemoveHold();
}
}
void OnWritesDoneDone(bool) override
{
std::unique_lock&lt;std::mutex&gt; l(mu_);
cv_.notify_one();
}
void OnReadDone(bool ok) override
{
std::unique_lock&lt;std::mutex&gt; l(mu_);
if (ok) {
cv_.notify_one();
} else {
RemoveHold();
}
}
void OnDone(const grpc::Status &amp;s) override
{
std::unique_lock&lt;std::mutex&gt; l(mu_);
LOG(&quot;%p stream closed status:%d error:%s\n&quot;, this, s.error_code(), s.error_message().c_str());
status_ = s;
done_   = true;
cv_.notify_one();
}
ErrorCode SendMessage(const char *data, size_t length)
{
std::unique_lock&lt;std::mutex&gt; l(mu_);
if (done_) {
return convertToErrorCode(status_);
}
request_.set_data(data, length);
LOG(&quot;StartWrite %p\n&quot;, this);
StartWrite(&amp;request_);
if(std::cv_status::timeout == cv_.wait_for(l, std::chrono::seconds(timeout_sec_))) {
return ErrorCode_TIMEOUT;
}
if (done_) {
return convertToErrorCode(status_);
}
return static_cast&lt;ErrorCode&gt;(response_.error_code());
}

}

答案2

得分: 0

如下是您要的翻译部分:

"So, one immediate issue I noticed is that your reactor object stream can go out of scope and be deleted before the RPC is done."
"所以,我立即注意到的一个问题是,您的反应堆对象 stream 可能会在RPC完成之前超出范围并被删除。"

"The reactor may not be deleted until OnDone is called."
"直到调用 OnDone 之前,反应堆不能被删除。"

"Consider the following events -"
"考虑以下事件 -"

"SendData is invoked, which invokes SendMessage() on a locally scoped stream object."
"调用 SendData,它在本地作用域的 stream 对象上调用 SendMessage()。"

"In SendMessage(), the cv_.wait_for() times out and returns."
"在 SendMessage() 中,cv_.wait_for() 超时并返回。"

"stream object gets deleted."
"stream 对象被删除。"

"At this point, the RPC fails and the gRPC library tries to invoke one of the callbacks, but the object is already deleted, leading to a SEGFAULT."
"在这一点上,RPC 失败,gRPC库尝试调用其中一个回调,但对象已经被删除,导致 SEGFAULT。"

"I think fixing this probably fixes the issue that you are seeing, but I haven't run this code myself."
"我认为修复这个问题可能会解决您遇到的问题,但我自己没有运行过这段代码。"

"I'll link to the examples just in case"
"我将提供示例链接以防万一"
" https://github.com/grpc/grpc/blob/master/examples/cpp/interceptors/client.cc."

英文:

So, one immediate issue I noticed is that your reactor object stream can go out of scope and be deleted before the RPC is done.

https://github.com/grpc/grpc/blob/2892b24eabbb22b2344aba9c3ba84e529017b684/include/grpcpp/support/client_callback.h#L234
"The reactor may not be deleted until OnDone is called."

Consider the following events -

  1. SendData is invoked, which invokes SendMessage() on a locally scoped stream object.
  2. In SendMessage(), the cv_.wait_for() times out and returns.
  3. stream object gets deleted.
  4. At this point, the RPC fails and the gRPC library tries to invoke one of the callbacks, but the object is already deleted, leading to a SEGFAULT.

I think fixing this probably fixes the issue that you are seeing, but I haven't run this code myself.

I'll link to the examples just in case https://github.com/grpc/grpc/blob/master/examples/cpp/interceptors/client.cc.

huangapple
  • 本文由 发表于 2023年3月15日 19:03:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/75743856.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定