如何直接从 protobuf 创建 GRPC 客户端,而无需将其编译成 Java 代码。

huangapple go评论76阅读模式
英文:

How to create GRPC client directly from protobuf without compiling it into java code

问题

在使用gRPC时,我们需要通过协议缓冲编译器(protoc)或使用Gradle或Maven的protoc构建插件,从我们的.proto服务定义生成gRPC客户端和服务器接口。

流程如下:protobuf文件 -> Java代码 -> gRPC客户端。

那么,有没有办法跳过这个步骤呢?

如何创建一个通用的gRPC客户端,可以直接从protobuf文件调用服务器,而无需编译成Java代码?
或者,有没有在运行时生成代码的方法?

期望流程:protobuf文件 -> gRPC客户端。

我想构建一个通用的gRPC客户端系统,输入是protobuf文件以及方法、包、消息请求的描述,而无需为每个protobuf重新编译。

非常感谢。

英文:

When working with gRPC, we need to generate the gRPC client and server interfaces from our .proto service definition via protocol buffer compiler (protoc) or using Gradle or Maven protoc build plugin.

Flow now: protobuf file -> java code -> gRPC client.

So, is there any way to skip this step?

How to create a generic gRPC client that can call the server directly from the protobuf file without compile into java code?
Or, is there a way to Generated Code at runtime?

Flow expect: protobuf file -> gRPC client.

I want to build a generic gRPC client system with the input are protobuf files along with description of method, package, message request ... without having to compile again for each protobuf.

Thank you very much.

答案1

得分: 8

Protobuf系统确实需要运行protoc。但是,可以跳过生成的代码。而不是像传递--java_out--grpc_java_out这样的内容给protoc,你可以传递--descriptor_set_out=FILE,这将会将.proto文件解析为一个_descriptor_文件。描述符文件是一个proto编码的FileDescriptorSet。这与反射服务使用的基本格式相同。

一旦你有了描述符,你可以一次加载一个FileDescriptor,并且创建一个DynamicMessage

然后对于gRPC部分,你需要创建一个gRPC的MethodDescriptor。

static MethodDescriptor from(
  Descriptors.MethodDescriptor methodDesc
) {
  return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
    // UNKNOWN is fine, but the "correct" value can be computed from
    // methodDesc.toProto().getClientStreaming()/getServerStreaming()
    .setType(getMethodTypeFromDesc(methodDesc))
    .setFullMethodName(MethodDescriptor.generateFullMethodName(
        serviceDesc.getFullName(), methodDesc.getName()))
    .setRequestMarshaller(ProtoUtils.marshaller(
        DynamicMessage.getDefaultInstance(methodDesc.getInputType())))
    .setResponseMarshaller(ProtoUtils.marshaller(
        DynamicMessage.getDefaultInstance(methodDesc.getOutputType())))
    .build();
}

static MethodDescriptor.MethodType getMethodTypeFromDesc(
  Descriptors.MethodDescriptor methodDesc
) {
  if (!methodDesc.isServerStreaming()
    && !methodDesc.isClientStreaming()) {
    return MethodDescriptor.MethodType.UNARY;
  } else if (methodDesc.isServerStreaming()
        && !methodDesc.isClientStreaming()) {
    return MethodDescriptor.MethodType.SERVER_STREAMING;
  } else if (!methodDesc.isServerStreaming()) {
    return MethodDescriptor.MethodType.CLIENT_STREAMING;
  } else {
    return MethodDescriptor.MethodType.BIDI_STREAMING;
  }
}

在那一点上,你拥有了你所需要的一切,可以在gRPC中调用Channel.newCall(method, CallOptions.DEFAULT)。你也可以自由地使用ClientCalls,以使用更类似于存根API的东西。

所以动态调用绝对是可行的,而且被用于诸如grpcurl之类的事情。但这也不容易,因此通常只在必要时才会这样做。

英文:

