使用云 Pub/Sub 模拟器进行集成测试。在同一代码块中发送和接收消息。

huangapple go评论79阅读模式
英文:

Integration test with cloud pubsub emulator. Sending and Receiving from the same code block

问题

我一直在尝试使用模拟器测试与Cloud PubSub的交互。它可以将消息发布到主题,但接收方没有被触发。以下是代码的工作流程:

func TestPubSubEmulator(t *testing.T) {
    ctx := context.Background()
    topic, sub, err := CreateTestTopicAndSubscription(ctx, "project-id", "topic-id")
    if err != nil {
        t.Fatal(err)
    }

    cctx, cancelFunc := context.WithCancel(ctx)
    defer cancelFunc()

    var messageRecieved int32

    sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
        t.Log(m.Data)
        atomic.AddInt32(&messageRecieved, 1)
        m.Ack()
    })

    topic.Publish(ctx, &pubsub.Message{
        Data: []byte("Hello World"),
    })

    time.Sleep(5 * time.Second)

    t.Log(messageRecieved)
    if messageRecieved != 1 {
        t.Fatal("Message was never sent")
    }
}

这是创建主题和订阅的代码:

func CreateTestTopicAndSubscription(ctx context.Context, projectID, topicID string) 
(*pubsub.Topic, *pubsub.Subscription, error) {
    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        return nil, nil, fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()

    topic, err := client.CreateTopic(ctx, topicID)
    if err != nil {
        return nil, nil, fmt.Errorf("CreateTopic: %v", err)
    }

    // 创建一个新的订阅以及确保它永不过期
    sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
        Topic:            topic,
        AckDeadline:      10 * time.Second,
        ExpirationPolicy: time.Duration(0),
    })
    if err != nil {
        return nil, nil, err
    }

    return topic, sub, nil
}

我目前正在尝试从另一个程序发送消息,以查看是否会触发接收方。

英文:

I've been trying to test the interactions with Cloud PubSub using the emulator. It publishes the message to the topic but the receiver doesn't get triggered. Here's the code workthrough:

func TestPubSubEmulator(t *testing.T) {
ctx := context.Background()
topic, sub, err := CreateTestTopicAndSubscription(ctx, "project-id", "topic-id")
if err != nil {
	t.Fatal(err)
}

cctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()

var messageRecieved int32

sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
	t.Log(m.Data)
	atomic.AddInt32(&messageRecieved, 1)
	m.Ack()
})

topic.Publish(ctx, &pubsub.Message{
	Data: []byte("Hello World"),
})

time.Sleep(5 * time.Second)

t.Log(messageRecieved)
if messageRecieved != 1 {
	t.Fatal("Message was never sent")
}
}

This is also the code for creating the topic and subscription:

func CreateTestTopicAndSubscription(ctx context.Context, projectID, topicID string) 
(*pubsub.Topic, *pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
	return nil, nil, fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
	return nil, nil, fmt.Errorf("CreateTopic: %v", err)
}

// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
	Topic:            topic,
	AckDeadline:      10 * time.Second,
	ExpirationPolicy: time.Duration(0),
})
if err != nil {
	return nil, nil, err
}

  return topic, sub, nil
}

Am currently trying to send the message from a different program to see whether it gets triggered.

答案1

得分: 1

很抱歉,我没有及时更新这个问题。我发现问题是由于指向订阅的指针引起的。它没有监听消息。

我需要创建一个新的指向订阅的指针,以便监听更改。

以下是概念示例:

// 创建一个新的订阅到已创建的主题,并确保它永不过期。
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
...
// 这个订阅由于某种原因无法工作
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})

相反,应该先创建并使用一个新的指针来监听。

client.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{Topic: topic})

// 这个订阅将能够接收消息
sub := client.Subscription(subId)
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})

英文:

Am sorry I didn't update this question early. I found that the problem was caused by the pointer to the subscription. It wasn't listening for messages.
I needed to create a new pointer to the subscription that will listen for changes.

Here's the concept

// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
    Topic:            topic,
    AckDeadline:      10 * time.Second,
    ExpirationPolicy: time.Duration(0),
})
if err != nil {
    return nil, nil, err
}
...
// This subscription won't work for some reason
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
   t.Log(m.Data)
   atomic.AddInt32(&messageRecieved, 1)
   m.Ack()
})

Instead, it should be implemented first be created then listened to with a new pointer.

client.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{Topic: topic})

// This subscription would be able to receive messages
sub := client.Subscription(subId)
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
   t.Log(m.Data)
   atomic.AddInt32(&messageRecieved, 1)
   m.Ack()
})

huangapple
  • 本文由 发表于 2022年3月23日 16:30:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/71583988.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定