NATS异步回复请求不是异步的。

huangapple go评论79阅读模式
英文:

NATS async reply to request is not asynchronnous

问题

我正在尝试使用Go语言在gnatsd中实现请求/响应功能,并意识到gnatsd不以异步方式回复请求。

我开始使用NATS的GitHub示例进行调查https://github.com/nats-io/go-nats/tree/master/examples - 示例nats-req.go和nats-rply.go。这些示例运行良好。

然后,我简单地修改它们以在gnatsd上测试并行请求,并提供一些调试信息,以了解异步回复的goroutine ID。以下是修改后的示例源代码。

nats-rply.go已修改为简单地返回传入请求的文本,并提供有关当前goroutine ID的信息。我还在异步处理函数中添加了1秒的休眠时间,以模拟一些处理时间。

package main
import (
	"fmt"
	"github.com/nats-io/go-nats"
	"flag"
	"log"
	"runtime"
	"time"
	"bytes"
	"strconv"
)

// 注意:对于TLS,请使用tls方案,例如 nats-rply -s tls://demo.nats.io:4443 foo hello
func usage() {
	log.Fatalf("Usage: nats-rply [-s server][-t] <subject> \n")
}

func printMsg(m *nats.Msg, i int) {
	log.Printf("[#%d] Received on [%s]: '%s'\n", i, m.Subject, string(m.Data))
}

func main() {
	log.Printf("Main goroutine ID:%d\n", getGID())
	var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
	var showTime = flag.Bool("t", false, "Display timestamps")

	//log.SetFlags(0)
	flag.Usage = usage
	flag.Parse()

	args := flag.Args()
	if len(args) < 1 {
		usage()
	}

	nc, err := nats.Connect(*urls)
	if err != nil {
		log.Fatalf("Can't connect: %v\n", err)
	}

	subj, i := args[0], 0

	nc.Subscribe(subj, func(msg *nats.Msg) {
		i++
		printMsg(msg, i)
		// 模拟一些处理时间
		time.Sleep(1 * time.Second)
		newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
		nc.Publish(msg.Reply, []byte(newreply))
	})
	nc.Flush()

	if err := nc.LastError(); err != nil {
		log.Fatal(err)
	}

	log.Printf("Listening on [%s]\n", subj)
	if *showTime {
		log.SetFlags(log.LstdFlags)
	}

	runtime.Goexit()
}

func getGID() uint64 {
    b := make([]byte, 64)
    b = b[:runtime.Stack(b, false)]
    b = bytes.TrimPrefix(b, []byte("goroutine "))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

nats-req.go已修改为在并行启动的10个goroutine中发送10个请求,请求超时设置为3.5秒。我尝试了使用共享NATS连接的goroutine(函数oneReq())和使用自己的NATS连接的goroutine(函数onReqSeparateConnect())-但结果都不成功。

package main

import (
	"flag"
	"fmt"
	"github.com/nats-io/go-nats"
	"sync"
	"time"
	"log"
)

// 注意:对于TLS,请使用tls方案,例如 nats-req -s tls://demo.nats.io:4443 foo hello
func usage() {
	log.Fatalf("Usage: nats-req [-s server (%s)] <subject> \n", nats.DefaultURL)
}

func main() {
	//var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")

	//log.SetFlags(0)
	flag.Usage = usage
	flag.Parse()

	args := flag.Args()
	if len(args) < 1 {
		usage()
	}

	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatalf("Can't connect: %v\n", err)
	}
	defer nc.Close()
	subj := args[0]

	var wg sync.WaitGroup
	wg.Add(10)
	for i := 1; i <= 10; i++ {
		//go oneReq(subj, fmt.Sprintf("Request%d", i), nc, &wg)
		go oneReqSeparateConnect(subj, fmt.Sprintf("Request%d", i), &wg)
	}

	wg.Wait()

}

func oneReq(subj string, payload string, nc *nats.Conn, wg *sync.WaitGroup) {
	defer wg.Done()
	msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
	if err != nil {
		if nc.LastError() != nil {
			log.Printf("Error in Request: %v\n", nc.LastError())
		}
		log.Printf("Error in Request: %v\n", err)
	} else {
		log.Printf("Published [%s] : '%s'\n", subj, payload)
		log.Printf("Received [%v] : '%s'\n", msg.Subject, string(msg.Data))
	}
}

