将代码并发化以提高效率

huangapple go评论93阅读模式
英文:

Adding goroutine concurrency to make code efficient

问题

func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
    // 数据处理
	response, err := http.Post(postURL, "application/json", responseBody)
	if err != nil{
		panic(err)
	}
	fmt.Printf("POST : %s\n", status)
	return response
}

func callback(data string) {

	parsedData := make(map[string]interface{})
	err := json.Unmarshal([]byte(data), &parsedData)
	if err != nil {
		panic(err)
	}

	sender := parsedData["sender"].(string)
	recipient := parsedData["recipient"].(string)
	gatewayMessageId := parsedData["gateway_message_id"].(string)
  
    sendCallback("sent", sender, recipient, gatewayMessageId)
    sendCallback("delivered", sender, recipient, gatewayMessageId)
}

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		Password: "",
		DB:       0,
	})
	for {
		data, err := rdb.RPop(ctx, "callback").Result()
		if err == redis.Nil {
			fmt.Println("Sleeping")
			time.Sleep(2 * time.Second) // 睡眠2秒
			continue
		}
		go callback(data)
	}
}

我理解你想要翻译的代码部分。以上是你提供的代码的翻译结果。

英文:
func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
    // data massaging
	response, err := http.Post(postURL, "application/json", responseBody)
	if err != nil{
		panic(err)
	}
	fmt.Printf("POST : %s\n", status)
	return response
}

func callback(rdb *redis.Client) {
	for {
		data, err := rdb.RPop(ctx, "callback").Result()
		if err == redis.Nil {
			fmt.Println("Sleeping")
			time.Sleep(2 * time.Second) // sleep for 2s
			continue
		}

		// more work

       sendCallback(status, sender, recipient, gatewayMessageId)
	}

}



func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		Password: "",
		DB:       0,
	})
	callback(rdb)
}

I understand that the above code is flawed as I'm waiting for "data" to be processed. However, I want it to be non-blocking, and as soon as data is present, I want it to be processed. I have been consuming tutorials on goroutines, but I'm unable to wrap my head around it.

EDIT

Based on @torek explanation, if I take the infinite loop out of the callback function, do it in the main function, and let the callback focus on its part, is this how goroutines should work?

func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
    // data massaging
    response, err := http.Post(postURL, "application/json", responseBody)
    if err != nil{
        panic(err)
    }
    fmt.Printf("POST : %s\n", status)
    return response
}

func callback(data string) {

	parsedData := make(map[string]interface{})
	err := json.Unmarshal([]byte(data), &parsedData)
	if err != nil {
		panic(err)
	}

	sender := parsedData["sender"].(string)
	recipient := parsedData["recipient"].(string)
	gatewayMessageId := parsedData["gateway_message_id"].(string)
  
    sendCallback("sent", sender, recipient, gatewayMessageId)
    sendCallback("delivered", sender, recipient, gatewayMessageId)
}

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		Password: "",
		DB:       0,
	})
	for {
		data, err := rdb.RPop(ctx, "callback").Result()
		if err == redis.Nil {
			fmt.Println("Sleeping")
			time.Sleep(2 * time.Second) // sleep for 2s
			continue
		}
		go callback(data)
	}
}

答案1

得分: 1

我建议使用工作池解决方案。工作池应该有助于控制CPU和内存的使用。由于上下文切换开销,大量的goroutine进行CPU密集型操作并不是最优的。但最重要的好处是对http客户端的控制。你的代码为从redis接收到的每条消息创建了一个goroutine。然后这些goroutine发起HTTP请求。如果从redis接收到的消息数量超过目标服务在一段时间内可以处理的HTTP请求数量,目标服务将崩溃。或者,由于大量的goroutine,应用程序可能会在达到内存限制时崩溃。你可以设置MaxConnsPerHost,它可以防止无控制地创建连接,但无法防止创建新的goroutine。在我的建议中,应用程序通过利用通道来使处理速度适应目标服务的能力。如果发现目标服务可以处理更多的请求并且有多余的CPU资源,你可以增加工作人员的数量。

type message struct {
	Sender           string `json:"sender"`
	Recipient        string `json:"recipient"`
	GatewayMessageID string `json:"gateway_message_id"`
}

func sendCallback(status string, m message) *http.Response {
	// 数据处理
    ...
	response, err := http.Post(postURL, "application/json", responseBody)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("POST : %s\n", status)
	return response
}

func worker(messages chan []byte) {
	for rawMessage := range messages {
		m := message{}
		err := json.Unmarshal(rawMessage, &m)
		if err != nil {
			log.Fatal(err)
		}
		sendCallback("sent", m)
		sendCallback("delivered", m)
	}
}

const numberOfWorkers = 10

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
		Password: "",
		DB:       0,
	})
	messages := make(chan []byte, numberOfWorkers)
	for i := 0; i < numberOfWorkers; i++ {
		go worker(messages)
	}

	for {
		data, err := rdb.RPop(ctx, "callback").Result()
		if err == redis.Nil {
			fmt.Println("Sleeping")
			time.Sleep(2 * time.Second) // 睡眠2秒
			continue
		} else if err != nil {
			log.Fatal(err)
		}
		messages <- []byte(data)
	}
}
英文:

I suggest a worker pool solution. The worker pool should help control CPU and memory usage. Big number of goroutines making CPU intensive operation are not optimal due to context switch overhead. But the most important benefit is having the http client under control. Your code creates a goroutine for each message received from redis. These goroutines then make a HTTP request. If the number of messages received from redis is higher than the number of HTTP requests that can be handled by the target service in a period of time, the target service will crash. Alternatively, the application may crash if it reaches a memory limit due to a large number of goroutines. You can set MaxConnsPerHost which will prevent uncontrolled connection creation, but it will not prevent the creation of new goroutines. In my proposal, the application adapts the processing speed to the capabilities of the target service by leveraging the channel. You can increase the number of workers if you find that the target service can handle more requests and you have spare CPU power.

type message struct {
Sender           string `json:&quot;sender&quot;`
Recipient        string `json:&quot;recipient&quot;`
GatewayMessageID string `json:&quot;gateway_message_id&quot;`
}
func sendCallback(status string, m message) *http.Response {
// data massaging
...
response, err := http.Post(postURL, &quot;application/json&quot;, responseBody)
if err != nil {
log.Fatal(err)
}
fmt.Printf(&quot;POST : %s\n&quot;, status)
return response
}
func worker(messages chan []byte) {
for rawMessage := range  messages {
m := message{}
err := json.Unmarshal(rawMessage, &amp;m)
if err != nil {
log.Fatal(err)
}
sendCallback(&quot;sent&quot;, m)
sendCallback(&quot;delivered&quot;, m)
}
}
const numberOfWorkers = 10
func main() {
rdb := redis.NewClient(&amp;redis.Options{
Addr:     &quot;127.0.0.1:6379&quot;,
Password: &quot;&quot;,
DB:       0,
})
messages := make(chan []byte, numberOfWorkers)
for i := 0; i &lt; numberOfWorkers; i++ {
go worker(messages)
}
for {
data, err := rdb.RPop(ctx, &quot;callback&quot;).Result()
if err == redis.Nil {
fmt.Println(&quot;Sleeping&quot;)
time.Sleep(2 * time.Second) // sleep for 2s
continue
} else if err != nil {
log.Fatal(err)
}
messages &lt;- []byte(data)
}
}

huangapple
  • 本文由 发表于 2021年9月1日 00:34:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/69002356.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定