How to write to two different csv files concurrently in Go?

huangapple go评论126阅读模式
英文:

How to write to two different csv files concurrently in Go?

问题

我已经创建了一个最小可重现的示例。基本上,我有两个不同的服务,在两个不同的goroutine中运行record方法。它们分别在不同的时间创建并写入不同的CSV文件。当我运行这段代码时,CSV文件被创建了,但是没有数据。在运行过程中没有出现任何错误。我读到应该使用互斥锁,我已经实现了,但是也没有起作用。在这种情况下,我应该怎么做?

以下是修复问题的建议:

  1. 问题可能出在writer.Flush()方法没有被调用,导致数据没有被写入文件。在record方法的case <-a.QuitChan:case <-b.QuitChan:分支中,添加writer.Flush()语句,确保数据被刷新到文件中。

  2. 为了避免并发访问文件的问题,可以使用互斥锁来保护对文件的写操作。在AB结构体中分别添加一个互斥锁字段,并在写入文件之前使用Lock方法获取锁,在写入完成后使用Unlock方法释放锁。

修改后的代码如下:

  1. package main
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "os"
  6. "strconv"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. var (
  12. csvOnePath = "test.csv"
  13. csvTwoPath = "test_two.csv"
  14. )
  15. type A struct {
  16. Running int32 // used atomically
  17. QuitChan chan struct{}
  18. mutex sync.Mutex
  19. }
  20. func NewA() *A {
  21. return &A{
  22. QuitChan: make(chan struct{}),
  23. }
  24. }
  25. func (a *A) Start() error {
  26. if ok := atomic.CompareAndSwapInt32(&a.Running, 0, 1); !ok {
  27. return fmt.Errorf("Cannot start service A: service already started")
  28. }
  29. go a.record()
  30. return nil
  31. }
  32. func (a *A) Stop() error {
  33. if ok := atomic.CompareAndSwapInt32(&a.Running, 1, 0); !ok {
  34. return fmt.Errorf("Cannot stop service A: service already stopped")
  35. }
  36. close(a.QuitChan)
  37. return nil
  38. }
  39. func (a *A) record() {
  40. a.mutex.Lock()
  41. defer a.mutex.Unlock()
  42. file_one, err := os.Create(csvOnePath)
  43. if err != nil {
  44. fmt.Println(err)
  45. return
  46. }
  47. defer file_one.Close()
  48. writer := csv.NewWriter(file_one)
  49. defer writer.Flush()
  50. header := []string{"this", "is", "a", "test"}
  51. err = writer.Write(header)
  52. if err != nil {
  53. fmt.Println(err)
  54. return
  55. }
  56. ticker := time.NewTicker(10 * time.Second)
  57. for {
  58. select {
  59. case t := <-ticker.C:
  60. err = writer.Write([]string{fmt.Sprintf("%2d:%2d:%2d", t.Hour(), t.Minute(), t.Second())})
  61. if err != nil {
  62. fmt.Println(err)
  63. a.QuitChan <- struct{}{}
  64. }
  65. case <-a.QuitChan:
  66. ticker.Stop()
  67. fmt.Println("Stopped recording.")
  68. return
  69. }
  70. }
  71. }
  72. type B struct {
  73. Running int32 // used atomically
  74. QuitChan chan struct{}
  75. mutex sync.Mutex
  76. }
  77. func NewB() *B {
  78. return &B{
  79. QuitChan: make(chan struct{}),
  80. }
  81. }
  82. func (b *B) Start() error {
  83. if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
  84. return fmt.Errorf("Cannot start service B: service already started")
  85. }
  86. go b.record()
  87. return nil
  88. }
  89. func (b *B) Stop() error {
  90. if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
  91. return fmt.Errorf("Cannot stop service B: service already stopped")
  92. }
  93. close(b.QuitChan)
  94. return nil
  95. }
  96. func (b *B) record() {
  97. b.mutex.Lock()
  98. defer b.mutex.Unlock()
  99. file_two, err := os.Create(csvTwoPath)
  100. if err != nil {
  101. fmt.Println(err)
  102. return
  103. }
  104. defer file_two.Close()
  105. writer := csv.NewWriter(file_two)
  106. defer writer.Flush()
  107. header := []string{"this", "is", "a", "second", "test"}
  108. err = writer.Write(header)
  109. if err != nil {
  110. fmt.Println(err)
  111. return
  112. }
  113. ticker := time.NewTicker(1 * time.Second)
  114. ticks := 0
  115. for {
  116. select {
  117. case <-ticker.C:
  118. if ticks%15 == 0 {
  119. err = writeMsgToReport(writer, "YEET "+strconv.Itoa(ticks))
  120. if err != nil {
  121. fmt.Println(err)
  122. b.QuitChan <- struct{}{}
  123. }
  124. }
  125. ticks++
  126. case <-b.QuitChan:
  127. ticker.Stop()
  128. fmt.Println("Stopped recording.")
  129. return
  130. }
  131. }
  132. }
  133. func writeMsgToReport(report *csv.Writer, msg string) error {
  134. ct := time.Now()
  135. timestamp := fmt.Sprintf("%2d:%2d:%2d", ct.Hour(), ct.Minute(), ct.Second())
  136. return report.Write([]string{timestamp, msg})
  137. }
  138. func main() {
  139. serviceA := NewA()
  140. err := serviceA.Start()
  141. if err != nil {
  142. fmt.Println(err)
  143. return
  144. }
  145. defer serviceA.Stop()
  146. serviceB := NewB()
  147. err = serviceB.Start()
  148. if err != nil {
  149. fmt.Println(err)
  150. return
  151. }
  152. defer serviceB.Stop()
  153. time.Sleep(600 * time.Second)
  154. }

