英文:
Integration test with cloud pubsub emulator. Sending and Receiving from the same code block
问题
我一直在尝试使用模拟器测试与Cloud PubSub的交互。它可以将消息发布到主题,但接收方没有被触发。以下是代码的工作流程:
func TestPubSubEmulator(t *testing.T) {
ctx := context.Background()
topic, sub, err := CreateTestTopicAndSubscription(ctx, "project-id", "topic-id")
if err != nil {
t.Fatal(err)
}
cctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var messageRecieved int32
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
topic.Publish(ctx, &pubsub.Message{
Data: []byte("Hello World"),
})
time.Sleep(5 * time.Second)
t.Log(messageRecieved)
if messageRecieved != 1 {
t.Fatal("Message was never sent")
}
}
这是创建主题和订阅的代码:
func CreateTestTopicAndSubscription(ctx context.Context, projectID, topicID string)
(*pubsub.Topic, *pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, nil, fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
return nil, nil, fmt.Errorf("CreateTopic: %v", err)
}
// 创建一个新的订阅以及确保它永不过期
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
return topic, sub, nil
}
我目前正在尝试从另一个程序发送消息,以查看是否会触发接收方。
英文:
I've been trying to test the interactions with Cloud PubSub using the emulator. It publishes the message to the topic but the receiver doesn't get triggered. Here's the code workthrough:
func TestPubSubEmulator(t *testing.T) {
ctx := context.Background()
topic, sub, err := CreateTestTopicAndSubscription(ctx, "project-id", "topic-id")
if err != nil {
t.Fatal(err)
}
cctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var messageRecieved int32
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
topic.Publish(ctx, &pubsub.Message{
Data: []byte("Hello World"),
})
time.Sleep(5 * time.Second)
t.Log(messageRecieved)
if messageRecieved != 1 {
t.Fatal("Message was never sent")
}
}
This is also the code for creating the topic and subscription:
func CreateTestTopicAndSubscription(ctx context.Context, projectID, topicID string)
(*pubsub.Topic, *pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, nil, fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
return nil, nil, fmt.Errorf("CreateTopic: %v", err)
}
// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
return topic, sub, nil
}
Am currently trying to send the message from a different program to see whether it gets triggered.
答案1
得分: 1
很抱歉,我没有及时更新这个问题。我发现问题是由于指向订阅的指针引起的。它没有监听消息。
我需要创建一个新的指向订阅的指针,以便监听更改。
以下是概念示例:
// 创建一个新的订阅到已创建的主题,并确保它永不过期。
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
...
// 这个订阅由于某种原因无法工作
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
相反,应该先创建并使用一个新的指针来监听。
client.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{Topic: topic})
// 这个订阅将能够接收消息
sub := client.Subscription(subId)
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
英文:
Am sorry I didn't update this question early. I found that the problem was caused by the pointer to the subscription. It wasn't listening for messages.
I needed to create a new pointer to the subscription that will listen for changes.
Here's the concept
// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
...
// This subscription won't work for some reason
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
Instead, it should be implemented first be created then listened to with a new pointer.
client.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{Topic: topic})
// This subscription would be able to receive messages
sub := client.Subscription(subId)
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论