Protobuf systems really need protoc to be run. However, the generated code could be skipped. Instead of passing something like --java_out and --grpc_java_out to protoc you can pass --descriptor_set_out=FILE which will parse the .proto file into a descriptor file. A descriptor file is a proto-encoded FileDescriptorSet. This is the same basic format as used with the reflection service.

Once you have a descriptor, you can load it a FileDescriptor at a time and create a DynamicMessage.

Then for the gRPC piece, you need to create a gRPC MethodDescriptor.

static MethodDescriptor from(
  Descriptors.MethodDescriptor methodDesc
) {
  return MethodDescriptor.&lt;DynamicMessage, DynamicMessage&gt;newBuilder()
    // UNKNOWN is fine, but the &quot;correct&quot; value can be computed from
    // methodDesc.toProto().getClientStreaming()/getServerStreaming()
    .setType(getMethodTypeFromDesc(methodDesc))
    .setFullMethodName(MethodDescriptor.generateFullMethodName(
        serviceDesc.getFullName(), methodDesc.getName()))
    .setRequestMarshaller(ProtoUtils.marshaller(
        DynamicMessage.getDefaultInstance(methodDesc.getInputType())))
    .setResponseMarshaller(ProtoUtils.marshaller(
        DynamicMessage.getDefaultInstance(methodDesc.getOutputType())))
    .build();

static MethodDescriptor.MethodType getMethodTypeFromDesc(
  Descriptors.MethodDescriptor methodDesc
) {
  if (!methodDesc.isServerStreaming()
    &amp;&amp; !methodDesc.isClientStreaming()) {
    return MethodDescriptor.MethodType.UNARY;
  } else if (methodDesc.isServerStreaming()
        &amp;&amp; !methodDesc.isClientStreaming()) {
    return MethodDescriptor.MethodType.SERVER_STREAMING;
  } else if (!methodDesc.isServerStreaming()) {
    return MethodDescriptor.MethodType.CLIENT_STREAMING);
  } else {
    return MethodDescriptor.MethodType.BIDI_STREAMING);
  }
}

At that point you have everything you need and can call Channel.newCall(method, CallOptions.DEFAULT) in gRPC. You're also free to use ClientCalls to use something more similar to the stub APIs.

So dynamic calls are definitely possible, and is used for things like grpcurl. But it also is not easy and so is generally only done when necessary.

答案2

得分: 3

import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.util.JsonFormat;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Slf4j
public class ReflectionCall {

    public static void main(String[] args) throws InterruptedException {
        String methodSymbol = "io.github.helloworlde.grpc.HelloService.SayHello";
        String requestContent = "{\"message\": \"Reflection\"}";

        ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9090)
                                                      .usePlaintext()
                                                      .build();
        
        ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);
        
        StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {
            @Override
            public void onNext(ServerReflectionResponse response) {
                try {
                    if (response.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {
                        List<ByteString> fileDescriptorProtoList = response.getFileDescriptorResponse().getFileDescriptorProtoList();
                        handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContent);
                    } else {
                        log.warn("Unknown response type: " + response.getMessageResponseCase());
                    }
                } catch (Exception e) {
                    log.error("Failed to handle response: {}", e.getMessage(), e);
                }
            }

            @Override
            public void onError(Throwable t) {}

            @Override
            public void onCompleted() {
                log.info("Complete");
            }
        };

        StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);

        ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder()
                                                                                        .setFileContainingSymbol(methodSymbol)
                                                                                        .build();
        requestStreamObserver.onNext(getFileContainingSymbolRequest);
        channel.awaitTermination(10, TimeUnit.SECONDS);
    }

    // Other code methods and classes...
}

Please note that the provided Java code snippet is a partial translation of your original code. If you need further assistance or specific translations for other parts, feel free to ask.

英文:

