Go GRPC刷新令牌以进行双向流。

huangapple go评论72阅读模式
英文:

Go GRPC Refresh token for a bidirectional stream

问题

TLDR:我正在寻找一种在每次调用stream.Send(msg)时更新打开流的标头的方法,而无需关闭流并打开新流。

摘要:

我有一个用于处理双向流的GRPC客户端和服务器。为了与服务器进行身份验证,客户端必须在请求标头中发送一个JWT,设置为"authorization"。令牌的有效期为30分钟。令牌过期后,服务器将终止连接。

我正在寻找一种从客户端刷新授权令牌并保持流打开的方法。客户端应该在每30分钟执行一次新请求的循环中使用更新后的令牌和更新后的有效载荷。我还没有找到一种在客户端上更新已打开流的标头的方法。

让我们看一下下面的代码,以了解客户端的情况。下面的代码有一个函数用于创建客户端的新实例,另一个函数用于建立与GRPC服务器的连接。

func NewWatchClient(config *Config, logger *logrus.Logger) (*WatchClient, error) {
	cc, err := newConnection(config, logger)
	if err != nil {
		return nil, err
	}

	service := proto.NewWatchServiceClient(cc)

	return &WatchClient{
		config:  config,
		conn:    cc,
		logger:  entry,
		service: service,
	}, nil
}

func newConnection(config *Config, logger *logrus.Logger) (*grpc.ClientConn, error) {
	address := fmt.Sprintf("%s:%d", config.Host, config.Port)

    // rpcCredential implements credentials.PerRPCCredentials
	rpcCredential := newTokenAuth(config.Auth, config.TenantID)

	return grpc.Dial(
		address,
        grpc.WithPerRPCCredentials(rpcCredential),
	)
}

从上面的newConnection函数中可以看到,调用了另一个函数newTokenAuth来创建一个身份验证令牌。这个函数返回一个实现了PerRPCCredentials接口的结构体。

有两种方法可以设置请求的授权:

  1. 使用grpc.WithPerRPCCredentials在创建与服务器的连接时添加授权。
  2. 使用grpc.PerRPCCredentials在与服务器建立的每个流上添加授权。

在这种情况下,我使用grpc.WithPerRPCCredentials在创建与服务器的连接时附加令牌。

现在,让我们看一下PerRPCCredentials的定义。

type PerRPCCredentials interface {
	// GetRequestMetadata gets the current request metadata, refreshing
	// tokens if required. This should be called by the transport layer on
	// each request, and the data should be populated in headers or other
	// context. If a status code is returned, it will be used as the status
	// for the RPC. uri is the URI of the entry point for the request.
	// When supported by the underlying implementation, ctx can be used for
	// timeout and cancellation. Additionally, RequestInfo data will be
	// available via ctx to this call.
	// TODO(zhaoq): Define the set of the qualified keys instead of leaving
	// it as an arbitrary string.
	GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
	// RequireTransportSecurity indicates whether the credentials requires
	// transport security.
	RequireTransportSecurity() bool
}

该接口要求您定义两个方法。GetRequestMetadata的文档说:

> GetRequestMetadata获取当前请求的元数据,如果需要,刷新令牌。

因此,看起来我的PerRPCCredentials实现应该能够处理流或连接的令牌刷新。让我们看一下我的PerRPCCredentials实现。

// tokenAuth implements the PerRPCCredentials interface
type tokenAuth struct {
	tenantID       string
	tokenRequester auth.PlatformTokenGetter
    token          string
}

// RequireTransportSecurity leave as false for now
func (tokenAuth) RequireTransportSecurity() bool {
	return false
}

// GetRequestMetadata sets the http header prior to transport
func (t tokenAuth) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
	token, err := t.tokenRequester.GetToken()
	if err != nil {
		return nil, err
	}
	t.token = token

	go func() {
		time.Sleep(25 * time.Minute)
		token, _ := t.tokenRequester.GetToken()
		t.token = token
	}()

	return map[string]string{
		"tenant-id": t.tenantID,
		"authorization":     "Bearer " + t.token,
	}, nil
}

正如您所看到的,调用GetRequestMetadata将建立一个goroutine,每25分钟尝试刷新令牌。在这里添加一个goroutine可能不是正确的方法。这是为了尝试刷新授权标头,但这并不起作用。

让我们看一下流。

func (w WatchClient) CreateWatch() error {
	topic := &proto.Request{SelfLink: w.config.TopicSelfLink}

	stream, err := w.service.CreateWatch(context.Background())
	if err != nil {
		return err
	}

	for {
		err = stream.Send(topic)
		if err != nil {
			return err
		}
		time.Sleep(25 * time.Minute)
	}
}

客户端每25分钟在流上发送一条消息。我想要的是在调用stream.Send时,同时发送更新后的令牌。

这个GetRequestMetadata函数只会被调用一次,无论是通过grpc.WithPerRPCCredentials还是grpc.PerRPCCredsCallOption设置授权,所以似乎没有办法更新授权标头。

如果您对我在尝试使用PerRPCCredentials进行令牌刷新时遗漏了什么有任何想法,请告诉我。

谢谢。

英文:

TLDR: I am looking for a way to update headers on an open stream for each call to stream.Send(msg) without closing the stream and opening a new one.

