等待直到Go中的映射中的某个值可用

huangapple go评论97阅读模式
英文:

Wait until a value in a map becomes available in Go

问题

我有一个程序,基本上有三种情况:设置键的值,如果存在则获取值,或者等待给定键的值可用。我的原始想法是创建一个带有map[string]interface{}的新类型,用于存储“持久化”值。此外,对于等待值,我计划使用map[string](chan struct{})。当调用Set()方法时,我会向该通道写入数据,任何等待它的人都会知道值已经存在。

我不知道键的预先情况-它们是随机的。我不确定如何正确实现Wait()方法。

问题是-在Wait()中存在竞争条件-在释放互斥锁之后,在开始监听通道以接收值之前。

如何处理这个问题?欢迎任何其他关于如何实现这个的建议,我相信一定有更好的方法。我不想使用固定间隔轮询值或类似的方法。

英文:

I have a program where I basically have three scenarios - set value for a key, get value if it's there, or wait until the value for a given key becomes available. My original idea - create a new type with a map[string]interface{} - where "persisted" values are stored. Besides that, for waiting on a value, I planned on using a map[string](chan struct{}). When a Set() method is invoked, I would write to that channel and anyone waiting on it would know the value is there.

I don't know the keys in advance - they're random. I'm not sure how to correctly implement the Wait() method.

type Map struct {
	sync.Mutex

	m    map[string]interface{}
	wait map[string]chan (struct{})
}


func (m *Map) Set(key string, value interface{}) {
	m.ensureWaitChan(key)

	m.Lock()
	defer m.Unlock()

	m.m[key] = value

	// Signal to all waiting.
	m.wait[key] <- struct{}{}
}


func (m *Map) Wait(key string) interface{} {
	m.ensureWaitChan(key)

	m.Lock()
	
	value, ok := m.m[key]
	if ok {
		m.Unlock()
		return value
	}

	m.Unlock()
	// <------ Unlocked state where something might happen.
	<-m.wait[key]

	value := m.m[key]

	return value	
}

// If the channel does not exist for those waiting - create it.
func (m *Map) ensureWaitChan(key string) {
	m.Lock()
	defer m.Unlock()

	_, ok := m.wait[key]
	if ok {
		return
	}

	m.wait[key] = make(chan struct{}, 100)
}

The problem is - there's a race condition in Wait() - after I release the mutex, and before I start listening on a channel for incoming value.

What would be the best way to handle this? Open to any other suggestions on how to implement this, I believe there must be a better way to do this. I would refrain from polling the value at fixed intervals or anything like that.

答案1

得分: 2

你要找的是一个同步映射和消息代理之间的混合体。我们可以通过利用通信和同步的通道来实现这一点,这样订阅者就可以在发布消息时立即接收到消息(如果它们尚未在缓存中)。

因为订阅者在等待期间必须解锁映射互斥锁,所以它们无法安全地访问添加到映射中的新消息。我们直接将新值发送到所有订阅者的自己的通道,这样我们就不需要在Set内部添加更多的同步来确保在解锁映射本身之前满足所有订阅者的要求。提前解锁映射将允许订阅者直接读取它,但也会允许在此期间插入新值,从而导致不一致的结果。

一个运行的版本,还包括一个带有类型参数的通用Map实现:https://go.dev/play/p/AN7VRSPdGmO

英文:

What you are looking for is a mix between a synchronized map and a message broker. We can do this by leveraging the channels for communication as well as synchronization, so that subscribers can receive the messages as soon as they are published if they are not yet in the cache.

type Map struct {
	sync.Mutex

	m    map[string]any
	subs map[string][]chan any
}

func (m *Map) Set(key string, value any) {
	m.Lock()
	defer m.Unlock()

	m.m[key] = value

	// Send the new value to all waiting subscribers of the key
	for _, sub := range m.subs[key] {
		sub <- value
	}
	delete(m.subs, key)
}

func (m *Map) Wait(key string) any {
	m.Lock()
	// Unlock cannot be deferred so we can unblock Set() while waiting

	value, ok := m.m[key]
	if ok {
		m.Unlock()
		return value
	}

	// if there is no value yet, subscribe to any new values for this key
	ch := make(chan any)
	m.subs[key] = append(m.subs[key], ch)
	m.Unlock()

	return <-ch
}

Because subscribers must unlock the map mutex while they wait, they cannot safely access new messages added to the map. We send the new value directly to all subscribers over their own channel so that we don't need to add more synchronization within Set to ensure that all subscribers are satisfied before unlocking the map itself. Unlocking the map early would allow subscribers to read it directly, but would also allow new values to be inserted in the meantime causing inconsistent results.

A running version, also including a generic Map implementation with type parameters: https://go.dev/play/p/AN7VRSPdGmO

答案2

得分: 0

这是一个使用条件变量和context.Context进行Wait取消的示例代码。

package main

import (
	"context"
	"fmt"
	"sync"
)

type Map struct {
	mu   sync.Mutex
	cond *sync.Cond
	m    map[string]interface{}
}

