io.Pipe()导致WaitGroup被阻塞。

huangapple go评论84阅读模式
英文:

io.Pipe() causes WaitGroup to get stuck

问题

我正在处理一个大约100GB的巨大数据文件。该文件中的每一行都是一个JSON数据片段,我想要读取、压缩并存储在内存数据库中。

var wg sync.WaitGroup
for {
	line, err := reader.ReadString('\n')
	if err != nil {
		break
	}
	go func(index int) {
		wg.Add(1)
		pr, pw := io.Pipe()
		zw := lzw.NewWriter(pw, lzw.LSB, 8)
		_, err := io.Copy(zw, strings.NewReader(line))
		pw.Close()
		zw.Close()
		if err != nil {
			fmt.Println(err.Error())
		}
		b, err := io.ReadAll(pr)
		if err != nil {
			fmt.Println(err.Error())
		}
		client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
		pr.Close()
		wg.Done()
	}(index)
	if index%10000 == 0 {
		fmt.Println(index)
		wg.Wait()
	}
	index += 1
}

然而,这段代码在处理完前10000行后停止了。当我将wg.Add(1)移到zw.Close()之后,它可以继续处理剩下的行(但变得不稳定)。如果我尝试以未压缩的方式存储确切的值,而没有使用lzwio.Pipe(),那么一切都能正常工作。

我不确定我是否正确使用了WaitGroup,或者是否有与io.Pipe()相关的我尚不知道的问题。

英文:

I am processing a huge data file which is approx. 100 GB. Each line in that huge file is a JSON piece of data which I'd like to read, compress, and store in an in memory database.

var wg sync.WaitGroup
for {
	line, err := reader.ReadString('\n')
	if err != nil {
		break
	}
	go func(index int) {
		wg.Add(1)
		pr, pw := io.Pipe()
		zw := lzw.NewWriter(pw, lzw.LSB, 8)
		_, err := io.Copy(zw, strings.NewReader(line))
		pw.Close()
		zw.Close()
		if err != nil {
			fmt.Println(err.Error())
		}
		b, err := io.ReadAll(pr)
		if err != nil {
			fmt.Println(err.Error())
		}
		client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
		pr.Close()
		wg.Done()
	}(index)
	if index%10000 == 0 {
		fmt.Println(index)
		wg.Wait()
	}
	index += 1
}

However, this code stops after processing the first 10000 lines. When I move down the wg.Add(1) after the zw.Close() it keeps on processing the rest of the line (but becomes instable). Without the lzw and io.Pipe() when I try to store the exact values in an uncompressed way, then everything works without any issue.

I am not sure whether I am not using the WaitGroup correctly or there is something associated with the io.Pipe() which I am not aware of yet.

答案1

得分: 1

TLDR:
1- 移除pr, pw := io.Pipe()可以使代码更加简单,因为它是多余的,可以尝试使用以下代码:

line, err := reader.ReadString('\n')
if err == io.EOF {
	wg.Wait()
	break
}
if err != nil {
	log.Fatal(err)
}
wg.Add(1)
go func(index int) {
	var buf bytes.Buffer
	{ // 词法作用域(静态作用域)
		zw := lzw.NewWriter(&buf, lzw.LSB, 8)
		n, err := zw.Write([]byte(line)) // n, err := io.Copy(zw, strings.NewReader(line))
		if err != nil {
			log.Fatal(err)
		}
		if int(n) != len(line) {
			log.Fatal(n, len(line))
		}
		// 调用者有责任在写入完成后调用WriteCloser的Close方法。
		if err = zw.Close(); err != nil {
			log.Fatal(err)
		}
	}
	ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
	client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(buf.Bytes()), 1000*time.Hour)

	cancelFunc()
	wg.Done()
}(index)

if index%tenThousand == 0 {
	wg.Wait()
}

