将值发送到通道并在准备好时读取输出。

huangapple go评论83阅读模式
英文:

Sending value into Channel and Reading output when Ready

问题

我正在尝试使用两个通道在Golang中构建一个接收器和发送器模式。我正在执行一个任务(API调用),并接收一个Response结构体。我的目标是,当接收到一个响应时,我想将其发送到另一个通道(writeChan)进行额外处理。

我想要不断地读取/监听接收器通道(respChan),并处理通过的任何内容(比如一个Response)。然后,我想要启动一个线程,在另一个goroutine中对该响应进行进一步操作。

我想了解如何将这个模式链接在一起,以允许数据从API调用流动,并同时写入它(每个Response将被写入一个单独的文件目标,Write()函数处理)。

基本上,我当前的模式如下:

package main

import (
    "fmt"
    "sync"
)

func main() {

    var wg sync.WaitGroup
    respChan := make(chan Response) // Response是一个包含API响应元数据的结构体
    defer close(respChan)
    // requests只是一个要发送到API的请求切片
    // 这部分工作得很好
    for _, req := range requests {
        wg.Add(1)
        go func(r Request) {
            defer wg.Done()
            resp, _ := r.Get() // 进行API调用并接收一个Response结构体
            respChan <- resp // 将响应放入我们的通道中
        }(req)
    }

    // 现在,我想在可用时提取响应并将其发送到另一个函数进行一些处理。我不确定如何正确处理这个。
    writeChan := make(chan string)
    defer close(writeChan)
    select {
        case resp := <-respChan: // 从响应通道接收
            go func(response Response) {
                signal, _ := Write(response) // 将响应写入文件的单独函数。在这个上下文中不重要。
                writeChan <- signal // 将信号数据放入通道中,它是一个文件路径的字符串,表示文件写入的位置(将用于后续处理)

            }(resp)
        case <-time.After(15 *time.Second):
            fmt.Println("15秒内没有收到任何内容...")

    }
    wg.Wait()
}

以上是你提供的代码的翻译。

英文:

I am trying to construct a receiver and sender pattern using two channels in Golang. I am doing a task (API call), and receiving back a Response struct. My goal is that when a response is received I'd like to send it to another channel (writeChan) for additional processing.

I'd like to continuously read/listen on that receiver channel (respChan) and process anything that comes through (such as a Response). Then I'd like to spin up a thread to go and do a further operation with that Response in another goroutine.

I'd like to understand how I can chain together this pattern to allow data to flow from the API calls and concurrently write it (each Response will be written to a separate file destination which the Write() func handles.

Essentially my current pattern is the following:

package main

import (
    &quot;fmt&quot;
    &quot;sync&quot;
)

func main() {

    var wg sync.WaitGroup
    respChan := make(chan Response) // Response is a struct that contains API response metadata
    defer close(respChan)
    // requests is just a slice of requests to be made to an API
    // This part is working well
    for _, req := range requests {
        wg.Add(1)
        go func(r Request) {
            defer wg.Done()
            resp, _ := r.Get() // Make the API call and receive back a Response struct
            respChan &lt;- resp // Put the response into our channel
        }(req)
    }

    // Now, I want to extract the responses as they become available and send them to another function to do some processing. This I am unsure of how to handle properly
    writeChan := make(chan string)
    defer close(writeChan)
    select {
        case resp := &lt;-respChan: // receive from response channel
            go func(response Response) {
                signal, _ := Write(response) // Separate func to write the response to a file. Not important here in this context.
                writeChan &lt;- signal // Put the signal data into the channel which is a string file path of where the file was written (will be used for a later process)

            }(resp)
        case &lt;-time.After(15 *time.Second):
            fmt.Println(&quot;15 seconds have passed without receiving anything...&quot;)

    }
    wg.Wait()
}

答案1

得分: 2

让我与您分享一个您可以受益的工作示例。首先,我将呈现代码,然后逐步介绍所有相关部分。

首先,我定义了两个在示例中将使用的结构体:RequestResponse。在前者中,我添加了一个DelayInSeconds字段来模拟一些繁重的负载和耗时操作。然后,我定义了包含所有请求的requests变量。

写入部分:
在这里,我遍历requests变量。对于每个请求,我将向目标URL发出一个HTTP请求。time.Sleep模拟了繁重的负载。然后,我将响应写入未缓冲的respChan通道。

读取部分:
在这里,主要的变化是将select语句包装在一个for循环中。通过这样做,我们将确保迭代正确的次数(基于requests变量的长度)。

最后的说明:
首先,请记住,这段代码过于简化,只是为了展示相关部分。因此,缺少了很多错误处理,并且一些内联函数可以提取为命名函数。您不需要使用sync.WaitGroup来实现所需的功能,使用通道就足够了。
请随意调整延迟并检查写入的文件!

如果这对您有帮助,请告诉我!

编辑:
根据您的要求,我将为您提供一个更准确的解决方案。新的读取部分将类似于以下内容:

