Golang Nats 订阅问题

huangapple go评论106阅读模式
英文:

Golang Nats subscribe issue

问题

我目前正在使用微服务架构进行工作。
在将NATS插入我的项目之前,我想用它测试一些简单的场景。

在一个场景中,我有一个简单的发布者,它在本地主机的基本NATS服务器上通过for循环发布了100,000条消息。

问题出在订阅者那里。当订阅者接收到30,000到40,000条消息时,我的整个main.go程序和所有其他Go协程都停止工作,什么都不做。我只能使用Ctrl + C退出。但是发布者仍然继续发送消息。当我打开一个新的终端并启动一个新的订阅者实例时,一切又恢复正常,直到订阅者接收到约30,000条消息。最糟糕的是,没有出现任何错误,服务器上也没有日志,所以我不知道发生了什么。

之后,我尝试用QueueSubscribe方法替换Subscribe方法,一切都正常工作。

Subscribe和QueueSubscribe之间的主要区别是什么?

NATS-Streaming是一个更好的选择吗?或者在哪些情况下我应该使用Streaming,在哪些情况下应该使用标准的NATS服务器?

以下是你的代码:

发布者:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/nats-io/go-nats"
)

func main() {
	go createPublisher()

	for {

	}
}

func createPublisher() {

	log.Println("pub started")

	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	msg := make([]byte, 16)

	for i := 0; i < 100000; i++ {
		nc.Publish("alenSub", msg)
		if (i % 100) == 0 {
			fmt.Println("i", i)
		}
		time.Sleep(time.Millisecond)
	}

	log.Println("pub finish")

	nc.Flush()

}

订阅者:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/nats-io/go-nats"
)

var received int64

func main() {
	received = 0

	go createSubscriber()
	go check()

	for {

	}
}

func createSubscriber() {

	log.Println("sub started")

	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	nc.Subscribe("alenSub", func(msg *nats.Msg) {
		received++
	})
	nc.Flush()

	for {

	}
}

func check() {
	for {
		fmt.Println("-----------------------")
		fmt.Println("still running")
		fmt.Println("received", received)
		fmt.Println("-----------------------")
		time.Sleep(time.Second * 2)
	}
}
英文:

I work currently on a micro service architecture.
Before I insert NATS into my project I wanted to test some simple scenarios with it.

In one scenario I have a simple publisher, which publishes 100.000 messages in a for loop over a basic Nats server running on localhost:4222.

The big problem with it, is the subscriber. When he receive between 30.000 - 40.000 messages my whole main.go program and all other go routines just stops and do nothing. I can just quit with ctrl + c. But the Publisher is still keep sending the messages. When I open a new terminal and start a new instance of the subscriber all again works well, till the Subscriber receive about 30000 messages. And the worst thing is that there appears not even one error and also no logs on the server so I have no idea whats going on.

After that I was trying replace the Subscribe-method with the QueueSubscribe-method and all works fine.

What is the main difference between Subscribe and QueueSubscribe?

Is NATS-Streaming a better opportunity? Or in which cases I should prefer Streaming and in which the standard NATS-Server

Here is my code:

Publisher:

package main

import (
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;time&quot;

	&quot;github.com/nats-io/go-nats&quot;
)

func main() {
	go createPublisher()

	for {

	}
}

func createPublisher() {

	log.Println(&quot;pub started&quot;)

	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	msg := make([]byte, 16)

	for i := 0; i &lt; 100000; i++ {
		nc.Publish(&quot;alenSub&quot;, msg)
		if (i % 100) == 0 {
			fmt.Println(&quot;i&quot;, i)
		}
		time.Sleep(time.Millisecond)
	}

	log.Println(&quot;pub finish&quot;)

	nc.Flush()

}

Subscriber:

package main

import (
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;time&quot;

	&quot;github.com/nats-io/go-nats&quot;
)

var received int64

func main() {
	received = 0

	go createSubscriber()
	go check()

	for {

	}
}

func createSubscriber() {

	log.Println(&quot;sub started&quot;)

	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	nc.Subscribe(&quot;alenSub&quot;, func(msg *nats.Msg) {
		received++
	})
	nc.Flush()

	for {

	}
}

func check() {
	for {
		fmt.Println(&quot;-----------------------&quot;)
		fmt.Println(&quot;still running&quot;)
		fmt.Println(&quot;received&quot;, received)
		fmt.Println(&quot;-----------------------&quot;)
		time.Sleep(time.Second * 2)
	}
}

