将文件分割并通过TCP同时将块写入服务器时出现了”broken pipe”错误。

huangapple go评论79阅读模式
英文:

Strip file and write the chunks concurrently to server through TCP shows broken pipe error

问题

我的客户将一个文件分成多个块(每个块为128MB),然后使用goroutines并发地将这些块上传到多个服务器。

然而,当我使用多个goroutine时,我的客户端程序出现错误。

write tcp [::1]:49324->[::1]:2001: write: broken pipe

而在我的服务器上,错误是

EOF

请注意,断开的管道错误和EOF错误发生在不同的块中。例如,写入块1时可能会发生断开的管道错误,而在服务器接收块2时可能会发生EOF错误。

以下是客户端代码:

//设置后台运行的goroutine的最大数量
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)

var sentByte int64

for i:= 0; i < chunkCount; i += 1{
	guard <- struct{}{} 

	go func(i int){
		index := i%len(serverList)
		vsConnection, _ := net.Dial("tcp", serverList[index])
					
		sentByte=0
		file, _ := os.Open(fileName)
		file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE为134217728
		for { 
			n, _ := file.Read(sendBuffer)

			n2, err2 := vsConnection.Write(sendBuffer[:n])
			if err2 != nil {
				fmt.Println("err2",err2,chunkName)				
			}
			if(n2!=65536){ //65536为sendBuffer的大小
				fmt.Println("n2",n2)
			}
			sentByte = sentByte+int64(n)
			if(sentByte == CHUNKSIZE){
				break;
			}
		}
		vsConnection.Close()
		file.Close()
		<-guard
	}(i)
}

以下是服务器代码:

func main() {
    mapping := cmap.New()
	server, error := net.Listen("tcp", ":2001")
    if error != nil {
	    fmt.Println("启动服务器时出错" + error.Error())
   		return
	}

    for {
	    connection, error := server.Accept()
		if error != nil {
    		fmt.Println("连接出错" + error.Error())
	    	return
	    }
	    //每个连接一个goroutine
	    go ConnectionHandler(connection,mapping)
	}
}

func ConnectionHandler(connection net.Conn, mapping cmap.ConcurrentMap) {
	fmt.Println("已连接")
	//创建一个缓冲区来保存数据		
    var bufferFile bytes.Buffer
    writer := bufio.NewWriter(&bufferFile)

	var receivedBytes int64
    receivedBytes=0
	for {

		if(CHUNKSIZE<=receivedBytes){
			break
		}
		n,err := io.CopyN(writer, connection, BUFFERSIZE)
		receivedBytes += n
		if err != nil {
			fmt.Println("err", err.Error(), fileName)
			break
		}
	}
	mapping.Set(fileName,bufferFile.Bytes())
	connection.Close()

}

非常感谢。

英文:

My client divides a file into multiple amount of chunks (128mb each), then it will upload the chunks to multiple servers concurrently using goroutines.

However, when I use more than 1 goroutine, I got an error from the my client program.

write tcp [::1]:49324-&gt;[::1]:2001: write: broken pipe

And in my server, the error is

EOF

Note that the broken pipe error and EOF error occurs in different chunks. For example, broken pipe error might happen when writing chunk 1 while EOF error might happen when server is receiving chunk 2.

Below is the client code:

//set maximum no. of goroutine running in the back
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)

var sentByte int64

for i:= 0; i &lt; chunkCount; i += 1{
	guard &lt;- struct{}{} 

	go func(i int){
		index := i%len(serverList)
		vsConnection, _ := net.Dial(&quot;tcp&quot;, serverList[index])
					
		sentByte=0
		file, _ := os.Open(fileName)
		file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
		for { 
			n, _ := file.Read(sendBuffer)

			n2, err2 := vsConnection.Write(sendBuffer[:n])
			if err2 != nil {
				fmt.Println(&quot;err2&quot;,err2,chunkName)				
			}
			if(n2!=65536){ //65536 is size of sendBuffer
				fmt.Println(&quot;n2&quot;,n2)
			}
			sentByte = sentByte+int64(n)
			if(sentByte == CHUNKSIZE){
				break;
			}
		}
		vsConnection.Close()
		file.Close()
		&lt;-guard
	}(i)
}

Below is the server code:

func main() {
    mapping := cmap.New()
	server, error := net.Listen(&quot;tcp&quot;, &quot;:2001&quot;)
    if error != nil {
	    fmt.Println(&quot;There was an error starting the server&quot; +    error.Error())
   		return
	}

    for {
	    connection, error := server.Accept()
		if error != nil {
    		fmt.Println(&quot;There was am error with the connection&quot; + error.Error())
	    	return
	    }
	    //one goroutine per connection
	    go ConnectionHandler(connection,mapping)
	}
}

func ConnectionHandler(connection net.Conn, mapping cmap.ConcurrentMap) {
	fmt.Println(&quot;Connected&quot;)
	//make a buffer to hold data		
    var bufferFile bytes.Buffer
    writer := bufio.NewWriter(&amp;bufferFile)

	var receivedBytes int64
    receivedBytes=0
	for {

		if(CHUNKSIZE&lt;=receivedBytes){
			break
		}
		n,err := io.CopyN(writer, connection, BUFFERSIZE)
		receivedBytes += n
		if err != nil {
			fmt.Println(&quot;err&quot;, err.Error(), fileName)
			break
		}
	}
	mapping.Set(fileName,bufferFile.Bytes())
	connection.Close()

}

Big thanks in advance.

答案1

得分: 3

在你的客户端中,sentByte 应该是发送协程的一个局部变量。由于你将其声明为全局变量,所以你的代码存在竞态条件。尝试以下修复方法:

go func(i int){
    index := i%len(serverList)
    vsConnection, _ := net.Dial("tcp", serverList[index])

    sentByte := 0 // 将 sentByte 声明为局部变量,这样每个协程都有自己的副本
    file, _ := os.Open(fileName)
    file.Seek(int64(i)*CHUNKSIZE, 0) // CHUNKSIZE 是 134217728
    for {
        n, _ := file.Read(sendBuffer)
        // ...

请返回翻译好的部分。

英文:

In your client sentByte should be a local variable to the sender goroutine. Since you have declared it as a global, there is a race condition in your code. Try the below fix:

go func(i int){
    index := i%len(serverList)
    vsConnection, _ := net.Dial(&quot;tcp&quot;, serverList[index])

    sentByte := 0 // make sentByte a local variable, so each goroutine 
                  // has its own copy 
    file, _ := os.Open(fileName)
    file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
    for { 
        n, _ := file.Read(sendBuffer)
        // ...

huangapple
  • 本文由 发表于 2017年4月1日 17:22:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/43155017.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定