GRPC异步 + 阻塞存根Java

huangapple go评论98阅读模式
英文:

GRPC Async + Blocking Stub Java

问题

I am running into a bit of a chicken and an egg problem.

Case: A file is generated on a remote client. The client should transmit the file to the server over an asynccstub. The client must also transmit metadata via a blocking stub to be stored in a database.

Problems:

  1. If I do the asynchronous operation first, then the file data is sent prior to the metadata, and therefore the server has no context as to what to name the file is, or where to put it. I originally intended to return this information from the server (bidirectionally), however stream observers do not lend themselves to set variables outside their anonymous definition.

  2. If I do the synchronous operation first, I can get file naming information back from the server; however, I will need to package this into the "Chunks" of data. This would also require constantly opening and closing of the save file while GRPC iterates over its stream data, as iterators are not easily reset (so I can't just peel off the first request).

  3. As a last option, I could package all of this to the asynchronous request and dispatch with any synchronous call. I believe this will provide a working solution, but am concerned about the amount of data being sent on already large requests as well as the inefficiency mentioned before.

So my question is:

  1. Is there a way to set a global variable to 'value.Message' from the response observer.
  2. Alternatively, is there a way to pass information from the synchronous call to the asynchronous call on the server side?

Async response observer:

StreamObserver<GrpcServerComm.UploadStatus> responseObserver = new StreamObserver<GrpcServerComm.UploadStatus>() {

            @Override
            public void onNext(GrpcServerComm.UploadStatus value) {
                if (value.getCode() != 1) {
                    Log.d("Error", "Upload  Procedure Failure");
                    finishLatch.countDown();
                }
            }
            @Override
            public void onError(Throwable t) {
                Log.d("Error", "Upload Response");
                finishLatch.countDown();
            }
            @Override
            public void onCompleted() {
                finishLatch.countDown();
            }
        }; 

Relevant protobufs

message UploadStatus {
    string filename=1;
    int32 code = 2;
}
message DataChunk
{
    string filename=1;
    bytes chunk = 2;
}
message VideoMetadata
{
    string publisher =1;
    string description =2;
    string tags = 3;
    double videolat= 4;
    double videolong=5;
}
service DataUpload
{
    rpc UploadData (stream DataChunk) returns(UploadStatus);
}
service ContentMetaData
{
    rpc UploadMetaData(VideoMetadata) returns (UploadStatus);
}

Python Server-side functions

class DataUploadServicer(proto_test_pb2_grpc.DataUploadServicer):
    def UploadData(self,request_it,context):
        response = proto_test_pb2.UploadStatus()
        filename = str(random.getrandbits(32)) #server decides filename
        response = filestream.writefile(filename,request_it)
        return response

def writefile(filename, chunks):
    response = proto_test_pb2.UploadStatus()
    filename='tmp/'+filename
    app_file = open(filename,"ab")
    for chunk in chunks:
        app_file.write(chunk.chunk)
    app_file.close()
    print('File Written')
    response.Code=1
    response.Message = "Successful write"
    return response
英文:

I am running into a bit of a chicken and an egg problem.

Case: A file is generated on a remote client. The client should transmit the file to the server over an asynccstub. The client must also transmit metadata via a blocking stub to be stored in a database.

Problems:

  1. If I do the asynchronous operation first, then the file data is sent prior to the metadata, and therefore the server has no context as to what to name the file is, or where to put it. I originally intended to return this information from the server (bidirectionally), however stream observers do not lend themselves to set variables outside their anonymous definition.

  2. If I do the synchronous operation first, I can get file naming information back from the server;however, I will need to package this into the "Chunks" of data. This would also require constantly opening and closing of the save file while GRPC iterates over it's stream data, as iterators are not easily reset (so i cant just peel off the first request).

  3. As a last option, I could package all of this to the asynchronous request and dispatch with any synchronous call. I believe this will provide a working solution, but am concerned about the amount of data being sent on already large requests as well as the inefficiency mentioned before.

So my question is:

  1. Is there a way to set a global variable to 'value.Message' from the response observer.
  2. Alternatively, is there a way to pass information from the syncronous call to the asynchronous call on the server side?

Async response observer:

StreamObserver&lt;GrpcServerComm.UploadStatus&gt; responseObserver = new StreamObserver&lt;GrpcServerComm.UploadStatus&gt;() {

            @Override
            public void onNext(GrpcServerComm.UploadStatus value) {
                if (value.getCode() != 1) {
                    Log.d(&quot;Error&quot;, &quot;Upload  Procedure Failure&quot;);
                    finishLatch.countDown();
                }
            }
            @Override
            public void onError(Throwable t) {
                Log.d(&quot;Error&quot;, &quot;Upload Response&quot;);
                finishLatch.countDown();
            }
            @Override
            public void onCompleted() {
                finishLatch.countDown();
            }
        }; 

Relevant protobufs

message UploadStatus {
    string filename=1;
    int32 code = 2;
}
message DataChunk
{
    string filename=1;
    bytes chunk = 2;
}
message VideoMetadata
{
    string publisher =1;
    string description =2;
    string tags = 3;
    double videolat= 4;
    double videolong=5;
}
service DataUpload
{
    rpc UploadData (stream DataChunk) returns(UploadStatus);
}
service ContentMetaData
{
    rpc UploadMetaData(VideoMetadata) returns (UploadStatus);
}

Python Server-side functions

class DataUploadServicer(proto_test_pb2_grpc.DataUploadServicer):
    def UploadData(self,request_it,context):
        response = proto_test_pb2.UploadStatus()
        filename = str(random.getrandbits(32)) #server decides filename
        response = filestream.writefile(filename,request_it)
        return response

def writefile(filename, chunks):
    response = proto_test_pb2.UploadStatus()
    filename=&#39;tmp/&#39;+filename
    app_file = open(filename,&quot;ab&quot;)
    for chunk in chunks:
        app_file.write(chunk.chunk)
    app_file.close()
    print(&#39;File Written&#39;)
    response.Code=1
    response.Message = &quot;Succsesful write&quot;
    return response

答案1

得分: 0

Java用户,关于这方面的详细文章可以在这里找到。

我认为将它们作为两个单独的请求不是一个好主意。相反,应该将“Metadata”和“DataChunk”组合成一个单一类型,如下所示。

message FileUploadRequest {
    oneof upload_data {
        VideoMetadata metaData = 1;
        DataChunk dataChunk = 2;
    }
}

当你使用Oneof时,在生成的代码中,oneof字段具有与常规字段相同的getter和setter。您还会得到一个用于检查oneof中设置了哪个值(如果有的话)的特殊方法。首先发送元数据,然后发送数据块。根据oneof中设置的值,然后可以相应地做出决策。

你的服务应该是这样的。

service FileuploadService {
    rpc UploadData (stream FileUploadRequest) returns(UploadStatus);
}
英文:

Java users, a detailed article on this here.


I think it is not good idea to have these as 2 separate requests. Instead Metadata and DataChunk should be combined as 1 single type as shown here.

message FileUploadRequest {
	VideoMetadata metaData = 1;
	DataChunk dataChunk = 2;
}

Now you might ask why we have to send the metadata for every request! This is where gRPC oneof type helps.

message FileUploadRequest {
  oneof upload_data {
	VideoMetadata metaData = 1;
	DataChunk dataChunk = 2;
  }
}

Your service would be like this.

service FileuploadService {
    rpc UploadData (stream FileUploadRequest) returns(UploadStatus);
}

When you use Oneof, In your generated code, oneof fields have the same getters and setters as regular fields. You also get a special method for checking which value (if any) in the oneof is set. First you send the metatdata and then you send the chunk. Based on, which oneof is set, then you can take the decision accordingly.

huangapple
  • 本文由 发表于 2020年7月23日 05:41:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/63043620.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定