如何在 Nest.js 中实现 gRPC 服务器可写流?

huangapple go评论71阅读模式
英文:

How can I implement a gRPC ServerWritableStream in nest.js?

问题

1 中的 [nest.js documentation] 没有提到有关单向 ServerWritableStream 的情况。我想要接收一个正常的请求,并使用 call.write 来传递给客户端流式消息。这在普通的 TypeScript 中可以正常工作,如下所示,但在 nest.js 的 gRPC 控制器中无法正常工作。我还在使用 Envoy,在 nest.js 上的一元调用以及简单服务器上也可以正常工作。

function doOnAdd(call) {
  setInterval(() => {
    const myTodo = JSON.stringify({
      id: 'b779cb10-72c8-416f-9399-273eab8e3421',
      title: 'Fix the server streaming',
      completed: false,
    });
    console.log('Sending streaming data', myTodo);
    call.write({message: myTodo});
  }, 5000);

  call.on('end', () => {
    console.log('end');
  });

  setTimeout(() => {
    call.end();
  }, 30000);
}

但这个 nest.js 代码不起作用(同一控制器中的一元 gRPC 调用可以正常工作)。

@GrpcMethod('TodoService', 'OnAdded')
async onAdded(
  request: todo.OnAddedTodoRequest,
  metadata: Metadata,
  call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
) {
  setInterval(() => {
    const myTodo = JSON.stringify({
      id: 'b779cb10-72c8-416f-9399-273eab8e3421',
      title: 'Fix the server streaming',
      completed: false,
    });
    console.log('Sending streaming data', myTodo);
    const message = new todo.ServerMessage({ message: myTodo });
    call.write(message);
  }, 5000);

  call.on('end', () => {
    console.log('end');
  });

  setTimeout(() => {
    call.end();
  }, 30000);
}

最后,这是简化的 protobuf:

syntax = "proto3";

package todo;

service TodoService {
  rpc OnAdded (OnAddedTodoRequest) returns (stream ServerMessage);
}

message OnAddedTodoRequest {}

message ServerMessage {
  string message = 1;
}
英文:

The nest.js documentation doesn't mention anything regarding the case of a unidirectional ServerWritableStream. I want to receive a normal request and use call.write to pass to the client streaming messages. This works fine in plain TypeScript using the below but it doesn't work from within a nest.js gRPC controller. I am also using Envoy which works fine with the unary calls on nest.js as well as with the simple server.

function doOnAdd(call) {
  setInterval(() =&gt; {
    const myTodo = JSON.stringify({
      id: &#39;b779cb10-72c8-416f-9399-273eab8e3421&#39;,
      title: &#39;Fix the server streaming&#39;,
      completed: false,
    });
    console.log(&#39;Sending streaming data&#39;, myTodo);
    call.write({message: myTodo});
  }, 5000);

  call.on(&#39;end&#39;, () =&gt; {
    console.log(&#39;end&#39;);
  });

  setTimeout(() =&gt; {
    call.end();
  }, 30000);
}

But this nest.js code does not work (unary gRPC calls work fine in the same controller).

@GrpcMethod(&#39;TodoService&#39;, &#39;OnAdded&#39;)
  async onAdded(
    request: todo.OnAddedTodoRequest,
    metadata: Metadata,
    call: ServerWritableStream&lt;todo.OnAddedTodoRequest, todo.ServerMessage&gt;,
  ) {
    setInterval(() =&gt; {
      const myTodo = JSON.stringify({
        id: &#39;b779cb10-72c8-416f-9399-273eab8e3421&#39;,
        title: &#39;Fix the server streaming&#39;,
        completed: false,
      });
      console.log(&#39;Sending streaming data&#39;, myTodo);
      const message = new todo.ServerMessage({ message: myTodo });
      call.write(message);
    }, 5000);

    call.on(&#39;end&#39;, () =&gt; {
      console.log(&#39;end&#39;);
    });

    setTimeout(() =&gt; {
      call.end();
    }, 30000);
  }

Finally here is the simplified protobuf:

syntax = &quot;proto3&quot;;

package todo;

service TodoService {
  rpc OnAdded (OnAddedTodoRequest) returns (stream ServerMessage);
}

message OnAddedTodoRequest {}

message ServerMessage {
  string message = 1;
}

答案1

得分: 0

我终于意识到 Nest.js 不等待间隔完成,而立即返回。我的解决方案是将其包装在一个Promise中,如下所示:

@GrpcMethod('TodoService', 'OnAdded')
async onAdded(
  request: todo.OnAddedTodoRequest,
  metadata: Metadata,
  call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
) {
  await new Promise((resolve) => {
    setInterval(() => {
      const myTodo = JSON.stringify({
        id: 'b779cb10-72c8-416f-9399-273eab8e3421',
        title: 'Fix the server streaming',
        completed: false,
      });
      console.log('Sending streaming data', myTodo);
      const message = new todo.ServerMessage({ message: myTodo });
      call.write(message);
    }, 5000);

    call.on('end', () => {
      console.log('end');
      resolve();
    });

    setTimeout(() => {
      call.end();
    }, 30000);
  });
}
英文:

I finally realised that nest.js doesn't wait for the interval to complete and returns immediately. My solution was to wrap it with a promise like this:

@GrpcMethod(&#39;TodoService&#39;, &#39;OnAdded&#39;)
  async onAdded(
    request: todo.OnAddedTodoRequest,
    metadata: Metadata,
    call: ServerWritableStream&lt;todo.OnAddedTodoRequest, todo.ServerMessage&gt;,
  ) {
  await new Promise((resolve) =&gt; {
    setInterval(() =&gt; {
      const myTodo = JSON.stringify({
        id: &#39;b779cb10-72c8-416f-9399-273eab8e3421&#39;,
        title: &#39;Fix the server streaming&#39;,
        completed: false,
      });
      console.log(&#39;Sending streaming data&#39;, myTodo);
      const message = new todo.ServerMessage({ message: myTodo });
      call.write(message);
    }, 5000);

    call.on(&#39;end&#39;, () =&gt; {
      console.log(&#39;end&#39;);
      resolve();
    });

    setTimeout(() =&gt; {
      call.end();
    }, 30000);
  });
}

huangapple
  • 本文由 发表于 2023年3月31日 22:42:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/75899822.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定