处理来自RabbitMQ的消息时限制并发性。

huangapple go评论102阅读模式
英文:

Limiting concurrency when processing messages from RabbitMQ

问题

我正在尝试从队列(RabbitMQ)中读取URL并进行有限数量的并发HTTP请求,即使用一个包含10个工作线程的池,从队列中接收URL并进行并发请求(无限循环)。

到目前为止,我已经按照RabbitMQ教程实现了一个消费者:
https://www.rabbitmq.com/tutorials/tutorial-one-go.html

并尝试了一些从网上找到的示例方法,最后在这个示例中停下来:
http://jmoiron.net/blog/limiting-concurrency-in-go/

不幸的是,我的当前代码运行大约一分钟,然后无限期地冻结。我尝试添加/移动了一些goroutine,但似乎无法按预期工作(我对Go语言非常陌生)。

当前代码:

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/Xide/bloom"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

var netClient = &http.Client{
	Timeout: time.Second * 10,
}

func getRequest(url string) {
	//resp, err := http.Get(string(url))
	resp, err := netClient.Get(string(url))
	if err != nil {
		log.Printf("HTTP request error: %s", err)
		return
	}
	fmt.Println("StatusCode:", resp.StatusCode)
	fmt.Println(resp.Request.URL)
}

func main() {
	bf := bloom.NewDefaultScalable(0.1)

	conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"urls",    // name
		true,      // durable
		false,     // delete when unused
		false,     // exclusive
		false,     // no-wait
		nil,       // arguments
	)
	failOnError(err, "Failed to declare a queue")

	err = ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, //global
	)
	failOnError(err, "Failed to set Qos")

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	concurrency := 10
	sem := make(chan bool, concurrency)
	go func() {
		for d := range msgs {
			sem <- true
			url := string(d.Body)
			if bf.Match(url) == false {
				bf.Feed(url)
				log.Printf("Not seen: %s", d.Body)
				go func(url string) {
					defer func() { <-sem }()
					getRequest(url)
				}(url)
			} else {
				log.Printf("Already seen: %s", d.Body)
			}
			d.Ack(false)
		}
		for i := 0; i < cap(sem); i++ {
			sem <- true
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

希望对你有所帮助!

英文:

I'm attempting to read URLs from a queue (RabbitMQ) and make a limited number of concurrent HTTP requests i.e. have a pool of 10 workers making concurrent requests to URLs received from the queue (forever).

So far I've implemented a consumer as per the RabbitMQ tutorials:
https://www.rabbitmq.com/tutorials/tutorial-one-go.html

And have tried a number of methods from examples discovered on the web, ending at the example here:
http://jmoiron.net/blog/limiting-concurrency-in-go/

Unfortunately, my current code runs for approximately one minute and then freezes indefinitely. I've tried adding/moving go routines around but I can't seem to get it to work as intended (I'm very new to Go).

Current code:

package main
import (
&quot;fmt&quot;
&quot;log&quot;
&quot;net/http&quot;
&quot;time&quot;
&quot;github.com/Xide/bloom&quot;
&quot;github.com/streadway/amqp&quot;
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf(&quot;%s: %s&quot;, msg, err)
panic(fmt.Sprintf(&quot;%s: %s&quot;, msg, err))
}
}
var netClient = &amp;http.Client{
Timeout: time.Second * 10,
}
func getRequest(url string) {
//resp, err := http.Get(string(url))
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf(&quot;HTTP request error: %s&quot;, err)
return
}
fmt.Println(&quot;StatusCode:&quot;, resp.StatusCode)
fmt.Println(resp.Request.URL)
}
func main() {
bf := bloom.NewDefaultScalable(0.1)
conn, err := amqp.Dial(&quot;amqp://127.0.0.1:5672/&quot;)
failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, &quot;Failed to open a channel&quot;)
defer ch.Close()
q, err := ch.QueueDeclare(
&quot;urls&quot;,            // name
true,              // durable
false,             // delete when unused
false,             // exclusive
false,             // no-wait
nil,               // arguments
)
failOnError(err, &quot;Failed to declare a queue&quot;)
err = ch.Qos(
1,     // prefetch count
0,     // prefetch size
false, //global
)
failOnError(err, &quot;Failed to set Qos&quot;)
msgs, err := ch.Consume(
q.Name, // queue
&quot;&quot;,     // consumer
false,  // auto-ack
false,  // exclusive
false,  // no-local
false,  // no-wait
nil,    // args
)
failOnError(err, &quot;Failed to register a consumer&quot;)
forever := make(chan bool)
concurrency := 10
sem := make(chan bool, concurrency)
go func() {
for d := range msgs {
sem &lt;- true
url := string(d.Body)
if bf.Match(url) == false {
bf.Feed(url)
log.Printf(&quot;Not seen: %s&quot;, d.Body)
go func(url string) {
defer func() { &lt;-sem }()
getRequest(url)
}(url)
} else {
log.Printf(&quot;Already seen: %s&quot;, d.Body)
}
d.Ack(false)
}
for i := 0; i &lt; cap(sem); i++ {
sem &lt;- true
}
}()
log.Printf(&quot; [*] Waiting for messages. To exit press CTRL+C&quot;)
&lt;-forever
}

答案1

得分: 3

你没有正确处理HTTP响应,导致了一组不断增长的开放连接。尝试使用以下代码:

func getRequest(url string) {
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP请求错误:%s", err)
        return
    }
    // 添加以下代码:
    defer func() {
        io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()
    }()
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

在你从通道中读取完消息后,下面这部分代码似乎是不必要的,而且可能会引发问题:

for i := 0; i < cap(sem); i++ {
    sem <- true
}

为什么要在从队列中读取所有消息后填充sem通道呢?你已经向通道中添加了与你希望从中读取的消息数量相同的消息,所以这是毫无意义的,最好情况下是多余的,如果对代码的其他部分进行错误更改,可能会引发问题。

与你的问题无关,但这部分代码是多余的:

if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
}

