修改connect-go拦截器中的responsebody。

huangapple go评论90阅读模式
英文:

Modifying responsebody in connect-go interceptor

问题

我正在使用Buf的connect-go库来实现一个gRPC服务器。

许多gRPC调用都是时间敏感的,因此它们包含一个字段,客户端使用该字段发送其当前时间戳。服务器将客户端时间戳与本地时间戳进行比较,并返回它们之间的差异。以下是.proto定义中的示例:

service EventService {
    // Start performing a task
    rpc Start (StartRequest) returns (StartResponse);
}

message StartRequest {
    int64 location_id = 1;
    int64 task_id = 2;
    Location user_latlng = 3;
    google.protobuf.Timestamp now_on_device = 4;
}

message StartResponse {
    TaskPerformanceInfo info = 1;
    google.protobuf.Duration device_offset = 2;
}

由于我已经为多个RPC方法实现了这个功能,我想看看是否可以使用拦截器来处理它,这样我就不需要确保在所有单独的RPC方法实现中都处理它。

由于protoc-gen-go编译器如何为字段定义getter方法,通过定义一个接口并使用类型断言来检查请求消息是否包含now_on_device字段非常容易:

type hasNowOnDevice interface {
	GetNowOnDevice() *timestamppb.Timestamp
}
if reqWithNow, ok := req.Any().(hasNowOnDevice); ok {
   // ...
}

这使得拦截器的大部分代码编写起来非常容易:

func MakeDeviceTimeInterceptor() func(connect.UnaryFunc) connect.UnaryFunc {
	return connect.UnaryInterceptorFunc(
		func(next connect.UnaryFunc) connect.UnaryFunc {
			return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
				now := time.Now().UTC()
				ctxa := context.WithValue(ctx, CurrentTimestampKey{}, now)

				var deviceTimeOffset time.Duration
				// 如果protobuf消息有`NowOnDevice`字段,使用它来获取设备时间和服务器时间之间的差异。
				if reqWithNow, ok := req.Any().(hasNowOnDevice); ok {
					deviceTime := reqWithNow.GetNowOnDevice().AsTime()
					deviceTimeOffset = now.Sub(deviceTime)
					ctxa = context.WithValue(ctxa, DeviceTimeDiffKey{}, deviceTimeOffset)
				}

				res, err := next(ctxa, req)

				// TODO: 如何在这里修改响应?

				return res, err
			})
		},
	)
}

我遇到的问题(如上面的注释所述)是如何修改响应。

我无法像为请求那样为响应定义一个接口,因为protoc-gen-go没有定义setter方法。然后我想我可以使用类型切换,像这样(在上面的TODO注释处):

switch resMsg := res.Any().(type) {
case *livev1.StartResponse:
	resMsg.DeviceOffset = durationpb.New(deviceTimeOffset)
	return &connect.Response[livev1.StartResponse]{
		Msg: resMsg,
	}, err
case *livev1.StatusResponse:
	resMsg.DeviceOffset = durationpb.New(deviceTimeOffset)
	return &connect.Response[livev1.StatusResponse]{
		Msg: resMsg,
	}, err
}

这种方法有三个问题:

  1. 我找不到一种方法将旧响应的头部/尾部复制到这个新响应中。(我认为它们实际上在这一点上还没有设置,但我不能确定。)
  2. 使用类型断言要求我为每种类型重复几乎相同的代码块。
  3. 这不再比在每个RPC方法中单独实现这个功能更简单。

有没有更简单的方法使用拦截器来修改响应中的字段?或者我应该以其他方式来做这个?

英文:

I am using Buf's connect-go library to implement a gRPC server.

Many of the gRPC calls are time-sensitive, so they include a field that the client uses to send its current timestamp. The server compares the client timestamp with the local timestamp and returns the difference between them. Here is the an example from the .proto definitions:

service EventService {
    // Start performing a task
    rpc Start (StartRequest) returns (StartResponse);
}

message StartRequest {
    int64 location_id = 1;
    int64 task_id = 2;
    Location user_latlng = 3;
    google.protobuf.Timestamp now_on_device = 4;
}

message StartResponse {
    TaskPerformanceInfo info = 1;
    google.protobuf.Duration device_offset = 2;
}

Because I have this implemented for several RPC methods, I wanted to see if I could use an interceptor to handle it so I don't need to make sure it is being handled in all of the individual RPC method implementations.

Because of how the protoc-gen-go compiler defines getters for the fields, checking if the request message contains the now_on_device field is easily done by defining an interface and using type assertion:

type hasNowOnDevice interface {
	GetNowOnDevice() *timestamppb.Timestamp
}
if reqWithNow, ok := req.Any().(hasNowOnDevice); ok {
   // ...
}

This makes most of the interceptor very easy to write:

