Go例程:进行并发API请求

huangapple go评论150阅读模式
英文:

Go routine:Making concurrent API requests

问题

我正在尝试理解通道(channels)和 goroutine,并尝试编写一个用于向服务器发起并发 API 请求的 goroutine。

但是当我使用 goroutine 运行代码时,似乎花费的时间与不使用 goroutine 时相同。

func sendUser(user string, ch chan<- string) {
    resp, err := http.Get("URL/user")
    // 进行处理并获取 resp=string
    ch <- resp
}

func AsyncHTTP(users []string) ([]string, error) {
    ch := make(chan string)
    var responses []string
    var user string

    for _, user = range users {
        go sendUser(user, ch)

        for {
            select {
            case r := <-ch:
                if r.err != nil {
                    fmt.Println(r.err)
                }
                responses = append(responses, r)
                // 是否有更好的方法来表示响应处理完成?
                if len(responses) == len(users) {
                    return responses, nil
                }
            case <-time.After(50 * time.Millisecond):
                fmt.Printf(".")
            }
        }
    }
    return responses, nil
}

问题:

  1. 即使我使用了 goroutine,请求完成的时间与不使用 goroutine 时相同。我在使用 goroutine 方面有什么错误吗?

  2. 为了告诉任务不再等待,我使用了以下代码:

    if len(responses) == len(users)
    

    有没有更好的方法来表示响应处理完成并告诉通道不再等待?

  3. wait.Syncgroup 是什么?我如何在我的 goroutine 中使用它?

英文:

I am trying to understand channels and goroutines and tried to write a goroutine for making concurrent API requests to the server

But when I am running the code using a goroutine, it seems like it is taking the same time as it does without a goroutine.

func sendUser(user string, ch chan&lt;- string)  {
    resp,err := http.get(&quot;URL&quot;/user)
    //do the processing and get resp=string
    ch &lt;- resp
}


func AsyncHTTP(users []string) ([]string, error) {
    ch := make(chan string)
    var responses []string
    var user string
 
    for _ , user = range users {
	    go sendUser(user, ch)

        for {
	        select {
	        case r := &lt;-ch:
		        if r.err != nil {
			        fmt.Println(r.err)
		        }
		        responses = append(responses, r)
                **//Is there a better way to show that the processing of response is complete**?
		        if len(responses) == len(users) { 
			        return responses, nil
		        }
	        case &lt;-time.After(50 * time.Millisecond):
		        fmt.Printf(&quot;.&quot;)
	        }
        }
    }
    return responses, nil
}

Questions:

  1. Even though I am using a goroutine, request completion time is same as it is without goroutines? Is there anything I am doing wrong with goroutines?

  2. For telling the job not to wait anymore here I am using:

     if len(responses) == len(users)
    

    Is there a better way to show that the processing of response is complete and tell ch not to wait anymore?

  3. What is wait.Syncgroup? How can I use it in my goroutine?

答案1

得分: 15

我可能会这样做:

func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    resp, err := http.Get("URL/" + user)
    if err != nil {
        log.Println("处理错误")
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Println("处理错误")
    }
    ch <- string(b)
}

func AsyncHTTP(users []string) ([]string, error) {
    ch := make(chan string)
    var responses []string
    var user string
    var wg sync.WaitGroup
    for _, user = range users {
        wg.Add(1)
        go sendUser(user, ch, &wg)
    }

    // 在后台关闭通道
    go func() {
        wg.Wait()
        close(ch)
    }()
    // 从通道中读取数据,直到通道关闭
    for res := range ch {
        responses = append(responses, res)
    }

    return responses, nil
}

它允许在数据发送时从通道中读取。通过使用 waitgroup,我将知道何时关闭通道。通过将 waitgroup 和 close 放在 goroutine 中,我可以实时地从通道中读取数据,而不会阻塞。

英文:

I might do something like this..

func sendUser(user string, ch chan&lt;- string, wg *sync.WaitGroup) {
	defer wg.Done()
	resp, err := http.Get(&quot;URL/&quot; + user)
	if err != nil {
		log.Println(&quot;err handle it&quot;)
	}
	defer resp.Body.Close()
	b, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		log.Println(&quot;err handle it&quot;)
	}
	ch &lt;- string(b)
}

func AsyncHTTP(users []string) ([]string, error) {
	ch := make(chan string)
	var responses []string
	var user string
	var wg sync.WaitGroup
	for _, user = range users {
		wg.Add(1)
		go sendUser(user, ch, &amp;wg)
	}

	// close the channel in the background
	go func() {
		wg.Wait()
		close(ch)
	}()
	// read from channel as they come in until its closed
	for res := range ch {
		responses = append(responses, res)
	}

	return responses, nil
}

It allows to read from the channel as they are sent. By using a waitgroup I'll know when to close the channel. By putting the waitgroup and close in a goroutine I can read from the channel in "realtime" without blocking.

答案2

得分: 6

对于有界并行性/速率限制,我们可以看一个示例https://blog.golang.org/pipelines#TOC_9.

基本步骤如下:

  1. 将用于调用API的输入/参数/参数流式传输到输入通道。
  2. 运行N个工作协程,每个协程都从相同的(共享的)输入通道中获取参数。从输入通道获取参数,调用API,将结果发送到结果通道。
  3. 消费结果通道,如果出现错误,则提前返回。

使用sync.WaitGroup等待所有工作协程完成(在输入通道耗尽后)。

以下是代码示例(您可以立即运行它,尝试将NUM_PARALLEL更改为不同的并行度)。将BASE_URL更改为您的基本URL。

package main

import (
	"fmt"
	"io"
	"net/http"
	"strconv"
	"sync"
	"time"
)

// 占位符URL。将其更改为您的基本URL。
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"