根据文档Fatalf已经会退出程序,所以panic永远不会被调用。如果你想要记录日志并panic,可以尝试使用log.Panicf,它专门用于此目的。

英文:

You're not properly handling your HTTP responses, leading to a growing set of open connections. Try this:

func getRequest(url string) {
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf(&quot;HTTP request error: %s&quot;, err)
return
}
// Add this bit:
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
fmt.Println(&quot;StatusCode:&quot;, resp.StatusCode)
fmt.Println(resp.Request.URL)
}

This, after you finish reading messages from the channel, seems unnecessary and potentially problematic:

    for i := 0; i &lt; cap(sem); i++ {
sem &lt;- true
}

Why fill the sem channel after you've read all the messages from the queue? You've added exactly as many messages to the channel as you expect to read from it, so this is pointless at best, and could cause problems if you make the wrong change to the rest of the code.

Unrelated to your issue, but this is redundant:

if err != nil {
log.Fatalf(&quot;%s: %s&quot;, msg, err)
panic(fmt.Sprintf(&quot;%s: %s&quot;, msg, err))
}

Per the documentation, Fatalf already exits, so the panic will never be called. If you want to log and panic, try log.Panicf, which is designed for that purpose.

答案2

得分: 0

你在收到消息时向sem添加了一个计数,但只有在没有看到URL时才从sem中减去计数。

所以,一旦你已经“已经看到”10个URL,你的应用程序就会锁定。
因此,你需要在记录“已经看到”的else语句中添加<-sem

无论如何,这是一种相当奇怪的并发方式。
下面是一个更符合惯用方式的版本 在Play上

注意,在这个版本中,我们只是生成了10个goroutine来监听rabbit通道。

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/Xide/bloom"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

var netClient = &http.Client{
	Timeout: time.Second * 10,
}

func getRequest(url string) {
	//resp, err := http.Get(string(url))
	resp, err := netClient.Get(string(url))
	if err != nil {
		log.Printf("HTTP请求错误:%s", err)
		return
	}
	resp.Body.Close()
	fmt.Println("状态码:", resp.StatusCode)
	fmt.Println(resp.Request.URL)
}

