将csv.NewWriter()传递给Golang中的另一个函数以异步写入文件

huangapple go评论73阅读模式
英文:

Passing a csv.NewWriter() to another func in Golang to Write to file Asynchronously

问题

我正在进行API调用(可能在一个作业中有数千个调用),当它们返回并完成时,我希望能够将它们写入一个共享文件(比如CSV文件),而不是等待所有调用完成后再进行写入。

如何以一种有效地写入到由多个线程共享的单个文件的方式共享一个csv.Writer()?这可能是一个比较困难的任务,但我想知道是否有一种方法可以实现。

package main
import (
    "encoding/csv"
    "os"
)

type Row struct {
    Field1 string
    Field2 string
}

func main () {
    file, _ := os.Create("file.csv")
    w := csv.NewWriter(file)

    // 一些操作来创建一个包含要写入的行的Row结构体切片
    var rowsToWrite []Row
    // 现在让我们遍历并写入文件
    // 理想情况下,我想在goroutine中执行此操作,但对于线程安全的写入还不太确定
    for _, r := range rowsToWrite {
        go func(row Row, writer *csv.Writer) {
            err := writeToFile(row, writer)
            if err != nil {
                // 处理错误
            }
        }(r, w)
    }

}

func writeToFile(row Row, writer *csv.Writer) error {

    // 使用共享的writer来维护我在文件中的位置,以便我可以追加到CSV中
    if err := writer.Write([]string{row.Field1, row.Field2}); err != nil {
        return err
    }
    writer.Flush() // 刷新缓冲区,确保数据写入文件
    return nil

}

请注意,我对代码进行了一些修改:

  • writeToFile函数中,将参数writer的类型更改为*csv.Writer,以便在函数中使用它。
  • writeToFile函数中,使用writer.Write([]string{row.Field1, row.Field2})将行写入CSV文件。
  • writeToFile函数中,添加了writer.Flush()语句,以确保数据被写入文件。
英文:

I am making API calls (potentially thousands in a single job) and as they return and complete, I'd like to be able to write them to a single shared file (say CSV for simplicity) instead of waiting for all of them to complete before writing.

How could I share a single csv.Writer() in a way that effectively writes to a single file shared by many threads. This may be too daunting of a task, but I was curious if there was a way to go about it.

package main
import (
    "encoding/csv"
    "os"
)

type Row struct {
    Field1 string
    Field2 string
}

func main () {
    file, _ := os.Create("file.csv")
    w := csv.NewWriter(file)

    // Some operations to create a slice of Row structs that will contain the rows 
    // To write
    var rowsToWrite []Row
    // Now lets iterate over and write to file
    // Ideally, I'd like to do this in a goroutine but not entirely sure about thread safe writes
    for _, r := range rowsToWrite {
        go func(row, writer) {
            err := writeToFile(row, writer)
            if err != nil {
            // Handle error
            }
        }(r, w)
    }

}

func writeToFile(row Row, writer ???) error {

    // Use the shared writer to maintain where I am at in the file so I can append to the CSV
    if err := w.Write(row); err != nil {
        return err
    }
    return nil

}


答案1

得分: 1

我个人不会在代码的两个不同点同时打开同一个文件进行写操作。根据操作系统对缓冲写入的处理方式,可能会出现一些有趣的情况。

根据你描述的目标,可以尝试以下方法(这只是我脑海中的想法,没有经过严格测试):

  1. 创建一个用于排队文本块的通道(我假设是文本块)- make(chan []byte, depth) - depth 可以根据你运行的一些测试进行调整。
  2. 创建一个 goroutine 打开文件句柄进行写入,然后从排队通道读取数据,并将通道中的数据写入文件。
  3. 你可以创建 n 个 goroutine 来向排队通道写入数据,只要不超过通道容量(不超过你的写入能力),你可能就不需要担心锁的问题。

如果你确实想使用锁,那么你需要在负责入队的 goroutine 之间共享一个 sync.Mutex

根据需要进行调整。

英文:

I would (personally) not have the same file open for writing at two separate points in the code. Depending on how the OS handles buffered writes, etc., you can end up with "interesting" things happening.

Given how you've described your goals, one might do something like (this is off the top of my head and not rigorously tested):

  1. Create a channel to queue blocks of text (I assume) to be written - make(chan []byte, depth) - depth could be tuneable based on some tests you'd run, presumably.
  2. Have a goroutine open a filehandle for writing on your file, then read from that queueing channel, writing whatever it gets from the channel to that file
  3. you could then have n goroutines writing to the queueing channel, and as long as you don't exceed the channel capacity (outrun your ability to write), you might never need to worry about locks.

If you did want to use locks, then you'd need a sync.Mutex shared between the goroutines responsible for enqueueing.

Season to taste, obviously.

答案2

得分: 1

