英文:
GRPC Async + Blocking Stub Java
问题
I am running into a bit of a chicken and an egg problem.
Case: A file is generated on a remote client. The client should transmit the file to the server over an asynccstub. The client must also transmit metadata via a blocking stub to be stored in a database.
Problems:
-
If I do the asynchronous operation first, then the file data is sent prior to the metadata, and therefore the server has no context as to what to name the file is, or where to put it. I originally intended to return this information from the server (bidirectionally), however stream observers do not lend themselves to set variables outside their anonymous definition.
-
If I do the synchronous operation first, I can get file naming information back from the server; however, I will need to package this into the "Chunks" of data. This would also require constantly opening and closing of the save file while GRPC iterates over its stream data, as iterators are not easily reset (so I can't just peel off the first request).
-
As a last option, I could package all of this to the asynchronous request and dispatch with any synchronous call. I believe this will provide a working solution, but am concerned about the amount of data being sent on already large requests as well as the inefficiency mentioned before.
So my question is:
- Is there a way to set a global variable to 'value.Message' from the response observer.
- Alternatively, is there a way to pass information from the synchronous call to the asynchronous call on the server side?
Async response observer:
StreamObserver<GrpcServerComm.UploadStatus> responseObserver = new StreamObserver<GrpcServerComm.UploadStatus>() {
@Override
public void onNext(GrpcServerComm.UploadStatus value) {
if (value.getCode() != 1) {
Log.d("Error", "Upload Procedure Failure");
finishLatch.countDown();
}
}
@Override
public void onError(Throwable t) {
Log.d("Error", "Upload Response");
finishLatch.countDown();
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
};
Relevant protobufs
message UploadStatus {
string filename=1;
int32 code = 2;
}
message DataChunk
{
string filename=1;
bytes chunk = 2;
}
message VideoMetadata
{
string publisher =1;
string description =2;
string tags = 3;
double videolat= 4;
double videolong=5;
}
service DataUpload
{
rpc UploadData (stream DataChunk) returns(UploadStatus);
}
service ContentMetaData
{
rpc UploadMetaData(VideoMetadata) returns (UploadStatus);
}
Python Server-side functions
class DataUploadServicer(proto_test_pb2_grpc.DataUploadServicer):
def UploadData(self,request_it,context):
response = proto_test_pb2.UploadStatus()
filename = str(random.getrandbits(32)) #server decides filename
response = filestream.writefile(filename,request_it)
return response
def writefile(filename, chunks):
response = proto_test_pb2.UploadStatus()
filename='tmp/'+filename
app_file = open(filename,"ab")
for chunk in chunks:
app_file.write(chunk.chunk)
app_file.close()
print('File Written')
response.Code=1
response.Message = "Successful write"
return response
英文:
I am running into a bit of a chicken and an egg problem.
Case: A file is generated on a remote client. The client should transmit the file to the server over an asynccstub. The client must also transmit metadata via a blocking stub to be stored in a database.
Problems:
-
If I do the asynchronous operation first, then the file data is sent prior to the metadata, and therefore the server has no context as to what to name the file is, or where to put it. I originally intended to return this information from the server (bidirectionally), however stream observers do not lend themselves to set variables outside their anonymous definition.
-
If I do the synchronous operation first, I can get file naming information back from the server;however, I will need to package this into the "Chunks" of data. This would also require constantly opening and closing of the save file while GRPC iterates over it's stream data, as iterators are not easily reset (so i cant just peel off the first request).
-
As a last option, I could package all of this to the asynchronous request and dispatch with any synchronous call. I believe this will provide a working solution, but am concerned about the amount of data being sent on already large requests as well as the inefficiency mentioned before.
So my question is:
- Is there a way to set a global variable to 'value.Message' from the response observer.
- Alternatively, is there a way to pass information from the syncronous call to the asynchronous call on the server side?
Async response observer:
StreamObserver<GrpcServerComm.UploadStatus> responseObserver = new StreamObserver<GrpcServerComm.UploadStatus>() {
@Override
public void onNext(GrpcServerComm.UploadStatus value) {
if (value.getCode() != 1) {
Log.d("Error", "Upload Procedure Failure");
finishLatch.countDown();
}
}
@Override
public void onError(Throwable t) {
Log.d("Error", "Upload Response");
finishLatch.countDown();
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
};
Relevant protobufs
message UploadStatus {
string filename=1;
int32 code = 2;
}
message DataChunk
{
string filename=1;
bytes chunk = 2;
}
message VideoMetadata
{
string publisher =1;
string description =2;
string tags = 3;
double videolat= 4;
double videolong=5;
}
service DataUpload
{
rpc UploadData (stream DataChunk) returns(UploadStatus);
}
service ContentMetaData
{
rpc UploadMetaData(VideoMetadata) returns (UploadStatus);
}
Python Server-side functions
class DataUploadServicer(proto_test_pb2_grpc.DataUploadServicer):
def UploadData(self,request_it,context):
response = proto_test_pb2.UploadStatus()
filename = str(random.getrandbits(32)) #server decides filename
response = filestream.writefile(filename,request_it)
return response
def writefile(filename, chunks):
response = proto_test_pb2.UploadStatus()
filename='tmp/'+filename
app_file = open(filename,"ab")
for chunk in chunks:
app_file.write(chunk.chunk)
app_file.close()
print('File Written')
response.Code=1
response.Message = "Succsesful write"
return response
答案1
得分: 0
Java用户,关于这方面的详细文章可以在这里找到。
我认为将它们作为两个单独的请求不是一个好主意。相反,应该将“Metadata”和“DataChunk”组合成一个单一类型,如下所示。
message FileUploadRequest {
oneof upload_data {
VideoMetadata metaData = 1;
DataChunk dataChunk = 2;
}
}
当你使用Oneof时,在生成的代码中,oneof字段具有与常规字段相同的getter和setter。您还会得到一个用于检查oneof中设置了哪个值(如果有的话)的特殊方法。首先发送元数据,然后发送数据块。根据oneof中设置的值,然后可以相应地做出决策。
你的服务应该是这样的。
service FileuploadService {
rpc UploadData (stream FileUploadRequest) returns(UploadStatus);
}
英文:
Java users, a detailed article on this here.
I think it is not good idea to have these as 2 separate requests. Instead Metadata
and DataChunk
should be combined as 1 single type as shown here.
message FileUploadRequest {
VideoMetadata metaData = 1;
DataChunk dataChunk = 2;
}
Now you might ask why we have to send the metadata for every request! This is where gRPC oneof type helps.
message FileUploadRequest {
oneof upload_data {
VideoMetadata metaData = 1;
DataChunk dataChunk = 2;
}
}
Your service would be like this.
service FileuploadService {
rpc UploadData (stream FileUploadRequest) returns(UploadStatus);
}
When you use Oneof, In your generated code, oneof fields have the same getters and setters as regular fields. You also get a special method for checking which value (if any) in the oneof is set. First you send the metatdata and then you send the chunk. Based on, which oneof is set, then you can take the decision accordingly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论