count := 0
for {
	// 这个检查是为了退出循环,而不是无限等待
	// 根据您的需求,可以删除它
	if count == 3 {
		fmt.Println("所有响应已到达...")
		return
	}
	res := <-respChan
	count++
	go func(r Response) {
		f, err := os.Create(fmt.Sprintf("%v.txt", strings.Replace(r.Url, "https://", "", 1)))
		if err != nil {
			panic(err)
		}
		defer f.Close()
		f.Write([]byte(fmt.Sprintf("%q OK with %d\n", r.Url, r.StatusCode)))
		writeChan <- struct{}{}
	}(res)
}

在这里,执行在for循环中无限等待。无论每个请求需要多长时间完成,只要它到达,就会立即获取。我在for循环的顶部放置了一个if语句,在处理我们需要的请求后退出。但是,您可以避免它,让代码运行直到收到取消信号(这取决于您)。

如果这样更符合您的要求,请告诉我,谢谢!

英文:

Let me share with you a working example that you can benefit from. First, I'm gonna present the code, then, I'm gonna walk you through all the relevant sections.

package main

import (
	&quot;fmt&quot;
	&quot;net/http&quot;
	&quot;os&quot;
	&quot;strings&quot;
	&quot;time&quot;
)

type Request struct {
	Url            string
	DelayInSeconds time.Duration
}

type Response struct {
	Url        string
	StatusCode int
}

func main() {
	requests := []Request{
		{&quot;https://www.google.com&quot;, 0},
		{&quot;https://stackoverflow.com&quot;, 1},
		{&quot;https://www.wikipedia.com&quot;, 4},
	}

	respChan := make(chan Response)
	defer close(respChan)

	for _, req := range requests {
		go func(r Request) {
			fmt.Printf(&quot;%q - %v\n&quot;, r.Url, strings.Repeat(&quot;#&quot;, 30))
			// simulate heavy work
			time.Sleep(time.Second * r.DelayInSeconds)
			resp, _ := http.Get(r.Url)
			res := Response{r.Url, resp.StatusCode}
			fmt.Println(time.Now())
			respChan &lt;- res
		}(req)
	}

	writeChan := make(chan struct{})
	defer close(writeChan)

	for i := 0; i &lt; len(requests); i++ {
		select {
		case res := &lt;-respChan:
			go func(r Response) {
				f, err := os.Create(fmt.Sprintf(&quot;%v.txt&quot;, strings.Replace(r.Url, &quot;https://&quot;, &quot;&quot;, 1)))
				if err != nil {
					panic(err)
				}
				defer f.Close()
				f.Write([]byte(fmt.Sprintf(&quot;%q OK with %d\n&quot;, r.Url, r.StatusCode)))
				writeChan &lt;- struct{}{}
			}(res)
		case &lt;-time.After(time.Second * 2):
			fmt.Println(&quot;Timeout&quot;)
		}
	}
}

Set up

First, I've defined the two structs that will be used in the example: Request and Response. In the former, I put a DelayInSeconds to mock some heavy loads and time-consuming operations. Then, I defined the requests variable that contains all the requests that have to be done.

The writing part

Here, I range over the requests variable. For each request, I'm gonna issue an HTTP request to the target URL. The time.Sleep emulate the heavy load. Then, I write the response to the respChan channel which is unbuffered.

The reading part

Here, the major change is to wrap the select construct into a for loop. Thanks to this, we'll make sure to iterate the right times (based on the length of the requests variable).

Final notes

First of all, bear in mind that the code is oversimplified just to show off the relevant parts. Due to this, a lot of error handling is missing and some inline functions could be extrapolated into named functions. You don't need to use sync.WaitGroup to achieve what you need, the usage of channels will be enough.
Feel free to play with delays and check which files are written!

Let me know if this helps you!

Edit

As requested, I'm gonna provide you with a more accurate solution based on your needs. The new reading part will be something like the following:

count := 0
for {
	// this check is need to exit the for loop and not wait indefinitely
	// it can be removed based on your needs
	if count == 3 {
		fmt.Println(&quot;all responses arrived...&quot;)
		return
	}
	res := &lt;-respChan
	count++
	go func(r Response) {
		f, err := os.Create(fmt.Sprintf(&quot;%v.txt&quot;, strings.Replace(r.Url, &quot;https://&quot;, &quot;&quot;, 1)))
		if err != nil {
			panic(err)
		}
		defer f.Close()
		f.Write([]byte(fmt.Sprintf(&quot;%q OK with %d\n&quot;, r.Url, r.StatusCode)))
		writeChan &lt;- struct{}{}
	}(res)
}

Here, the execution is waiting indefinitely within the for loop. No matter how long each request takes to complete, it will be fetched as soon as it arrives. I put, at the top of the for loop, an if to exit after it processed the requests that we need. However, you can avoid it and let the code run till a cancellation signal comes in (it's up to you).

Let me know if this better meets your requirements, thanks!

huangapple
  • 本文由 发表于 2023年2月3日 06:08:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/75329453.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定