使用自定义处理程序在NATS Golang中保持订阅方法。

huangapple go评论97阅读模式
英文:

Hold subscribe method with custom handler nats golang

问题

我正在使用golang在nats客户端之上编写包装器,我想要一个处理程序函数,一旦我从nats服务器接收到消息,就可以从消费者中调用它。
我想要在接收到nats消息之前暂停自定义订阅方法。

发布:

func (busConfig BusConfig) Publish(service string, data []byte) error {
    pubErr := conn.Publish(service, data)
    if pubErr != nil {
        return pubErr
    }
    return nil
}

订阅:

func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
    fmt.Println("订阅中:", subject)

    //wg := sync.WaitGroup{}
    //wg.Add(1)
    subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
        go func() {
            handler(msg.Data)
        }()
        //wg.Done()
    })
    if err != nil {
        fmt.Println("订阅者错误:", err)
    }
    //wg.Wait()
    defer subscription.Unsubscribe()

}

测试用例:

func TestLifeCycleEvent(t *testing.T) {
    busClient := GetBusClient()
    busClient.Subscribe(SUBJECT, func(input []byte) {
        fmt.Println("收到生命周期事件:", string(input))
    })

    busClient.Publish(SUBJECT, []byte("完成通知"))
}

我发现消息已经发布,但没有被订阅,我尝试使用waitgroup来暂停订阅方法,但我认为这不是正确的解决方案。

英文:

I am writing wrapper on top of nats client in golang, I want to take handler function which can be invoked from consumer once I get the message from nats server.
I want to hold custom subscribe method until it receives the message from nats.

Publish:

func (busConfig BusConfig) Publish(service string, data []byte) error {
	pubErr := conn.Publish(service, data)
	if pubErr != nil {
		return pubErr
	}
	return nil
}

Subscribe:

func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
	fmt.Println("Subscrbing on : ", subject)

	//wg := sync.WaitGroup{}
	//wg.Add(1)
	subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
		go func() {
			handler(msg.Data)
		}()
		//wg.Done()
	})
	if err != nil {
		fmt.Println("Subscriber error : ", err)
	}
	//wg.Wait()
	defer subscription.Unsubscribe()

}

test case:

func TestLifeCycleEvent(t *testing.T) {
	busClient := GetBusClient()
	busClient.Subscribe(SUBJECT, func(input []byte) {
		fmt.Println("Life cycle event received :", string(input))
	})

	busClient.Publish(SUBJECT, []byte("complete notification"))
}

I am seeing message is published but not subscribed, I tried to hold subscribe method using waitgroup but I think this is not the correct solution.

答案1

得分: 3

你看不到消息被传递的原因是Subscribe是一个异步方法,它会生成一个goroutine来处理传入的消息并调用回调函数。

在调用busClient.Publish()之后,你的应用程序立即退出。它不会等待Subscribe()内部发生任何事情。

当你使用nats.Subscribe()时,通常是在一个长时间运行的应用程序中,在特定条件下退出(比如接收到关闭信号)。WaitGroup在这里可以工作,但可能只适用于测试,而不适用于真实的应用程序。

你还应该在退出程序之前调用NATS连接的Flush()方法,以确保所有缓冲的消息都已发送。

如果你想要一个同步的方法,可以使用nats.SubscribeSync()

在这里查看示例:https://natsbyexample.com/examples/messaging/pub-sub/go

英文:

You don't see the message being delivered because Subscribe is an async method that spawns a goroutine to handle the incoming messages and call the callback.

Straight after calling busClient.Publish() your application exits. It does not wait for anything to happen inside Subscribe().

When you use nats.Subscribe(), you usually have a long-running application that exits in specific conditions (like receiving a shutdown signal). WaitGroup can work here, but probably not for real applications, just for tests.

You should also call Flush() method on NATS connection to ensure all buffered messages have been sent before exiting the program.

If you want a synchronous method, you can use nats.SubscribeSync()

Check out examples here: https://natsbyexample.com/examples/messaging/pub-sub/go

答案2

得分: 0

根据我的理解,在NATs中,即使我们没有提供回复地址,我们也需要对消息进行响应,以便它可以回复该消息。

func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
    subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
        go func() {
            handler(msg.Data)
            msg.Respond(nil)
        }()
    })
}
英文:

For my understanding, I think in NATs we need to respond to the message even if we are not providing the reply address, so it can respond to the message.

func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
    subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
        go func() {
            handler(msg.Data)
            msg.Respond(nil)
        }()
      })
   }

huangapple
  • 本文由 发表于 2022年12月19日 22:22:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/74851779.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定