How to write to two different csv files concurrently in Go?

huangapple go评论75阅读模式
英文:

How to write to two different csv files concurrently in Go?

问题

我已经创建了一个最小可重现的示例。基本上,我有两个不同的服务,在两个不同的goroutine中运行record方法。它们分别在不同的时间创建并写入不同的CSV文件。当我运行这段代码时,CSV文件被创建了,但是没有数据。在运行过程中没有出现任何错误。我读到应该使用互斥锁,我已经实现了,但是也没有起作用。在这种情况下,我应该怎么做?

以下是修复问题的建议:

  1. 问题可能出在writer.Flush()方法没有被调用,导致数据没有被写入文件。在record方法的case <-a.QuitChan:case <-b.QuitChan:分支中,添加writer.Flush()语句,确保数据被刷新到文件中。

  2. 为了避免并发访问文件的问题,可以使用互斥锁来保护对文件的写操作。在AB结构体中分别添加一个互斥锁字段,并在写入文件之前使用Lock方法获取锁,在写入完成后使用Unlock方法释放锁。

修改后的代码如下:

package main

import (
	"encoding/csv"
	"fmt"
	"os"
	"strconv"
	"sync"
	"sync/atomic"
	"time"
)

var (
	csvOnePath = "test.csv"
	csvTwoPath = "test_two.csv"
)

type A struct {
	Running   int32 // used atomically
	QuitChan  chan struct{}
	mutex     sync.Mutex
}

func NewA() *A {
	return &A{
		QuitChan: make(chan struct{}),
	}
}

func (a *A) Start() error {
	if ok := atomic.CompareAndSwapInt32(&a.Running, 0, 1); !ok {
		return fmt.Errorf("Cannot start service A: service already started")
	}
	go a.record()
	return nil
}

func (a *A) Stop() error {
	if ok := atomic.CompareAndSwapInt32(&a.Running, 1, 0); !ok {
		return fmt.Errorf("Cannot stop service A: service already stopped")
	}
	close(a.QuitChan)
	return nil
}

func (a *A) record() {
	a.mutex.Lock()
	defer a.mutex.Unlock()

	file_one, err := os.Create(csvOnePath)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer file_one.Close()

	writer := csv.NewWriter(file_one)
	defer writer.Flush()

	header := []string{"this", "is", "a", "test"}
	err = writer.Write(header)
	if err != nil {
		fmt.Println(err)
		return
	}

	ticker := time.NewTicker(10 * time.Second)
	for {
		select {
		case t := <-ticker.C:
			err = writer.Write([]string{fmt.Sprintf("%2d:%2d:%2d", t.Hour(), t.Minute(), t.Second())})
			if err != nil {
				fmt.Println(err)
				a.QuitChan <- struct{}{}
			}
		case <-a.QuitChan:
			ticker.Stop()
			fmt.Println("Stopped recording.")
			return
		}
	}
}

type B struct {
	Running   int32 // used atomically
	QuitChan  chan struct{}
	mutex     sync.Mutex
}

func NewB() *B {
	return &B{
		QuitChan: make(chan struct{}),
	}
}

func (b *B) Start() error {
	if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
		return fmt.Errorf("Cannot start service B: service already started")
	}
	go b.record()
	return nil
}

func (b *B) Stop() error {
	if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
		return fmt.Errorf("Cannot stop service B: service already stopped")
	}
	close(b.QuitChan)
	return nil
}

func (b *B) record() {
	b.mutex.Lock()
	defer b.mutex.Unlock()

	file_two, err := os.Create(csvTwoPath)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer file_two.Close()

	writer := csv.NewWriter(file_two)
	defer writer.Flush()

	header := []string{"this", "is", "a", "second", "test"}
	err = writer.Write(header)
	if err != nil {
		fmt.Println(err)
		return
	}

	ticker := time.NewTicker(1 * time.Second)
	ticks := 0
	for {
		select {
		case <-ticker.C:
			if ticks%15 == 0 {
				err = writeMsgToReport(writer, "YEET "+strconv.Itoa(ticks))
				if err != nil {
					fmt.Println(err)
					b.QuitChan <- struct{}{}
				}
			}
			ticks++
		case <-b.QuitChan:
			ticker.Stop()
			fmt.Println("Stopped recording.")
			return
		}
	}
}

func writeMsgToReport(report *csv.Writer, msg string) error {
	ct := time.Now()
	timestamp := fmt.Sprintf("%2d:%2d:%2d", ct.Hour(), ct.Minute(), ct.Second())
	return report.Write([]string{timestamp, msg})
}

func main() {
	serviceA := NewA()
	err := serviceA.Start()
	if err != nil {
		fmt.Println(err)
		return
	}
	defer serviceA.Stop()

	serviceB := NewB()
	err = serviceB.Start()
	if err != nil {
		fmt.Println(err)
		return
	}
	defer serviceB.Stop()

	time.Sleep(600 * time.Second)
}

请尝试使用上述修改后的代码,并确保在写入文件之后调用writer.Flush()方法。这样应该能够解决你的问题。

英文:

I've created a minimal reproduceable example

