英文:
GRPC-Web Stream: Only receiving data when stream is closed
问题
I have set up a simple Java & Spring Boot backend to stream data to a gRPC-web client. Streaming works as expected when calling from Postman; however, when starting the stream from the web client, data is received only after the stream is closed on the server-side. I can't determine why this is happening.
Here's the relevant code:
Proto Definition:
syntax = "proto3";
package com.example.api;
option java_multiple_files = true;
option java_package = "com.example.api";
service SSEService {
rpc Connect (SSEConnectionRequest) returns (stream SSEMessageResponse) {}
rpc Ping (PingRequest) returns (PongResponse) {}
}
message SSEConnectionRequest {
repeated string topics = 1;
}
message SSEMessageResponse {
string id = 1;
string type = 2;
string payload = 3;
}
message PingRequest {
string message = 1;
}
message PongResponse {
string message = 1;
}
Backend handler:
@GrpcService
@Slf4j
public class SSEHandler extends SSEServiceGrpc.SSEServiceImplBase {
@SneakyThrows
@Override
public void connect(SSEConnectionRequest request, StreamObserver<SSEMessageResponse> responseObserver) {
// ...
}
@Override
public void ping(PingRequest request, StreamObserver<PongResponse> responseObserver) {
// ...
}
}
Web client:
const grpcClient = new SSEServiceClient('http://localhost:9999');
const stream = grpcClient.connect(new SSEConnectionRequest());
// ...
Envoy static config:
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 0.0.0.0, port_value: 9901 }
# ... (the rest of the config)
It seems like the issue might be related to how the server-side streaming is handled in your gRPC service. You may want to ensure that the server is sending data as soon as it's available and not waiting for the stream to be closed.
Please review your server-side code to ensure that data is sent promptly in the connect
method without waiting for the stream to be completed.
英文:
I have setup a simple Java & Spring Boot backend in order to stream data to a gRPC-web client.
The Streaming is working as expected when calling from Postman. I call the endpoint and receive the data every 2 seconds.
But when I start the stream from the web client, I only get the data once the stream is closed on server-side. I can't figure out why this is happening.
I have the Envoy gateway to proxy the calls from the FE to BE. This is the whole code I have:
Proto Definition:
syntax = "proto3";
package com.example.api;
option java_multiple_files = true;
option java_package = "com.example.api";
service SSEService {
rpc Connect (SSEConnectionRequest) returns (stream SSEMessageResponse) {}
rpc Ping (PingRequest) returns (PongResponse) {}
}
message SSEConnectionRequest {
repeated string topics = 1;
}
message SSEMessageResponse {
string id = 1;
string type = 2;
string payload = 3;
}
message PingRequest {
string message = 1;
}
message PongResponse {
string message = 1;
}
This the Backend handler:
@GrpcService
@Slf4j
public class SSEHandler extends SSEServiceGrpc.SSEServiceImplBase {
@SneakyThrows
@Override
public void connect(SSEConnectionRequest request, StreamObserver<SSEMessageResponse> responseObserver) {
var newObserver = (ServerCallStreamObserver<SSEMessageResponse>) responseObserver;
log.info("Is observer ready? {}", newObserver.isReady());
while(!newObserver.isReady()) {
log.info("Waiting for the stream to be ready");
}
for (var i = 0; i < 5; i++) {
var response = SSEMessageResponse.newBuilder()
.setId(UUID.randomUUID().toString())
.setType("MessageType")
.setPayload("Message number #" + i)
.build();
log.info("Sending response down the stream to client. Number {}", i);
newObserver.onNext(response);
Thread.sleep(2000);
}
responseObserver.onCompleted();
}
@Override
public void ping(PingRequest request, StreamObserver<PongResponse> responseObserver) {
log.info("Will send a response now");
var response = PongResponse.newBuilder()
.setMessage("Test response")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
This is the web client:
const grpcClient = new SSEServiceClient('http://localhost:9999');
const stream = grpcClient.connect(new SSEConnectionRequest());
stream.on('data', (data: SSEMessageResponse) => {
console.log('data', data);
});
stream.on('status', (status: any) => {
console.log('STATUS', status);
});
stream.on('error', (e: Error) => {
console.log('error', e);
stream.cancel();
});
And this is the Envoy static config:
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 0.0.0.0, port_value: 9901 }
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 9999 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: auto
stat_prefix: ingress_http
stream_idle_timeout: 0s
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route:
cluster: sse_service
timeout: 0s
max_stream_duration:
grpc_timeout_header_max: 0s
cors:
allow_origin_string_match:
- prefix: "*"
allow_methods: GET, PUT, DELETE, POST, OPTIONS
allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
max_age: "1728000"
expose_headers: custom-header-1,grpc-status,grpc-message
http_filters:
- name: envoy.filters.http.grpc_web
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
- name: envoy.filters.http.cors
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: sse_service
connect_timeout: 60s
type: logical_dns
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
http2_protocol_options: {}
lb_policy: round_robin
load_assignment:
cluster_name: cluster_0
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: localhost
port_value: 9090
Any clue on what my be going on?
答案1
得分: 1
I found a workaround. The problem wasn't in the Backend nor the Envoy proxy but on how I compiled the proto stubs. For some reason when I compiled with the options:
--js_out=import_style=commonjs,binary:$OUT_DIR --grpc-web_out=import_style=typescript,mode=grpcweb:$OUT_DIR
I just got the data when the stream was closed on server-side. But when I compiled with:
--grpc-web_out=import_style=typescript,mode=grpcwebtext:$OUT_DIR
Basically changing from mode=grpcweb
to mode=grpcwebtext
did the trick. The stream worked normally, sending the data every 2 seconds as expected.
英文:
To whomever bumped into this, I found a workaround. The problem wasn't in the Backend nor the Envoy proxy but on how I compiled the proto stubs. For some reason reason when I compiled with the options:
--js_out=import_style=commonjs,binary:$OUT_DIR --grpc-web_out=import_style=typescript,mode=grpcweb:$OUT_DIR
I just got the data when the stream was closed on server-side. But when I compiled with:
--grpc-web_out=import_style=typescript,mode=grpcwebtext:$OUT_DIR
Basically changing from mode=grpcweb
to mode=grpcwebtext
did the trick. The stream worked normally, sending the data every 2 seconds as expected.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论