英文:
Go routine:Making concurrent API requests
问题
我正在尝试理解通道(channels)和 goroutine,并尝试编写一个用于向服务器发起并发 API 请求的 goroutine。
但是当我使用 goroutine 运行代码时,似乎花费的时间与不使用 goroutine 时相同。
func sendUser(user string, ch chan<- string) {
resp, err := http.Get("URL/user")
// 进行处理并获取 resp=string
ch <- resp
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
for _, user = range users {
go sendUser(user, ch)
for {
select {
case r := <-ch:
if r.err != nil {
fmt.Println(r.err)
}
responses = append(responses, r)
// 是否有更好的方法来表示响应处理完成?
if len(responses) == len(users) {
return responses, nil
}
case <-time.After(50 * time.Millisecond):
fmt.Printf(".")
}
}
}
return responses, nil
}
问题:
-
即使我使用了 goroutine,请求完成的时间与不使用 goroutine 时相同。我在使用 goroutine 方面有什么错误吗?
-
为了告诉任务不再等待,我使用了以下代码:
if len(responses) == len(users)
有没有更好的方法来表示响应处理完成并告诉通道不再等待?
-
wait.Syncgroup
是什么?我如何在我的 goroutine 中使用它?
英文:
I am trying to understand channels and goroutines and tried to write a goroutine for making concurrent API requests to the server
But when I am running the code using a goroutine, it seems like it is taking the same time as it does without a goroutine.
func sendUser(user string, ch chan<- string) {
resp,err := http.get("URL"/user)
//do the processing and get resp=string
ch <- resp
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
for _ , user = range users {
go sendUser(user, ch)
for {
select {
case r := <-ch:
if r.err != nil {
fmt.Println(r.err)
}
responses = append(responses, r)
**//Is there a better way to show that the processing of response is complete**?
if len(responses) == len(users) {
return responses, nil
}
case <-time.After(50 * time.Millisecond):
fmt.Printf(".")
}
}
}
return responses, nil
}
Questions:
-
Even though I am using a goroutine, request completion time is same as it is without goroutines? Is there anything I am doing wrong with goroutines?
-
For telling the job not to wait anymore here I am using:
if len(responses) == len(users)
Is there a better way to show that the processing of response is complete and tell ch not to wait anymore?
-
What is wait.Syncgroup? How can I use it in my goroutine?
答案1
得分: 15
我可能会这样做:
func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := http.Get("URL/" + user)
if err != nil {
log.Println("处理错误")
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("处理错误")
}
ch <- string(b)
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
var wg sync.WaitGroup
for _, user = range users {
wg.Add(1)
go sendUser(user, ch, &wg)
}
// 在后台关闭通道
go func() {
wg.Wait()
close(ch)
}()
// 从通道中读取数据,直到通道关闭
for res := range ch {
responses = append(responses, res)
}
return responses, nil
}
它允许在数据发送时从通道中读取。通过使用 waitgroup,我将知道何时关闭通道。通过将 waitgroup 和 close 放在 goroutine 中,我可以实时地从通道中读取数据,而不会阻塞。
英文:
I might do something like this..
func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := http.Get("URL/" + user)
if err != nil {
log.Println("err handle it")
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("err handle it")
}
ch <- string(b)
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
var wg sync.WaitGroup
for _, user = range users {
wg.Add(1)
go sendUser(user, ch, &wg)
}
// close the channel in the background
go func() {
wg.Wait()
close(ch)
}()
// read from channel as they come in until its closed
for res := range ch {
responses = append(responses, res)
}
return responses, nil
}
It allows to read from the channel as they are sent. By using a waitgroup I'll know when to close the channel. By putting the waitgroup and close in a goroutine I can read from the channel in "realtime" without blocking.
答案2
得分: 6
对于有界并行性/速率限制,我们可以看一个示例https://blog.golang.org/pipelines#TOC_9.
基本步骤如下:
- 将用于调用API的输入/参数/参数流式传输到输入通道。
- 运行
N
个工作协程,每个协程都从相同的(共享的)输入通道中获取参数。从输入通道获取参数,调用API,将结果发送到结果通道。 - 消费结果通道,如果出现错误,则提前返回。
使用sync.WaitGroup
等待所有工作协程完成(在输入通道耗尽后)。
以下是代码示例(您可以立即运行它,尝试将NUM_PARALLEL
更改为不同的并行度)。将BASE_URL
更改为您的基本URL。
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
)
// 占位符URL。将其更改为您的基本URL。
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"
// 并行度
const NUM_PARALLEL = 20
// 将输入流式传输到输入通道
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
inputCh := make(chan string)
go func() {
defer close(inputCh)
for _, input := range inputs {
select {
case inputCh <- input:
case <-done:
// 如果提前关闭了done(因为中途出现错误),完成循环(关闭输入通道)
break
}
}
}()
return inputCh
}
// HTTP调用的普通函数,不涉及goroutine/通道的知识
func sendUser(user string) (string, error) {
url := BASE_URL + user
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
bodyStr := string(body)
return bodyStr, nil
}
// 用作结果通道类型的sendUser返回值的包装器
type result struct {
bodyStr string
err error
}
func AsyncHTTP(users []string) ([]string, error) {
done := make(chan struct{})
defer close(done)
inputCh := streamInputs(done, users)
var wg sync.WaitGroup
// 在开始时批量添加goroutine计数器
wg.Add(NUM_PARALLEL)
resultCh := make(chan result)
for i := 0; i < NUM_PARALLEL; i++ {
// 生成N个工作协程,每个协程都从共享的输入通道中获取参数。
go func() {
for input := range inputCh {
bodyStr, err := sendUser(input)
resultCh <- result{bodyStr, err}
}
wg.Done()
}()
}
// 等待所有工作协程完成。如果没有错误(没有提前返回),则发生。
go func() {
wg.Wait()
close(resultCh)
}()
results := []string{}
for result := range resultCh {
if result.err != nil {
// 提前返回。done通道已关闭,因此输入通道也已关闭。
// 所有工作协程停止工作(因为输入通道已关闭)
return nil, result.err
}
results = append(results, result.bodyStr)
}
return results, nil
}
func main() {
// 填充users参数
users := []string{}
for i := 1; i <= 100; i++ {
users = append(users, strconv.Itoa(i))
}
start := time.Now()
results, err := AsyncHTTP(users)
if err != nil {
fmt.Println(err)
return
}
for _, result := range results {
fmt.Println(result)
}
fmt.Println("finished in ", time.Since(start))
}
英文:
For bounded parallelism / rate limiting, we can take a look an example at https://blog.golang.org/pipelines#TOC_9.
Basically the steps are:
- Stream inputs / params / args used to call the API, to an input channel.
- Run
N
worker goroutines, each consuming the same (shared) input channel. Get the args from input channel, call the API, send the result into a result channel. - Consume the result channel, return early if there's error.
sync.WaitGroup
is used to wait for all worker goroutines to complete (after the input channel is exhausted).
Below is code example of it (you can run it right away, try changing NUM_PARALLEL
to different number of parallelism). Change BASE_URL
to your base url.
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
)
// placeholder url. Change it to your base url.
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"
// number of parallelism
const NUM_PARALLEL = 20
// Stream inputs to input channel
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
inputCh := make(chan string)
go func() {
defer close(inputCh)
for _, input := range inputs {
select {
case inputCh <- input:
case <-done:
// in case done is closed prematurely (because error midway),
// finish the loop (closing input channel)
break
}
}
}()
return inputCh
}
// Normal function for HTTP call, no knowledge of goroutine/channels
func sendUser(user string) (string, error) {
url := BASE_URL + user
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
bodyStr := string(body)
return bodyStr, nil
}
// Wrapper for sendUser return value, used as result channel type
type result struct {
bodyStr string
err error
}
func AsyncHTTP(users []string) ([]string, error) {
done := make(chan struct{})
defer close(done)
inputCh := streamInputs(done, users)
var wg sync.WaitGroup
// bulk add goroutine counter at the start
wg.Add(NUM_PARALLEL)
resultCh := make(chan result)
for i := 0; i < NUM_PARALLEL; i++ {
// spawn N worker goroutines, each is consuming a shared input channel.
go func() {
for input := range inputCh {
bodyStr, err := sendUser(input)
resultCh <- result{bodyStr, err}
}
wg.Done()
}()
}
// Wait all worker goroutines to finish. Happens if there's no error (no early return)
go func() {
wg.Wait()
close(resultCh)
}()
results := []string{}
for result := range resultCh {
if result.err != nil {
// return early. done channel is closed, thus input channel is also closed.
// all worker goroutines stop working (because input channel is closed)
return nil, result.err
}
results = append(results, result.bodyStr)
}
return results, nil
}
func main() {
// populate users param
users := []string{}
for i := 1; i <= 100; i++ {
users = append(users, strconv.Itoa(i))
}
start := time.Now()
results, err := AsyncHTTP(users)
if err != nil {
fmt.Println(err)
return
}
for _, result := range results {
fmt.Println(result)
}
fmt.Println("finished in ", time.Since(start))
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论