I did it in Java, and the step is:

  1. Call reflection service to get FileDescriptorProto list by method name
  2. Get FileDescriptor of method from FileDescriptorProto list by package name, service name
  3. Get MethodDescriptor from ServiceDescriptor which get from the FileDescriptor
  4. Generate a MethodDescriptor&lt;DynamicMessage, DynamicMessage&gt; by MethodDescriptor
  5. Build request DynamicMessage from content like JSON or others
  6. Call method
  7. Parse response content to JSON from DynamicMessage response

You can reference the full sample in project helloworlde/grpc-java-sample#reflection


And proto is:

syntax = &quot;proto3&quot;;

package io.github.helloworlde.grpc;

option go_package = &quot;api;grpc_gateway&quot;;
option java_package = &quot;io.github.helloworlde.grpc&quot;;
option java_multiple_files = true;
option java_outer_classname = &quot;HelloWorldGrpc&quot;;

service HelloService{
  rpc SayHello(HelloMessage) returns (HelloResponse){
  }
}

message HelloMessage {
  string message = 2;
}

message HelloResponse {
  string message = 1;
}

Start server for this proto by yourself, and the full code in Java just like:

import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.util.JsonFormat;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Slf4j
public class ReflectionCall {

    public static void main(String[] args) throws InterruptedException {
        // 反射方法的格式只支持 package.service.method 或者 package.service
        String methodSymbol = &quot;io.github.helloworlde.grpc.HelloService.SayHello&quot;;
        String requestContent = &quot;{\&quot;message\&quot;: \&quot;Reflection\&quot;}&quot;;

        // 构建 Channel
        ManagedChannel channel = ManagedChannelBuilder.forAddress(&quot;127.0.0.1&quot;, 9090)
                                                      .usePlaintext()
                                                      .build();
        // 使用 Channel 构建 BlockingStub
        ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);
        // 响应观察器
        StreamObserver&lt;ServerReflectionResponse&gt; streamObserver = new StreamObserver&lt;ServerReflectionResponse&gt;() {
            @Override
            public void onNext(ServerReflectionResponse response) {
                try {
                    // 只需要关注文件描述类型的响应
                    if (response.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {
                        List&lt;ByteString&gt; fileDescriptorProtoList = response.getFileDescriptorResponse().getFileDescriptorProtoList();
                        handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContent);
                    } else {
                        log.warn(&quot;未知响应类型: &quot; + response.getMessageResponseCase());
                    }
                } catch (Exception e) {
                    log.error(&quot;处理响应失败: {}&quot;, e.getMessage(), e);
                }
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onCompleted() {
                log.info(&quot;Complete&quot;);
            }
        };
        // 请求观察器
        StreamObserver&lt;ServerReflectionRequest&gt; requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);