请尝试使用上述修改后的代码,并确保在写入文件之后调用writer.Flush()方法。这样应该能够解决你的问题。

英文:

I've created a minimal reproduceable example

  1. package main
  2. import (
  3. &quot;encoding/csv&quot;
  4. &quot;fmt&quot;
  5. &quot;os&quot;
  6. &quot;strconv&quot;
  7. &quot;sync/atomic&quot;
  8. &quot;time&quot;
  9. )
  10. var (
  11. csvOnePath = &quot;test.csv&quot;
  12. csvTwoPath = &quot;test_two.csv&quot;
  13. )
  14. type A struct {
  15. Running int32 // used atomically
  16. QuitChan chan struct{}
  17. }
  18. func NewA() *A {
  19. return &amp;A{
  20. QuitChan: make(chan struct{}),
  21. }
  22. }
  23. func (a *A) Start() error {
  24. if ok := atomic.CompareAndSwapInt32(&amp;a.Running, 0, 1); !ok {
  25. return fmt.Errorf(&quot;Cannot start service A: service already started&quot;)
  26. }
  27. go a.record()
  28. return nil
  29. }
  30. func (a *A) Stop() error {
  31. if ok := atomic.CompareAndSwapInt32(&amp;a.Running, 1, 0); !ok {
  32. return fmt.Errorf(&quot;Cannot stop service A: service already stopped&quot;)
  33. }
  34. close(a.QuitChan)
  35. return nil
  36. }
  37. func (a *A) record() {
  38. //file_one, err := os.OpenFile(csvOnePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
  39. file_one, err := os.Create(csvOnePath)
  40. if err != nil {
  41. fmt.Println(err)
  42. return
  43. }
  44. writer := csv.NewWriter(file_one)
  45. // writer, closeFileFunc, err := NewCsvWriter(csvOnePath)
  46. if err != nil {
  47. fmt.Println(err)
  48. return
  49. }
  50. header := []string{&quot;this&quot;, &quot;is&quot;, &quot;a&quot;, &quot;test&quot;}
  51. err = writer.Write(header)
  52. if err != nil {
  53. fmt.Println(err)
  54. return
  55. }
  56. ticker := time.NewTicker(10*time.Second)
  57. for {
  58. select {
  59. case t := &lt;-ticker.C:
  60. err = writer.Write([]string{fmt.Sprintf(&quot;%2d:%2d:%2d&quot;, t.Hour(), t.Minute(), t.Second())})
  61. if err != nil {
  62. fmt.Println(err)
  63. a.QuitChan &lt;- struct{}{}
  64. }
  65. case &lt;-a.QuitChan:
  66. ticker.Stop()
  67. writer.Flush()
  68. file_one.Close()
  69. fmt.Println(&quot;Stopped recording.&quot;)
  70. break
  71. }
  72. }
  73. }
  74. type B struct {
  75. Running int32 // used atomically
  76. QuitChan chan struct{}
  77. }
  78. func NewB() *B {
  79. return &amp;B{
  80. QuitChan: make(chan struct{}),
  81. }
  82. }
  83. func (b *B) Start() error {
  84. if ok := atomic.CompareAndSwapInt32(&amp;b.Running, 0, 1); !ok {
  85. return fmt.Errorf(&quot;Cannot start service B: service already started&quot;)
  86. }
  87. go b.record()
  88. return nil
  89. }
  90. func (b *B) Stop() error {
  91. if ok := atomic.CompareAndSwapInt32(&amp;b.Running, 1, 0); !ok {
  92. return fmt.Errorf(&quot;Cannot stop service B: service already stopped&quot;)
  93. }
  94. close(b.QuitChan)
  95. return nil
  96. }
  97. func writeMsgToReport(report *csv.Writer, msg string) error {
  98. ct := time.Now()
  99. timestamp := fmt.Sprintf(&quot;%2d:%2d:%2d&quot;, ct.Hour(), ct.Minute(), ct.Second())
  100. return report.Write([]string{timestamp, msg})
  101. }
  102. func (b *B) record() {
  103. //file_two, err := os.OpenFile(csvTwoPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
  104. file_two, err := os.Create(csvTwoPath)
  105. if err != nil {
  106. fmt.Println(err)
  107. return
  108. }
  109. writer := csv.NewWriter(file_two)
  110. //writer, closeFileFunc, err := NewCsvWriter(csvTwoPath)
  111. if err != nil {
  112. fmt.Println(err)
  113. return
  114. }
  115. header := []string{&quot;this&quot;, &quot;is&quot;, &quot;a&quot;, &quot;second&quot;, &quot;test&quot;}
  116. err = writer.Write(header)
  117. if err != nil {
  118. fmt.Println(err)
  119. return
  120. }
  121. ticker := time.NewTicker(1*time.Second)
  122. ticks := 0
  123. for {
  124. select {
  125. case &lt;-ticker.C:
  126. if ticks % 15 == 0 {
  127. err = writeMsgToReport(writer, &quot;YEET &quot;+strconv.Itoa(ticks))
  128. if err != nil {
  129. fmt.Println(err)
  130. b.QuitChan &lt;- struct{}{}
  131. }
  132. }
  133. ticks++
  134. case &lt;-b.QuitChan:
  135. ticker.Stop()
  136. writer.Flush()
  137. file_two.Close()
  138. fmt.Println(&quot;Stopped recording.&quot;)
  139. break
  140. }
  141. }
  142. }
  143. func main() {
  144. serviceA := NewA()
  145. err := serviceA.Start()
  146. if err != nil {
  147. fmt.Println(err)
  148. return
  149. }
  150. defer serviceA.Stop()
  151. serviceB := NewB()
  152. err = serviceB.Start()
  153. if err != nil {
  154. fmt.Println(err)
  155. return
  156. }
  157. defer serviceB.Stop()
  158. time.Sleep(600*time.Second)
  159. }

Essentially, I have two different services that run a record method in two different goroutines. They each create and write to a different csv file at different times. When I run this, the csv files are created but never have data. No errors are ever raised while running this. I read that I should use a mutex which I've implemented but this hasn't worked either. What should I do here?

答案1

得分: 1

根据评论中的详细说明,当main()函数执行完毕时,程序将退出;规范中指出:“它不会等待其他(非主)goroutine完成。”。

这意味着你的goroutine很可能不会处理关闭文件的代码,这意味着缓冲数据可能不会被写入。

我在playground上创建了一个简化版本的应用程序来演示这一点。

有多种方法可以解决这个问题,但最简单的方法可能是添加一个WaitGroup,这样你的应用程序可以在终止之前等待goroutine退出。

英文:

As detailed in the comments the program will exit when main() completes; the spec states "It does not wait for other (non-main) goroutines to complete.".

This means that it is unlikely that your go routines will process the code that closes the files meaning that buffered data may not be written.

I created a simplified version of your application in the playground to demonstrate this.

There are a number of ways to fix this but the simplest is probably to add a WaitGroup so your application can wait for the go routines to exit before terminating.

huangapple
  • 本文由 发表于 2021年11月2日 02:42:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/69801368.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定