How to use a goroutine pool

huangapple go评论100阅读模式
英文:

How to use a goroutine pool

问题

我想使用Go语言从Yahoo Finance下载股票价格电子表格。我将为每只股票创建一个独立的goroutine来发起HTTP请求。我有一个大约包含2500个股票代码的列表,但是我希望每次只发起250个请求。在Java中,我会创建一个线程池,并在需要时重用线程。我试图找到类似的东西,一个goroutine池,但是没有找到相关资源。如果有人能告诉我如何完成这个任务,或者指导我相关的资源,我将不胜感激。谢谢!

英文:

I want to use Go for downloading stock price spreadsheets from Yahoo finance. I'll be making an http request for every stock in its own goroutine. I have a list of around 2500 symbols, but instead of making 2500 requests in parallel, I'd prefer making 250 at a time. In Java I'd create a thread pool and reuse threads as and when they get free. I was trying to find something similar, a goroutine pool, if you will, but was unable to find any resources. I'd appreciate if someone can tell me how to accomplish the task at hand or point me to resources for the same. Thanks!

答案1

得分: 74

最简单的方法,我想,是创建250个goroutine,并向它们传递一个通道,你可以使用该通道将链接从主goroutine传递给子goroutine,监听该通道。

当所有链接都传递给goroutine后,关闭通道,所有goroutine就会完成它们的工作。

为了确保在子goroutine处理数据之前,主goroutine不会结束,你可以使用sync.WaitGroup

下面是一些代码示例(不是最终的工作版本,但展示了上述的要点):

func worker(linkChan chan string, wg *sync.WaitGroup) {
   // 当goroutine完成时,减少等待组的内部计数器
   defer wg.Done()

   for url := range linkChan {
     // 在这里分析值并执行任务
   }
}

func main() {
    lCh := make(chan string)
    wg := new(sync.WaitGroup)
    
    // 将goroutine添加到工作组并运行它们
    for i := 0; i < 250; i++ {
        wg.Add(1)
        go worker(lCh, wg)
    }

    // 将所有链接分发给“空闲”的goroutine进行处理
    for _, link := range yourLinksSlice {
        lCh <- link
    }

    // 关闭通道(等待中的goroutine将不再继续)
    close(lCh)

    // 等待所有goroutine完成(否则它们会随着主例程的结束而死亡)
    wg.Wait()
}

希望对你有帮助!

英文:

The simplest way, I suppose, is to create 250 goroutines and pass them a channel which you can use to pass links from main goroutine to child ones, listening that channel.

When all links are passed to goroutines, you close a channel and all goroutines just finish their jobs.

To secure yourself from main goroutine get finished before children process data, you can use sync.WaitGroup.

Here is some code to illustrate (not a final working version but shows the point) that I told above:

func worker(linkChan chan string, wg *sync.WaitGroup) {
   // Decreasing internal counter for wait-group as soon as goroutine finishes
   defer wg.Done()

   for url := range linkChan {
     // Analyze value and do the job here
   }
}

func main() {
    lCh := make(chan string)
    wg := new(sync.WaitGroup)
    
    // Adding routines to workgroup and running then
    for i := 0; i &lt; 250; i++ {
        wg.Add(1)
        go worker(lCh, wg)
    }

    // Processing all links by spreading them to `free` goroutines
    for _, link := range yourLinksSlice {
        lCh &lt;- link
    }

    // Closing channel (waiting in goroutines won&#39;t continue any more)
    close(lCh)

    // Waiting for all goroutines to finish (otherwise they die as main routine dies)
    wg.Wait()
}

答案2

得分: 3

你可以从这个git仓库中使用Go语言的线程池实现库。

这里有一篇关于如何使用通道作为线程池的好博客。

博客中的代码片段:

    var (
 MaxWorker = os.Getenv("MAX_WORKERS")
 MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job代表要运行的作业
type Job struct {
    Payload Payload
}

// 一个带缓冲的通道,我们可以在上面发送工作请求。
var JobQueue chan Job

// Worker代表执行作业的工作者
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start方法启动工作者的运行循环,监听退出通道以便停止它
func (w Worker) Start() {
    go func() {
        for {
            // 将当前工作者注册到工作者队列中。
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // 我们收到了一个工作请求。
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // 我们收到了停止信号
                return
            }
        }
    }()
}

// Stop方法向工作者发送停止监听工作请求的信号。
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
} 
英文:

You can use the thread pool implementation library in Go from this git repo

Here is the nice blog about how to use the channels as thread pool