func MakeDeviceTimeInterceptor() func(connect.UnaryFunc) connect.UnaryFunc {
	return connect.UnaryInterceptorFunc(
		func(next connect.UnaryFunc) connect.UnaryFunc {
			return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
				now := time.Now().UTC()
				ctxa := context.WithValue(ctx, CurrentTimestampKey{}, now)

				var deviceTimeOffset time.Duration
				// If the protobuf message has a `NowOnDevice` field, use it
				// to get the difference betweent the device time and server time.
				if reqWithNow, ok := req.Any().(hasNowOnDevice); ok {
					deviceTime := reqWithNow.GetNowOnDevice().AsTime()
					deviceTimeOffset = now.Sub(deviceTime)
					ctxa = context.WithValue(ctxa, DeviceTimeDiffKey{}, deviceTimeOffset)
				}

				res, err := next(ctxa, req)

				// TODO: How do I modify the response here?

				return res, err
			})
		},
	)
}

The problem I have (as noted in the comment above) is how to modify the response.

I can't define an interface for the response the same way I did for the request because protoc-gen-go does not define setters. Then I thought I could just use a type switch, like this (where the TODO comment is above):

switch resMsg := res.Any().(type) {
case *livev1.StartResponse:
	resMsg.DeviceOffset = durationpb.New(deviceTimeOffset)
	return &connect.Response[livev1.StartResponse]{
		Msg: resMsg,
	}, err
case *livev1.StatusResponse:
	resMsg.DeviceOffset = durationpb.New(deviceTimeOffset)
	return &connect.Response[livev1.StatusResponse]{
		Msg: resMsg,
	}, err
}

There are three problems with this approach:

  1. I can't find a way to copy the headers/trailers from the old response into this new response. (I don't think they are actually set yet at this point, but I don't know that for sure.)
  2. Using type assertion requires me to repeat almost the same code block over and over for each type.
  3. This is no longer simpler than implementing this in each RPC method individually.

Is there an easier way to use an interceptor to modify a field in the response? Or is there some other way I should be doing this?

答案1

得分: 3

Deepankar提出了一个解决方案,尽管我确实看到将所有响应数据保留在模式定义的响应结构中的吸引力。如果protoc-gen-go生成与获取器配套的设置器,这肯定会更简单!

> 我找不到一种方法将旧响应的头部/尾部复制到这个新响应中。(我认为它们实际上在这一点上还没有设置,但我不能确定。)

你不需要这样做。在你的示例中,res.Any()返回一个指向protobuf消息的指针 - 你可以直接修改它。你的类型切换可以像这样:

switch resMsg := res.Any().(type) {
case *livev1.StartResponse:
    resMsg.DeviceOffset = durationpb.New(deviceTimeOffset)
case *livev1.StatusResponse:
    resMsg.DeviceOffset = durationpb.New(deviceTimeOffset)
}
return res, err

> 使用类型断言需要我为每种类型重复几乎相同的代码块。

不幸的是,你最好的选择可能是使用反射。你可以选择标准的Go反射或protobuf反射 - 两者都可以工作。使用protobuf反射,类似下面的代码可以解决问题:

res, err := next(ctx, req)
if err != nil {
	return nil, err
}
msg, ok := res.Any().(proto.Message)
if !ok {
	return res, nil
}

// 保留计算偏移量的逻辑!
var deviceTimeOffset time.Duration

// 你可以将这个作为全局变量。
durationName := (*durationpb.Duration)(nil).ProtoReflect().Descriptor().FullName()

refMsg := msg.ProtoReflect()
offsetFD := refMsg.Descriptor().Fields().ByName("DeviceOffset")
if offsetFD != nil &&
    offsetFD.Message() != nil &&
    offsetFD.Message().FullName() == durationName {
    refOffset := durationpb.New(deviceTimeOffset).ProtoReflect()
    refMsg.Set(
        offsetFD, 
        protoreflect.ValueOf(refOffset),
    )
}
return res, nil

你可以自行决定这是否比重复的类型切换更好 - 它要复杂得多,但确实更加DRY(不重复)一些。

英文:

Deepankar outlines one solution, though I do see the appeal of keeping all the response data in the schema-defined response structure. This would certainly be simpler if protoc-gen-go generated setters to go along with the getters!

> I can't find a way to copy the headers/trailers from the old response into this new response. (I don't think they are actually set yet at this point, but I don't know that for sure.)

You don't need to do this. In your example, res.Any() returns a pointer to the protobuf message - you can modify it in place. Your type switch can look like this:

switch resMsg := res.Any().(type) {
case *livev1.StartResponse:
    resMsg.DeviceOffset = durationpb.New(deviceTimeOffset)
case *livev1.StatusResponse:
    resMsg.DeviceOffset = durationpb.New(deviceTimeOffset)
}
return res, err

> Using type assertion requires me to repeat almost the same code block over and over for each type.

Unfortunately, your best bet here is likely reflection. You can choose between standard Go reflection or protobuf reflection - either should work. With protobuf reflection, something like this should do the trick:

res, err := next(ctx, req)
if err != nil {
	return nil, err
}
msg, ok := res.Any().(proto.Message)
if !ok {
	return res, nil
}

// Keep your logic to calculate offset!
var deviceTimeOffset time.Duration

// You could make this a global.
durationName := (*durationpb.Duration)(nil).ProtoReflect().Descriptor().FullName()

