英文:
Golang Nats subscribe issue
问题
我目前正在使用微服务架构进行工作。
在将NATS插入我的项目之前,我想用它测试一些简单的场景。
在一个场景中,我有一个简单的发布者,它在本地主机的基本NATS服务器上通过for循环发布了100,000条消息。
问题出在订阅者那里。当订阅者接收到30,000到40,000条消息时,我的整个main.go程序和所有其他Go协程都停止工作,什么都不做。我只能使用Ctrl + C退出。但是发布者仍然继续发送消息。当我打开一个新的终端并启动一个新的订阅者实例时,一切又恢复正常,直到订阅者接收到约30,000条消息。最糟糕的是,没有出现任何错误,服务器上也没有日志,所以我不知道发生了什么。
之后,我尝试用QueueSubscribe方法替换Subscribe方法,一切都正常工作。
Subscribe和QueueSubscribe之间的主要区别是什么?
NATS-Streaming是一个更好的选择吗?或者在哪些情况下我应该使用Streaming,在哪些情况下应该使用标准的NATS服务器?
以下是你的代码:
发布者:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
func main() {
go createPublisher()
for {
}
}
func createPublisher() {
log.Println("pub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
msg := make([]byte, 16)
for i := 0; i < 100000; i++ {
nc.Publish("alenSub", msg)
if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond)
}
log.Println("pub finish")
nc.Flush()
}
订阅者:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
var received int64
func main() {
received = 0
go createSubscriber()
go check()
for {
}
}
func createSubscriber() {
log.Println("sub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
nc.Subscribe("alenSub", func(msg *nats.Msg) {
received++
})
nc.Flush()
for {
}
}
func check() {
for {
fmt.Println("-----------------------")
fmt.Println("still running")
fmt.Println("received", received)
fmt.Println("-----------------------")
time.Sleep(time.Second * 2)
}
}
英文:
I work currently on a micro service architecture.
Before I insert NATS into my project I wanted to test some simple scenarios with it.
In one scenario I have a simple publisher, which publishes 100.000 messages in a for loop over a basic Nats server running on localhost:4222.
The big problem with it, is the subscriber. When he receive between 30.000 - 40.000 messages my whole main.go program and all other go routines just stops and do nothing. I can just quit with ctrl + c. But the Publisher is still keep sending the messages. When I open a new terminal and start a new instance of the subscriber all again works well, till the Subscriber receive about 30000 messages. And the worst thing is that there appears not even one error and also no logs on the server so I have no idea whats going on.
After that I was trying replace the Subscribe-method with the QueueSubscribe-method and all works fine.
What is the main difference between Subscribe and QueueSubscribe?
Is NATS-Streaming a better opportunity? Or in which cases I should prefer Streaming and in which the standard NATS-Server
Here is my code:
Publisher:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
func main() {
go createPublisher()
for {
}
}
func createPublisher() {
log.Println("pub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
msg := make([]byte, 16)
for i := 0; i < 100000; i++ {
nc.Publish("alenSub", msg)
if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond)
}
log.Println("pub finish")
nc.Flush()
}
Subscriber:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
var received int64
func main() {
received = 0
go createSubscriber()
go check()
for {
}
}
func createSubscriber() {
log.Println("sub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
nc.Subscribe("alenSub", func(msg *nats.Msg) {
received++
})
nc.Flush()
for {
}
}
func check() {
for {
fmt.Println("-----------------------")
fmt.Println("still running")
fmt.Println("received", received)
fmt.Println("-----------------------")
time.Sleep(time.Second * 2)
}
}
答案1
得分: 1
无限的for
循环可能会导致垃圾回收器无法正常工作:https://github.com/golang/go/issues/15442#issuecomment-214965471
我通过运行发布者程序成功复现了这个问题。为了解决这个问题,我建议使用sync.WaitGroup
。以下是我对评论中提供的代码进行更新以完成任务的方式:
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/nats-io/go-nats"
)
// 创建一个等待组
var wg sync.WaitGroup
func main() {
// 添加一个等待者
wg.Add(1)
go createPublisher()
// 等待等待组完成
wg.Wait()
}
func createPublisher() {
log.Println("pub started")
// 在 createPublisher 完成后标记等待组为已完成
defer wg.Done()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
msg := make([]byte, 16)
for i := 0; i < 100000; i++ {
if errPub := nc.Publish("alenSub", msg); errPub != nil {
panic(errPub)
}
if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond * 1)
}
log.Println("pub finish")
errFlush := nc.Flush()
if errFlush != nil {
panic(errFlush)
}
errLast := nc.LastError()
if errLast != nil {
panic(errLast)
}
}
我建议以类似的方式更新上述的订阅者代码。
Subscribe
和 QueueSubscribe
的主要区别在于,在 Subscribe
中,所有订阅者都会收到所有的消息,而在 QueueSubscribe
中,QueueGroup
中的一个订阅者会收到每个消息。
有关 NATS Streaming 的其他功能的详细信息,请参阅以下链接:
https://nats.io/documentation/streaming/nats-streaming-intro/
我们看到 NATS 和 NATS Streaming 在各种用例中被用于数据流水线和控制平面。您的选择应该根据您的用例需求来确定。
英文:
The infinite for
loops are likely starving the garbage collector: https://github.com/golang/go/issues/15442#issuecomment-214965471
I was able to reproduce the issue by just running the publisher. To resolve, I recommend using a sync.WaitGroup
. Here's how I updated the code linked to in the comments to get it to complete:
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/nats-io/go-nats"
)
// create wait group
var wg sync.WaitGroup
func main() {
// add 1 waiter
wg.Add(1)
go createPublisher()
// wait for wait group to complete
wg.Wait()
}
func createPublisher() {
log.Println("pub started")
// mark wait group done after createPublisher completes
defer wg.Done()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
msg := make([]byte, 16)
for i := 0; i < 100000; i++ {
if errPub := nc.Publish("alenSub", msg); errPub != nil {
panic(errPub)
}
if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond * 1)
}
log.Println("pub finish")
errFlush := nc.Flush()
if errFlush != nil {
panic(errFlush)
}
errLast := nc.LastError()
if errLast != nil {
panic(errLast)
}
}
I'd recommend updating the above subscriber code similarly.
The main difference between Subscribe
and QueueSubscriber
is that in Subscribe
all subscribers are sent all messages from. While in QueueSubscribe
only one subscriber in a QueueGroup
is sent each message.
Some details on additional features for NATS Streaming are here:
https://nats.io/documentation/streaming/nats-streaming-intro/
We see both NATS and NATS Streaming used in a variety of use cases from data pipelines to control planes. Your choice should be driven by the needs of your use case.
答案2
得分: 1
如所述,删除for{}循环。用runtime.Goexit()替换。
对于订阅者,你不需要在Go协程中创建订阅者。异步订阅者已经有了自己的Go协程用于回调。
还可以使用atomic或mutex来保护接收到的变量。
在这里也可以看到一些示例。
https://github.com/nats-io/go-nats/tree/master/examples
英文:
As stated, remove the for{} loop. Replace with runtime.Goexit().
For subscriber you don't need to create the subscriber in a Go routine. Async subscribers already have their own Go routine for callbacks.
Also protected the received variable with atomic or a mutex.
See the examples here as well.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论