英文:
Race condition while trying to update a map
问题
简介
我正在尝试使用Go构建一个客户端-服务器应用程序。
服务器只有一个实例,其中包含任务列表,这些任务只是字符串数据。服务器维护一个任务及其分配状态的映射。
另一方面,客户端可以有多个实例,并向服务器请求任务。当任务被分配时,服务器将分配状态更新为true。
客户端和服务器之间的通信是通过Go的RPC进行的。
为了模拟多个客户端,我在单个应用程序中创建了多个Go协程,这些协程向服务器发出请求。
代码
服务器
package main
import (
"fmt"
"log"
"net/http"
"net/rpc"
"sync"
)
type Container struct {
mu sync.Mutex
list map[string]bool
}
var ListOfTasks Container
func (c *Container) UpdateList() string {
var t string
ListOfTasks.mu.Lock()
defer ListOfTasks.mu.Unlock()
for k, v := range c.list {
if !v {
c.list[k] = true
fmt.Println("Task Name", k)
return k
}
}
return t
}
// RPC Code
type RPCServer struct{}
type Input struct{}
type Output struct {
Message string
}
func (s *RPCServer) GetTask(i *Input, o *Output) error {
o.Message = ListOfTasks.UpdateList()
return nil
}
//
func init() {
ListOfTasks = Container{}
ListOfTasks.list = map[string]bool{
"task1": false,
"task2": false,
"task3": false,
}
}
func main() {
some := new(RPCServer)
rpc.Register(some)
rpc.HandleHTTP()
fmt.Println("Listening on port 1234")
err := http.ListenAndServe(":1234", nil)
if err != nil {
log.Fatal("bruh", err.Error())
}
}
客户端
package main
import (
"fmt"
"log"
"net/rpc"
"sync"
)
type Input struct {
Name string
}
type Output struct {
Message string
}
func main() {
client, err := rpc.DialHTTP("tcp", "localhost:1234")
if err != nil {
log.Fatal("bruh!", err.Error())
}
arg := &Input{}
reply := Output{}
var wg sync.WaitGroup
wg.Add(4)
for i := 0; i < 4; i++ {
go func(gori int) {
defer wg.Done()
err = client.Call("RPCServer.GetTask", arg, &reply)
if err != nil {
log.Fatal("bruh!", err.Error())
}
fmt.Printf("#%d: message: %s\n", gori, reply.Message)
}(i)
}
wg.Wait()
}
问题
当多个客户端向服务器请求任务时,我们遇到了竞态条件,这不会被Go语言的竞争检测器检测到。可以在应用程序的输出中观察到这一点。
服务器的输出
Listening on port 1234
Task Name task1
Task Name task2
Task Name task3
客户端的输出
#1: message: task3
#2: message: task3
#0: message: task3
#3: message: task3
服务器为每个工作线程分配了不同的任务,但在工作线程的端口,每个人都收到了相同的任务。
我在整个寻找未分配任务的for循环上加了锁。
我尝试过的其他方法(但没有成功)
-
使用
sync.Map
:仍然遇到了竞态条件,这被Go语言的竞争检测器检测到了。 -
只对
c.list[k] = true
应用锁:导致了我现在面临的相同问题。
我怀疑我应用锁的方式有问题,但我不明白我做错了什么。
我知道问题是由于有多个Go协程尝试从和更新映射中读取数据而引起的。因此,我在读取和写入发生的整个部分上应用了锁。
我阅读的大多数文章或StackOverflow帖子都将服务器和客户端放在同一个应用程序中,并通过使用通道来避免此问题。我不确定如何在我的用例中使用通道。
此外,最终这将成为一个MapReduce应用程序,因此我将不得不存储除了表示分配状态的标志之外的更多数据。
感谢您的帮助!
英文:
Introduction
I am trying to build a client-server application in /go.
There is only one instance for the server which contains list of tasks, which are nothing but string data. The server maintains a map of task and it's allocation status.
On the other hand the client can have multiple instances, and requests the server for the tasks. When a task is allocated the server updates allocation status to true.
The communication between the client and server is over go rpc.
Please refer to this for better understanding.
To mimic multiple clients, I am spinning up multiple go routines in a single application that make request to the server.
Code
Server
package main
import (
"fmt"
"log"
"net/http"
"net/rpc"
"sync"
)
type Container struct {
mu sync.Mutex
list map[string]bool
}
var ListOfTasks Container
func (c *Container) UpdateList() string {
var t string
ListOfTasks.mu.Lock()
defer ListOfTasks.mu.Unlock()
for k, v := range c.list {
if !v {
c.list[k] = true
fmt.Println("Task Name", k)
return k
}
}
return t
}
// RPC Code
type RPCServer struct{}
type Input struct {}
type Output struct {
Message string
}
func (s *RPCServer) GetTask(i *Input, o *Output) error {
o.Message = ListOfTasks.UpdateList()
return nil
}
//
func init() {
ListOfTasks = Container{}
ListOfTasks.list = map[string]bool{
"task1" : false,
"task2" : false,
"task3" : false,
}
}
func main() {
some := new(RPCServer)
rpc.Register(some)
rpc.HandleHTTP()
fmt.Println("Listening on port 1234")
err := http.ListenAndServe(":1234", nil)
if err != nil {
log.Fatal("bruh", err.Error())
}
}
Client
package main
import (
"fmt"
"log"
"net/rpc"
"sync"
)
type Input struct {
Name string
}
type Output struct {
Message string
}
func main() {
client, err := rpc.DialHTTP("tcp", "localhost:1234")
if err != nil {
log.Fatal("bruh!", err.Error())
}
arg := &Input{}
reply := Output{}
var wg sync.WaitGroup
wg.Add(4)
for i := 0; i< 4; i++ {
go func(gori int) {
defer wg.Done()
err = client.Call("RPCServer.GetTask", arg, &reply)
if err != nil {
log.Fatal("bruh!", err.Error())
}
fmt.Printf("#%d: message: %s\n", gori, reply.Message)
}(i)
}
wg.Wait()
}
The Problem
When multiple clients make request to the server asking for a task, we run into a race condition, which is not detected by the golang's race detector. It can be observed in the application's output.
Server's output
Listening on port 1234
Task Name task1
Task Name task2
Task Name task3
Client's output
#1: message: task3
#2: message: task3
#0: message: task3
#3: message: task3
The server allocates different tasks to each worker but on the worker's end, everyone receives the same task.
I have lock over the entire for loop where we are searching for an unallocated task.
Other things that I have tried (but didn't work)
-
Using
sync.Map
: Still ended up with a race condition, which was detected by the golang race detector. -
Applying lock only on
c.list[k] = true
: Which caused the same issue that I am facing now.
I have my suspicion on the way I am applying the lock, but I don't understand what I am doing wrong.
I know the problem is caused since there are multiple go routines trying to read from and update the map. And therefore I am applying the lock on the entire section where the reading and writing happens.
Most of the articles or StackOverflow posts that I read had both the server and client in the same application and avoided this problem by using channels. I am not sure how do I use channels in my use case.
Also, eventually this will become a map-reduce application, so I would have to store much more data other than a flag representing the allocation status.
Any help is appreciated, Thanks!
答案1
得分: 5
服务器是正确的(尽管代码有点混乱--为什么在全局的ListOfTasks
上使用互斥锁,当你有一个指针接收器指向你的Collection
?)。
但是客户端代码是错误的:你在客户端的所有goroutine中都使用了相同的&reply
指针,这意味着当一个客户端goroutine接收到一个回复时,所有的goroutine都会看到它,然后在接收到回复和写入回复之间存在竞争。你需要将reply := Output{}
移到goroutine中。
我相信竞争检测器会检测到这个问题,但你必须在客户端上运行它,而不是在服务器上运行。
英文:
The server is correct (although the code is a little muddled -- why use the mutex on the global ListOfTasks
when you have a pointer receiver to your Collection
?).
But the client code it wrong: you are using the same &reply
pointer in all of your goroutines in the client, meaning that when one client goroutine receives a reply, all the goroutines see it, and then there's a race between receiving the reply and writing it. You need to move the reply := Output{}
into the goroutine.
I believe the race detector will detect this, but you have to run it on the client, not on the server.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论