Golang生产者消费者接收到的消息数量

huangapple go评论125阅读模式
英文:

golang producer consumer number of messages received

问题

我已经在Go语言中编写了生产者-消费者模式,用于读取多个CSV文件并处理记录。我一次性读取CSV文件的所有记录。

我想要在处理过程中以总记录数的5%为间隔记录处理完成的百分比。例如,我有3个CSV文件要处理,每个文件有20、30、50行/记录(总共100条记录),希望在处理5条记录时记录进度。

  1. func processData(inputCSVFiles []string) {
  2. producerCount := len(inputCSVFiles)
  3. consumerCount := producerCount
  4. link := make(chan []string, 100)
  5. wp := &sync.WaitGroup{}
  6. wc := &sync.WaitGroup{}
  7. wp.Add(producerCount)
  8. wc.Add(consumerCount)
  9. for i := 0; i < producerCount; i++ {
  10. go produce(link, inputCSVFiles[i], wp)
  11. }
  12. for i := 0; i < consumerCount; i++ {
  13. go consume(link, wc)
  14. }
  15. wp.Wait()
  16. close(link)
  17. wc.Wait()
  18. fmt.Println("已完成所有CSV数据文件的数据迁移过程。")
  19. }
  20. func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
  21. defer wg.Done()
  22. records := readCsvFile(filePath)
  23. totalNumberOfRecords := len(records)
  24. for _, record := range records {
  25. link <- record
  26. }
  27. }
  28. func consume(link <-chan []string, wg *sync.WaitGroup) {
  29. defer wg.Done()
  30. for record := range link {
  31. // 处理CSV记录
  32. }
  33. }

以上是你提供的代码的翻译。

英文:

I have written producer-consumer pattern in golang. Reading multiple csv files and processing records. I am reading all records of csv file in one go.

I want to log percentage of processing completion in interval of 5% of total records including all csv files. for e.g I have 3 csv to process & each have 20,30,50 rows/records (so in total 100 records to process) want to log progress when 5 records are processed.

  1. func processData(inputCSVFiles []string) {
  2. producerCount := len(inputCSVFiles)
  3. consumerCount := producerCount
  4. link := make(chan []string, 100)
  5. wp := &amp;sync.WaitGroup{}
  6. wc := &amp;sync.WaitGroup{}
  7. wp.Add(producerCount)
  8. wc.Add(consumerCount)
  9. for i := 0; i &lt; producerCount; i++ {
  10. go produce(link, inputCSVFiles[i], wp)
  11. }
  12. for i := 0; i &lt; consumerCount; i++ {
  13. go consume(link, wc)
  14. }
  15. wp.Wait()
  16. close(link)
  17. wc.Wait()
  18. fmt.Println(&quot;Completed data migration process for all CSV data files.&quot;)
  19. }
  20. func produce(link chan&lt;- []string, filePath string, wg *sync.WaitGroup) {
  21. defer wg.Done()
  22. records := readCsvFile(filePath)
  23. totalNumberOfRecords := len(records)
  24. for _, record := range records {
  25. link &lt;- record
  26. }
  27. }
  28. func consume(link &lt;-chan []string, wg *sync.WaitGroup) {
  29. defer wg.Done()
  30. for record := range link {
  31. // process csv record
  32. }
  33. }

答案1

得分: 1

我已经使用原子变量和计数通道,其中消费者在记录处理时会推送计数,其他goroutine将从通道中读取并计算总处理记录的百分比。

  1. var progressPercentageStep float64 = 5.0
  2. var totalRecordsToProcess int32
  3. func processData(inputCSVFiles []string) {
  4. producerCount := len(inputCSVFiles)
  5. consumerCount := producerCount
  6. link := make(chan []string, 100)
  7. counter := make(chan int, 100)
  8. defer close(counter)
  9. wp := &sync.WaitGroup{}
  10. wc := &sync.WaitGroup{}
  11. wp.Add(producerCount)
  12. wc.Add(consumerCount)
  13. for i := 0; i < producerCount; i++ {
  14. go produce(link, inputCSVFiles[i], wp)
  15. }
  16. go progressStats(counter)
  17. for i := 0; i < consumerCount; i++ {
  18. go consume(link, wc)
  19. }
  20. wp.Wait()
  21. close(link)
  22. wc.Wait()
  23. }
  24. func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
  25. defer wg.Done()
  26. records := readCsvFile(filePath)
  27. atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))
  28. for _, record := range records {
  29. link <- record
  30. }
  31. }
  32. func consume(link <-chan []string, counter chan<- int, wg *sync.WaitGroup) {
  33. defer wg.Done()
  34. for record := range link {
  35. // 处理csv记录
  36. counter <- 1
  37. }
  38. }
  39. func progressStats(counter <-chan int) {
  40. var feedbackThreshold = progressPercentageStep
  41. for count := range counter {
  42. totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)
  43. donePercent := 100.0 * processed / totalRemaining
  44. // 记录进度
  45. if donePercent >= feedbackThreshold {
  46. log.Printf("进度 ************** 总记录数:%d,已处理记录数:%d,已处理百分比:%.2f **************\n", totalRecordsToProcess, processed, donePercent)
  47. feedbackThreshold += progressPercentageStep
  48. }
  49. }
  50. }

注意:我只翻译了代码部分,不包括注释。

英文:

I have used atomic variable & counter channel where consumer will push count when record is processed & other goroutine will read from channel & calculate total processed record percentage.

  1. var progressPercentageStep float64 = 5.0
  2. var totalRecordsToProcess int32
  3. func processData(inputCSVFiles []string) {
  4. producerCount := len(inputCSVFiles)
  5. consumerCount := producerCount
  6. link := make(chan []string, 100)
  7. counter := make(chan int, 100)
  8. defer close(counter)
  9. wp := &amp;sync.WaitGroup{}
  10. wc := &amp;sync.WaitGroup{}
  11. wp.Add(producerCount)
  12. wc.Add(consumerCount)
  13. for i := 0; i &lt; producerCount; i++ {
  14. go produce(link, inputCSVFiles[i], wp)
  15. }
  16. go progressStats(counter)
  17. for i := 0; i &lt; consumerCount; i++ {
  18. go consume(link, wc)
  19. }
  20. wp.Wait()
  21. close(link)
  22. wc.Wait()
  23. }
  24. func produce(link chan&lt;- []string, filePath string, wg *sync.WaitGroup) {
  25. defer wg.Done()
  26. records := readCsvFile(filePath)
  27. atomic.AddInt32(&amp;totalRecordsToProcess, int32(len(records)))
  28. for _, record := range records {
  29. link &lt;- record
  30. }
  31. }
  32. func consume(link &lt;-chan []string,counter chan&lt;- int, wg *sync.WaitGroup) {
  33. defer wg.Done()
  34. for record := range link {
  35. // process csv record
  36. counter &lt;- 1
  37. }
  38. }
  39. func progressStats(counter &lt;-chan int) {
  40. var feedbackThreshold = progressPercentageStep
  41. for count := range counter {
  42. totalRemaining := atomic.AddInt32(&amp;totalRecordsToProcess, -count)
  43. donePercent := 100.0 * processed / totalRemaining
  44. // log progress
  45. if donePercent &gt;= feedbackThreshold {
  46. log.Printf(&quot;Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n&quot;, totalRecordsToProcess, processed, donePercent)
  47. feedbackThreshold += progressPercentageStep
  48. }
  49. }
  50. }

huangapple
  • 本文由 发表于 2022年1月13日 21:10:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/70697174.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定