答案1

得分: 1

无限的for循环可能会导致垃圾回收器无法正常工作:https://github.com/golang/go/issues/15442#issuecomment-214965471

我通过运行发布者程序成功复现了这个问题。为了解决这个问题,我建议使用sync.WaitGroup。以下是我对评论中提供的代码进行更新以完成任务的方式:

package main

import (
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/nats-io/go-nats"
)

// 创建一个等待组
var wg sync.WaitGroup

func main() {
	// 添加一个等待者
	wg.Add(1)
	go createPublisher()

	// 等待等待组完成
	wg.Wait()
}

func createPublisher() {

	log.Println("pub started")
	// 在 createPublisher 完成后标记等待组为已完成
	defer wg.Done()

	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	msg := make([]byte, 16)

	for i := 0; i < 100000; i++ {
		if errPub := nc.Publish("alenSub", msg); errPub != nil {
			panic(errPub)
		}

		if (i % 100) == 0 {
			fmt.Println("i", i)
		}
		time.Sleep(time.Millisecond * 1)
	}

	log.Println("pub finish")

	errFlush := nc.Flush()
	if errFlush != nil {
		panic(errFlush)
	}

	errLast := nc.LastError()
	if errLast != nil {
		panic(errLast)
	}

}

我建议以类似的方式更新上述的订阅者代码。

SubscribeQueueSubscribe 的主要区别在于,在 Subscribe 中,所有订阅者都会收到所有的消息,而在 QueueSubscribe 中,QueueGroup 中的一个订阅者会收到每个消息。

有关 NATS Streaming 的其他功能的详细信息,请参阅以下链接:
https://nats.io/documentation/streaming/nats-streaming-intro/

我们看到 NATS 和 NATS Streaming 在各种用例中被用于数据流水线和控制平面。您的选择应该根据您的用例需求来确定。

英文:

The infinite for loops are likely starving the garbage collector: https://github.com/golang/go/issues/15442#issuecomment-214965471

I was able to reproduce the issue by just running the publisher. To resolve, I recommend using a sync.WaitGroup. Here's how I updated the code linked to in the comments to get it to complete:

package main

import (
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;sync&quot;
	&quot;time&quot;

	&quot;github.com/nats-io/go-nats&quot;
)

// create wait group
var wg sync.WaitGroup

func main() {
	// add 1 waiter
	wg.Add(1)
	go createPublisher()
	
	// wait for wait group to complete
	wg.Wait()
}

func createPublisher() {

	log.Println(&quot;pub started&quot;)
	// mark wait group done after createPublisher completes
	defer wg.Done()

	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	msg := make([]byte, 16)

	for i := 0; i &lt; 100000; i++ {
		if errPub := nc.Publish(&quot;alenSub&quot;, msg); errPub != nil {
			panic(errPub)
		}

		if (i % 100) == 0 {
			fmt.Println(&quot;i&quot;, i)
		}
		time.Sleep(time.Millisecond * 1)
	}

	log.Println(&quot;pub finish&quot;)

	errFlush := nc.Flush()
	if errFlush != nil {
		panic(errFlush)
	}

	errLast := nc.LastError()
	if errLast != nil {
		panic(errLast)
	}

}

I'd recommend updating the above subscriber code similarly.

The main difference between Subscribe and QueueSubscriber is that in Subscribe all subscribers are sent all messages from. While in QueueSubscribe only one subscriber in a QueueGroup is sent each message.

Some details on additional features for NATS Streaming are here:
https://nats.io/documentation/streaming/nats-streaming-intro/

We see both NATS and NATS Streaming used in a variety of use cases from data pipelines to control planes. Your choice should be driven by the needs of your use case.

答案2

得分: 1

如所述,删除for{}循环。用runtime.Goexit()替换。

对于订阅者,你不需要在Go协程中创建订阅者。异步订阅者已经有了自己的Go协程用于回调。

还可以使用atomic或mutex来保护接收到的变量。

在这里也可以看到一些示例。

https://github.com/nats-io/go-nats/tree/master/examples

英文:

As stated, remove the for{} loop. Replace with runtime.Goexit().

For subscriber you don't need to create the subscriber in a Go routine. Async subscribers already have their own Go routine for callbacks.

Also protected the received variable with atomic or a mutex.

See the examples here as well.

https://github.com/nats-io/go-nats/tree/master/examples

huangapple
  • 本文由 发表于 2017年8月26日 01:05:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/45886359.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定