func (d *Map) Set(a string, b interface{}) {
	d.mu.Lock()
	defer d.mu.Unlock()
	d.m[a] = b
	d.cond.Broadcast()
}

func (d *Map) Get(a string) interface{} {
	d.mu.Lock()
	defer d.mu.Unlock()
	return d.m[a]
}

func (d *Map) WaitContext(ctx context.Context, a string) interface{} {
	quit := make(chan struct{})
	defer close(quit)
	var done bool
	go func() {
		select {
		case <-quit:
		case <-ctx.Done():
			d.mu.Lock()
			done = true
			d.cond.Broadcast()
			d.mu.Unlock()
		}
	}()
	d.mu.Lock()
	defer d.mu.Unlock()
	for !done {
		if b, ok := d.m[a]; ok {
			return b
		}
		d.cond.Wait()
	}
	return nil
}

func (d *Map) Wait(a string) interface{} {
	return d.WaitContext(context.Background(), a)
}

func main() {
	d := &Map{}
	d.m = make(map[string]interface{})
	d.cond = sync.NewCond(&d.mu)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		fmt.Println("等待并获取 c=", d.Wait("c"))
		wg.Done()
	}()

	d.Set("a", "apple")
	d.Set("b", "banana")
	fmt.Println("b=", d.Get("b"))
	fmt.Println("a=", d.Get("a"))
	fmt.Println("c=", d.Get("c"))
	d.Set("c", "cherry")
	fmt.Println("c=", d.Get("c"))
	wg.Wait()

	ctx, cancel := context.WithCancel(context.Background())
	wg.Add(1)
	go func() {
		fmt.Println("等待并获取 d=", d.WaitContext(ctx, "d"))
		wg.Done()
	}()
	cancel()
	wg.Wait()
}

希望对你有帮助!

英文:

This uses a condition variable, and context.Context for Wait cancellation.

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;sync&quot;
)

type Map struct {
	mu   sync.Mutex
	cond *sync.Cond
	m    map[string]interface{}
}

func (d *Map) Set(a string, b interface{}) {
	d.mu.Lock()
	defer d.mu.Unlock()
	d.m[a] = b
	d.cond.Broadcast()
}

func (d *Map) Get(a string) interface{} {
	d.mu.Lock()
	defer d.mu.Unlock()
	return d.m[a]
}

func (d *Map) WaitContext(ctx context.Context, a string) interface{} {
	quit := make(chan struct{})
	defer close(quit)
	var done bool
	go func() {
		select {
		case &lt;-quit:
		case &lt;-ctx.Done():
			d.mu.Lock()
			done = true
			d.cond.Broadcast()
			d.mu.Unlock()
		}
	}()
	d.mu.Lock()
	defer d.mu.Unlock()
	for !done {
		if b, ok := d.m[a]; ok {
			return b
		}
		d.cond.Wait()
	}
	return nil
}

func (d *Map) Wait(a string) interface{} {
	return d.WaitContext(context.Background(), a)
}

func main() {
	d := &amp;Map{}
	d.m = make(map[string]interface{})
	d.cond = sync.NewCond(&amp;d.mu)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		fmt.Println(&quot;waited and got c=&quot;, d.Wait(&quot;c&quot;))
		wg.Done()
	}()

	d.Set(&quot;a&quot;, &quot;apple&quot;)
	d.Set(&quot;b&quot;, &quot;banana&quot;)
	fmt.Println(&quot;b=&quot;, d.Get(&quot;b&quot;))
	fmt.Println(&quot;a=&quot;, d.Get(&quot;a&quot;))
	fmt.Println(&quot;c=&quot;, d.Get(&quot;c&quot;))
	d.Set(&quot;c&quot;, &quot;cherry&quot;)
	fmt.Println(&quot;c=&quot;, d.Get(&quot;c&quot;))
	wg.Wait()

	ctx, cancel := context.WithCancel(context.Background())
	wg.Add(1)
	go func() {
		fmt.Println(&quot;waited and got d=&quot;, d.WaitContext(ctx, &quot;d&quot;))
		wg.Done()
	}()
	cancel()
	wg.Wait()
}

答案3

得分: -1

不要发送struct{}{}到通道,而是直接关闭通道。之后对通道的所有读取都将是非阻塞的。
这也解决了你当前代码中的一个问题:如果有多个goroutine在等待相同的值,只有一个会收到信号。

你还应该检查通道是否已经关闭,因为关闭已经关闭的通道会导致恐慌。你可以通过使用带有默认情况的select语句进行非阻塞读取来实现这一点。

英文:

Instead of sending struct{}{} to the channel, you could just close the channel. All reads from channel after that would be non-blocking.
This also solves a problem, your code currently has: if multiple goroutines are waiting on the same value, only one would get the signal.

You should also check whether the channel has already been closed, since closing the channel twice woud cause panic. You can do that by non-blocking read from channel with select statement with default case.

huangapple
  • 本文由 发表于 2023年3月1日 21:31:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/75604402.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定