package main
import (
&quot;encoding/csv&quot;
&quot;fmt&quot;
&quot;os&quot;
&quot;strconv&quot;
&quot;sync/atomic&quot;
&quot;time&quot;
)
var (
csvOnePath = &quot;test.csv&quot;
csvTwoPath = &quot;test_two.csv&quot;
)
type A struct {
Running 	int32 // used atomically
QuitChan 	chan struct{}
}
func NewA() *A {
return &amp;A{
QuitChan: make(chan struct{}),
}
}
func (a *A) Start() error {
if ok := atomic.CompareAndSwapInt32(&amp;a.Running, 0, 1); !ok {
return fmt.Errorf(&quot;Cannot start service A: service already started&quot;)
}
go a.record()
return nil
}
func (a *A) Stop() error {
if ok := atomic.CompareAndSwapInt32(&amp;a.Running, 1, 0); !ok {
return fmt.Errorf(&quot;Cannot stop service A: service already stopped&quot;)
}
close(a.QuitChan)
return nil
}
func (a *A) record() {
//file_one, err := os.OpenFile(csvOnePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
file_one, err := os.Create(csvOnePath)
if err != nil {
fmt.Println(err)
return
}
writer := csv.NewWriter(file_one)
// writer, closeFileFunc, err := NewCsvWriter(csvOnePath)
if err != nil {
fmt.Println(err)
return
}
header := []string{&quot;this&quot;, &quot;is&quot;, &quot;a&quot;, &quot;test&quot;}
err = writer.Write(header)
if err != nil {
fmt.Println(err)
return
}
ticker := time.NewTicker(10*time.Second)
for {
select {
case t := &lt;-ticker.C:
err = writer.Write([]string{fmt.Sprintf(&quot;%2d:%2d:%2d&quot;, t.Hour(), t.Minute(), t.Second())})
if err != nil {
fmt.Println(err)
a.QuitChan &lt;- struct{}{}
}
case &lt;-a.QuitChan:
ticker.Stop()
writer.Flush()
file_one.Close()
fmt.Println(&quot;Stopped recording.&quot;)
break
}
}
}
type B struct {
Running 	int32 // used atomically
QuitChan	chan struct{}
}
func NewB() *B {
return &amp;B{
QuitChan: make(chan struct{}),
}
}
func (b *B) Start() error {
if ok := atomic.CompareAndSwapInt32(&amp;b.Running, 0, 1); !ok {
return fmt.Errorf(&quot;Cannot start service B: service already started&quot;)
}
go b.record()
return nil
}
func (b *B) Stop() error {
if ok := atomic.CompareAndSwapInt32(&amp;b.Running, 1, 0); !ok {
return fmt.Errorf(&quot;Cannot stop service B: service already stopped&quot;)
}
close(b.QuitChan)
return nil
}
func writeMsgToReport(report *csv.Writer, msg string) error {
ct := time.Now()
timestamp := fmt.Sprintf(&quot;%2d:%2d:%2d&quot;, ct.Hour(), ct.Minute(), ct.Second())
return report.Write([]string{timestamp, msg})
}
func (b *B) record() {
//file_two, err := os.OpenFile(csvTwoPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
file_two, err := os.Create(csvTwoPath)
if err != nil {
fmt.Println(err)
return
}
writer := csv.NewWriter(file_two)
//writer, closeFileFunc, err := NewCsvWriter(csvTwoPath)
if err != nil {
fmt.Println(err)
return
}
header := []string{&quot;this&quot;, &quot;is&quot;, &quot;a&quot;, &quot;second&quot;, &quot;test&quot;}
err = writer.Write(header)
if err != nil {
fmt.Println(err)
return
}
ticker := time.NewTicker(1*time.Second)
ticks := 0
for {
select {
case &lt;-ticker.C:
if ticks % 15 == 0 {
err = writeMsgToReport(writer, &quot;YEET &quot;+strconv.Itoa(ticks))
if err != nil {
fmt.Println(err)
b.QuitChan &lt;- struct{}{}
}
}
ticks++
case &lt;-b.QuitChan:
ticker.Stop()
writer.Flush()
file_two.Close()
fmt.Println(&quot;Stopped recording.&quot;)
break
}
}
}
func main() {
serviceA := NewA()
err := serviceA.Start()
if err != nil {
fmt.Println(err)
return
}
defer serviceA.Stop()
serviceB := NewB()
err = serviceB.Start()
if err != nil {
fmt.Println(err)
return
}
defer serviceB.Stop()
time.Sleep(600*time.Second)
}

Essentially, I have two different services that run a record method in two different goroutines. They each create and write to a different csv file at different times. When I run this, the csv files are created but never have data. No errors are ever raised while running this. I read that I should use a mutex which I've implemented but this hasn't worked either. What should I do here?

答案1

得分: 1

根据评论中的详细说明,当main()函数执行完毕时,程序将退出;规范中指出:“它不会等待其他(非主)goroutine完成。”。

这意味着你的goroutine很可能不会处理关闭文件的代码,这意味着缓冲数据可能不会被写入。

我在playground上创建了一个简化版本的应用程序来演示这一点。

有多种方法可以解决这个问题,但最简单的方法可能是添加一个WaitGroup,这样你的应用程序可以在终止之前等待goroutine退出。

英文:

As detailed in the comments the program will exit when main() completes; the spec states "It does not wait for other (non-main) goroutines to complete.".

This means that it is unlikely that your go routines will process the code that closes the files meaning that buffered data may not be written.

I created a simplified version of your application in the playground to demonstrate this.

There are a number of ways to fix this but the simplest is probably to add a WaitGroup so your application can wait for the go routines to exit before terminating.

huangapple
  • 本文由 发表于 2021年11月2日 02:42:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/69801368.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定