从gRPC服务器拦截器获取流式文件的大小。

huangapple go评论69阅读模式
英文:

Get streamed file size from grpc server interceptor

问题

在这种拦截器中,你可以通过grpc.ServerStream参数(ss)来获取传入文件的大小。具体来说,你可以使用ss.RecvMsg方法来接收FileForm消息,并通过获取Upfile1、Upfile2和Upfile3字段的长度来计算文件的大小。以下是一个示例代码片段,展示了如何在拦截器中获取传入文件的大小:

func logInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	fmt.Println("Log Interceptor")

	// 接收FileForm消息
	fileForm := &pdfcompose.FileForm{}
	err := ss.RecvMsg(fileForm)
	if err != nil {
		return err
	}

	// 计算文件大小
	fileSize := len(fileForm.Upfile1) + len(fileForm.Upfile2) + len(fileForm.Upfile3)
	fmt.Println("File size:", fileSize)

	// 继续处理请求
	err = handler(srv, ss)
	if err != nil {
		return err
	}
	return nil
}

在这个示例中,我们首先创建一个FileForm实例,然后使用ss.RecvMsg方法将接收到的消息存储到FileForm实例中。接下来,我们计算Upfile1、Upfile2和Upfile3字段的长度,并将它们相加得到文件的大小。最后,我们打印文件大小,并继续处理请求。

请注意,这只是一个示例代码片段,你可能需要根据你的实际情况进行适当的修改和调整。

英文:

I have a Go server bidirectional stream method defined like this in proto-file:

syntax = "proto3";
option go_package="pdfcompose/;pdfcompose";
package pdfcompose;

service PdfCompose {
  rpc Send (stream FileForm) returns (stream PdfFile) {}
}

message FileForm {
  bytes Upfile1 = 1;
  bytes Upfile2 = 2;
  bytes Upfile3 = 3;
}

message PdfFile {
  bytes File = 1;
}

And my log interceptor has the following interface:

func logInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)  error {
	fmt.Println("Log Interceptor")
	err := handler(srv, ss)
	if err != nil {
		return err
	}
	return nil
}

And I'm using https://github.com/grpc-ecosystem/go-grpc-middleware as an interceptor engine.
I need to implement logging of the streaming file size (for educational purposes) and trying to find out where I can get any data about FileForm and it contents.

My first guess was to look into grpc.ServerStream argument (ss) to find something about it and it looks like it contains a lot of data like max and min MessageSize, but noting about actual content length.

How I can get size of incoming file with this kind of interceptors?

答案1

得分: 0

所以,正如@Brits在上面提到的,实现我想要的工作方式是编写流的包装器。

这是一个示例:https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/validator/validator.go 我从这个存储库中获取了以下代码,并且我希望我正确理解了Apache 2.0许可证,并且复制了这部分代码没有问题:

// StreamServerInterceptor返回一个新的流服务器拦截器,用于验证传入的消息。
//
// 无效的消息将在RPC的类型上拒绝InvalidArgument之前发生。对于ServerStream(1:m)请求,它将在到达任何用户空间处理程序之前发生。对于ClientStream(n:1)或BidiStream(n:m)RPC,消息将在调用stream.Recv()时被拒绝。
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapper := &recvWrapper{stream}
return handler(srv, wrapper)
}
}

type recvWrapper struct {
grpc.ServerStream
}

func (s *recvWrapper) RecvMsg(m interface{}) error {
if err := s.ServerStream.RecvMsg(m); err != nil {
return err
}

if err := validate(m); err != nil {
	return err
}

return nil

}

validate()函数实际上将请求作为interface{}获取,并且您需要使用类型断言将此请求转换为所需的类型,如v := req.(type)

例如,我将请求类型定义为FileForm,并能够检查其内容:

func logFileSize(req interface{}) error {
m := req.(*pdfcompose.FileForm)
println("SizeOfUpfile1: " + strconv.Itoa(int(binary.Size(m.Upfile1))))

if m.Upfile2 != nil {
	println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
}
if m.Upfile3 != nil {
	println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
}
return nil

}

func (s *recvWrapper) RecvMsg(m interface{}) error {
if err := s.ServerStream.RecvMsg(m); err != nil {
return err
}
//z := m.(pdfcompose.FileForm)
if err := logFileSize(m); err != nil {
return err
}

return nil

}

func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapper := &recvWrapper{stream}
return handler(srv, wrapper)
}
}

英文:

So, as @Brits has mentioned above, the working way to implement what I wanted is to write wrapper for stream.

Here is an example: https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/validator/validator.go I took the following code from this repo and I hope I understand apache2 license correct and there is no issues with copying a part of this code:

// StreamServerInterceptor returns a new streaming server interceptor that validates incoming messages.
//
// The stage at which invalid messages will be rejected with `InvalidArgument` varies based on the
// type of the RPC. For `ServerStream` (1:m) requests, it will happen before reaching any userspace
// handlers. For `ClientStream` (n:1) or `BidiStream` (n:m) RPCs, the messages will be rejected on
// calls to `stream.Recv()`.
func StreamServerInterceptor() grpc.StreamServerInterceptor {
	return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
		wrapper := &recvWrapper{stream}
		return handler(srv, wrapper)
	}
}

type recvWrapper struct {
	grpc.ServerStream
}

func (s *recvWrapper) RecvMsg(m interface{}) error {
	if err := s.ServerStream.RecvMsg(m); err != nil {
		return err
	}

	if err := validate(m); err != nil {
		return err
	}

	return nil
}

validate() function actually gets request as interface{} and you need to use type assertion to cast this request to type you needed as v := req.(type).

For example, I typed request to FileForm and was able to check it's content:

func logFileSize(req interface{}) error {
	m := req.(*pdfcompose.FileForm)
	println("SizeOfUpfile1: " + strconv.Itoa(int(binary.Size(m.Upfile1))))

	if m.Upfile2 != nil {
		println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
	}
	if m.Upfile3 != nil {
		println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
	}
	return nil
}

func (s *recvWrapper) RecvMsg(m interface{}) error {
	if err := s.ServerStream.RecvMsg(m); err != nil {
		return err
	}
	//z := m.(pdfcompose.FileForm)
	if err := logFileSize(m); err != nil {
		return err
	}

	return nil
}

func StreamServerInterceptor() grpc.StreamServerInterceptor {
	return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
		wrapper := &recvWrapper{stream}
		return handler(srv, wrapper)
	}
}

huangapple
  • 本文由 发表于 2021年7月28日 14:27:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/68555406.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定