// 并行度
const NUM_PARALLEL = 20

// 将输入流式传输到输入通道
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
	inputCh := make(chan string)
	go func() {
		defer close(inputCh)
		for _, input := range inputs {
			select {
			case inputCh <- input:
			case <-done:
				// 如果提前关闭了done(因为中途出现错误),完成循环(关闭输入通道)
				break
			}
		}
	}()
	return inputCh
}

// HTTP调用的普通函数,不涉及goroutine/通道的知识
func sendUser(user string) (string, error) {
	url := BASE_URL + user
	resp, err := http.Get(url)
	if err != nil {
		return "", err
	}
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return "", err
	}

	bodyStr := string(body)
	return bodyStr, nil
}

// 用作结果通道类型的sendUser返回值的包装器
type result struct {
	bodyStr string
	err     error
}

func AsyncHTTP(users []string) ([]string, error) {
	done := make(chan struct{})
	defer close(done)

	inputCh := streamInputs(done, users)

	var wg sync.WaitGroup
	// 在开始时批量添加goroutine计数器
	wg.Add(NUM_PARALLEL)

	resultCh := make(chan result)

	for i := 0; i < NUM_PARALLEL; i++ {
		// 生成N个工作协程,每个协程都从共享的输入通道中获取参数。
		go func() {
			for input := range inputCh {
				bodyStr, err := sendUser(input)
				resultCh <- result{bodyStr, err}
			}
			wg.Done()
		}()
	}

	// 等待所有工作协程完成。如果没有错误(没有提前返回),则发生。
	go func() {
		wg.Wait()
		close(resultCh)
	}()

	results := []string{}
	for result := range resultCh {
		if result.err != nil {
			// 提前返回。done通道已关闭,因此输入通道也已关闭。
			// 所有工作协程停止工作(因为输入通道已关闭)
			return nil, result.err
		}
		results = append(results, result.bodyStr)
	}

	return results, nil
}

func main() {
	// 填充users参数
	users := []string{}
	for i := 1; i <= 100; i++ {
		users = append(users, strconv.Itoa(i))
	}

	start := time.Now()

	results, err := AsyncHTTP(users)
	if err != nil {
		fmt.Println(err)
		return
	}

	for _, result := range results {
		fmt.Println(result)
	}

	fmt.Println("finished in ", time.Since(start))
}

英文:

For bounded parallelism / rate limiting, we can take a look an example at https://blog.golang.org/pipelines#TOC_9.

Basically the steps are:

  1. Stream inputs / params / args used to call the API, to an input channel.
  2. Run N worker goroutines, each consuming the same (shared) input channel. Get the args from input channel, call the API, send the result into a result channel.
  3. Consume the result channel, return early if there's error.

sync.WaitGroup is used to wait for all worker goroutines to complete (after the input channel is exhausted).

Below is code example of it (you can run it right away, try changing NUM_PARALLEL to different number of parallelism). Change BASE_URL to your base url.

package main

import (
	&quot;fmt&quot;
	&quot;io&quot;
	&quot;net/http&quot;
	&quot;strconv&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

// placeholder url. Change it to your base url.
const BASE_URL = &quot;https://jsonplaceholder.typicode.com/posts/&quot;

// number of parallelism
const NUM_PARALLEL = 20

// Stream inputs to input channel
func streamInputs(done &lt;-chan struct{}, inputs []string) &lt;-chan string {
	inputCh := make(chan string)
	go func() {
		defer close(inputCh)
		for _, input := range inputs {
			select {
			case inputCh &lt;- input:
			case &lt;-done:
				// in case done is closed prematurely (because error midway),
				// finish the loop (closing input channel)
				break
			}
		}
	}()
	return inputCh
}

// Normal function for HTTP call, no knowledge of goroutine/channels
func sendUser(user string) (string, error) {
	url := BASE_URL + user
	resp, err := http.Get(url)
	if err != nil {
		return &quot;&quot;, err
	}
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return &quot;&quot;, err
	}

	bodyStr := string(body)
	return bodyStr, nil
}

// Wrapper for sendUser return value, used as result channel type
type result struct {
	bodyStr string
	err     error
}

func AsyncHTTP(users []string) ([]string, error) {
	done := make(chan struct{})
	defer close(done)

	inputCh := streamInputs(done, users)

	var wg sync.WaitGroup
	// bulk add goroutine counter at the start
	wg.Add(NUM_PARALLEL)

	resultCh := make(chan result)

	for i := 0; i &lt; NUM_PARALLEL; i++ {
		// spawn N worker goroutines, each is consuming a shared input channel.
		go func() {
			for input := range inputCh {
				bodyStr, err := sendUser(input)
				resultCh &lt;- result{bodyStr, err}
			}
			wg.Done()
		}()
	}

	// Wait all worker goroutines to finish. Happens if there&#39;s no error (no early return)
	go func() {
		wg.Wait()
		close(resultCh)
	}()

	results := []string{}
	for result := range resultCh {
		if result.err != nil {
			// return early. done channel is closed, thus input channel is also closed.
			// all worker goroutines stop working (because input channel is closed)
			return nil, result.err
		}
		results = append(results, result.bodyStr)
	}

	return results, nil
}

func main() {
	// populate users param
	users := []string{}
	for i := 1; i &lt;= 100; i++ {
		users = append(users, strconv.Itoa(i))
	}

	start := time.Now()

	results, err := AsyncHTTP(users)
	if err != nil {
		fmt.Println(err)
		return
	}

	for _, result := range results {
		fmt.Println(result)
	}

	fmt.Println(&quot;finished in &quot;, time.Since(start))
}

huangapple
  • 本文由 发表于 2017年7月27日 05:38:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/45337881.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定