GoLang 顺序协程

huangapple go评论79阅读模式
英文:

GoLang sequential goroutines

问题

我是新手学习golang,有一个使用场景,其中对一种类型的值的操作必须按顺序运行,而对另一种类型的值的操作可以并发运行。

  1. 假设数据是从流连接中传输的(有序的):
    key_name_1, value_1
    key_name_2, value_2
    key_name_1, value_1
    
  2. 现在,key_name_1key_name_2 可以由 goroutine 并发操作。
  3. 但是,由于下一个流式传输的值(第三行)仍然是 key_name_1,所以只有在之前的操作(第一行)完成后,才能由 goroutine 处理这个操作。否则,它应该等待第一次操作完成后再应用该操作。对于讨论的目的,我们可以假设操作只是将新值添加到先前的值中。

在golang中,如何以最高的性能实现这一点?


确切的使用场景是数据库更改以队列的形式进行流式传输,现在如果一个值正在发生变化,重要的是在另一个数据库上按照相同的顺序应用该操作,否则一致性将受到影响。冲突很少发生,但可能会发生。

英文:

I am new to golang and have a use case where operations on a value of a type have to run in a sequential manner where as operation on value of other type can be run concurrently.

  1. Imagine data is coming from a streaming connection (In-order)
    key_name_1, value_1 
    key_name_2, value_2
    key_name_1, value_1
    
  2. Now, key_name_1, and key_name_2 can be operated by goroutine concurrently.
  3. But as next streamed value (3rd row) is key_name_1 again, so this operation should only be processed by goroutine if the earlier operation (1st row) has finished otherwise it should wait for the 1st operation to finish before it can apply the operation.
    For the sake of discussion we can assume that operation is simply
    adding the new value to previous value.

What would be the right way to achieve this in golang with highest possible performance ?


The exact use case is database changes are streamed on a queue, now if a value is getting changed its important that onto another database that operation is applied on the same sequence otherwise consistency will get impacted. Conflicts are rare, but can happen.

答案1

得分: 1

作为给定键的互斥性的简单解决方案,您可以使用一个带有引用计数锁的锁定映射。对于高负载来说可能不是最优的,但在您的情况下可能足够。

type processLock struct {
    mtx      sync.Mutex
    refcount int
}

type locker struct {
    mtx   sync.Mutex
    locks map[string]*processLock
}

func (l *locker) acquire(key string) {
    l.mtx.Lock()
    lk, found := l.locks[key]
    if !found {
        lk = &processLock{}
        l.locks[key] = lk
    }
    lk.refcount++
    l.mtx.Unlock()
    lk.mtx.Lock()
}

func (l *locker) release(key string) {
    l.mtx.Lock()
    lk := l.locks[key]
    lk.refcount--
    if lk.refcount == 0 {
        delete(l.locks, key)
    }
    l.mtx.Unlock()
    lk.mtx.Unlock()
}

在处理键之前调用acquire(key),在完成后调用release(key)

Live demo

警告!上述代码保证了互斥性,但不保证顺序。要顺序化解锁,您需要一个FIFO互斥锁

英文:

As a simple solution for mutual exclusivity for a given key you can just use a locked map of ref-counted locks. It's not the most optimal for high loads, but might just suffice in your case.

type processLock struct {
	mtx      sync.Mutex
	refcount int
}

type locker struct {
	mtx   sync.Mutex
	locks map[string]*processLock
}

func (l *locker) acquire(key string) {
	l.mtx.Lock()
	lk, found := l.locks[key]
	if !found {
		lk = &processLock{}
		l.locks[key] = lk
	}
	lk.refcount++
	l.mtx.Unlock()
	lk.mtx.Lock()
}

func (l *locker) release(key string) {
	l.mtx.Lock()
	lk := l.locks[key]
	lk.refcount--
	if lk.refcount == 0 {
		delete(l.locks, key)
	}
	l.mtx.Unlock()
	lk.mtx.Unlock()
}

Just call acquire(key) before processing a key and release(key) when done with it.

Live demo.

Warning! The code above guarantees exclusivity, but not sequence. To sequentialize the unlocking you need a FIFO mutex.

huangapple
  • 本文由 发表于 2021年12月13日 04:12:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/70327324.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定