Snippet from the blog

    var (
 MaxWorker = os.Getenv(&quot;MAX_WORKERS&quot;)
 MaxQueue  = os.Getenv(&quot;MAX_QUEUE&quot;)
)

//Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool &lt;- w.JobChannel

            select {
            case job := &lt;-w.JobChannel:
                // we have received a work request.
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf(&quot;Error uploading to S3: %s&quot;, err.Error())
                }

            case &lt;-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit &lt;- true
    }()
} 

答案3

得分: 2

这个例子使用了两个通道,一个用于输入,另一个用于输出。工作线程可以根据需要进行扩展,每个goroutine都从输入队列中获取任务并将所有输出保存到输出通道中。欢迎提供更简单的方法。

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func worker(input chan string, output chan string) {
	defer wg.Done()
	// 消费者:从输入通道中处理项目,并将结果发送到输出通道
	for value := range input {
		output <- value + " processed"
	}
}

func main() {
	var jobs = []string{"one", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two"}
	input := make(chan string, len(jobs))
	output := make(chan string, len(jobs))
	workers := 250

	// 增加waitgroup计数器并创建goroutine
	for i := 0; i < workers; i++ {
		wg.Add(1)
		go worker(input, output)
	}

	// 生产者:将任务加载到输入通道中
	for _, job := range jobs {
		input <- job
	}

	// 关闭输入通道,因为不再发送任务到输入通道
	close(input)
	// 等待所有goroutine完成处理
	wg.Wait()
	// 关闭输出通道,因为所有工作线程都已完成处理
	close(output)

	// 从输出通道中读取结果
	for result := range output {
		fmt.Println(result)
	}

}
英文:

This example uses two chanels, one for the inputs and another for output. Workers can scale to whatever size and each goroutine works on the input queue and saves all output to the output channel. Feedback on easier methods are very welcome.

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

var wg sync.WaitGroup

func worker(input chan string, output chan string) {
	defer wg.Done()
    // Consumer: Process items from the input channel and send results to output channel
	for value := range input {
		output &lt;- value + &quot; processed&quot;
	}
}

func main() {
	var jobs = []string{&quot;one&quot;, &quot;two&quot;, &quot;three&quot;, &quot;four&quot;, &quot;two&quot;, &quot;three&quot;, &quot;four&quot;, &quot;two&quot;, &quot;three&quot;, &quot;four&quot;, &quot;two&quot;, &quot;three&quot;, &quot;four&quot;, &quot;two&quot;, &quot;three&quot;, &quot;four&quot;, &quot;two&quot;}
	input := make(chan string, len(jobs))
	output := make(chan string, len(jobs))
	workers := 250
    
    // Increment waitgroup counter and create go routines
	for i := 0; i &lt; workers; i++ {
		wg.Add(1)
		go worker(input, output)
	}

    // Producer: load up input channel with jobs
	for _, job := range jobs {
		input &lt;- job
	}

    // Close input channel since no more jobs are being sent to input channel
	close(input)
    // Wait for all goroutines to finish processing
	wg.Wait()
    // Close output channel since all workers have finished processing
	close(output)

    // Read from output channel
	for result := range output {
		fmt.Println(result)
	}

}

答案4

得分: 2

你可以查看这个链接

我们在Go语言中创建了一个线程池,并在我们的生产系统中使用它。

我参考了这里

它非常简单易用,还有一个Prometheus客户端,可以告诉你使用了多少个工作线程。

要初始化,只需创建一个调度器的实例:

dispatcher = workerpool.NewDispatcher(
	"DispatcherName",
	workerpool.SetMaxWorkers(10),
)

创建一个实现了IJob接口的对象(比如说job)。所以它应该实现Process方法:

// IJob : Interface for the Job to be processed
type IJob interface {
	Process() error
}

然后将作业发送给调度器:

dispatcher.JobQueue <- job // job对象

就是这样。

英文:

You can take a look at this

We have created a thread pool in go and have been using it for our production systems.

I had taken reference from here

Its pretty simple to use and also has a prometheus client that tells you how many workers are used.

To initialize just create an instance of dispatcher

dispatcher = workerpool.NewDispatcher(
	&quot;DispatcherName&quot;,
	workerpool.SetMaxWorkers(10),
)

Create an object (lets say job) that implements this interface. So it should implement the Process method

// IJob : Interface for the Job to be processed
type IJob interface {
    Process() error
}

Then just send the job to the dispatcher

dispatcher.JobQueue &lt;- job //object of job

This is it.

huangapple
  • 本文由 发表于 2013年8月16日 14:53:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/18267460.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定