func main() {
	bf := bloom.NewDefaultScalable(0.1)

	conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
	failOnError(err, "连接到RabbitMQ失败")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "打开通道失败")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"urls", // 名称
		true,   // 持久化
		false,  // 未使用时删除
		false,  // 独占
		false,  // 不等待
		nil,    // 参数
	)
	failOnError(err, "声明队列失败")

	err = ch.Qos(
		1,     // 预取计数
		0,     // 预取大小
		false, // 全局
	)
	failOnError(err, "设置Qos失败")

	msgs, err := ch.Consume(
		q.Name, // 队列
		"",     // 消费者
		false,  // 自动确认
		false,  // 独占
		false,  // 不使用本地
		false,  // 不等待
		nil,    // 参数
	)
	failOnError(err, "注册消费者失败")

	concurrency := 10
	var wg sync.WaitGroup // 用于协调它们何时完成,即:如果rabbit连接关闭
	for x := 0; x < concurrency; x++ { // 生成10个goroutine,都从rabbit通道中读取
		wg.Add(1)
		go func() {
			defer wg.Done() // 表示此goroutine已完成
			for d := range msgs {
				url := string(d.Body)
				if bf.Match(url) == false {
					bf.Feed(url)
					log.Printf("未见过:%s", d.Body)
					getRequest(url)
				} else {
					log.Printf("已见过:%s", d.Body)
				}
				d.Ack(false)
			}
			log.Println("msgs通道已关闭")
		}()
	}

	log.Printf(" [*] 等待消息。要退出,请按CTRL+C")
	wg.Wait() // 当所有goroutine退出时,应用程序退出
}
英文:

You are adding to sem when you get a message, but only removing from sem when you haven't seen a url.

so, once you've "already seen" 10 urls, your app will lock up.
So you need to add &lt;-sem to your else statement that logs "Already seen".

Either way, that's a fairly odd way to do this kind of concurrency.
Here's a version that does this in a more idiomatic way on Play.

Note, in this version, we just spawn 10 goroutines that listen to the rabbit channel.

package main
import (
&quot;fmt&quot;
&quot;log&quot;
&quot;net/http&quot;
&quot;time&quot;
&quot;github.com/Xide/bloom&quot;
&quot;github.com/streadway/amqp&quot;
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf(&quot;%s: %s&quot;, msg, err)
}
}
var netClient = &amp;http.Client{
Timeout: time.Second * 10,
}
func getRequest(url string) {
//resp, err := http.Get(string(url))
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf(&quot;HTTP request error: %s&quot;, err)
return
}
resp.Body.Close()
fmt.Println(&quot;StatusCode:&quot;, resp.StatusCode)
fmt.Println(resp.Request.URL)
}
func main() {
bf := bloom.NewDefaultScalable(0.1)
conn, err := amqp.Dial(&quot;amqp://127.0.0.1:5672/&quot;)
failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, &quot;Failed to open a channel&quot;)
defer ch.Close()
q, err := ch.QueueDeclare(
&quot;urls&quot;, // name
true,   // durable
false,  // delete when unused
false,  // exclusive
false,  // no-wait
nil,    // arguments
)
failOnError(err, &quot;Failed to declare a queue&quot;)
err = ch.Qos(
1,     // prefetch count
0,     // prefetch size
false, //global
)
failOnError(err, &quot;Failed to set Qos&quot;)
msgs, err := ch.Consume(
q.Name, // queue
&quot;&quot;,     // consumer
false,  // auto-ack
false,  // exclusive
false,  // no-local
false,  // no-wait
nil,    // args
)
failOnError(err, &quot;Failed to register a consumer&quot;)
concurrency := 10
var wg sync.Waitgroup              // used to coordinate when they are done, ie: if rabbit conn was closed
for x := 0; x &lt; concurrency; x++ { // spawn 10 goroutines, all reading from the rabbit channel
wg.Add(1)
go func() {
defer wg.Done() // signal that this goroutine is done
for d := range msgs {
url := string(d.Body)
if bf.Match(url) == false {
bf.Feed(url)
log.Printf(&quot;Not seen: %s&quot;, d.Body)
getRequest(url)
} else {
log.Printf(&quot;Already seen: %s&quot;, d.Body)
}
d.Ack(false)
}
log.Println(&quot;msgs channel closed&quot;)
}()
}
log.Printf(&quot; [*] Waiting for messages. To exit press CTRL+C&quot;)
wg.Wait() // when all goroutine&#39;s exit, the app exits
}

huangapple
  • 本文由 发表于 2017年8月24日 18:02:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/45858684.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定