使用Google PubSub与Cloud Run gRPC服务

huangapple go评论86阅读模式
英文:

Using google PubSub with a Cloud Run gRPC service

问题

我有一个Cloud Run gRPC服务,我希望它监听PubSub主题。
官方文档中唯一的方法是使用触发器,但这仅适用于接受HTTP请求的REST服务器。
我在网上找不到任何关于如何在Cloud Run gRPC服务中使用pubsub的好例子。
我的服务是使用Go按照官方的grpc 说明构建的。

main.go:

func main() {
    ctx := context.Background()

    pubSubClient, err := pubsub.NewClient(ctx, 'project-id')
    if err != nil {
        log.Fatal(err)
    }
    pubSubSubscription := pubSubClient.Subscription("subscription-id")
    go func(sub *pubsub.Subscription) {
        err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
            defer m.Ack()
            log.Printf("sub.Receiv called %+v", m)
        })
        if err != nil {
            // handle error
        }
    }(pubSubSubscription)

    ....

    lis, err := net.Listen("tcp", ":"+port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    s, _ := server.New(ctx, ...)
    grpcServer := grpc.NewServer()

    grpcpkg.RegisterReportServiceServer(grpcServer, s)
    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %s", err)
    }
}

只要至少有一个实例处于活动状态并监听,这将起作用。但是,如果没有活动实例,并且发布了新的pubsub消息,Cloud Run不会“唤醒”实例,因为它的自动缩放是基于HTTP请求的利用率。
使上述代码工作的唯一方法是在Cloud Run中设置min-instance 1,但对于许多用例来说可能效率不高(例如,服务在夜间不活动)。

有没有任何解决方法?
有没有办法从pubsub消息触发grpc cloud run服务?

英文:

I have a Cloud Run gRPC service and I want it to listen to PubSub topic.
The only way to do it from the official documentations is to use a trigger but this only works with REST server that accept http requests.
I couldn't find any good example on the web of how to use pubsub with cloud run grpc services.
My service is built with Go following the official grpc instructions

main.go:

func main() {
	ctx := context.Background()

	pubSubClient, err := pubsub.NewClient(ctx, 'project-id')
	if err != nil {
		log.Fatal(err)
	}
	pubSubSubscription := pubSubClient.Subscription("subscription-id")
	go func(sub *pubsub.Subscription) {
		err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
			defer m.Ack()
			log.Printf("sub.Receiv called %+v", m)
		})
		if err != nil {
			// handle error
		}
	}(pubSubSubscription)

    ....

    lis, err := net.Listen("tcp", ":"+port)
    if err != nil {
	    log.Fatalf("failed to listen: %v", err)
    }

    s, _ := server.New(ctx, ...)
    grpcServer := grpc.NewServer()

    grpcpkg.RegisterReportServiceServer(grpcServer, s)
    if err := grpcServer.Serve(lis); err != nil {
	    log.Fatalf("failed to serve: %s", err)
    }
}

This will work as long as there is at least one instance up and listening. But if there is no instances active, and a new pubsub message publishes, cloud run won't "wake up" the insane because it's autoscaling is utilized based on http requests.
The only way to make the above code work is by setting min-instance 1 in cloud run, but this can be not efficient for a lot of use cases (i.e the service is not active at night)

Is there any work around for this?
Is there any way to trigger a grpc cloud run service from a pubsub message?

答案1

得分: 2

您正在尝试在客户端库上使用的sub.Receive方法是用于拉取消息传递,而不是推送传递。您可以阅读有关这两种传递类型的比较以进行选择的方式

如果您希望将消息作为传入请求接收,您必须使用推送传递,并可以按照Cloud Run Pubsub使用指南进行操作。请注意,Cloud Pub/Sub仅将消息作为具有指定JSON有效负载的HTTPS POST请求传递。要处理来自gRPC服务器的这些请求,您可以使用gRPC Gateway插件生成一个反向代理,以公开HTTP API。

一旦您的Cloud Run实例使用反向代理接受HTTP流量,它将根据传入请求进行自动缩放,因此您不需要始终保持1个实例运行。请注意,这可能意味着在冷启动时,消息处理延迟最多为10秒。

英文:

The sub.Receive method you are attempting to use on the client library is meant for Pull message delivery and not Push delivery. You can read about how these two delivery types compare to make a choice.

If you wish to receive messages as incoming requests, you must use Push delivery and can follow the the Cloud Run Pubsub usage guide. Note that Cloud Pub/Sub only delivers messages as HTTPS POST requests with the specified JSON payload. To handle these requests from a gRPC server, you can use the gRPC Gateway plugin to generate a reverse proxy that exposes an HTTP API.

Once your Cloud Run instance is accepting HTTP traffic using the reverse proxy, it will be autoscaled with incoming requests, so you won't need to keep 1 instance always running. Note that this may mean that message processing latency is upto 10s on a cold start.

huangapple
  • 本文由 发表于 2022年7月25日 15:29:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/73105438.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定