英文:
How can I implement a gRPC ServerWritableStream in nest.js?
问题
1 中的 [nest.js documentation] 没有提到有关单向 ServerWritableStream 的情况。我想要接收一个正常的请求,并使用 call.write 来传递给客户端流式消息。这在普通的 TypeScript 中可以正常工作,如下所示,但在 nest.js 的 gRPC 控制器中无法正常工作。我还在使用 Envoy,在 nest.js 上的一元调用以及简单服务器上也可以正常工作。
function doOnAdd(call) {
setInterval(() => {
const myTodo = JSON.stringify({
id: 'b779cb10-72c8-416f-9399-273eab8e3421',
title: 'Fix the server streaming',
completed: false,
});
console.log('Sending streaming data', myTodo);
call.write({message: myTodo});
}, 5000);
call.on('end', () => {
console.log('end');
});
setTimeout(() => {
call.end();
}, 30000);
}
但这个 nest.js 代码不起作用(同一控制器中的一元 gRPC 调用可以正常工作)。
@GrpcMethod('TodoService', 'OnAdded')
async onAdded(
request: todo.OnAddedTodoRequest,
metadata: Metadata,
call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
) {
setInterval(() => {
const myTodo = JSON.stringify({
id: 'b779cb10-72c8-416f-9399-273eab8e3421',
title: 'Fix the server streaming',
completed: false,
});
console.log('Sending streaming data', myTodo);
const message = new todo.ServerMessage({ message: myTodo });
call.write(message);
}, 5000);
call.on('end', () => {
console.log('end');
});
setTimeout(() => {
call.end();
}, 30000);
}
最后,这是简化的 protobuf:
syntax = "proto3";
package todo;
service TodoService {
rpc OnAdded (OnAddedTodoRequest) returns (stream ServerMessage);
}
message OnAddedTodoRequest {}
message ServerMessage {
string message = 1;
}
英文:
The nest.js documentation doesn't mention anything regarding the case of a unidirectional ServerWritableStream. I want to receive a normal request and use call.write to pass to the client streaming messages. This works fine in plain TypeScript using the below but it doesn't work from within a nest.js gRPC controller. I am also using Envoy which works fine with the unary calls on nest.js as well as with the simple server.
function doOnAdd(call) {
setInterval(() => {
const myTodo = JSON.stringify({
id: 'b779cb10-72c8-416f-9399-273eab8e3421',
title: 'Fix the server streaming',
completed: false,
});
console.log('Sending streaming data', myTodo);
call.write({message: myTodo});
}, 5000);
call.on('end', () => {
console.log('end');
});
setTimeout(() => {
call.end();
}, 30000);
}
But this nest.js code does not work (unary gRPC calls work fine in the same controller).
@GrpcMethod('TodoService', 'OnAdded')
async onAdded(
request: todo.OnAddedTodoRequest,
metadata: Metadata,
call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
) {
setInterval(() => {
const myTodo = JSON.stringify({
id: 'b779cb10-72c8-416f-9399-273eab8e3421',
title: 'Fix the server streaming',
completed: false,
});
console.log('Sending streaming data', myTodo);
const message = new todo.ServerMessage({ message: myTodo });
call.write(message);
}, 5000);
call.on('end', () => {
console.log('end');
});
setTimeout(() => {
call.end();
}, 30000);
}
Finally here is the simplified protobuf:
syntax = "proto3";
package todo;
service TodoService {
rpc OnAdded (OnAddedTodoRequest) returns (stream ServerMessage);
}
message OnAddedTodoRequest {}
message ServerMessage {
string message = 1;
}
答案1
得分: 0
我终于意识到 Nest.js 不等待间隔完成,而立即返回。我的解决方案是将其包装在一个Promise中,如下所示:
@GrpcMethod('TodoService', 'OnAdded')
async onAdded(
request: todo.OnAddedTodoRequest,
metadata: Metadata,
call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
) {
await new Promise((resolve) => {
setInterval(() => {
const myTodo = JSON.stringify({
id: 'b779cb10-72c8-416f-9399-273eab8e3421',
title: 'Fix the server streaming',
completed: false,
});
console.log('Sending streaming data', myTodo);
const message = new todo.ServerMessage({ message: myTodo });
call.write(message);
}, 5000);
call.on('end', () => {
console.log('end');
resolve();
});
setTimeout(() => {
call.end();
}, 30000);
});
}
英文:
I finally realised that nest.js doesn't wait for the interval to complete and returns immediately. My solution was to wrap it with a promise like this:
@GrpcMethod('TodoService', 'OnAdded')
async onAdded(
request: todo.OnAddedTodoRequest,
metadata: Metadata,
call: ServerWritableStream<todo.OnAddedTodoRequest, todo.ServerMessage>,
) {
await new Promise((resolve) => {
setInterval(() => {
const myTodo = JSON.stringify({
id: 'b779cb10-72c8-416f-9399-273eab8e3421',
title: 'Fix the server streaming',
completed: false,
});
console.log('Sending streaming data', myTodo);
const message = new todo.ServerMessage({ message: myTodo });
call.write(message);
}, 5000);
call.on('end', () => {
console.log('end');
resolve();
});
setTimeout(() => {
call.end();
}, 30000);
});
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论