        // 构建并发送获取方法文件描述请求
        ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder()
                                                                                        .setFileContainingSymbol(methodSymbol)
                                                                                        .build();
        requestStreamObserver.onNext(getFileContainingSymbolRequest);
        channel.awaitTermination(10, TimeUnit.SECONDS);
    }

    /**
     * 处理响应
     */
    private static void handleResponse(List&lt;ByteString&gt; fileDescriptorProtoList,
                                       ManagedChannel channel,
                                       String methodFullName,
                                       String requestContent) {
        try {
            // 解析方法和服务名称
            String fullServiceName = extraPrefix(methodFullName);
            String methodName = extraSuffix(methodFullName);
            String packageName = extraPrefix(fullServiceName);
            String serviceName = extraSuffix(fullServiceName);

            // 根据响应解析 FileDescriptor
            Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);

            // 查找服务描述
            Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);
            // 查找方法描述
            Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);

            // 发起请求
            executeCall(channel, fileDescriptor, methodDescriptor, requestContent);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 解析并查找方法对应的文件描述
     */
    private static Descriptors.FileDescriptor getFileDescriptor(List&lt;ByteString&gt; fileDescriptorProtoList,
                                                                String packageName,
                                                                String serviceName) throws Exception {

        Map&lt;String, DescriptorProtos.FileDescriptorProto&gt; fileDescriptorProtoMap =
                fileDescriptorProtoList.stream()
                                       .map(bs -&gt; {
                                           try {
                                               return DescriptorProtos.FileDescriptorProto.parseFrom(bs);
                                           } catch (InvalidProtocolBufferException e) {
                                               e.printStackTrace();
                                           }
                                           return null;
                                       })
                                       .filter(Objects::nonNull)
                                       .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -&gt; f));


        if (fileDescriptorProtoMap.isEmpty()) {
            log.error(&quot;服务不存在&quot;);
            throw new IllegalArgumentException(&quot;方法的文件描述不存在&quot;);
        }

        // 查找服务对应的 Proto 描述
        DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);

        // 获取这个 Proto 的依赖
        Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);

        // 生成 Proto 的 FileDescriptor
        return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
    }


    /**
     * 根据包名和服务名查找相应的文件描述
     */
    private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName,
                                                                                       String serviceName,
                                                                                       Map&lt;String, DescriptorProtos.FileDescriptorProto&gt; fileDescriptorProtoMap) {
        for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) {
            if (proto.getPackage().equals(packageName)) {
                boolean exist = proto.getServiceList()
                                     .stream()
                                     .anyMatch(s -&gt; serviceName.equals(s.getName()));
                if (exist) {
                    return proto;
                }
            }
        }

        throw new IllegalArgumentException(&quot;服务不存在&quot;);
    }

    /**
     * 获取前缀
     */
    private static String extraPrefix(String content) {
        int index = content.lastIndexOf(&quot;.&quot;);
        return content.substring(0, index);
    }

    /**
     * 获取后缀
     */
    private static String extraSuffix(String content) {
        int index = content.lastIndexOf(&quot;.&quot;);
        return content.substring(index + 1);
    }

    /**
     * 获取依赖类型
     */
    private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto,
                                                                Map&lt;String, DescriptorProtos.FileDescriptorProto&gt; finalDescriptorProtoMap) {
        return proto.getDependencyList()
                    .stream()
                    .map(finalDescriptorProtoMap::get)
                    .map(f -&gt; toFileDescriptor(f, getDependencies(f, finalDescriptorProtoMap)))
                    .toArray(Descriptors.FileDescriptor[]::new);
    }

    /**
     * 将 FileDescriptorProto 转为 FileDescriptor
     */
    @SneakyThrows
    private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto,
                                                               Descriptors.FileDescriptor[] dependencies) {
        return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
    }


    /**
     * 执行方法调用
     */
    private static void executeCall(ManagedChannel channel,
                                    Descriptors.FileDescriptor fileDescriptor,
                                    Descriptors.MethodDescriptor originMethodDescriptor,
                                    String requestContent) throws Exception {

        // 重新生成 MethodDescriptor
        MethodDescriptor&lt;DynamicMessage, DynamicMessage&gt; methodDescriptor = generateMethodDescriptor(originMethodDescriptor);

        CallOptions callOptions = CallOptions.DEFAULT;

        TypeRegistry registry = TypeRegistry.newBuilder()
                                            .add(fileDescriptor.getMessageTypes())
                                            .build();

        // 将请求内容由 JSON 字符串转为相应的类型
        JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(registry);
        DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType());
        parser.merge(requestContent, messageBuilder);
        DynamicMessage requestMessage = messageBuilder.build();

        // 调用,调用方式可以通过 originMethodDescriptor.isClientStreaming() 和 originMethodDescriptor.isServerStreaming() 推断
        DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage);

        // 将响应解析为 JSON 字符串
        JsonFormat.Printer printer = JsonFormat.printer()
                                               .usingTypeRegistry(registry)
                                               .includingDefaultValueFields();
        String responseContent = printer.print(response);

        log.info(&quot;响应: {}&quot;, responseContent);
    }

    /**
     * 重新生成方法描述
     */
    private static MethodDescriptor&lt;DynamicMessage, DynamicMessage&gt; generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {
        // 生成方法全名
        String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());
        // 请求和响应类型
        MethodDescriptor.Marshaller&lt;DynamicMessage&gt; inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())
                                                                                                              .buildPartial());
        MethodDescriptor.Marshaller&lt;DynamicMessage&gt; outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())
                                                                                                               .buildPartial());

        // 生成方法描述, originMethodDescriptor 的 fullMethodName 不正确
        return MethodDescriptor.&lt;DynamicMessage, DynamicMessage&gt;newBuilder()
                .setFullMethodName(fullMethodName)
                .setRequestMarshaller(inputTypeMarshaller)
                .setResponseMarshaller(outputTypeMarshaller)
                // 使用 UNKNOWN,自动修改
                .setType(MethodDescriptor.MethodType.UNKNOWN)
                .build();
    }
}