Summary

I have a GRPC client and server built to handle bidirectional streams. To authenticate with the server the client must send a JWT in the request headers, set as "authorization". The token is valid for 30 minutes. After the token has expired, the server will terminate the connection.

I am looking for a way to refresh my authorization token from the client, and keep the stream open. The client should run in a loop executing a new request every 30 minutes with the updated token, and the updated payload. I have not seen a way to update a header from the client side for an already opened stream.

Let's look at some code to get an idea of what the client side looks like. The code below has a function to create a new instance of the client, and another function to establish the connection to the GRPC server.

func NewWatchClient(config *Config, logger *logrus.Logger) (*WatchClient, error) {
	cc, err := newConnection(config, logger)
	if err != nil {
		return nil, err
	}

	service := proto.NewWatchServiceClient(cc)

	return &WatchClient{
		config:  config,
		conn:    cc,
		logger:  entry,
		service: service,
	}, nil
}

func newConnection(config *Config, logger *logrus.Logger) (*grpc.ClientConn, error) {
	address := fmt.Sprintf("%s:%d", config.Host, config.Port)

    // rpcCredential implements credentials.PerRPCCredentials
	rpcCredential := newTokenAuth(config.Auth, config.TenantID)

	return grpc.Dial(
		address,
        grpc.WithPerRPCCredentials(rpcCredential),
	)
}

Looking at the newConnection function above I can see that there is a call to another function, newTokenAuth, to create an auth token. This func returns a struct that implements the PerRPCCredentials interface.

There are two ways to set the authorization for a request.

  1. Use grpc.WithPerRPCCredentials to add the authorization at the time of creating the connection to the server.

  2. Use grpc.PerRPCCredentials to add the authorization to each stream opened on the connection to the server.

In this case, I am using grpc.WithPerRPCCredentials to attach the token at the time of creating the connection to the server.

Now, let's take a look at the definition of PerRPCCredentials.

type PerRPCCredentials interface {
	// GetRequestMetadata gets the current request metadata, refreshing
	// tokens if required. This should be called by the transport layer on
	// each request, and the data should be populated in headers or other
	// context. If a status code is returned, it will be used as the status
	// for the RPC. uri is the URI of the entry point for the request.
	// When supported by the underlying implementation, ctx can be used for
	// timeout and cancellation. Additionally, RequestInfo data will be
	// available via ctx to this call.
	// TODO(zhaoq): Define the set of the qualified keys instead of leaving
	// it as an arbitrary string.
	GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
	// RequireTransportSecurity indicates whether the credentials requires
	// transport security.
	RequireTransportSecurity() bool
}

The interface requires that you define two methods. The documentation of GetRequestMetadata says

> GetRequestMetadata gets the current request metadata, refreshing tokens if required

So, it looks like my implementation of PerRPCCredentials should be able to handle a token refresh for my stream or connection. Let's take a look at my implementation of PerRPCCredentials.

// tokenAuth implements the PerRPCCredentials interface
type tokenAuth struct {
	tenantID       string
	tokenRequester auth.PlatformTokenGetter
    token          string
}

// RequireTransportSecurity leave as false for now
func (tokenAuth) RequireTransportSecurity() bool {
	return false
}

// GetRequestMetadata sets the http header prior to transport
func (t tokenAuth) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
	token, err := t.tokenRequester.GetToken()
	if err != nil {
		return nil, err
	}
	t.token = token

	go func() {
		time.Sleep(25 * time.Minute)
		token, _ := t.tokenRequester.GetToken()
		t.token = token
	}()

	return map[string]string{
		"tenant-id": t.tenantID,
		"authorization":     "Bearer " + t.token,
	}, nil
}

As you can see, the call to GetRequestMetadata will establish a go routine that will attempt to refresh a token every 25 minutes. Adding a go routine right here is probably not the right way to do it. It was an attempt to get the auth header to refresh, which doesn't work.

Let's take a look at the stream.

func (w WatchClient) CreateWatch() error {
	topic := &proto.Request{SelfLink: w.config.TopicSelfLink}

	stream, err := w.service.CreateWatch(context.Background())
	if err != nil {
		return err
	}

	for {
		err = stream.Send(topic)
		if err != nil {
			return err
		}
		time.Sleep(25 * time.Minute)
	}
}

The client sends a message on the stream every 25 minutes. All I'm looking to get here is that when stream.Send is called, the updated token is also sent.

This function, GetRequestMetadata only gets called once, regardless if I am setting the auth through grpc.WithPerRPCCredentials or grpc.PerRPCCredsCallOption so there appears to be no way to update the authorization header.

If you have any idea what I have missed in my attempt to utilize the PerRPCCredentials for token refresh then please let me know.

Thank you.

答案1

得分: 4

头部信息在 RPC 开始时发送,并且在 RPC 过程中无法更新。如果你需要在流的生命周期中发送数据,它需要作为请求消息的一部分包含在你的 proto 定义中。

英文:

Headers are sent at the beginning of an RPC, and cannot be updated during the RPC. If you need to send data during the life of a stream, it needs to be part of the request message in your proto definition.

huangapple
  • 本文由 发表于 2021年10月13日 15:26:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/69551307.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定