英文:
Modifying metadata on Go GRPC server streaming interceptor
问题
我一直在尝试在服务器流拦截器上设置元数据,以便实际的RPC函数可以在下游读取它们:
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
return handler(srv, ss)
}
func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
ctx := client.Context()
userID, ok := HeaderFromMetadata(ctx, "X-User-Id")
log.Printf("User ID: %s, Ok: %t\n", userID, ok)
return nil
}
func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
for _, header := range headers {
if value := meta.Get(header); len(value) > 0 {
return value[0], true
}
}
return "", false
}
我的服务器注册如下:
server := grpc.NewServer(
grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())
我遇到的问题是找不到用户ID头。我可以看到当客户端发送请求时拦截器被调用,并且我可以看到元数据包含头部,但实际的RPC似乎无法提取它。我在这里做错了什么?
英文:
I've been trying to set metadata on a server stream interceptor so they could be read downstream by the actual RPC function:
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
return handler(srv, ss)
}
func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
ctx := client.Context()
userID, ok := HeaderFromMetadata(ctx, "X-User-Id")
log.Printf("User ID: %s, Ok: %t\n", userID, ok)
return nil
}
func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
for _, header := range headers {
if value := meta.Get(header); len(value) > 0 {
return value[0], true
}
}
return "", false
}
And my server is registered like this:
server := grpc.NewServer(
grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())
The problem I'm having is that the user ID header isn't being found. I can see that the interceptor is called when the client sends the request, and I can see that the metadata contains the header, but the actual RPC can't seem to extract it. What am I doing wrong here?
答案1
得分: 5
更新
更简单的解决方案是只覆盖ServerStream的Context()方法。
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, &serverStream{ss, newCtx})
}
更新
另一个简单的解决方案是定义一个grpc.ServerStream的包装器,如下所示:
type serverStreamWrapper struct {
ss grpc.ServerStream
ctx context.Context
}
func (w serverStreamWrapper) Context() context.Context { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD) { w.ss.SetTrailer(md) }
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, serverStreamWrapper{ss, newCtx})
}
你可以使用NewIncomingContext在流中创建当前上下文的副本。
由于没有设置grpc.ServerStream的context的方法,为了将上下文设置回ServerStream,定义了一个带有context.Context的wrappedStream,以及一个SetContext方法来设置context.Context。
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
完整的示例代码
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
return w.ServerStream.SendMsg(m)
}
type StreamContextWrapper interface {
grpc.ServerStream
SetContext(context.Context)
}
func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
ctx := ss.Context()
return &wrappedStream{
ss,
ctx,
}
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
sw := newStreamContextWrapper(ss)
sw.SetContext(newCtx)
return handler(srv, sw)
}
英文:
Update
The more simple solution is only override the Context() method of ServerStream
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, &serverStream{ss, newCtx})
}
Update
Another simple solution is to define one wrapper to grpc.ServerStream as below
type serverStreamWrapper struct {
ss grpc.ServerStream
ctx context.Context
}
func (w serverStreamWrapper) Context() context.Context { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD) { w.ss.SetTrailer(md) }
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, serverStreamWrapper{ss, newCtx})
}
You could use NewIncomingContext to create a copy of the current context in the stream.
Since there is no method to set the context of grpc.ServerStream, in order to set the context back to ServerStream, the wrappedStream is defined with context.Context, and SetContext method to set context.Context
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
Full sample codes
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
return w.ServerStream.SendMsg(m)
}
type StreamContextWrapper interface {
grpc.ServerStream
SetContext(context.Context)
}
func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
ctx := ss.Context()
return &wrappedStream{
ss,
ctx,
}
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
sw := newStreamContextWrapper(ss)
sw.SetContext(newCtx)
return handler(srv, sw)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论