refMsg := msg.ProtoReflect()
offsetFD := refMsg.Descriptor().Fields().ByName("DeviceOffset")
if offsetFD != nil &&
    offsetFD.Message() != nil &&
    offsetFD.Message().FullName() == durationName {
    refOffset := durationpb.New(deviceTimeOffset).ProtoReflect()
    refMsg.Set(
        offsetFD, 
        protoreflect.ValueOf(refOffset),
    )
}
return res, nil

It's up to you whether you think this is better or worse than the repetitive type switch - it's quite a bit more complex, but it does keep things a bit more DRY.

答案2

得分: 1

你是否有可能使用请求头而不是请求体?如果客户端可以通过请求头发送NowOnDevice,那么你可以将响应发送到响应头中。Unix时间戳可能是最好的方法。

func MakeDeviceTimeInterceptor() connect.UnaryInterceptorFunc {
	return func(next connect.UnaryFunc) connect.UnaryFunc {
		return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			now := time.Now().UTC()
			ctxa := context.WithValue(ctx, CurrentTimestampKey{}, now)

			var deviceTimeOffset time.Duration
			// 检查请求头中的`now-on-device`字段,而不是请求体
			reqWithNow := req.Header().Get("now-on-device")

			if reqWithNow != "" {
				val, err := strconv.Atoi(reqWithNow)
				if err != nil {
					return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("invalid timestamp"))
				}

				deviceTime := time.Unix(int64(val), 0)
				deviceTimeOffset = now.Sub(deviceTime)
				ctxa = context.WithValue(ctxa, DeviceTimeDiffKey{}, deviceTimeOffset)
			}

			res, err := next(ctxa, req)

			// 如果值已设置,则设置到响应头中
			if deviceTimeOffset != 0 {
				res.Header().Set("device-time-offset", fmt.Sprintf("%d", deviceTimeOffset))
			}

			return res, err
		}
	}
}

然后你会得到以下响应:

curl -v \
    --header "Content-Type: application/json" --header "now-on-device: 1656442814" \
    --data '{"name": "Jane"}' \
    http://localhost:8080/greet.v1.GreetService/Greet
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /greet.v1.GreetService/Greet HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.79.1
> Accept: */*
> Content-Type: application/json
> now-on-device: 1656442814
> Content-Length: 16
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Accept-Encoding: gzip
< Content-Type: application/json
< Device-Time-Offset: 7259524766000
< Greet-Version: v1
< Date: Tue, 28 Jun 2022 21:01:13 GMT
< Content-Length: 27
<
* Connection #0 to host localhost left intact
{"greeting":"Hello, Jane!"}
英文:

Do you have the possibility use Headers instead of body. If Clients can send NowOnDevice through request headers, then you can send back the response in response headers instead. Unix Timestamp might be the best approach.

func MakeDeviceTimeInterceptor() connect.UnaryInterceptorFunc {
	return func(next connect.UnaryFunc) connect.UnaryFunc {
		return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			now := time.Now().UTC()
			ctxa := context.WithValue(ctx, CurrentTimestampKey{}, now)

			var deviceTimeOffset time.Duration
			// Check the header message `now-on-device` field, instead of body
			reqWithNow := req.Header().Get(&quot;now-on-device&quot;)

			if reqWithNow != &quot;&quot; {
				val, err := strconv.Atoi(reqWithNow)
				if err != nil {
					return nil, connect.NewError(connect.CodeInvalidArgument, errors.New(&quot;invalid timestamp&quot;))
				}

				deviceTime := time.Unix(int64(val), 0)
				deviceTimeOffset = now.Sub(deviceTime)
				ctxa = context.WithValue(ctxa, DeviceTimeDiffKey{}, deviceTimeOffset)
			}

			res, err := next(ctxa, req)

			// Set to response header if value is set
			if deviceTimeOffset != 0 {
				res.Header().Set(&quot;device-time-offset&quot;, fmt.Sprintf(&quot;%d&quot;, deviceTimeOffset))
			}

			return res, err
		}
	}
}

Then you have the response:

curl -v \
    --header &quot;Content-Type: application/json&quot; --header &quot;now-on-device: 1656442814&quot; \
    --data &#39;{&quot;name&quot;: &quot;Jane&quot;}&#39; \
    http://localhost:8080/greet.v1.GreetService/Greet
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
&gt; POST /greet.v1.GreetService/Greet HTTP/1.1
&gt; Host: localhost:8080
&gt; User-Agent: curl/7.79.1
&gt; Accept: */*
&gt; Content-Type: application/json
&gt; now-on-device: 1656442814
&gt; Content-Length: 16
&gt;
* Mark bundle as not supporting multiuse
&lt; HTTP/1.1 200 OK
&lt; Accept-Encoding: gzip
&lt; Content-Type: application/json
&lt; Device-Time-Offset: 7259524766000
&lt; Greet-Version: v1
&lt; Date: Tue, 28 Jun 2022 21:01:13 GMT
&lt; Content-Length: 27
&lt;
* Connection #0 to host localhost left intact
{&quot;greeting&quot;:&quot;Hello, Jane!&quot;}

huangapple
  • 本文由 发表于 2022年6月28日 15:15:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/72782161.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定