英文:
Sending value into Channel and Reading output when Ready
问题
我正在尝试使用两个通道在Golang中构建一个接收器和发送器模式。我正在执行一个任务(API调用),并接收一个Response
结构体。我的目标是,当接收到一个响应时,我想将其发送到另一个通道(writeChan
)进行额外处理。
我想要不断地读取/监听接收器通道(respChan
),并处理通过的任何内容(比如一个Response
)。然后,我想要启动一个线程,在另一个goroutine中对该响应进行进一步操作。
我想了解如何将这个模式链接在一起,以允许数据从API调用流动,并同时写入它(每个Response将被写入一个单独的文件目标,Write()
函数处理)。
基本上,我当前的模式如下:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
respChan := make(chan Response) // Response是一个包含API响应元数据的结构体
defer close(respChan)
// requests只是一个要发送到API的请求切片
// 这部分工作得很好
for _, req := range requests {
wg.Add(1)
go func(r Request) {
defer wg.Done()
resp, _ := r.Get() // 进行API调用并接收一个Response结构体
respChan <- resp // 将响应放入我们的通道中
}(req)
}
// 现在,我想在可用时提取响应并将其发送到另一个函数进行一些处理。我不确定如何正确处理这个。
writeChan := make(chan string)
defer close(writeChan)
select {
case resp := <-respChan: // 从响应通道接收
go func(response Response) {
signal, _ := Write(response) // 将响应写入文件的单独函数。在这个上下文中不重要。
writeChan <- signal // 将信号数据放入通道中,它是一个文件路径的字符串,表示文件写入的位置(将用于后续处理)
}(resp)
case <-time.After(15 *time.Second):
fmt.Println("15秒内没有收到任何内容...")
}
wg.Wait()
}
以上是你提供的代码的翻译。
英文:
I am trying to construct a receiver and sender pattern using two channels in Golang. I am doing a task (API call), and receiving back a Response
struct. My goal is that when a response is received I'd like to send it to another channel (writeChan
) for additional processing.
I'd like to continuously read/listen on that receiver channel (respChan
) and process anything that comes through (such as a Response
). Then I'd like to spin up a thread to go and do a further operation with that Response in another goroutine.
I'd like to understand how I can chain together this pattern to allow data to flow from the API calls and concurrently write it (each Response will be written to a separate file destination which the Write()
func handles.
Essentially my current pattern is the following:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
respChan := make(chan Response) // Response is a struct that contains API response metadata
defer close(respChan)
// requests is just a slice of requests to be made to an API
// This part is working well
for _, req := range requests {
wg.Add(1)
go func(r Request) {
defer wg.Done()
resp, _ := r.Get() // Make the API call and receive back a Response struct
respChan <- resp // Put the response into our channel
}(req)
}
// Now, I want to extract the responses as they become available and send them to another function to do some processing. This I am unsure of how to handle properly
writeChan := make(chan string)
defer close(writeChan)
select {
case resp := <-respChan: // receive from response channel
go func(response Response) {
signal, _ := Write(response) // Separate func to write the response to a file. Not important here in this context.
writeChan <- signal // Put the signal data into the channel which is a string file path of where the file was written (will be used for a later process)
}(resp)
case <-time.After(15 *time.Second):
fmt.Println("15 seconds have passed without receiving anything...")
}
wg.Wait()
}
答案1
得分: 2
让我与您分享一个您可以受益的工作示例。首先,我将呈现代码,然后逐步介绍所有相关部分。
首先,我定义了两个在示例中将使用的结构体:Request
和Response
。在前者中,我添加了一个DelayInSeconds
字段来模拟一些繁重的负载和耗时操作。然后,我定义了包含所有请求的requests
变量。
写入部分:
在这里,我遍历requests
变量。对于每个请求,我将向目标URL发出一个HTTP请求。time.Sleep
模拟了繁重的负载。然后,我将响应写入未缓冲的respChan
通道。
读取部分:
在这里,主要的变化是将select
语句包装在一个for
循环中。通过这样做,我们将确保迭代正确的次数(基于requests
变量的长度)。
最后的说明:
首先,请记住,这段代码过于简化,只是为了展示相关部分。因此,缺少了很多错误处理,并且一些内联函数可以提取为命名函数。您不需要使用sync.WaitGroup
来实现所需的功能,使用通道就足够了。
请随意调整延迟并检查写入的文件!
如果这对您有帮助,请告诉我!
编辑:
根据您的要求,我将为您提供一个更准确的解决方案。新的读取部分将类似于以下内容:
count := 0
for {
// 这个检查是为了退出循环,而不是无限等待
// 根据您的需求,可以删除它
if count == 3 {
fmt.Println("所有响应已到达...")
return
}
res := <-respChan
count++
go func(r Response) {
f, err := os.Create(fmt.Sprintf("%v.txt", strings.Replace(r.Url, "https://", "", 1)))
if err != nil {
panic(err)
}
defer f.Close()
f.Write([]byte(fmt.Sprintf("%q OK with %d\n", r.Url, r.StatusCode)))
writeChan <- struct{}{}
}(res)
}
在这里,执行在for
循环中无限等待。无论每个请求需要多长时间完成,只要它到达,就会立即获取。我在for
循环的顶部放置了一个if
语句,在处理我们需要的请求后退出。但是,您可以避免它,让代码运行直到收到取消信号(这取决于您)。
如果这样更符合您的要求,请告诉我,谢谢!
英文:
Let me share with you a working example that you can benefit from. First, I'm gonna present the code, then, I'm gonna walk you through all the relevant sections.
package main
import (
"fmt"
"net/http"
"os"
"strings"
"time"
)
type Request struct {
Url string
DelayInSeconds time.Duration
}
type Response struct {
Url string
StatusCode int
}
func main() {
requests := []Request{
{"https://www.google.com", 0},
{"https://stackoverflow.com", 1},
{"https://www.wikipedia.com", 4},
}
respChan := make(chan Response)
defer close(respChan)
for _, req := range requests {
go func(r Request) {
fmt.Printf("%q - %v\n", r.Url, strings.Repeat("#", 30))
// simulate heavy work
time.Sleep(time.Second * r.DelayInSeconds)
resp, _ := http.Get(r.Url)
res := Response{r.Url, resp.StatusCode}
fmt.Println(time.Now())
respChan <- res
}(req)
}
writeChan := make(chan struct{})
defer close(writeChan)
for i := 0; i < len(requests); i++ {
select {
case res := <-respChan:
go func(r Response) {
f, err := os.Create(fmt.Sprintf("%v.txt", strings.Replace(r.Url, "https://", "", 1)))
if err != nil {
panic(err)
}
defer f.Close()
f.Write([]byte(fmt.Sprintf("%q OK with %d\n", r.Url, r.StatusCode)))
writeChan <- struct{}{}
}(res)
case <-time.After(time.Second * 2):
fmt.Println("Timeout")
}
}
}
Set up
First, I've defined the two structs that will be used in the example: Request
and Response
. In the former, I put a DelayInSeconds
to mock some heavy loads and time-consuming operations. Then, I defined the requests
variable that contains all the requests that have to be done.
The writing part
Here, I range over the requests
variable. For each request, I'm gonna issue an HTTP request to the target URL. The time.Sleep
emulate the heavy load. Then, I write the response to the respChan
channel which is unbuffered.
The reading part
Here, the major change is to wrap the select
construct into a for
loop. Thanks to this, we'll make sure to iterate the right times (based on the length of the requests
variable).
Final notes
First of all, bear in mind that the code is oversimplified just to show off the relevant parts. Due to this, a lot of error handling is missing and some inline functions could be extrapolated into named functions. You don't need to use sync.WaitGroup
to achieve what you need, the usage of channels will be enough.
Feel free to play with delays and check which files are written!
Let me know if this helps you!
Edit
As requested, I'm gonna provide you with a more accurate solution based on your needs. The new reading part will be something like the following:
count := 0
for {
// this check is need to exit the for loop and not wait indefinitely
// it can be removed based on your needs
if count == 3 {
fmt.Println("all responses arrived...")
return
}
res := <-respChan
count++
go func(r Response) {
f, err := os.Create(fmt.Sprintf("%v.txt", strings.Replace(r.Url, "https://", "", 1)))
if err != nil {
panic(err)
}
defer f.Close()
f.Write([]byte(fmt.Sprintf("%q OK with %d\n", r.Url, r.StatusCode)))
writeChan <- struct{}{}
}(res)
}
Here, the execution is waiting indefinitely within the for
loop. No matter how long each request takes to complete, it will be fetched as soon as it arrives. I put, at the top of the for
loop, an if
to exit after it processed the requests that we need. However, you can avoid it and let the code run till a cancellation signal comes in (it's up to you).
Let me know if this better meets your requirements, thanks!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论