GRPC自定义编解码器用于测量压缩/解压缩时间。

huangapple go评论112阅读模式
英文:

GRPC custom codec for measuring compression/decompression times

问题

我想测量使用GRPC库处理的有效负载进行压缩和解压缩的成本/时间。我创建了一些函数,我相信它们会给我合理准确的时间(参见MeasureTimeAndCpu),从我所了解的来看,我需要创建自己的编解码器,包装默认的proto编解码器,以便我可以“覆盖”Marshal/Unmarshal函数:

package codec

import (
	"bytes"
	"compress/gzip"
	"fmt"
	"io/ioutil"

	"google.golang.org/grpc/encoding"
	"log"
	"my/json-over-grpc/pkg/metrics"
)

func init() {
	fmt.Println("registering custom codec")
	encoding.RegisterCodec(&TimerCodec{
		encoding.GetCodec("proto"),
	})
}

type TimerCodec struct {
	encoding.Codec
}

func (g *TimerCodec) Marshal(v interface{}) ([]byte, error) {
	fmt.Println("g.codec ", g.Codec)
	return g.Codec.Marshal(v)
}

func (g *TimerCodec) Unmarshal(data []byte, v interface{}) error {
	fmt.Println("unmarshalling")
	// Check if the data is compressed with gzip
	if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
		var err error
		startTime, startCpuTime := metrics.MeasureTimeAndCpu(func() {
			var reader *gzip.Reader
			reader, err = gzip.NewReader(bytes.NewReader(data))
			if err != nil {
				return
			}
			defer reader.Close()
			uncompressed, _ := ioutil.ReadAll(reader)
			data = uncompressed
		})
		if err != nil {
			return err
		}

		log.Printf("Decompression wall time: %d, CPU time: %d", startTime, startCpuTime)
	}

	return g.Codec.Unmarshal(data, v)
}

func (g *TimerCodec) Name() string {
	return "mytimercodec" // use a unique name for your codec
}

然后在我的服务器上,我这样做:

cdc := &codec.TimerCodec{
	Codec: encoding.GetCodec("proto"),
}
encoding.RegisterCodec(cdc)

在我的客户端上,我这样做:

conn, err := grpc.Dial(":10000",
	grpc.WithTransportCredentials(insecure.NewCredentials()),
	grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mytimercodec")),
)

我还尝试在客户端进行调用时执行以下操作:

opts := []grpc.CallOption{
	grpc.CallContentSubtype((&codec.TimerCodec{
		Codec: encoding.GetCodec("proto"),
	}).Name()), // Use TimerCodec for message transmission
}

但是我在客户端遇到的问题是panic: runtime error: invalid memory address or nil pointer dereference,它来自于我的自定义编解码器中的marshal函数的return g.Codec.Marshal(v)这一行,即g.Codecnil

我在寻找相关文档方面遇到了困难,所以我怀疑我可能有些地方出错了,或者甚至可能有更简单的方法来做到这一点...谢谢!

英文:

I want to measure the cost/time of compressing and decompressing the payloads handled by the GRPC library. I've created some functions that I believe will give me reasonably accurate timings (see MeasureTimeAndCpu) for this to occur and from what I have read I need to create my own codec, wrapping the default proto codec so that I can 'override' the Marshal/Unmarshal functions:

package codec

import (
	"bytes"
	"compress/gzip"
	"fmt"
	"io/ioutil"

	"google.golang.org/grpc/encoding"
	"log"
	"my/json-over-grpc/pkg/metrics"
)

func init() {
	fmt.Println("registering custom codec")
	encoding.RegisterCodec(&TimerCodec{
		encoding.GetCodec("proto"),
	})
}

type TimerCodec struct {
	encoding.Codec
}

func (g *TimerCodec) Marshal(v interface{}) ([]byte, error) {
		fmt.Println("g.codec ", g.Codec)
		return g.Codec.Marshal(v)
}

func (g *TimerCodec) Unmarshal(data []byte, v interface{}) error {
	fmt.Println("unmarshalling")
	// Check if the data is compressed with gzip
	if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
		var err error
		startTime, startCpuTime := metrics.MeasureTimeAndCpu(func() {
			var reader *gzip.Reader
			reader, err = gzip.NewReader(bytes.NewReader(data))
			if err != nil {
				return
			}
			defer reader.Close()
			uncompressed, _ := ioutil.ReadAll(reader)
			data = uncompressed
		})
		if err != nil {
			return err
		}

		log.Printf("Decompression wall time: %d, CPU time: %d", startTime, startCpuTime)
	}

	return g.Codec.Unmarshal(data, v)
}

func (g *TimerCodec) Name() string {
	return "mytimercodec" // use a unique name for your codec
}

In my server then I am doing

	cdc := &codec.TimerCodec{
		Codec: encoding.GetCodec("proto"),
	}
	encoding.RegisterCodec(cdc)

and on my client I'm doing

	conn, err := grpc.Dial(":10000",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mytimercodec")),
	)

and I've also tried in when making the call in the client to do

	opts := []grpc.CallOption{
		grpc.CallContentSubtype((&codec.TimerCodec{
			Codec: encoding.GetCodec("proto"),
		}).Name()), // Use TimerCodec for message transmission
	}

but what I keep facing on the client side is panic: runtime error: invalid memory address or nil pointer dereference coming from the line return g.Codec.Marshal(v) in the marshal function in my custom codec, i.e g.Codec is nil.

I'm struggling to find the docs surrounding the way to do this so I suspect I am slightly off somewhere or whether there may even be a simpler way to do this...
thanks!

答案1

得分: 2

"proto"编解码器从未注册,因此encoding.GetCodec("proto")始终返回nilencoding/proto/proto.go在其自己的init()函数中注册了此编解码器,因此您需要确保加载了这个子包。

根据包的组织方式,我建议:

import (
	"fmt";

	"google.golang.org/grpc/encoding"
	"google.golang.org/grpc/encoding/proto"
)

func init() {
	encoding.RegisterCodec(&TimerCodec{
		encoding.GetCodec(proto.Name),
	})
}

type TimerCodec struct {
	encoding.Codec
}

但是,您应该使用内置的Go基准测试库,而不是编写自己的计时逻辑:https://pkg.go.dev/testing#hdr-Benchmarks

英文:

The "proto" codec never gets registered, so encoding.GetCodec("proto") always returns nil. encoding/proto/proto.go registers this codec in its own init() function, so you need to make sure this subpackage gets loaded.

Based on the way the package is organized, I suggest:

import (
	"fmt"

	"google.golang.org/grpc/encoding"
	"google.golang.org/grpc/encoding/proto"
)

func init() {
	encoding.RegisterCodec(&TimerCodec{
		encoding.GetCodec(proto.Name),
	})
}

type TimerCodec struct {
	encoding.Codec
}

However, you should use probably use the built-in Go Benchmarking library instead of writing your own timing logic: https://pkg.go.dev/testing#hdr-Benchmarks

huangapple
  • 本文由 发表于 2023年5月23日 22:13:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/76315731.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定