func oneReqSeparateConnect(subj string, payload string, wg *sync.WaitGroup) {
	defer wg.Done()
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Printf("Can't connect: %v\n", err)
		return
	}
	defer nc.Close()
	msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
	if err != nil {
		if nc.LastError() != nil {
			log.Printf("Error in Request: %v\n", nc.LastError())
		}
		log.Printf("Error in Request: %v\n", err)
	} else {
		log.Printf("Published [%s] : '%s'\n", subj, payload)
		log.Printf("Received [%v] : '%s'\n", msg.Subject, string(msg.Data))
	}
}

以下是结果-不希望的行为,看起来nats-rply.go仅为处理传入请求创建了一个goroutine,并且请求以串行方式处理。
nats-req.go在一次发送所有10个请求,并将超时设置为3.5秒。nats-rply.go开始以1秒的间隔串行响应请求,因此只有3个请求在3.5秒超时之前得到满足-其余的请求超时。响应消息还包含GoroutineID,对于所有传入请求都是相同的!即使重新启动nats-rply.go服务器,当再次启动nats-req时,goroutine id也是相同的,只有在nats-rply.go服务器重新启动时才会更改ID。

nats-req.go日志

D:\PRAC\TSP\AMON>nats-req foo
2017/08/29 18:46:48 Sending: 'Request9'
2017/08/29 18:46:48 Sending: 'Request7'
2017/08/29 18:46:48 Sending: 'Request10'
2017/08/29 18:46:48 Sending: 'Request4'
2017/08/29 18:46:48 Sending: 'Request8'
2017/08/29 18:46:48 Sending: 'Request6'
2017/08/29 18:46:48 Sending: 'Request1'
2017/08/29 18:46:48 Sending: 'Request5'
2017/08/29 18:46:48 Sending: 'Request2'
2017/08/29 18:46:48 Sending: 'Request3'
2017/08/29 18:46:49 Published [foo] : 'Request9'
2017/08/29 18:46:49 Received [_INBOX.xrsXYOB2QmW1f52pkfLHya.xrsXYOB2QmW1f52pkfLHzJ] : 'REPLY TO request "Request9", GoroutineId:36'
2017/08/29 18:46:50 Published [foo] : 'Request7'
2017/08/29 18:46:50 Received [_INBOX.xrsXYOB2QmW1f52pkfLI02.xrsXYOB2QmW1f52pkfLI0l] : 'REPLY TO request "Request7", GoroutineId:36'
2017/08/29 18:46:51 Published [foo] : 'Request10'
2017/08/29 18:46:51 Received [_INBOX.xrsXYOB2QmW1f52pkfLI1U.xrsXYOB2QmW1f52pkfLI2D] : 'REPLY TO request "Request10", GoroutineId:36'
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout

nats-rply.go日志

