GRPC-Web 流:只有在流关闭时才接收数据。

huangapple go评论65阅读模式
英文:

GRPC-Web Stream: Only receiving data when stream is closed

问题

I have set up a simple Java & Spring Boot backend to stream data to a gRPC-web client. Streaming works as expected when calling from Postman; however, when starting the stream from the web client, data is received only after the stream is closed on the server-side. I can't determine why this is happening.

Here's the relevant code:

Proto Definition:

syntax = "proto3";

package com.example.api;

option java_multiple_files = true;
option java_package = "com.example.api";

service SSEService {
  rpc Connect (SSEConnectionRequest) returns (stream SSEMessageResponse) {}
  rpc Ping (PingRequest) returns (PongResponse) {}
}

message SSEConnectionRequest {
  repeated string topics = 1;
}

message SSEMessageResponse {
  string id = 1;
  string type = 2;
  string payload = 3;
}

message PingRequest {
  string message = 1;
}

message PongResponse {
  string message = 1;
}

Backend handler:

@GrpcService
@Slf4j
public class SSEHandler extends SSEServiceGrpc.SSEServiceImplBase {
    @SneakyThrows
    @Override
    public void connect(SSEConnectionRequest request, StreamObserver<SSEMessageResponse> responseObserver) {
        // ...
    }

    @Override
    public void ping(PingRequest request, StreamObserver<PongResponse> responseObserver) {
        // ...
    }
}

Web client:

const grpcClient = new SSEServiceClient('http://localhost:9999');
const stream = grpcClient.connect(new SSEConnectionRequest());
// ...

Envoy static config:

admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address: { address: 0.0.0.0, port_value: 9901 }
# ... (the rest of the config)

It seems like the issue might be related to how the server-side streaming is handled in your gRPC service. You may want to ensure that the server is sending data as soon as it's available and not waiting for the stream to be closed.

Please review your server-side code to ensure that data is sent promptly in the connect method without waiting for the stream to be completed.

英文:

I have setup a simple Java & Spring Boot backend in order to stream data to a gRPC-web client.

The Streaming is working as expected when calling from Postman. I call the endpoint and receive the data every 2 seconds.

But when I start the stream from the web client, I only get the data once the stream is closed on server-side. I can't figure out why this is happening.

I have the Envoy gateway to proxy the calls from the FE to BE. This is the whole code I have:

Proto Definition:

syntax = &quot;proto3&quot;;

package com.example.api;

option java_multiple_files = true;
option java_package = &quot;com.example.api&quot;;

service SSEService {
  rpc Connect (SSEConnectionRequest) returns (stream SSEMessageResponse) {}
  rpc Ping (PingRequest) returns (PongResponse) {}
}

message SSEConnectionRequest {
  repeated string topics = 1;
}

message SSEMessageResponse {
  string id = 1;
  string type = 2;
  string payload = 3;
}

message PingRequest {
  string message = 1;
}

message PongResponse {
  string message = 1;
}

This the Backend handler:

