如何在 Nest.js 中实现 gRPC 服务器可写流?

huangapple go评论86阅读模式
英文:

How can I implement a gRPC ServerWritableStream in nest.js?

问题

1 中的 [nest.js documentation] 没有提到有关单向 ServerWritableStream 的情况。我想要接收一个正常的请求,并使用 call.write 来传递给客户端流式消息。这在普通的 TypeScript 中可以正常工作,如下所示,但在 nest.js 的 gRPC 控制器中无法正常工作。我还在使用 Envoy,在 nest.js 上的一元调用以及简单服务器上也可以正常工作。

  1. function doOnAdd(call) {
  2. setInterval(() => {
  3. const myTodo = JSON.stringify({
  4. id: 'b779cb10-72c8-416f-9399-273eab8e3421',
  5. title: 'Fix the server streaming',
  6. completed: false,
  7. });
  8. console.log('Sending streaming data', myTodo);
  9. call.write({message: myTodo});
  10. }, 5000);
  11. call.on('end', () => {
  12. console.log('end');
  13. });
  14. setTimeout(() => {
  15. call.end();
  16. }, 30000);
  17. }

但这个 nest.js 代码不起作用(同一控制器中的一元 gRPC 调用可以正常工作)。

  1. @GrpcMethod('TodoService', 'OnAdded')
  2. async onAdded(
  3. request: todo.OnAddedTodoRequest,
  4. metadata: Metadata,
  5. call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
  6. ) {
  7. setInterval(() => {
  8. const myTodo = JSON.stringify({
  9. id: 'b779cb10-72c8-416f-9399-273eab8e3421',
  10. title: 'Fix the server streaming',
  11. completed: false,
  12. });
  13. console.log('Sending streaming data', myTodo);
  14. const message = new todo.ServerMessage({ message: myTodo });
  15. call.write(message);
  16. }, 5000);
  17. call.on('end', () => {
  18. console.log('end');
  19. });
  20. setTimeout(() => {
  21. call.end();
  22. }, 30000);
  23. }

最后,这是简化的 protobuf:

  1. syntax = "proto3";
  2. package todo;
  3. service TodoService {
  4. rpc OnAdded (OnAddedTodoRequest) returns (stream ServerMessage);
  5. }
  6. message OnAddedTodoRequest {}
  7. message ServerMessage {
  8. string message = 1;
  9. }
英文:

The nest.js documentation doesn't mention anything regarding the case of a unidirectional ServerWritableStream. I want to receive a normal request and use call.write to pass to the client streaming messages. This works fine in plain TypeScript using the below but it doesn't work from within a nest.js gRPC controller. I am also using Envoy which works fine with the unary calls on nest.js as well as with the simple server.

  1. function doOnAdd(call) {
  2. setInterval(() =&gt; {
  3. const myTodo = JSON.stringify({
  4. id: &#39;b779cb10-72c8-416f-9399-273eab8e3421&#39;,
  5. title: &#39;Fix the server streaming&#39;,
  6. completed: false,
  7. });
  8. console.log(&#39;Sending streaming data&#39;, myTodo);
  9. call.write({message: myTodo});
  10. }, 5000);
  11. call.on(&#39;end&#39;, () =&gt; {
  12. console.log(&#39;end&#39;);
  13. });
  14. setTimeout(() =&gt; {
  15. call.end();
  16. }, 30000);
  17. }

But this nest.js code does not work (unary gRPC calls work fine in the same controller).

  1. @GrpcMethod(&#39;TodoService&#39;, &#39;OnAdded&#39;)
  2. async onAdded(
  3. request: todo.OnAddedTodoRequest,
  4. metadata: Metadata,
  5. call: ServerWritableStream&lt;todo.OnAddedTodoRequest, todo.ServerMessage&gt;,
  6. ) {
  7. setInterval(() =&gt; {
  8. const myTodo = JSON.stringify({
  9. id: &#39;b779cb10-72c8-416f-9399-273eab8e3421&#39;,
  10. title: &#39;Fix the server streaming&#39;,
  11. completed: false,
  12. });
  13. console.log(&#39;Sending streaming data&#39;, myTodo);
  14. const message = new todo.ServerMessage({ message: myTodo });
  15. call.write(message);
  16. }, 5000);
  17. call.on(&#39;end&#39;, () =&gt; {
  18. console.log(&#39;end&#39;);
  19. });
  20. setTimeout(() =&gt; {
  21. call.end();
  22. }, 30000);
  23. }

Finally here is the simplified protobuf:

  1. syntax = &quot;proto3&quot;;
  2. package todo;
  3. service TodoService {
  4. rpc OnAdded (OnAddedTodoRequest) returns (stream ServerMessage);
  5. }
  6. message OnAddedTodoRequest {}
  7. message ServerMessage {
  8. string message = 1;
  9. }

答案1

得分: 0

我终于意识到 Nest.js 不等待间隔完成,而立即返回。我的解决方案是将其包装在一个Promise中,如下所示:

  1. @GrpcMethod('TodoService', 'OnAdded')
  2. async onAdded(
  3. request: todo.OnAddedTodoRequest,
  4. metadata: Metadata,
  5. call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
  6. ) {
  7. await new Promise((resolve) => {
  8. setInterval(() => {
  9. const myTodo = JSON.stringify({
  10. id: 'b779cb10-72c8-416f-9399-273eab8e3421',
  11. title: 'Fix the server streaming',
  12. completed: false,
  13. });
  14. console.log('Sending streaming data', myTodo);
  15. const message = new todo.ServerMessage({ message: myTodo });
  16. call.write(message);
  17. }, 5000);
  18. call.on('end', () => {
  19. console.log('end');
  20. resolve();
  21. });
  22. setTimeout(() => {
  23. call.end();
  24. }, 30000);
  25. });
  26. }
英文:

I finally realised that nest.js doesn't wait for the interval to complete and returns immediately. My solution was to wrap it with a promise like this:

  1. @GrpcMethod(&#39;TodoService&#39;, &#39;OnAdded&#39;)
  2. async onAdded(
  3. request: todo.OnAddedTodoRequest,
  4. metadata: Metadata,
  5. call: ServerWritableStream&lt;todo.OnAddedTodoRequest, todo.ServerMessage&gt;,
  6. ) {
  7. await new Promise((resolve) =&gt; {
  8. setInterval(() =&gt; {
  9. const myTodo = JSON.stringify({
  10. id: &#39;b779cb10-72c8-416f-9399-273eab8e3421&#39;,
  11. title: &#39;Fix the server streaming&#39;,
  12. completed: false,
  13. });
  14. console.log(&#39;Sending streaming data&#39;, myTodo);
  15. const message = new todo.ServerMessage({ message: myTodo });
  16. call.write(message);
  17. }, 5000);
  18. call.on(&#39;end&#39;, () =&gt; {
  19. console.log(&#39;end&#39;);
  20. resolve();
  21. });
  22. setTimeout(() =&gt; {
  23. call.end();
  24. }, 30000);
  25. });
  26. }

huangapple
  • 本文由 发表于 2023年3月31日 22:42:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/75899822.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定