答案3

得分: 1

以下是您要翻译的内容:

"从技术上讲,并没有太多方法可以阻止这种情况。两个主要障碍是:

  1. 需要一个可在运行时调用的解析器来读取 .proto 文件,以及
  2. 需要一个通用的 gRPC 客户端,可以将诸如服务方法名称之类的内容作为文字处理。

这两点都是可能的,但都不是简单的。

对于第一个障碍,一个粗略的方法是使用描述符集选项来调用 protoc,以生成模式二进制文件,然后将其反序列化为 FileDescriptorSet(来自 descriptor.proto);这个模型使您可以访问 protoc 如何查看文件。一些平台还具有本地解析器(基本上是在该平台中将 protoc 重新实现为库),例如 .NET 中的 protobuf-net.Reflection 就是这样做的。

对于第二个障碍,这里有一个在 C# 中的实现。这种方法在 Java 中也应该是相当可移植的,尽管细节会有所不同。您可以查看生成的实现,以了解它在任何特定语言中的工作方式。

(很抱歉,具体的示例是针对 C#/.NET 的,但那是我所熟悉的领域;方法应该是可移植的,尽管具体的代码不是直接适用的。)"

英文:

There isn't much to prevent this technically. The two big hurdles are:

  1. having a runtime-callable parser for reading the .proto, and
  2. having a general purpose gRPC client available that takes things like the service method name as literals

Both are possible, but neither is trivial.

For 1, the crude way would be to shell/invoke protoc using the descriptor-set option to generate a schema binary, then deserialize that as a FileDescriptorSet (from descriptor.proto); this model gives you access to how protoc sees the file. Some platforms also have native parsers (essentially reimplementing protoc as a library in that platform), for example protobuf-net.Reflection does this in .NET-land

For 2, here's an implementation of that in C#. The approach should be fairly portable to Java, even if the details vary. You can look at a generated implementation to see how it works in any particular language.

(Sorry that the specific examples are C#/.NET, but that's where I live; the approaches should be portable, even if the specific code: not directly)

答案4

得分: 0

技术上两者都是可行的。

代码生成只是简单地生成了一些类;主要是protobuf消息、grpc方法描述符和存根。您可以实现它或者在生成的代码中进行检查以绕过代码生成。老实说,我不确定这样做的好处是什么。而且,如果proto文件发生了更改,这样做将会非常麻烦。

使用字节码生成也是可能的,只要您在代码库中提交一些接口/抽象类来表示那些生成的存根/方法描述符和protobuf消息。但是您必须确保这些非动态代码与proto定义保持同步(很可能需要运行时检查/异常处理)。

英文:

technically both are possible.

The codegen is simply generating a handful of classes; mainly protobuf messages, grpc method descriptors and stubs. You can implement it or check in the generated code to bypass the codegen. i am not sure what is the benefit of doing this tbh. Also, it will be very annoying if the proto is changed.

It is also possible to do it dynamically using byte codegen as long as you check-in some interfaces/abstract classes to represent those generated stub/method descriptors and protobuf messages. you have to make sure those non dynamic code is in sync with the proto definition though (most likely runtime check/exception).

huangapple
  • 本文由 发表于 2020年4月10日 11:19:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/61133529.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定