修改Go GRPC服务器流拦截器上的元数据

huangapple go评论86阅读模式
英文:

Modifying metadata on Go GRPC server streaming interceptor

问题

我一直在尝试在服务器流拦截器上设置元数据,以便实际的RPC函数可以在下游读取它们:

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
    return handler(srv, ss)
}

func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
    ctx := client.Context()
    userID, ok := HeaderFromMetadata(ctx, "X-User-Id")

    log.Printf("User ID: %s, Ok: %t\n", userID, ok)
    return nil
}

func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
    meta, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return "", false
    }

    for _, header := range headers {
        if value := meta.Get(header); len(value) > 0 {
            return value[0], true
        }
    }

    return "", false
}

我的服务器注册如下:

server := grpc.NewServer(
    grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())

我遇到的问题是找不到用户ID头。我可以看到当客户端发送请求时拦截器被调用,并且我可以看到元数据包含头部,但实际的RPC似乎无法提取它。我在这里做错了什么?

英文:

I've been trying to set metadata on a server stream interceptor so they could be read downstream by the actual RPC function:

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
	return handler(srv, ss)
}

func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
    ctx := client.Context()
    userID, ok := HeaderFromMetadata(ctx, "X-User-Id")

    log.Printf("User ID: %s, Ok: %t\n", userID, ok)
    return nil
}

func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
	meta, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return "", false
	}

	for _, header := range headers {
		if value := meta.Get(header); len(value) > 0 {
			return value[0], true
		}
	}

	return "", false
}

And my server is registered like this:

server := grpc.NewServer(
    grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())

The problem I'm having is that the user ID header isn't being found. I can see that the interceptor is called when the client sends the request, and I can see that the metadata contains the header, but the actual RPC can't seem to extract it. What am I doing wrong here?

答案1

得分: 5

更新

更简单的解决方案是只覆盖ServerStreamContext()方法。

type serverStream struct {
	grpc.ServerStream
	ctx context.Context
}

func (s *serverStream) Context() context.Context {
	return s.ctx
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	md, ok := metadata.FromIncomingContext(ss.Context())
	if ok {
		md.Append("X-User-Id", "real_user_id")
	}
	newCtx := metadata.NewIncomingContext(ss.Context(), md)

	return handler(srv, &serverStream{ss, newCtx})
}

更新

另一个简单的解决方案是定义一个grpc.ServerStream的包装器,如下所示:

type serverStreamWrapper struct {
	ss  grpc.ServerStream
	ctx context.Context
}

func (w serverStreamWrapper) Context() context.Context        { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error   { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error   { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error  { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD)       { w.ss.SetTrailer(md) }

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	md, ok := metadata.FromIncomingContext(ss.Context())
	if ok {
		md.Append("X-User-Id", "real_user_id")
	}
	newCtx := metadata.NewIncomingContext(ss.Context(), md)

	return handler(srv, serverStreamWrapper{ss, newCtx})
}

你可以使用NewIncomingContext在流中创建当前上下文的副本。

由于没有设置grpc.ServerStreamcontext的方法,为了将上下文设置回ServerStream,定义了一个带有context.ContextwrappedStream,以及一个SetContext方法来设置context.Context

type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) SetContext(ctx context.Context) {
    w.ctx = ctx
}

完整的示例代码

type wrappedStream struct {
	grpc.ServerStream
	ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
	return w.ctx
}

func (w *wrappedStream) SetContext(ctx context.Context) {
	w.ctx = ctx
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	return w.ServerStream.SendMsg(m)
}

type StreamContextWrapper interface {
	grpc.ServerStream
	SetContext(context.Context)
}

func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
	ctx := ss.Context()
	return &wrappedStream{
		ss,
		ctx,
	}
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	md, ok := metadata.FromIncomingContext(ss.Context())
	if ok {
		md.Append("X-User-Id", "real_user_id")
	}
	newCtx := metadata.NewIncomingContext(ss.Context(), md)

	sw := newStreamContextWrapper(ss)
	sw.SetContext(newCtx)

	return handler(srv, sw)
}
英文:

Update

The more simple solution is only override the Context() method of ServerStream

type serverStream struct {
	grpc.ServerStream
	ctx context.Context
}

func (s *serverStream) Context() context.Context {
	return s.ctx
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	md, ok := metadata.FromIncomingContext(ss.Context())
	if ok {
		md.Append("X-User-Id", "real_user_id")
	}
	newCtx := metadata.NewIncomingContext(ss.Context(), md)

	return handler(srv, &serverStream{ss, newCtx})
}

Update

Another simple solution is to define one wrapper to grpc.ServerStream as below

type serverStreamWrapper struct {
	ss  grpc.ServerStream
	ctx context.Context
}

func (w serverStreamWrapper) Context() context.Context        { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error   { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error   { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error  { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD)       { w.ss.SetTrailer(md) }

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	md, ok := metadata.FromIncomingContext(ss.Context())
	if ok {
		md.Append("X-User-Id", "real_user_id")
	}
	newCtx := metadata.NewIncomingContext(ss.Context(), md)

	return handler(srv, serverStreamWrapper{ss, newCtx})
}

You could use NewIncomingContext to create a copy of the current context in the stream.

Since there is no method to set the context of grpc.ServerStream, in order to set the context back to ServerStream, the wrappedStream is defined with context.Context, and SetContext method to set context.Context

type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) SetContext(ctx context.Context) {
    w.ctx = ctx
}

Full sample codes

type wrappedStream struct {
	grpc.ServerStream
	ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
	return w.ctx
}

func (w *wrappedStream) SetContext(ctx context.Context) {
	w.ctx = ctx
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	return w.ServerStream.SendMsg(m)
}

type StreamContextWrapper interface {
	grpc.ServerStream
	SetContext(context.Context)
}

func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
	ctx := ss.Context()
	return &wrappedStream{
		ss,
		ctx,
	}
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	md, ok := metadata.FromIncomingContext(ss.Context())
	if ok {
		md.Append("X-User-Id", "real_user_id")
	}
	newCtx := metadata.NewIncomingContext(ss.Context(), md)

	sw := newStreamContextWrapper(ss)
	sw.SetContext(newCtx)

	return handler(srv, sw)
}

huangapple
  • 本文由 发表于 2022年10月21日 11:05:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/74148348.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定