2- 需要在go func(index int) {之前放置wg.Add(1)

	wg.Add(1)
	go func(index int) {

3- wg.Wait()的逻辑:

if index%10000 == 0 {
        fmt.Println(index)
        wg.Wait()
    }

如果index%10000 != 0,那么最后一次迭代会发生什么。所以在err == io.EOF时,你需要wg.Wait()等待所有goroutine加入:

if err == io.EOF {
	wg.Wait()
	fmt.Println("\n**** All done **** index =", index)
	break
}

4- 你可以使用词法作用域(静态作用域)来限制一些变量的作用域,使代码更易于管理,并知道何时调用lzw.NewWriterClose方法:

{ // 词法作用域(静态作用域)
	zw := lzw.NewWriter(bufio.NewWriter(&buf), lzw.LSB, 8)
	n, err := io.Copy(zw, strings.NewReader(line))
	if err != nil {
		log.Fatal(err)
	}
	if int(n) != len(line) {
		log.Fatal(n, len(line))
	}
	// 调用者有责任在写入完成后调用WriteCloser的Close方法。
	if err = zw.Close(); err != nil {
		log.Fatal(err)
	}
}

5- 始终检查错误,例如:

 if err = zw.Close(); err != nil {
    log.Fatal(err)
}

这是接近你的代码的工作版本-只是为了尝试并发逻辑以查看发生了什么(不建议,因为这里有多余的goroutines和io.Pipe)- 只是工作的代码:

package main

import (
	"bufio"
	"compress/lzw"
	"context"
	"encoding/base64"
	"fmt"
	"io"
	"log"
	"strings"
	"sync"
	"time"
)

func main() {
	index := 0
	client := &myClient{}
	reader := bufio.NewReader(file)
	// your code:
	var wg sync.WaitGroup
	for {
		index++
		line, err := reader.ReadString('\n')
		if err != nil {
			msg <- fmt.Sprint(index, " Done not waiting with err: ", err, time.Now())
			wg.Wait() // break waiting // if index%tenThousand != 0
			break
		}
		wg.Add(1)
		go func(i int) {
			msg <- fmt.Sprint(i, " Enter running ... ", time.Now())
			asyncReader, asyncWriter := io.Pipe() // make it async to read and write
			zipWriter := lzw.NewWriter(asyncWriter, lzw.LSB, 8)
			go func() { // async
				_, err := io.Copy(zipWriter, strings.NewReader(line))
				if err != nil {
					log.Fatal(err)
				}
				_ = zipWriter.Close()
				_ = asyncWriter.Close() // for io.ReadAll
			}()
			b, err := io.ReadAll(asyncReader)
			if err != nil {
				log.Fatal(err)
			}
			client.Set(context.Background(), fmt.Sprintf("%d", i), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
			asyncReader.Close()
			time.Sleep(1 * time.Second)
			msg <- fmt.Sprint(i, " Exit running ... ", time.Now())
			wg.Done()
		}(index)

		msg <- fmt.Sprint(index, " ", index%tenThousand == 0, " after go call")
		if index%tenThousand == 0 {
			wg.Wait()
			msg <- fmt.Sprint("..", index, " Done waiting after go call. ", time.Now())
		}
	}
	msg <- "Bye forever."

	wg.Wait()
	close(msg)
	wgMsg.Wait()
}

// just for the Go Playground:
const tenThousand = 2

type myClient struct {
}

func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {
	// fmt.Println("a =", a, ", b =", b, ", t =", t)
	if ctx.Err() != nil {
		fmt.Println(ctx.Err())
	}
}

var file, myw = io.Pipe()

func init() {
	go func() {
		for i := 1; i <= tenThousand+1; i++ {
			fmt.Fprintf(myw, "%d text to compress aaaaaaaaaaaaaa\n", i)
		}
		myw.Close()
	}()
	wgMsg.Add(1)
	go func() {
		defer wgMsg.Done()
		for s := range msg {
			fmt.Println(s)
		}
	}()
}

var msg = make(chan string, 100)
var wgMsg sync.WaitGroup

输出:

1 false after go call
2 true after go call
1 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001
2 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001
1 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
2 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
..2 Done waiting after go call. 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
3 false after go call
3 Enter running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
4 Done not waiting with err: EOF 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
3 Exit running ... 2009-11-10 23:00:02 +0000 UTC m=+2.000000001
Bye forever.
英文:

TLDR:
1- Removing pr, pw := io.Pipe() makes the code more simple, since it is superfluous,
try this:

line, err := reader.ReadString(&#39;\n&#39;)
if err == io.EOF {
wg.Wait()
break
}
if err != nil {
log.Fatal(err)
}
wg.Add(1)
go func(index int) {
var buf bytes.Buffer
{ // lexical scoping (static scoping)
zw := lzw.NewWriter(&amp;buf, lzw.LSB, 8)
n, err := zw.Write([]byte(line)) // n, err := io.Copy(zw, strings.NewReader(line))
if err != nil {
log.Fatal(err)
}
if int(n) != len(line) {
log.Fatal(n, len(line))
}
// It is the caller&#39;s responsibility to call Close on the WriteCloser when finished writing.
if err = zw.Close(); err != nil {
log.Fatal(err)
}
}
ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
client.Set(ctx, fmt.Sprintf(&quot;%d&quot;, index), base64.StdEncoding.EncodeToString(buf.Bytes()), 1000*time.Hour)
cancelFunc()
wg.Done()
}(index)
if index%tenThousand == 0 {
wg.Wait()
}

2- You need to put the wg.Add(1) before go func(index int) {:

	wg.Add(1)
go func(index int) {

3- The wg.Wait() logic:

if index%10000 == 0 {
fmt.Println(index)
wg.Wait()
}

What happens for the last iteration if index%10000 != 0.
So here when err == io.EOF you need to wg.Wait() for all goroutines to join:

if err == io.EOF {
wg.Wait()
fmt.Println(&quot;\n**** All done **** index =&quot;, index)
break
}

4- You may use lexical scoping (static scoping) to limit some variables scope and make the code more manageable - and to know when to Close the lzw.NewWriter :

{ // lexical scoping (static scoping)
zw := lzw.NewWriter(bufio.NewWriter(&amp;buf), lzw.LSB, 8)
n, err := io.Copy(zw, strings.NewReader(line))
if err != nil {
log.Fatal(err)
}
if int(n) != len(line) {
log.Fatal(n, len(line))
}
// It is the caller&#39;s responsibility to call Close on the WriteCloser when finished writing.
if err = zw.Close(); err != nil {
log.Fatal(err)
}
}

5- Always check the errors, e.g.:

 if err = zw.Close(); err != nil {
log.Fatal(err)
}

This is the working version close to your code - try this just to experiment with concurrency logic to see what happens (not recommended since this has superfluous goroutines and io.Pipe - just working:

package main
import (
&quot;bufio&quot;
&quot;compress/lzw&quot;
&quot;context&quot;
&quot;encoding/base64&quot;
&quot;fmt&quot;
&quot;io&quot;
&quot;log&quot;
&quot;strings&quot;
&quot;sync&quot;
&quot;time&quot;
)
func main() {
index := 0
client := &amp;myClient{}
reader := bufio.NewReader(file)
// your code:
var wg sync.WaitGroup
for {
index++
line, err := reader.ReadString(&#39;\n&#39;)
if err != nil {
msg &lt;- fmt.Sprint(index, &quot; Done not waiting with err: &quot;, err, time.Now())
wg.Wait() // break waiting // if index%tenThousand != 0
break
}
wg.Add(1)
go func(i int) {
msg &lt;- fmt.Sprint(i, &quot; Enter running ... &quot;, time.Now())
asyncReader, asyncWriter := io.Pipe() // make it async to read and write
zipWriter := lzw.NewWriter(asyncWriter, lzw.LSB, 8)
go func() { // async
_, err := io.Copy(zipWriter, strings.NewReader(line))
if err != nil {
log.Fatal(err)
}
_ = zipWriter.Close()
_ = asyncWriter.Close() // for io.ReadAll
}()
b, err := io.ReadAll(asyncReader)
if err != nil {
log.Fatal(err)
}
client.Set(context.Background(), fmt.Sprintf(&quot;%d&quot;, i), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
asyncReader.Close()
time.Sleep(1 * time.Second)
msg &lt;- fmt.Sprint(i, &quot; Exit running ... &quot;, time.Now())
wg.Done()
}(index)
msg &lt;- fmt.Sprint(index, &quot; &quot;, index%tenThousand == 0, &quot; after go call&quot;)
if index%tenThousand == 0 {
wg.Wait()
msg &lt;- fmt.Sprint(&quot;..&quot;, index, &quot; Done waiting after go call. &quot;, time.Now())
}
}
msg &lt;- &quot;Bye forever.&quot;
wg.Wait()
close(msg)
wgMsg.Wait()
}
// just for the Go Playground:
const tenThousand = 2
type myClient struct {
}
func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {
// fmt.Println(&quot;a =&quot;, a, &quot;, b =&quot;, b, &quot;, t =&quot;, t)
if ctx.Err() != nil {
fmt.Println(ctx.Err())
}
}
var file, myw = io.Pipe()
func init() {
go func() {
for i := 1; i &lt;= tenThousand+1; i++ {
fmt.Fprintf(myw, &quot;%d text to compress aaaaaaaaaaaaaa\n&quot;, i)
}
myw.Close()
}()
wgMsg.Add(1)
go func() {
defer wgMsg.Done()
for s := range msg {
fmt.Println(s)
}
}()
}
var msg = make(chan string, 100)
var wgMsg sync.WaitGroup

Output:

1 false after go call
2 true after go call
1 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001
2 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001
1 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
2 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
..2 Done waiting after go call. 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
3 false after go call
3 Enter running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
4 Done not waiting with err: EOF 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
3 Exit running ... 2009-11-10 23:00:02 +0000 UTC m=+2.000000001
Bye forever.

huangapple
  • 本文由 发表于 2022年5月26日 12:23:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/72386665.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定