英文:
How to use RWMutex?
问题
type Stat struct {
counters map[string]*int64
countersLock sync.RWMutex
averages map[string]*int64
averagesLock sync.RWMutex
}
下面是对它的调用:
func (s *Stat) Count(name string) {
s.countersLock.RLock()
counter := s.counters[name]
s.countersLock.RUnlock()
if counter != nil {
atomic.AddInt64(counter, int64(1))
return
}
}
我的理解是,我们首先锁定接收器 s(类型为 Stat),然后如果计数器存在,就向其添加值。
问题:
Q1:为什么我们需要锁定它?RWMutex
是什么意思?
Q2:s.countersLock.RLock()
- 这是锁定整个接收器 s 还是只锁定类型 Stat 中的 counters 字段?
Q3:s.countersLock.RLock()
- 这会锁定 averages 字段吗?
Q4:为什么我们要使用 RWMutex
?我以为在 Golang 中处理并发的首选方式是使用通道(channel)?
Q5:atomic.AddInt64
是什么意思?为什么在这种情况下需要原子操作?
Q6:为什么我们在添加值之前解锁?
英文:
type Stat struct {
counters map[string]*int64
countersLock sync.RWMutex
averages map[string]*int64
averagesLock sync.RWMutex
}
It is called below
func (s *Stat) Count(name string) {
s.countersLock.RLock()
counter := s.counters[name]
s.countersLock.RUnlock()
if counter != nil {
atomic.AddInt64(counter, int64(1))
return
}
}
My understanding is that we first lock the receiver s (which is a type Stat) and then we add to it if the counter does exist.
Questions:
Q1: why do we need to lock it? What does RWMutex
even mean?
Q2: s.countersLock.RLock()
- does this lock up the entire receiver s or only the counters field in type Stat?
Q3: s.countersLock.RLock()
- does this lock up the averages field?
Q4: Why should we use RWMutex
? I thought channel was the preferred way to handle concurrency in Golang?
Q5: What is this atomic.AddInt64
. Why do we need atomic in this case?
Q6: Why would we unlock right before we add to it?
答案1
得分: 184
当多个线程需要改变同一个值时,需要使用锁机制来同步访问。如果没有锁机制,两个或多个线程可能会同时写入同一个值,导致内存损坏,通常会导致崩溃。
atomic包提供了一种快速简便的方式来同步对原始值的访问。对于计数器来说,它是最快的同步方法。它具有一些具有明确定义用例的方法,例如增加、减少、交换等。
sync包提供了一种同步访问更复杂的值的方式,例如映射、切片、数组或一组值。您可以在不在atomic中定义的用例中使用它。
在任何情况下,只有在写入时才需要锁定。多个线程可以安全地读取相同的值而不需要锁定机制。
让我们来看一下您提供的代码。
type Stat struct {
counters map[string]*int64
countersLock sync.RWMutex
averages map[string]*int64
averagesLock sync.RWMutex
}
func (s *Stat) Count(name string) {
s.countersLock.RLock()
counter := s.counters[name]
s.countersLock.RUnlock()
if counter != nil {
atomic.AddInt64(counter, int64(1))
return
}
}
这里缺少的是如何初始化映射本身。到目前为止,映射没有被修改。如果计数器名称是预先确定的,并且不能后期添加,您不需要RWMutex。代码可能如下所示:
type Stat struct {
counters map[string]*int64
}
func InitStat(names... string) Stat {
counters := make(map[string]*int64)
for _, name := range names {
counter := int64(0)
counters[name] = &counter
}
return Stat{counters}
}
func (s *Stat) Count(name string) int64 {
counter := s.counters[name]
if counter == nil {
return -1 // (int64, error) instead?
}
return atomic.AddInt64(counter, 1)
}
(注意:我删除了averages,因为它在原始示例中没有被使用。)
现在,假设您不希望计数器预先确定。在这种情况下,您将需要一个互斥锁来同步访问。
让我们尝试使用一个Mutex。它很简单,因为一次只有一个线程可以持有Lock。如果第二个线程在第一个线程使用Unlock之前尝试Lock,它会等待(或阻塞)直到第一个线程释放锁。
type Stat struct {
counters map[string]*int64
mutex sync.Mutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]*int64)}
}
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
s.mutex.Unlock()
return atomic.AddInt64(counter, 1)
}
上面的代码将正常工作。但是存在两个问题。
- 如果在Lock和Unlock之间发生panic,互斥锁将永远被锁定,即使您从panic中恢复也是如此。这段代码可能不会引发panic,但通常最好假设可能会发生panic。
- 在获取计数器时,会获取独占锁。一次只有一个线程可以从计数器中读取。
问题1很容易解决。使用defer:
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
return atomic.AddInt64(counter, 1)
}
这样可以确保始终调用Unlock。如果由于某种原因您有多个返回语句,您只需要在函数开头指定一次Unlock。
问题2可以使用RWMutex解决。它是如何工作的,为什么有用呢?
RWMutex是Mutex的扩展,添加了两个方法:RLock和RUnlock。关于RWMutex有几点重要的注意事项:
-
RLock是一个共享的读锁。当使用它获取锁时,其他线程也可以使用RLock获取自己的锁。这意味着多个线程可以同时读取。它是半独占的。
-
如果互斥锁被读锁定,调用Lock将被阻塞。如果一个或多个读者持有锁,您不能写入。
一个好的思路是将RWMutex看作是带有读取计数器的Mutex。RLock增加计数器,而RUnlock减少计数器。调用Lock将在计数器大于0时阻塞。
您可能会想:如果我的应用程序以读为主,那么写入者是否可能被无限期地阻塞?不会的。RWMutex还有一个有用的属性:
可以将其视为杂货店收银台上方显示的灯,指示收银员是否开放。排队的人可以等待并得到帮助,但新的人不能排队。只要最后一个顾客得到帮助,收银员就休息了,该收银台要么保持关闭直到他们回来,要么用另一个收银员替换。
让我们使用RWMutex修改之前的示例:
type Stat struct {
counters map[string]*int64
mutex sync.RWMutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]*int64)}
}
func (s *Stat) Count(name string) int64 {
var counter *int64
if counter = getCounter(name); counter == nil {
counter = initCounter(name);
}
return atomic.AddInt64(counter, 1)
}
func (s *Stat) getCounter(name string) *int64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.counters[name]
}
func (s *Stat) initCounter(name string) *int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
return counter
}
通过上面的代码,我将逻辑分离到getCounter
和initCounter
函数中:
- 使代码易于理解。在同一个函数中使用RLock()和Lock()会很困难。
- 尽早释放锁,同时使用defer。
与Mutex示例不同,上面的代码允许同时递增不同的计数器。
我还想指出的另一件事是,所有上面的示例中,映射map[string]*int64
包含指向计数器的指针,而不是计数器本身。如果您将计数器存储在映射map[string]int64
中,您将需要使用Mutex而不是atomic。代码可能如下所示:
type Stat struct {
counters map[string]int64
mutex sync.Mutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]int64)}
}
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
s.counters[name]++
return s.counters[name]
}
您可能希望这样做以减少垃圾回收的次数,但这只有在有数千个计数器的情况下才会有所影响,即使在这种情况下,计数器本身所占用的空间也不多(与字节缓冲区等相比)。
英文:
When more than one thread* needs to mutate the same value, a locking mechanism is needed to synchronizes access. Without it two or more threads* could be writing to the same value at the same time, resulting in corrupt memory that typically results in a crash.
The atomic package provides a fast and easy way to synchronize access to primitive values. For a counter it is the fastest synchronization method. It has methods with well defined use cases, such as incrementing, decrementing, swapping, etc.
The sync package provides a way to synchronize access to more complicated values, such as maps, slices, arrays, or groups of values. You use this for use cases that are not defined in atomic.
In either case locking is only required when writing. Multiple threads* can safely read the same value without a locking mechanism.
Lets take a look at the code you provided.
type Stat struct {
counters map[string]*int64
countersLock sync.RWMutex
averages map[string]*int64
averagesLock sync.RWMutex
}
func (s *Stat) Count(name string) {
s.countersLock.RLock()
counter := s.counters[name]
s.countersLock.RUnlock()
if counter != nil {
atomic.AddInt64(counter, int64(1))
return
}
}
What's missing here is how the map's themselves are initialized. And so far the maps are not being mutated. If the counter names are predetermined and cannot be added to later, you don't need the RWMutex. That code might look something like this:
type Stat struct {
counters map[string]*int64
}
func InitStat(names... string) Stat {
counters := make(map[string]*int64)
for _, name := range names {
counter := int64(0)
counters[name] = &counter
}
return Stat{counters}
}
func (s *Stat) Count(name string) int64 {
counter := s.counters[name]
if counter == nil {
return -1 // (int64, error) instead?
}
return atomic.AddInt64(counter, 1)
}
(Note: I removed averages because it wasn't being used in the original example.)
Now, lets say you didn't want your counters to be predetermined. In that case you would need a mutex to synchronize access.
Lets try it with just a Mutex. It's simple because only one thread* can hold Lock at a time. If a second thread* tries to Lock before the first releases theirs with Unlock, it waits (or blocks)** until then.
type Stat struct {
counters map[string]*int64
mutex sync.Mutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]*int64)}
}
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
s.mutex.Unlock()
return atomic.AddInt64(counter, 1)
}
The code above will work just fine. But there are two problems.
- If there is a panic between Lock() and Unlock() the mutex will be locked forever, even if you were to recover from the panic. This code probably won't panic, but in general it's better practice to assume it might.
- An exclusive lock is taken while fetching the counter. Only one thread* can read from the counter at one time.
Problem #1 is easy to solve. Use defer:
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
return atomic.AddInt64(counter, 1)
}
This ensures that Unlock() is always called. And if for some reason you have more then one return, you only need to specify Unlock() once at the head of the function.
Problem #2 can be solved with RWMutex. How does it work exactly, and why is it useful?
RWMutex is an extension of Mutex and adds two methods: RLock and RUnlock. There are a few points that are important to note about RWMutex:
-
RLock is a shared read lock. When a lock is taken with it, other threads* can also take their own lock with RLock. This means multiple threads* can read at the same time. It's semi-exclusive.
-
If the mutex is read locked, a call to Lock is blocked**. If one or more readers hold a lock, you cannot write.
-
If the mutex is write locked (with Lock), RLock will block**.
A good way to think about it is RWMutex is a Mutex with a reader counter. RLock increments the counter while RUnlock decrements it. A call to Lock will block as long as that counter is > 0.
You may be thinking: If my application is read heavy, would that mean a writer could be blocked indefinitely? No. There is one more useful property of RWMutex:
- If the reader counter is > 0 and Lock is called, future calls to RLock will also block until the existing readers have released their locks, the writer has obtained his lock and later releases it.
Think of it as the light above a register at the grocery store that says a cashier is open or not. The people in line get to stay there and they will be helped, but new people cannot get in line. As soon as the last remaining customer is helped the cashier goes on break, and that register either remains closed until they come back or they are replaced with a different cashier.
Lets modify the earlier example with an RWMutex:
type Stat struct {
counters map[string]*int64
mutex sync.RWMutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]*int64)}
}
func (s *Stat) Count(name string) int64 {
var counter *int64
if counter = getCounter(name); counter == nil {
counter = initCounter(name);
}
return atomic.AddInt64(counter, 1)
}
func (s *Stat) getCounter(name string) *int64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.counters[name]
}
func (s *Stat) initCounter(name string) *int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
return counter
}
With the code above I've separated the logic out into getCounter
and initCounter
functions to:
- Keep the code simple to understand. It would be difficult to RLock() and Lock() in the same function.
- Release the locks as early as possible while using defer.
The code above, unlike the Mutex example, allows you to increment different counters simultaneously.
Another thing I wanted to point out is with all the examples above, the map map[string]*int64
contains pointers to the counters, not the counters themselves. If you were to store the counters in the map map[string]int64
you would need to use Mutex without atomic. That code would look something like this:
type Stat struct {
counters map[string]int64
mutex sync.Mutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]int64)}
}
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
s.counters[name]++
return s.counters[name]
}
You may want to do this to reduce garbage collection - but that would only matter if you had thousands of counters - and even then the counters themselves don't take up a whole lot of space (compared to something like a byte buffer).
*
When I say thread I mean go-routine. A thread in other languages is a mechanism for running one or more sets of code simultaneously. A thread is expensive to create and tear-down. A go-routine is built on top of threads, but re-uses them. When a go-routine sleeps the underlying thread can be used by another go-routine. When a go-routine wakes up, it might be on a different thread. Go handles all this behind the scenes. -- But for all intents and purposes you would treat a go-routine like a thread when it comes to memory access. However, you don't have to be as conservative when using go-routines as you do threads.
**
When a go-routine is blocked by Lock
, RLock
, a channel, or Sleep, the underlying thread might be re-used. No cpu is used by that go-routine - think of it as waiting in line. Like other languages an infinite loop like for {}
would block while keeping the cpu and go-routine busy - think of that as running around in a circle - you'll get dizzy, throw up, and the people around you won't be very happy.
答案2
得分: 50
问题:
Q1:为什么我们需要锁定它?RWMutex
是什么意思?
RW代表读/写。参考文档:http://golang.org/pkg/sync/#RWMutex。
我们需要锁定它,以防止其他例程/线程在我们处理它时更改该值。
Q2:s.countersLock.RLock()
- 这会锁定整个接收器s还是只锁定Stat类型中的counters字段?
作为互斥锁,只有在调用RLock()
函数时才会发生锁定。如果其他任何goroutine已经调用了WLock()
,那么它会阻塞。您可以在同一goroutine中调用任意数量的RLock()
,它不会锁定。
因此,它不会锁定任何其他字段,甚至不会锁定s.counters
。在您的示例中,您锁定了map查找以找到正确的计数器。
Q3:s.countersLock.RLock()
- 这会锁定averages字段吗?
不会,如Q2所述,RLock
只锁定自己。
Q4:为什么我们应该使用RWMutex
?我以为在Golang中处理并发的首选方式是使用通道?
通道非常有用,但有时不够用,有时也不合理。
在这里,由于您锁定了map访问,互斥锁是有意义的。使用通道,您必须有一个缓冲区为1的通道,在发送之前和接收之后进行操作。这不太直观。
Q5:这个atomic.AddInt64
是什么?为什么在这种情况下我们需要原子操作?
这个函数将以原子方式增加给定的变量。在您的情况下,存在竞态条件:counter
是一个指针,锁释放后和调用atomic.AddInt64
之前,实际变量可能被销毁。如果您对这种情况不熟悉,我建议您坚持使用互斥锁,并在锁定/解锁之间进行所有处理。
Q6:为什么我们在添加之前解锁?
你不应该这样做。
我不知道你想做什么,但这是一个(简单)的示例:https://play.golang.org/p/cVFPB-05dw
英文:
> Questions:
>
> Q1: why do we need to lock it? What does RWMutex
even mean?
RW stands for Read/Write. CF doc: http://golang.org/pkg/sync/#RWMutex.
We need to lock it to prevent other routines/thread to change the value while we process it.
> Q2: s.countersLock.RLock()
- does this lock up the entire receiver s
> or only the counters field in type Stat?
As a mutex, the lock occurs only when you call the RLock()
function. If any other goroutine already called the WLock()
, then it blocks. You can call any number of RLock()
within the same goroutine, it won't lock.
So it does not lock any other fields, not even s.counters
. In your example, you lock the map lookup to find the correct counter.
> Q3: s.countersLock.RLock()
- does this lock up the averages field?
No, as said in Q2, a RLock
locks only himself.
> Q4: Why should we use RWMutex
? I thought channel was the preferred way
> to handle concurrency in Golang?
Channel is very useful but sometimes it is not enough and sometimes it does not make sense.
Here, as you lock the map access, a mutex makes sense. With a chan, you'd have to have a buffered chan of 1, send before and receive after. Not very intuitive.
> Q5: What is this atomic.AddInt64
. Why do we need atomic in this case?
This function will increment the given variable in an atomic way. In your case, you have a race condition: counter
is a pointer and the actual variable can be destroyed after the release of the lock and before the call to atomic.AddInt64
.
If you are not familiar with this kind of things, I'd advise you to stick with Mutexes and do all processing you need in between the lock/unlock.
> Q6: Why would we unlock right before we add to it?
You should not.
I don't know what you are trying to do, but here is a (simple) example: https://play.golang.org/p/cVFPB-05dw
答案3
得分: 7
让我们将其与常规的sync.Mutex
进行比较,其中一次只有一个消费者可以持有锁定。并使用一个有趣的类比:想象一个大而美味的草莓奶昔,需要被一群朋友(消费者)分享。
朋友们想要分享奶昔,并决定使用一个独占的吸管(锁),所以一次只有一个朋友可以从吸管中喝。朋友调用m.Lock()
表示他们想要喝。如果没有人在喝,他们就可以继续喝,但如果其他人已经在使用吸管,他们必须等待(阻塞),直到前一个朋友喝完并在他们这边调用m.Unlock()
。
\\ | |
\\ |__|
m.Lock()
m.Unlock()
现在我们来看看sync.RWMutex
(读写互斥锁),在这里任意数量的读者可以持有锁,或者一个写者可以持有锁。
在草莓奶昔的类比中,朋友们决定使用许多“读者”吸管和一个独占的“写者”吸管来分享奶昔。朋友调用m.RLock()
表示他们想要用一个“读者”吸管喝,可以与其他读者同时喝。然而,独占的“写者”吸管的工作方式与之前相同。当有人调用m.Lock()
时,他们表示他们想要独自喝。此时,每个人都被阻塞,直到所有“读者”吸管都喝完(调用m.RUnlock()
)。然后,独占的写者开始独自喝。任何对m.RLock()
或m.Lock()
的其他调用都必须等待,直到拥有独占的“写者”吸管的朋友喝完(直到他们调用m.Unlock()
)。
\\ | | // // // //
\\ |__| // // // // ...
m.Lock() m.RLock()
m.Unlock() m.RUnlock()
术语“读者”和“写者”之所以被使用,是因为这是最常见的情况。并发的内存读取是可以的,但写入必须是顺序的。如果一个进程正在尝试读取一个内存地址,而另一个进程正在写入,那可能会导致内存损坏。
英文:
Let's compare it to the regular sync.Mutex
, where only one consumer can hold the lock at a given time. And use a fun analogy: imagine a big delicious strawberry milkshake, that needs to be shared by a bunch of friends (consumers).
The friends want to share the milkshake and decide to use a single exclusive straw (the lock), so only one friend can drink from the straw at a given time. A friend calling m.Lock()
signals that they want to drink. If no one is drinking they go ahead, but someone else was already using the straw, they have to wait (block) until the previous friend is done drinking and calls m.Unlock()
on their side.
\\ | |
\\ |__|
m.Lock()
m.Unlock()
Let's move into the sync.RWMutex
(Read Write Mutex), where any number of readers can hold the lock, or a single writer can hold the lock.
On the strawberry milkshake analogy, the friends decide to share the milkshake with many "reader" straws, and one single exclusive "writer" straw. A friend calling m.RLock()
signals that they want to drink with one of the "reader" straws, and can star drinking along with other readers at the same time. However, the exclusive "writer" straw works like before. When someone calls m.Lock()
, they signal that they want to drink alone. At that moment, everyone is blocked until all "reader" straws are done drinking (calling m.RUnlock()
). Then, the exclusive writer starts drinking alone. Any other call to either m.RLock()
or m.Lock()
has to wait until the friend with the exclusive "writer" straw is done drinking (until they call m.Unlock()
).
\\ | | // // // //
\\ |__| // // // // ...
m.Lock() m.RLock()
m.Unlock() m.RUnlock()
The terminology "reader" and "writer" is used because that is the most common scenario. Concurrent memory reads are fine, but writes have to be sequential. If one process is trying to read a memory address, while another process is writing, that could cause memory corruption.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论