英文:
Strip file and write the chunks concurrently to server through TCP shows broken pipe error
问题
我的客户将一个文件分成多个块(每个块为128MB),然后使用goroutines并发地将这些块上传到多个服务器。
然而,当我使用多个goroutine时,我的客户端程序出现错误。
write tcp [::1]:49324->[::1]:2001: write: broken pipe
而在我的服务器上,错误是
EOF
请注意,断开的管道错误和EOF错误发生在不同的块中。例如,写入块1时可能会发生断开的管道错误,而在服务器接收块2时可能会发生EOF错误。
以下是客户端代码:
//设置后台运行的goroutine的最大数量
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)
var sentByte int64
for i:= 0; i < chunkCount; i += 1{
guard <- struct{}{}
go func(i int){
index := i%len(serverList)
vsConnection, _ := net.Dial("tcp", serverList[index])
sentByte=0
file, _ := os.Open(fileName)
file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE为134217728
for {
n, _ := file.Read(sendBuffer)
n2, err2 := vsConnection.Write(sendBuffer[:n])
if err2 != nil {
fmt.Println("err2",err2,chunkName)
}
if(n2!=65536){ //65536为sendBuffer的大小
fmt.Println("n2",n2)
}
sentByte = sentByte+int64(n)
if(sentByte == CHUNKSIZE){
break;
}
}
vsConnection.Close()
file.Close()
<-guard
}(i)
}
以下是服务器代码:
func main() {
mapping := cmap.New()
server, error := net.Listen("tcp", ":2001")
if error != nil {
fmt.Println("启动服务器时出错" + error.Error())
return
}
for {
connection, error := server.Accept()
if error != nil {
fmt.Println("连接出错" + error.Error())
return
}
//每个连接一个goroutine
go ConnectionHandler(connection,mapping)
}
}
func ConnectionHandler(connection net.Conn, mapping cmap.ConcurrentMap) {
fmt.Println("已连接")
//创建一个缓冲区来保存数据
var bufferFile bytes.Buffer
writer := bufio.NewWriter(&bufferFile)
var receivedBytes int64
receivedBytes=0
for {
if(CHUNKSIZE<=receivedBytes){
break
}
n,err := io.CopyN(writer, connection, BUFFERSIZE)
receivedBytes += n
if err != nil {
fmt.Println("err", err.Error(), fileName)
break
}
}
mapping.Set(fileName,bufferFile.Bytes())
connection.Close()
}
非常感谢。
英文:
My client divides a file into multiple amount of chunks (128mb each), then it will upload the chunks to multiple servers concurrently using goroutines.
However, when I use more than 1 goroutine, I got an error from the my client program.
write tcp [::1]:49324->[::1]:2001: write: broken pipe
And in my server, the error is
EOF
Note that the broken pipe error and EOF error occurs in different chunks. For example, broken pipe error might happen when writing chunk 1 while EOF error might happen when server is receiving chunk 2.
Below is the client code:
//set maximum no. of goroutine running in the back
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)
var sentByte int64
for i:= 0; i < chunkCount; i += 1{
guard <- struct{}{}
go func(i int){
index := i%len(serverList)
vsConnection, _ := net.Dial("tcp", serverList[index])
sentByte=0
file, _ := os.Open(fileName)
file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
for {
n, _ := file.Read(sendBuffer)
n2, err2 := vsConnection.Write(sendBuffer[:n])
if err2 != nil {
fmt.Println("err2",err2,chunkName)
}
if(n2!=65536){ //65536 is size of sendBuffer
fmt.Println("n2",n2)
}
sentByte = sentByte+int64(n)
if(sentByte == CHUNKSIZE){
break;
}
}
vsConnection.Close()
file.Close()
<-guard
}(i)
}
Below is the server code:
func main() {
mapping := cmap.New()
server, error := net.Listen("tcp", ":2001")
if error != nil {
fmt.Println("There was an error starting the server" + error.Error())
return
}
for {
connection, error := server.Accept()
if error != nil {
fmt.Println("There was am error with the connection" + error.Error())
return
}
//one goroutine per connection
go ConnectionHandler(connection,mapping)
}
}
func ConnectionHandler(connection net.Conn, mapping cmap.ConcurrentMap) {
fmt.Println("Connected")
//make a buffer to hold data
var bufferFile bytes.Buffer
writer := bufio.NewWriter(&bufferFile)
var receivedBytes int64
receivedBytes=0
for {
if(CHUNKSIZE<=receivedBytes){
break
}
n,err := io.CopyN(writer, connection, BUFFERSIZE)
receivedBytes += n
if err != nil {
fmt.Println("err", err.Error(), fileName)
break
}
}
mapping.Set(fileName,bufferFile.Bytes())
connection.Close()
}
Big thanks in advance.
答案1
得分: 3
在你的客户端中,sentByte
应该是发送协程的一个局部变量。由于你将其声明为全局变量,所以你的代码存在竞态条件。尝试以下修复方法:
go func(i int){
index := i%len(serverList)
vsConnection, _ := net.Dial("tcp", serverList[index])
sentByte := 0 // 将 sentByte 声明为局部变量,这样每个协程都有自己的副本
file, _ := os.Open(fileName)
file.Seek(int64(i)*CHUNKSIZE, 0) // CHUNKSIZE 是 134217728
for {
n, _ := file.Read(sendBuffer)
// ...
请返回翻译好的部分。
英文:
In your client sentByte
should be a local variable to the sender goroutine. Since you have declared it as a global, there is a race condition in your code. Try the below fix:
go func(i int){
index := i%len(serverList)
vsConnection, _ := net.Dial("tcp", serverList[index])
sentByte := 0 // make sentByte a local variable, so each goroutine
// has its own copy
file, _ := os.Open(fileName)
file.Seek(int64(i)*CHUNKSIZE,0) //CHUNKSIZE is 134217728
for {
n, _ := file.Read(sendBuffer)
// ...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论