英文:
kubernetes is not routing traffic to grpc service
问题
我正在尝试在digitalocean
的Kubernetes
集群中使用GRPC,但尚未成功。
> PS:对于内容较长,我表示歉意。
我找到了一些相关内容,但这些内容都涉及到一些入口。对我来说,这些是内部服务。
我有一个定义为.proto
的文件,内容如下:
syntax = "proto3";
package imgproto;
option go_package = ".;imgproto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
service ImaginaryServer {
rpc Ping(PingPayload) returns (PongPayload) {}
}
message PingPayload {
google.protobuf.Timestamp ts = 1;
}
message PongPayload {
google.protobuf.Timestamp ts = 1;
}
在运行proto-gen-go
之后,我使用以下代码填充implementation
:
type ImaginaryServerImpl struct {
imgproto.UnimplementedImaginaryServer
}
func (s *ImaginaryServerImpl) Ping(_ context.Context, in *imgproto.PingPayload) (*imgproto.PongPayload, error) {
fmt.Printf("ImaginaryServerImpl.Ping: %v\n", in)
return &imgproto.PongPayload{
Ts: timestamppb.New(time.Now()),
}, nil
}
创建并将其注册到GRPC服务器:
grpcServer := grpc.NewServer()
imgproto.RegisterImaginaryServer(grpcServer, &ImaginaryServerImpl{})
然后启动服务器:
grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", constants.PORT_GRPC))
if err != nil {
return err
}
go func() {
if err := grpcServer.Serve(grpcListener); err != nil {
fmt.Println("GRPC Server startup failed with", err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
我编写了以下客户端代码:
grpcConnection, err := grpc.Dial(
endpoint,
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("Calling GRPC:", method, req, reply, invoker)
return nil
}))
if err != nil {
return ctx, err
}
client := pmqproto.NewImaginaryClient(grpcConnection)
fmt.Println(" >>>>>>>> PING:")
pong, pingErr := client.Ping(ctx, &imgproto.PingPayload{Ts: timestamppb.Now()}, grpc.WaitForReady(false))
if pingErr != nil {
fmt.Println(pingErr)
}
fmt.Println(" >>>>>>>> PONG: ", pong.Ts.AsTime().String())
但是看起来客户端返回时并没有实际调用RPC。
客户端中的日志:
>>>>>>>> PING:
Calling GRPC: /imgproto.ImaginaryServer/Ping ts:{seconds:1666113879 nanos:778900352} 0x127aa60
>>>>>>>> PONG: 1970-01-01 00:00:00 +0000 UTC
> 服务器中没有日志。
我的k8s
YAML 文件如下:
apiVersion: v1
kind: Service
metadata:
name: reality
namespace: reality-ns
spec:
type: ClusterIP
selector:
app: reality
ports:
- name: grpc
protocol: TCP
port: 6772
targetPort: 6772
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: reality
namespace: reality-ns
labels:
app: reality
spec:
selector:
matchLabels:
app: reality
template:
metadata:
labels:
app: reality
spec:
containers:
- name: reality
image: registry.example.com/binaek/reality
imagePullPolicy: Always
command: ["cmd"]
ports:
- name: grpc
containerPort: 6772
我无法找到我做错了什么。在这一点上,我非常需要帮助。
> 该镜像使用gcr.io/distroless/base
作为基础镜像。
英文:
I am trying to make GRPC work in a digitalocean
Kubernetes
cluster - but am still to succeed.
> PS: Apologies for the long content
I have found some content regarding this but those revolve around some ingress. For me, these are internal services.
I have a .proto
defined as such:
syntax = "proto3";
package imgproto;
option go_package = ".;imgproto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
service ImaginaryServer {
rpc Ping(PingPayload) returns (PongPayload) {}
}
message PingPayload {
google.protobuf.Timestamp ts = 1;
}
message PongPayload {
google.protobuf.Timestamp ts = 1;
}
After running proto-gen-go
, I populate the implementation
with:
type ImaginaryServerImpl struct {
imgproto.UnimplementedImaginaryServer
}
func (s *ImaginaryServerImpl) Ping(_ context.Context, in *imgproto.PingPayload) (*imgproto.PongPayload, error) {
fmt.Printf("ImaginaryServerImpl.Ping: %v\n", in)
return &imgproto.PongPayload{
Ts: timestamppb.New(time.Now()),
}, nil
}
Create and register it to a GRPC server:
grpcServer := grpc.NewServer()
imgproto.RegisterImaginaryServer(grpcServer, &ImaginaryServerImpl{})
And start the server:
grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", constants.PORT_GRPC))
if err != nil {
return err
}
go func() {
if err := grpcServer.Serve(grpcListener); err != nil {
fmt.Println("GRPC Server startup failed with", err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
I wrote up the client as:
grpcConnection, err := grpc.Dial(
endpoint,
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("Calling GRPC:", method, req, reply, invoker)
return nil
}))
if err != nil {
return ctx, err
}
client := pmqproto.NewImaginaryClient(grpcConnection)
fmt.Println(" >>>>>>>>>>> PING:")
pong, pingErr := client.Ping(ctx, &imgproto.PingPayload{Ts: timestamppb.Now()}, grpc.WaitForReady(false))
if pingErr != nil {
fmt.Println(pingErr)
}
fmt.Println(" >>>>>>>>>>> PONG: ", pong.Ts.AsTime().String())
But it looks like the client is returning without actually invoking the RPC.
Log
that I am seeing in the client:
>>>>>>>>>>> PING:
Calling GRPC: /imgproto.ImaginaryServer/Ping ts:{seconds:1666113879 nanos:778900352} 0x127aa60
>>>>>>>>>>> PONG: 1970-01-01 00:00:00 +0000 UTC
> There are no logs in the server.
My k8s
yaml is as such:
apiVersion: v1
kind: Service
metadata:
name: reality
namespace: reality-ns
spec:
type: ClusterIP
selector:
app: reality
ports:
- name: grpc
protocol: TCP
port: 6772
targetPort: 6772
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: reality
namespace: reality-ns
labels:
app: reality
spec:
selector:
matchLabels:
app: reality
template:
metadata:
labels:
app: reality
spec:
containers:
- name: reality
image: registry.example.com/binaek/reality
imagePullPolicy: Always
command: ["cmd"]
ports:
- name: grpc
containerPort: 6772
I am not able to locate what I am doing wrong. Desperately need help at this point.
> The image uses gcr.io/distroless/base
as it's base.
答案1
得分: 1
您的拦截器使用可能会阻止RPC的执行。
// 当在ClientConn上设置一元拦截器时,gRPC将所有一元RPC调用委托给拦截器,拦截器有责任调用invoker来完成RPC的处理。
https://github.com/grpc/grpc-go/blob/9127159caf5a3879dad56b795938fde3bc0a7eaa/interceptor.go#L31-L34
因此,您的拦截器函数应该像这样:
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("Calling GRPC:", method, req, reply, invoker)
return invoker(ctx, method, req, reply, cc, opts...)
}))
英文:
Your interceptor usage is likely preventing execution of the RPC.
// When a unary interceptor(s) is set on a ClientConn, gRPC
// delegates all unary RPC invocations to the interceptor, and it is the
// responsibility of the interceptor to call invoker to complete the processing
// of the RPC.
https://github.com/grpc/grpc-go/blob/9127159caf5a3879dad56b795938fde3bc0a7eaa/interceptor.go#L31-L34
So your interceptor function should instead look like:
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("Calling GRPC:", method, req, reply, invoker)
return invoker(ctx, method, req, reply, cc, opts...)
}))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论