对于这个问题,我原本认为你可以在goroutine中使用csv.Writer的Write()方法,但是当缓冲区刷新到磁盘时,正在写入缓冲区时会出现问题...不太确定。

无论如何,回到你最初的问题...

仍然使用相同的设置从https://jsonplaceholder.typicode.com下载Todo对象,作为示例:

type Todo struct {
	UserID    int    `json:"userId"`
	ID        int    `json:"id"`
	Title     string `json:"title"`
	Completed bool   `json:"completed"`
}

// toRecord将Todo结构转换为[]string,用于写入CSV。
func (t Todo) toRecord() []string {
	userID := strconv.Itoa(t.UserID)
	id := strconv.Itoa(t.ID)
	completed := strconv.FormatBool(t.Completed)

	return []string{userID, id, t.Title, completed}
}

// getTodo获取端点并将响应JSON解组为todo。
func getTodo(endpoint string) (todo Todo) {
	resp, err := http.Get(endpoint)
	if err != nil {
		log.Println("error:", err)
	}
	defer resp.Body.Close()
	json.NewDecoder(resp.Body).Decode(&todo)
	return
}

以下内容:

  • 将启动一个“父”goroutine来开始填充“todos”通道:
    • 在该例程内,将为每个HTTP请求启动goroutine,并将响应的Todo发送到“todos”
    • 父例程将等待所有请求例程完成
    • 当它们完成时,父例程将关闭“todos”通道
  • 与此同时,主函数已经继续并正在遍历“todos”,逐个选择一个Todo并将其写入CSV。
  • 当最初的“父”goroutine最终关闭“todos”时,for循环将中断,写入器执行最后的Flush()操作,程序将完成。
func main() {
	todos := make(chan Todo)

	go func() {
		const nAPICalls = 200
		var wg sync.WaitGroup

		wg.Add(nAPICalls)
		for i := 0; i < nAPICalls; i++ {
			s := fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i+1)

			go func(x string) {
				todos <- getTodo(x)
				wg.Done()
			}(s)
		}
		wg.Wait()

		close(todos)
	}()

	w := csv.NewWriter(os.Stdout)
	w.Write([]string{"UserID", "ID", "Title", "Completed"})
	for todo := range todos {
		w.Write(todo.toRecord())
	}
	w.Flush()
}
英文:

Lots of back and forth on this one for me 🙂

I originally thought you could use the Write() method on a csv.Writer in a goroutine, but there are issues when the buffer flushes to disk as the buffer is being written to... not exactly sure.

Anyways, to get back to what you were originally asking for...

Still using the same setup to download Todo objects from <https://jsonplaceholder.typicode.com>, as an example:

type Todo struct {
	UserID    int    `json:&quot;userId&quot;`
	ID        int    `json:&quot;id&quot;`
	Title     string `json:&quot;title&quot;`
	Completed bool   `json:&quot;completed&quot;`
}

// toRecord converts Todo struct to []string, for writing to CSV.
func (t Todo) toRecord() []string {
	userID := strconv.Itoa(t.UserID)
	id := strconv.Itoa(t.ID)
	completed := strconv.FormatBool(t.Completed)

	return []string{userID, id, t.Title, completed}
}

// getTodo gets endpoint and unmarshalls the response JSON into todo.
func getTodo(endpoint string) (todo Todo) {
	resp, err := http.Get(endpoint)
	if err != nil {
		log.Println(&quot;error:&quot;, err)
	}
	defer resp.Body.Close()
	json.NewDecoder(resp.Body).Decode(&amp;todo)
	return
}

The following:

  • Will start one "parent" goroutine to start filling the todos channel:
    • inside that routine, goroutines will be started for each HTTP request and will send the response Todo on todos
    • the parent will wait till all the request routines are done
    • when they're done, the parent will close the todos channel
  • Meanwhile, main has moved on and is ranging over todos, picking a Todo off one-at-a-time and writing it to the CSV.
  • When the original, "parent" goroutine finally closes todos, the for-loop will break, the writer does a final Flush(), and the program will complete.
func main() {
	todos := make(chan Todo)

	go func() {
		const nAPICalls = 200
		var wg sync.WaitGroup

		wg.Add(nAPICalls)
		for i := 0; i &lt; nAPICalls; i++ {
			s := fmt.Sprintf(&quot;https://jsonplaceholder.typicode.com/todos/%d&quot;, i+1)

			go func(x string) {
				todos &lt;- getTodo(x)
				wg.Done()
			}(s)
		}
		wg.Wait()

		close(todos)
	}()

	w := csv.NewWriter(os.Stdout)
	w.Write([]string{&quot;UserID&quot;, &quot;ID&quot;, &quot;Title&quot;, &quot;Completed&quot;})
	for todo := range todos {
		w.Write(todo.toRecord())
	}
	w.Flush()
}

huangapple
  • 本文由 发表于 2023年2月1日 21:00:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/75310555.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定