C:\Users\belunek>nats-rply foo
2017/08/29 18:46:46 Main goroutine ID:1
2017/08/29 18:46:46 Listening on [foo]
2017/08/29 18:46:48 [#1] Received on [foo]: 'Request9'
2017/08/29 18:46:49 [#2] Received on [foo]: 'Request7'
2017/08/29 18:46:50 [#3] Received on [foo]: 'Request10'
2017/08/29 18:46:51 [#4] Received on [foo]: 'Request4'
2017/08/29 18:46:52 [#5] Received on [foo]: 'Request8'
2017/08/29 18:46:53 [#6] Received on [foo]: 'Request6'
2017/08/29 18:46:54 [#7] Received on [foo]: 'Request1'
2017/08/29 18:46:55 [#8] Received on [foo]: 'Request5'
2017/08/29 18:46:56 [#9] Received on [foo]: 'Request2'
2017/08/29 18:46:57 [#10] Received on [foo]: 'Request3'

请问有什么想法,如何在NATS中正确实现异步(并行)响应处理的请求/响应通信?谢谢任何信息。

英文:

I am trying to implement request/response functinonality in gnatsd using GO language and I realized that gnatsd does not reply to request in async manner.

I started my investigation using NATS github examples https://github.com/nats-io/go-nats/tree/master/examples - examples nats-req.go and nats-rply.go. The examples works well.

Then I modified them simply to test parallel requests on gnatsd and also to provide some debug info in which goroutine ID the async reply is processed.
There is source code of modified examples.

nats-rply.go has been modified to simply return back text of incoming request with information on current goroutine ID. I have also add to the async processing function 1 second sleep to simulate some processing time.

package main
import (
&quot;fmt&quot;
&quot;github.com/nats-io/go-nats&quot;
&quot;flag&quot;
&quot;log&quot;
&quot;runtime&quot;
&quot;time&quot;
&quot;bytes&quot;
&quot;strconv&quot;
)
// NOTE: Use tls scheme for TLS, e.g. nats-rply -s tls://demo.nats.io:4443 foo hello
func usage() {
log.Fatalf(&quot;Usage: nats-rply [-s server][-t] &lt;subject&gt; \n&quot;)
}
func printMsg(m *nats.Msg, i int) {
log.Printf(&quot;[#%d] Received on [%s]: &#39;%s&#39;\n&quot;, i, m.Subject, string(m.Data))
}
func main() {
log.Printf(&quot;Main goroutine ID:%d\n&quot;, getGID())
var urls = flag.String(&quot;s&quot;, nats.DefaultURL, &quot;The nats server URLs (separated by comma)&quot;)
var showTime = flag.Bool(&quot;t&quot;, false, &quot;Display timestamps&quot;)
//log.SetFlags(0)
flag.Usage = usage
flag.Parse()
args := flag.Args()
if len(args) &lt; 1 {
usage()
}
nc, err := nats.Connect(*urls)
if err != nil {
log.Fatalf(&quot;Can&#39;t connect: %v\n&quot;, err)
}
subj, i := args[0], 0
nc.Subscribe(subj, func(msg *nats.Msg) {
i++
printMsg(msg, i)
//simulation of some processing time
time.Sleep(1 * time.Second)
newreply := []byte(fmt.Sprintf(&quot;REPLY TO request \&quot;%s\&quot;, GoroutineId:%d&quot;, string(msg.Data), getGID()))
nc.Publish(msg.Reply, []byte(newreply))
})
nc.Flush()
if err := nc.LastError(); err != nil {
log.Fatal(err)
}
log.Printf(&quot;Listening on [%s]\n&quot;, subj)
if *showTime {
log.SetFlags(log.LstdFlags)
}
runtime.Goexit()
}
func getGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte(&quot;goroutine &quot;))
b = b[:bytes.IndexByte(b, &#39; &#39;)]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}

nats-req.go has been modified to send 10 requests in separate 10 goroutines started in parallel, the request timeout has been set to 3,5 seconds. I tried goroutines with shared NATS connection (function oneReq()) and also goroutines with its own NATS connections (function onReqSeparateConnect()) - with the same unsuccessful results.

package main
import (
&quot;flag&quot;
&quot;fmt&quot;
&quot;github.com/nats-io/go-nats&quot;
&quot;sync&quot;
&quot;time&quot;
&quot;log&quot;
)
// NOTE: Use tls scheme for TLS, e.g. nats-req -s tls://demo.nats.io:4443 foo hello
func usage() {
log.Fatalf(&quot;Usage: nats-req [-s server (%s)] &lt;subject&gt; \n&quot;, nats.DefaultURL)
}
func main() {
//var urls = flag.String(&quot;s&quot;, nats.DefaultURL, &quot;The nats server URLs (separated by comma)&quot;)
//log.SetFlags(0)
flag.Usage = usage
flag.Parse()
args := flag.Args()
if len(args) &lt; 1 {
usage()
}
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf(&quot;Can&#39;t connect: %v\n&quot;, err)
}
defer nc.Close()
subj := args[0]
var wg sync.WaitGroup
wg.Add(10)
for i := 1; i &lt;= 10; i++ {
//go oneReq(subj, fmt.Sprintf(&quot;Request%d&quot;, i), nc, &amp;wg)
go oneReqSeparateConnect(subj, fmt.Sprintf(&quot;Request%d&quot;, i), &amp;wg)
}
wg.Wait()
}
func oneReq(subj string, payload string, nc *nats.Conn, wg *sync.WaitGroup) {
defer wg.Done()
msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
if err != nil {
if nc.LastError() != nil {
log.Printf(&quot;Error in Request: %v\n&quot;, nc.LastError())
}
log.Printf(&quot;Error in Request: %v\n&quot;, err)
} else {
log.Printf(&quot;Published [%s] : &#39;%s&#39;\n&quot;, subj, payload)
log.Printf(&quot;Received [%v] : &#39;%s&#39;\n&quot;, msg.Subject, string(msg.Data))
}
}
func oneReqSeparateConnect(subj string, payload string, wg *sync.WaitGroup) {
defer wg.Done()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Printf(&quot;Can&#39;t connect: %v\n&quot;, err)
return
}
defer nc.Close()
msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
if err != nil {
if nc.LastError() != nil {
log.Printf(&quot;Error in Request: %v\n&quot;, nc.LastError())
}
log.Printf(&quot;Error in Request: %v\n&quot;, err)
} else {
log.Printf(&quot;Published [%s] : &#39;%s&#39;\n&quot;, subj, payload)
log.Printf(&quot;Received [%v] : &#39;%s&#39;\n&quot;, msg.Subject, string(msg.Data))
}
}

And there is result - unwanted behaviour, it looks that nats-rply.go creates only one goroutine for processing incoming reqests and the requests are processed in serial way.
The nats-req.go sends all 10 requests in one time with timeout set to 3,5 seconds. The nats-rply.go starts responding to the request with one second intervals in serial way, so only 3 requests are satisfied until 3,5sec timeout is breached - rest of requests timeouts. The response message also contains GoroutineID which is the same for all incoming requests! Even when nats-req is started again the goroutine id is the same, the ID changes only when nats-rply.go server is restarted.

nats-req.go log

D:\PRAC\TSP\AMON&gt;nats-req foo
2017/08/29 18:46:48 Sending: &#39;Request9&#39;
2017/08/29 18:46:48 Sending: &#39;Request7&#39;
2017/08/29 18:46:48 Sending: &#39;Request10&#39;
2017/08/29 18:46:48 Sending: &#39;Request4&#39;
2017/08/29 18:46:48 Sending: &#39;Request8&#39;
2017/08/29 18:46:48 Sending: &#39;Request6&#39;
2017/08/29 18:46:48 Sending: &#39;Request1&#39;
2017/08/29 18:46:48 Sending: &#39;Request5&#39;
2017/08/29 18:46:48 Sending: &#39;Request2&#39;
2017/08/29 18:46:48 Sending: &#39;Request3&#39;
2017/08/29 18:46:49 Published [foo] : &#39;Request9&#39;
2017/08/29 18:46:49 Received [_INBOX.xrsXYOB2QmW1f52pkfLHya.xrsXYOB2QmW1f52pkfLHzJ] : &#39;REPLY TO request &quot;Request9&quot;, GoroutineId:36&#39;
2017/08/29 18:46:50 Published [foo] : &#39;Request7&#39;
2017/08/29 18:46:50 Received [_INBOX.xrsXYOB2QmW1f52pkfLI02.xrsXYOB2QmW1f52pkfLI0l] : &#39;REPLY TO request &quot;Request7&quot;, GoroutineId:36&#39;
2017/08/29 18:46:51 Published [foo] : &#39;Request10&#39;
2017/08/29 18:46:51 Received [_INBOX.xrsXYOB2QmW1f52pkfLI1U.xrsXYOB2QmW1f52pkfLI2D] : &#39;REPLY TO request &quot;Request10&quot;, GoroutineId:36&#39;
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout

nats-rply.go log

C:\Users\belunek&gt;nats-rply foo
2017/08/29 18:46:46 Main goroutine ID:1
2017/08/29 18:46:46 Listening on [foo]
2017/08/29 18:46:48 [#1] Received on [foo]: &#39;Request9&#39;
2017/08/29 18:46:49 [#2] Received on [foo]: &#39;Request7&#39;
2017/08/29 18:46:50 [#3] Received on [foo]: &#39;Request10&#39;
2017/08/29 18:46:51 [#4] Received on [foo]: &#39;Request4&#39;
2017/08/29 18:46:52 [#5] Received on [foo]: &#39;Request8&#39;
2017/08/29 18:46:53 [#6] Received on [foo]: &#39;Request6&#39;
2017/08/29 18:46:54 [#7] Received on [foo]: &#39;Request1&#39;
2017/08/29 18:46:55 [#8] Received on [foo]: &#39;Request5&#39;
2017/08/29 18:46:56 [#9] Received on [foo]: &#39;Request2&#39;
2017/08/29 18:46:57 [#10] Received on [foo]: &#39;Request3&#39;

Please any ideas, how to correctly implement request/response communication in NATS with asyns (parallel) response processing?
Thanks for any info.

答案1

得分: 2

Gnatsd以异步方式回复Request,但它并不为每个请求启动goroutine,而是纯粹的异步。由于你使用time.Sleep模拟处理负载,这会暂停调用的goroutine,看起来像是同步处理。如果你修改示例以使用goroutine,一切都会正常工作。

...
nc.Subscribe(subj, func(msg *nats.Msg) {
go handler(msg, i, nc)
})
...
func handler(msg *nats.Msg, i int, nc *nats.Conn) {
i++
printMsg(msg, i)
//模拟一些处理时间
time.Sleep(1 * time.Second)
newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
nc.Publish(msg.Reply, []byte(newreply))
}

输出:

./nats-rply test
2017/08/30 00:17:05 主goroutine ID:1
2017/08/30 00:17:05 监听 [test]
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request6'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request5'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request1'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request8'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request3'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request7'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request9'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request4'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request2'
2017/08/30 00:17:11 [#1] 接收到 [test]:'Request10'
./nats-req test
2017/08/30 00:17:12 发布 [test]:'Request3'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm6Bq]:'REPLY TO request "Request3", GoroutineId:37'
2017/08/30 00:17:12 发布 [test]:'Request7'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm5z6]:'REPLY TO request "Request7", GoroutineId:42'
2017/08/30 00:17:12 发布 [test]:'Request10'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm5wY]:'REPLY TO request "Request10", GoroutineId:43'
2017/08/30 00:17:12 发布 [test]:'Request5'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm6EO]:'REPLY TO request "Request5", GoroutineId:34'
2017/08/30 00:17:12 发布 [test]:'Request8'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm66k]:'REPLY TO request "Request8", GoroutineId:36'
2017/08/30 00:17:12 发布 [test]:'Request1'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm64C]:'REPLY TO request "Request1", GoroutineId:35'
2017/08/30 00:17:12 发布 [test]:'Request2'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm6Gw]:'REPLY TO request "Request2", GoroutineId:41'
2017/08/30 00:17:12 发布 [test]:'Request4'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm69I]:'REPLY TO request "Request4", GoroutineId:40'
2017/08/30 00:17:12 发布 [test]:'Request9'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm61e]:'REPLY TO request "Request9", GoroutineId:39'
2017/08/30 00:17:12 发布 [test]:'Request6'
2017/08/30 00:17:12 接收到 [_INBOX.xoG573m0V7dVoIJxojm5u0]:'REPLY TO request "Request6", GoroutineId:38'
英文:

Gnatsd reply to Request in async manner, but it doesn't start goroutine for each request, just pure async. And because you simulate processing load using time.Sleep, which pauses calling goroutine, it looks like sync processing. If you modify your example to use goroutines, everything works well.

...
nc.Subscribe(subj, func(msg *nats.Msg) {
go handler(msg, i, nc)
})
...
func handler(msg *nats.Msg, i int, nc *nats.Conn) {
i++
printMsg(msg, i)
//simulation of some processing time
time.Sleep(1 * time.Second)
newreply := []byte(fmt.Sprintf(&quot;REPLY TO request \&quot;%s\&quot;, GoroutineId:%d&quot;, string(msg.Data), getGID()))
nc.Publish(msg.Reply, []byte(newreply))
}

Output:

./nats-rply test
2017/08/30 00:17:05 Main goroutine ID:1
2017/08/30 00:17:05 Listening on [test]
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request6&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request5&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request1&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request8&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request3&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request7&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request9&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request4&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request2&#39;
2017/08/30 00:17:11 [#1] Received on [test]: &#39;Request10&#39;
./nats-req test
2017/08/30 00:17:12 Published [test] : &#39;Request3&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Bq] : &#39;REPLY TO request &quot;Request3&quot;, GoroutineId:37&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request7&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5z6] : &#39;REPLY TO request &quot;Request7&quot;, GoroutineId:42&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request10&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5wY] : &#39;REPLY TO request &quot;Request10&quot;, GoroutineId:43&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request5&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6EO] : &#39;REPLY TO request &quot;Request5&quot;, GoroutineId:34&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request8&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm66k] : &#39;REPLY TO request &quot;Request8&quot;, GoroutineId:36&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request1&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm64C] : &#39;REPLY TO request &quot;Request1&quot;, GoroutineId:35&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request2&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Gw] : &#39;REPLY TO request &quot;Request2&quot;, GoroutineId:41&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request4&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm69I] : &#39;REPLY TO request &quot;Request4&quot;, GoroutineId:40&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request9&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm61e] : &#39;REPLY TO request &quot;Request9&quot;, GoroutineId:39&#39;
2017/08/30 00:17:12 Published [test] : &#39;Request6&#39;
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5u0] : &#39;REPLY TO request &quot;Request6&quot;, GoroutineId:38&#39;

答案2

得分: 1

请记住,通过从消息处理程序启动一个go-routine,你的处理顺序将变得混乱。这就是为什么NATS会按顺序调用消息处理程序,以确保用户得到有序的处理。如果顺序对你不重要,那么确实可以在一个单独的go-routine(或一组go-routine池)中开始处理消息。

英文:

Keep in mind that by starting a go-routine from the message handler, your processing order goes out of the window. This is the reason NATS is calling the message handler serially, to give user a guaranteed order. If order is not important to you, then indeed, it is easy to start processing of the message in a separate go-routine (or pool of go-routines).

huangapple
  • 本文由 发表于 2017年8月30日 01:02:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/45944632.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定