英文:
Wait until a value in a map becomes available in Go
问题
我有一个程序,基本上有三种情况:设置键的值,如果存在则获取值,或者等待给定键的值可用。我的原始想法是创建一个带有map[string]interface{}
的新类型,用于存储“持久化”值。此外,对于等待值,我计划使用map[string](chan struct{})
。当调用Set()
方法时,我会向该通道写入数据,任何等待它的人都会知道值已经存在。
我不知道键的预先情况-它们是随机的。我不确定如何正确实现Wait()
方法。
问题是-在Wait()
中存在竞争条件-在释放互斥锁之后,在开始监听通道以接收值之前。
如何处理这个问题?欢迎任何其他关于如何实现这个的建议,我相信一定有更好的方法。我不想使用固定间隔轮询值或类似的方法。
英文:
I have a program where I basically have three scenarios - set value for a key, get value if it's there, or wait until the value for a given key becomes available. My original idea - create a new type with a map[string]interface{}
- where "persisted" values are stored. Besides that, for waiting on a value, I planned on using a map[string](chan struct{})
. When a Set()
method is invoked, I would write to that channel and anyone waiting on it would know the value is there.
I don't know the keys in advance - they're random. I'm not sure how to correctly implement the Wait()
method.
type Map struct {
sync.Mutex
m map[string]interface{}
wait map[string]chan (struct{})
}
func (m *Map) Set(key string, value interface{}) {
m.ensureWaitChan(key)
m.Lock()
defer m.Unlock()
m.m[key] = value
// Signal to all waiting.
m.wait[key] <- struct{}{}
}
func (m *Map) Wait(key string) interface{} {
m.ensureWaitChan(key)
m.Lock()
value, ok := m.m[key]
if ok {
m.Unlock()
return value
}
m.Unlock()
// <------ Unlocked state where something might happen.
<-m.wait[key]
value := m.m[key]
return value
}
// If the channel does not exist for those waiting - create it.
func (m *Map) ensureWaitChan(key string) {
m.Lock()
defer m.Unlock()
_, ok := m.wait[key]
if ok {
return
}
m.wait[key] = make(chan struct{}, 100)
}
The problem is - there's a race condition in Wait()
- after I release the mutex, and before I start listening on a channel for incoming value.
What would be the best way to handle this? Open to any other suggestions on how to implement this, I believe there must be a better way to do this. I would refrain from polling the value at fixed intervals or anything like that.
答案1
得分: 2
你要找的是一个同步映射和消息代理之间的混合体。我们可以通过利用通信和同步的通道来实现这一点,这样订阅者就可以在发布消息时立即接收到消息(如果它们尚未在缓存中)。
因为订阅者在等待期间必须解锁映射互斥锁,所以它们无法安全地访问添加到映射中的新消息。我们直接将新值发送到所有订阅者的自己的通道,这样我们就不需要在Set
内部添加更多的同步来确保在解锁映射本身之前满足所有订阅者的要求。提前解锁映射将允许订阅者直接读取它,但也会允许在此期间插入新值,从而导致不一致的结果。
一个运行的版本,还包括一个带有类型参数的通用Map
实现:https://go.dev/play/p/AN7VRSPdGmO
英文:
What you are looking for is a mix between a synchronized map and a message broker. We can do this by leveraging the channels for communication as well as synchronization, so that subscribers can receive the messages as soon as they are published if they are not yet in the cache.
type Map struct {
sync.Mutex
m map[string]any
subs map[string][]chan any
}
func (m *Map) Set(key string, value any) {
m.Lock()
defer m.Unlock()
m.m[key] = value
// Send the new value to all waiting subscribers of the key
for _, sub := range m.subs[key] {
sub <- value
}
delete(m.subs, key)
}
func (m *Map) Wait(key string) any {
m.Lock()
// Unlock cannot be deferred so we can unblock Set() while waiting
value, ok := m.m[key]
if ok {
m.Unlock()
return value
}
// if there is no value yet, subscribe to any new values for this key
ch := make(chan any)
m.subs[key] = append(m.subs[key], ch)
m.Unlock()
return <-ch
}
Because subscribers must unlock the map mutex while they wait, they cannot safely access new messages added to the map. We send the new value directly to all subscribers over their own channel so that we don't need to add more synchronization within Set
to ensure that all subscribers are satisfied before unlocking the map itself. Unlocking the map early would allow subscribers to read it directly, but would also allow new values to be inserted in the meantime causing inconsistent results.
A running version, also including a generic Map
implementation with type parameters: https://go.dev/play/p/AN7VRSPdGmO
答案2
得分: 0
这是一个使用条件变量和context.Context
进行Wait
取消的示例代码。
package main
import (
"context"
"fmt"
"sync"
)
type Map struct {
mu sync.Mutex
cond *sync.Cond
m map[string]interface{}
}
func (d *Map) Set(a string, b interface{}) {
d.mu.Lock()
defer d.mu.Unlock()
d.m[a] = b
d.cond.Broadcast()
}
func (d *Map) Get(a string) interface{} {
d.mu.Lock()
defer d.mu.Unlock()
return d.m[a]
}
func (d *Map) WaitContext(ctx context.Context, a string) interface{} {
quit := make(chan struct{})
defer close(quit)
var done bool
go func() {
select {
case <-quit:
case <-ctx.Done():
d.mu.Lock()
done = true
d.cond.Broadcast()
d.mu.Unlock()
}
}()
d.mu.Lock()
defer d.mu.Unlock()
for !done {
if b, ok := d.m[a]; ok {
return b
}
d.cond.Wait()
}
return nil
}
func (d *Map) Wait(a string) interface{} {
return d.WaitContext(context.Background(), a)
}
func main() {
d := &Map{}
d.m = make(map[string]interface{})
d.cond = sync.NewCond(&d.mu)
var wg sync.WaitGroup
wg.Add(1)
go func() {
fmt.Println("等待并获取 c=", d.Wait("c"))
wg.Done()
}()
d.Set("a", "apple")
d.Set("b", "banana")
fmt.Println("b=", d.Get("b"))
fmt.Println("a=", d.Get("a"))
fmt.Println("c=", d.Get("c"))
d.Set("c", "cherry")
fmt.Println("c=", d.Get("c"))
wg.Wait()
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
fmt.Println("等待并获取 d=", d.WaitContext(ctx, "d"))
wg.Done()
}()
cancel()
wg.Wait()
}
希望对你有帮助!
英文:
This uses a condition variable, and context.Context
for Wait
cancellation.
package main
import (
"context"
"fmt"
"sync"
)
type Map struct {
mu sync.Mutex
cond *sync.Cond
m map[string]interface{}
}
func (d *Map) Set(a string, b interface{}) {
d.mu.Lock()
defer d.mu.Unlock()
d.m[a] = b
d.cond.Broadcast()
}
func (d *Map) Get(a string) interface{} {
d.mu.Lock()
defer d.mu.Unlock()
return d.m[a]
}
func (d *Map) WaitContext(ctx context.Context, a string) interface{} {
quit := make(chan struct{})
defer close(quit)
var done bool
go func() {
select {
case <-quit:
case <-ctx.Done():
d.mu.Lock()
done = true
d.cond.Broadcast()
d.mu.Unlock()
}
}()
d.mu.Lock()
defer d.mu.Unlock()
for !done {
if b, ok := d.m[a]; ok {
return b
}
d.cond.Wait()
}
return nil
}
func (d *Map) Wait(a string) interface{} {
return d.WaitContext(context.Background(), a)
}
func main() {
d := &Map{}
d.m = make(map[string]interface{})
d.cond = sync.NewCond(&d.mu)
var wg sync.WaitGroup
wg.Add(1)
go func() {
fmt.Println("waited and got c=", d.Wait("c"))
wg.Done()
}()
d.Set("a", "apple")
d.Set("b", "banana")
fmt.Println("b=", d.Get("b"))
fmt.Println("a=", d.Get("a"))
fmt.Println("c=", d.Get("c"))
d.Set("c", "cherry")
fmt.Println("c=", d.Get("c"))
wg.Wait()
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
fmt.Println("waited and got d=", d.WaitContext(ctx, "d"))
wg.Done()
}()
cancel()
wg.Wait()
}
答案3
得分: -1
不要发送struct{}{}
到通道,而是直接关闭通道。之后对通道的所有读取都将是非阻塞的。
这也解决了你当前代码中的一个问题:如果有多个goroutine在等待相同的值,只有一个会收到信号。
你还应该检查通道是否已经关闭,因为关闭已经关闭的通道会导致恐慌。你可以通过使用带有默认情况的select语句进行非阻塞读取来实现这一点。
英文:
Instead of sending struct{}{}
to the channel, you could just close the channel. All reads from channel after that would be non-blocking.
This also solves a problem, your code currently has: if multiple goroutines are waiting on the same value, only one would get the signal.
You should also check whether the channel has already been closed, since closing the channel twice woud cause panic. You can do that by non-blocking read from channel with select statement with default case.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论