Golang生产者消费者接收到的消息数量

huangapple go评论92阅读模式
英文:

golang producer consumer number of messages received

问题

我已经在Go语言中编写了生产者-消费者模式,用于读取多个CSV文件并处理记录。我一次性读取CSV文件的所有记录。

我想要在处理过程中以总记录数的5%为间隔记录处理完成的百分比。例如,我有3个CSV文件要处理,每个文件有20、30、50行/记录(总共100条记录),希望在处理5条记录时记录进度。

func processData(inputCSVFiles []string) {
    producerCount := len(inputCSVFiles)
    consumerCount := producerCount

    link := make(chan []string, 100)
    wp := &sync.WaitGroup{}
    wc := &sync.WaitGroup{}

    wp.Add(producerCount)
    wc.Add(consumerCount)

    for i := 0; i < producerCount; i++ {
        go produce(link, inputCSVFiles[i], wp)
    }

    for i := 0; i < consumerCount; i++ {
        go consume(link, wc)
    }
    wp.Wait()
    close(link)
    wc.Wait()
    fmt.Println("已完成所有CSV数据文件的数据迁移过程。")
}

func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
    defer wg.Done()
    records := readCsvFile(filePath)
    totalNumberOfRecords := len(records)
    for _, record := range records {
        link <- record
    }
}

func consume(link <-chan []string, wg *sync.WaitGroup) {
    defer wg.Done()
    for record := range link {
        // 处理CSV记录
    }
}

以上是你提供的代码的翻译。

英文:

I have written producer-consumer pattern in golang. Reading multiple csv files and processing records. I am reading all records of csv file in one go.

I want to log percentage of processing completion in interval of 5% of total records including all csv files. for e.g I have 3 csv to process & each have 20,30,50 rows/records (so in total 100 records to process) want to log progress when 5 records are processed.

func processData(inputCSVFiles []string) {
	producerCount := len(inputCSVFiles)
	consumerCount := producerCount

	link := make(chan []string, 100)
	wp := &amp;sync.WaitGroup{}
	wc := &amp;sync.WaitGroup{}

	wp.Add(producerCount)
	wc.Add(consumerCount)

	for i := 0; i &lt; producerCount; i++ {
		go produce(link, inputCSVFiles[i], wp)
	}

	for i := 0; i &lt; consumerCount; i++ {
		go consume(link, wc)
	}
	wp.Wait()
	close(link)
	wc.Wait()
	fmt.Println(&quot;Completed data migration process for all CSV data files.&quot;)
}

func produce(link chan&lt;- []string, filePath string, wg *sync.WaitGroup) {
	defer wg.Done()
	records := readCsvFile(filePath)
    totalNumberOfRecords := len(records)
	for _, record := range records {
		link &lt;- record
	}
}

func consume(link &lt;-chan []string, wg *sync.WaitGroup) {
	defer wg.Done()
	for record := range link {
		// process csv record
	}
}

答案1

得分: 1

我已经使用原子变量和计数通道,其中消费者在记录处理时会推送计数,其他goroutine将从通道中读取并计算总处理记录的百分比。

var progressPercentageStep float64 = 5.0
var totalRecordsToProcess int32

func processData(inputCSVFiles []string) {
    producerCount := len(inputCSVFiles)
    consumerCount := producerCount
    link := make(chan []string, 100)
    counter := make(chan int, 100)
    defer close(counter)
    wp := &sync.WaitGroup{}
    wc := &sync.WaitGroup{}

    wp.Add(producerCount)
    wc.Add(consumerCount)

    for i := 0; i < producerCount; i++ {
        go produce(link, inputCSVFiles[i], wp)
    }

    go progressStats(counter)

    for i := 0; i < consumerCount; i++ {
        go consume(link, wc)
    }
    wp.Wait()
    close(link)
    wc.Wait()
}

func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
    defer wg.Done()
    records := readCsvFile(filePath)
    atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))
    for _, record := range records {
        link <- record
    }
}

func consume(link <-chan []string, counter chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for record := range link {
        // 处理csv记录
        counter <- 1
    }
}

func progressStats(counter <-chan int) {
    var feedbackThreshold = progressPercentageStep
    for count := range counter {
        totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)
        donePercent := 100.0 * processed / totalRemaining
        // 记录进度
        if donePercent >= feedbackThreshold {
            log.Printf("进度 ************** 总记录数:%d,已处理记录数:%d,已处理百分比:%.2f **************\n", totalRecordsToProcess, processed, donePercent)
            feedbackThreshold += progressPercentageStep
        }
    }
}

注意:我只翻译了代码部分,不包括注释。

英文:

I have used atomic variable & counter channel where consumer will push count when record is processed & other goroutine will read from channel & calculate total processed record percentage.

var progressPercentageStep float64 = 5.0
var totalRecordsToProcess int32
func processData(inputCSVFiles []string) {
producerCount := len(inputCSVFiles)
consumerCount := producerCount
link := make(chan []string, 100)
counter := make(chan int, 100)
defer close(counter)
wp := &amp;sync.WaitGroup{}
wc := &amp;sync.WaitGroup{}
wp.Add(producerCount)
wc.Add(consumerCount)
for i := 0; i &lt; producerCount; i++ {
go produce(link, inputCSVFiles[i], wp)
}
go progressStats(counter)
for i := 0; i &lt; consumerCount; i++ {
go consume(link, wc)
}
wp.Wait()
close(link)
wc.Wait()
}
func produce(link chan&lt;- []string, filePath string, wg *sync.WaitGroup) {
defer wg.Done()
records := readCsvFile(filePath)
atomic.AddInt32(&amp;totalRecordsToProcess, int32(len(records)))
for _, record := range records {
link &lt;- record
}
}
func consume(link &lt;-chan []string,counter chan&lt;- int, wg *sync.WaitGroup) {
defer wg.Done()
for record := range link {
// process csv record
counter &lt;- 1
}
}
func progressStats(counter &lt;-chan int) {
var feedbackThreshold = progressPercentageStep
for count := range counter {
totalRemaining := atomic.AddInt32(&amp;totalRecordsToProcess, -count)
donePercent := 100.0 * processed / totalRemaining
// log progress
if donePercent &gt;= feedbackThreshold {
log.Printf(&quot;Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n&quot;, totalRecordsToProcess, processed, donePercent)
feedbackThreshold += progressPercentageStep
}
}
}

huangapple
  • 本文由 发表于 2022年1月13日 21:10:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/70697174.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定