@GrpcService
@Slf4j
public class SSEHandler extends SSEServiceGrpc.SSEServiceImplBase {
    @SneakyThrows
    @Override
    public void connect(SSEConnectionRequest request, StreamObserver&lt;SSEMessageResponse&gt; responseObserver) {
        var newObserver = (ServerCallStreamObserver&lt;SSEMessageResponse&gt;) responseObserver;
        log.info(&quot;Is observer ready? {}&quot;, newObserver.isReady());

        while(!newObserver.isReady()) {
            log.info(&quot;Waiting for the stream to be ready&quot;);
        }

        for (var i = 0; i &lt; 5; i++) {
            var response = SSEMessageResponse.newBuilder()
                .setId(UUID.randomUUID().toString())
                .setType(&quot;MessageType&quot;)
                .setPayload(&quot;Message number #&quot; + i)
                .build();

            log.info(&quot;Sending response down the stream to client. Number {}&quot;, i);
            newObserver.onNext(response);

            Thread.sleep(2000);
        }
            responseObserver.onCompleted();

    }

    @Override
    public void ping(PingRequest request, StreamObserver&lt;PongResponse&gt; responseObserver) {
        log.info(&quot;Will send a response now&quot;);
        var response = PongResponse.newBuilder()
            .setMessage(&quot;Test response&quot;)
            .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}

This is the web client:

const grpcClient = new SSEServiceClient(&#39;http://localhost:9999&#39;);
const stream =   grpcClient.connect(new SSEConnectionRequest());
    stream.on(&#39;data&#39;, (data: SSEMessageResponse) =&gt; {
      console.log(&#39;data&#39;, data);
    });

    stream.on(&#39;status&#39;, (status: any) =&gt; {
      console.log(&#39;STATUS&#39;, status);
    });

    stream.on(&#39;error&#39;, (e: Error) =&gt; {
      console.log(&#39;error&#39;, e);
      stream.cancel();
    });

And this is the Envoy static config:

admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address: { address: 0.0.0.0, port_value: 9901 }

static_resources:
  listeners:
    - name: listener_0
      address:
        socket_address: { address: 0.0.0.0, port_value: 9999 }
      filter_chains:
        - filters:
          - name: envoy.filters.network.http_connection_manager
            typed_config:
              &quot;@type&quot;: type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
              codec_type: auto
              stat_prefix: ingress_http
              stream_idle_timeout: 0s
              route_config:
                name: local_route
                virtual_hosts:
                  - name: local_service
                    domains: [&quot;*&quot;]
                    routes:
                      - match: { prefix: &quot;/&quot; }
                        route:
                          cluster: sse_service
                          timeout: 0s
                          max_stream_duration:
                            grpc_timeout_header_max: 0s
                    cors:
                      allow_origin_string_match:
                        - prefix: &quot;*&quot;
                      allow_methods: GET, PUT, DELETE, POST, OPTIONS
                      allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
                      max_age: &quot;1728000&quot;
                      expose_headers: custom-header-1,grpc-status,grpc-message
              http_filters:
                - name: envoy.filters.http.grpc_web
                  typed_config:
                    &quot;@type&quot;: type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
                - name: envoy.filters.http.cors
                  typed_config:
                    &quot;@type&quot;: type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
                - name: envoy.filters.http.router
                  typed_config:
                    &quot;@type&quot;: type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
  clusters:
    - name: sse_service
      connect_timeout: 60s
      type: logical_dns
      typed_extension_protocol_options:
        envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
          &quot;@type&quot;: type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
          explicit_http_config:
            http2_protocol_options: {}
      http2_protocol_options: {}
      lb_policy: round_robin
      load_assignment:
        cluster_name: cluster_0
        endpoints:
          - lb_endpoints:
            - endpoint:
                address:
                  socket_address:
                    address: localhost
                    port_value: 9090

Any clue on what my be going on?

答案1

得分: 1

I found a workaround. The problem wasn't in the Backend nor the Envoy proxy but on how I compiled the proto stubs. For some reason when I compiled with the options:

--js_out=import_style=commonjs,binary:$OUT_DIR --grpc-web_out=import_style=typescript,mode=grpcweb:$OUT_DIR

I just got the data when the stream was closed on server-side. But when I compiled with:

--grpc-web_out=import_style=typescript,mode=grpcwebtext:$OUT_DIR

Basically changing from mode=grpcweb to mode=grpcwebtext did the trick. The stream worked normally, sending the data every 2 seconds as expected.

英文:

To whomever bumped into this, I found a workaround. The problem wasn't in the Backend nor the Envoy proxy but on how I compiled the proto stubs. For some reason reason when I compiled with the options:

--js_out=import_style=commonjs,binary:$OUT_DIR --grpc-web_out=import_style=typescript,mode=grpcweb:$OUT_DIR

I just got the data when the stream was closed on server-side. But when I compiled with:

--grpc-web_out=import_style=typescript,mode=grpcwebtext:$OUT_DIR

Basically changing from mode=grpcweb to mode=grpcwebtext did the trick. The stream worked normally, sending the data every 2 seconds as expected.

huangapple
  • 本文由 发表于 2023年6月21日 23:20:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76524868.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定