英文:
Modifying metadata on Go GRPC server streaming interceptor
问题
我一直在尝试在服务器流拦截器上设置元数据,以便实际的RPC函数可以在下游读取它们:
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
return handler(srv, ss)
}
func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
ctx := client.Context()
userID, ok := HeaderFromMetadata(ctx, "X-User-Id")
log.Printf("User ID: %s, Ok: %t\n", userID, ok)
return nil
}
func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
for _, header := range headers {
if value := meta.Get(header); len(value) > 0 {
return value[0], true
}
}
return "", false
}
我的服务器注册如下:
server := grpc.NewServer(
grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())
我遇到的问题是找不到用户ID头。我可以看到当客户端发送请求时拦截器被调用,并且我可以看到元数据包含头部,但实际的RPC似乎无法提取它。我在这里做错了什么?
英文:
I've been trying to set metadata on a server stream interceptor so they could be read downstream by the actual RPC function:
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
return handler(srv, ss)
}
func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
ctx := client.Context()
userID, ok := HeaderFromMetadata(ctx, "X-User-Id")
log.Printf("User ID: %s, Ok: %t\n", userID, ok)
return nil
}
func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
for _, header := range headers {
if value := meta.Get(header); len(value) > 0 {
return value[0], true
}
}
return "", false
}
And my server is registered like this:
server := grpc.NewServer(
grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())
The problem I'm having is that the user ID header isn't being found. I can see that the interceptor is called when the client sends the request, and I can see that the metadata contains the header, but the actual RPC can't seem to extract it. What am I doing wrong here?
答案1
得分: 5
更新
更简单的解决方案是只覆盖ServerStream
的Context()
方法。
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, &serverStream{ss, newCtx})
}
更新
另一个简单的解决方案是定义一个grpc.ServerStream
的包装器,如下所示:
type serverStreamWrapper struct {
ss grpc.ServerStream
ctx context.Context
}
func (w serverStreamWrapper) Context() context.Context { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD) { w.ss.SetTrailer(md) }
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, serverStreamWrapper{ss, newCtx})
}
你可以使用NewIncomingContext在流中创建当前上下文的副本。
由于没有设置grpc.ServerStream
的context
的方法,为了将上下文设置回ServerStream
,定义了一个带有context.Context
的wrappedStream
,以及一个SetContext
方法来设置context.Context
。
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
完整的示例代码
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
return w.ServerStream.SendMsg(m)
}
type StreamContextWrapper interface {
grpc.ServerStream
SetContext(context.Context)
}
func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
ctx := ss.Context()
return &wrappedStream{
ss,
ctx,
}
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
sw := newStreamContextWrapper(ss)
sw.SetContext(newCtx)
return handler(srv, sw)
}
英文:
Update
The more simple solution is only override the Context()
method of ServerStream
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, &serverStream{ss, newCtx})
}
Update
Another simple solution is to define one wrapper to grpc.ServerStream
as below
type serverStreamWrapper struct {
ss grpc.ServerStream
ctx context.Context
}
func (w serverStreamWrapper) Context() context.Context { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD) { w.ss.SetTrailer(md) }
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, serverStreamWrapper{ss, newCtx})
}
You could use NewIncomingContext to create a copy of the current context in the stream.
Since there is no method to set the context
of grpc.ServerStream
, in order to set the context back to ServerStream
, the wrappedStream
is defined with context.Context
, and SetContext
method to set context.Context
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
Full sample codes
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
return w.ServerStream.SendMsg(m)
}
type StreamContextWrapper interface {
grpc.ServerStream
SetContext(context.Context)
}
func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
ctx := ss.Context()
return &wrappedStream{
ss,
ctx,
}
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
sw := newStreamContextWrapper(ss)
sw.SetContext(newCtx)
return handler(srv, sw)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论