单写多读的并发性

huangapple go评论74阅读模式
英文:

Concurrency with a SINGLE WRITE by a single writer and multiple readers

问题

注意:这个问题与其他并发性问题不同,因为不仅写入者是单个的,而且写操作仅发生一次。

在Go中,当有多个并发读取器和一个只写一次的单个写入器(例如并发环境中的setter)时,可以使用哪种同步方法?

sync.Mutex适用于这种情况,但由于只有一个写入器,sync.RWMutex比常规互斥锁稍微快一些。

然而,对于整个应用程序运行期间仅设置一次值而言,使用互斥锁进行互斥锁定似乎是一种浪费。

有更快的方法吗?

package main

import (
	"sync"
)

type RWMutexErrorNotifier struct {
	rwMutex     sync.RWMutex
	emailSender func(string)
}

func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
	a.rwMutex.Lock()
	defer a.rwMutex.Unlock()

	a.emailSender = emailSender
}

func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
	a.rwMutex.RLock()
	defer a.rwMutex.RUnlock()

	if a.emailSender == nil {
		return
	}

	a.emailSender(errorMessage)
}

func main() {
	sender := &RWMutexErrorNotifier{}

	errorsCount := 100_000

	emailSender := func(emailMessage string) {
		// 发送电子邮件...
	}

	var wg sync.WaitGroup // 仅用于演示目的

	wg.Add(errorsCount)

	for i := 0; i < errorsCount; i++ {
		go func() {
			sender.SendErrorMessage("ALARM!")
			wg.Done()
		}()
	}

	sender.SetEmailSenderService(emailSender) // 进行一次单独的写入

	wg.Wait()
}
英文:

NOTE: This question is different from other concurrency questions because not only the writer is single, but also the write operation happens strictly once.

What method of synchronization in Go to use when one has multiple concurrent readers and a single writer that writes once and only once, for example a setter in a concurrent environment?

sync.Mutex works for this case, however, since there is only one writer, sync.RWMutex is even better as it is marginally faster than a regular mutex.

Still, it feels wasteful to do the mutex locking for the duration of the application run to simply set a value once.

Is there a faster way?

package main

import (
	&quot;sync&quot;
)

type RWMutexErrorNotifier struct {
	rwMutex     sync.RWMutex
	emailSender func(string)
}

func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
	a.rwMutex.Lock()
	defer a.rwMutex.Unlock()

	a.emailSender = emailSender
}

func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
	a.rwMutex.RLock()
	defer a.rwMutex.RUnlock()

	if a.emailSender == nil {
		return
	}

	a.emailSender(errorMessage)
}

func main() {
	sender := &amp;RWMutexErrorNotifier{}

	errorsCount := 100_000

	emailSender := func(emailMessage string) {
		// sending email...
	}

	var wg sync.WaitGroup // used only for demo purposes

	wg.Add(errorsCount)

	for i := 0; i &lt; errorsCount; i++ {
		go func() {
			sender.SendErrorMessage(&quot;ALARM!&quot;)
			wg.Done()
		}()
	}

	sender.SetEmailSenderService(emailSender) // making a single write

	wg.Wait()
}

答案1

得分: 1

根据你提供的内容,这是一个关于使用不同锁机制实现错误通知器的示例代码。代码中使用了四种不同的锁机制:sync.Mutexsync.RWMutexatomic.Bool和无锁的解决方案(用于基准测试)。根据基准测试的结果,对于单个写入操作,使用atomic.Bool的速度比使用sync.RWMutex更快,只比无锁的解决方案稍慢一些。

代码中定义了一个ErrorNotifier接口,包含了设置邮件发送服务和发送错误消息的方法。然后分别实现了基于不同锁机制的错误通知器类型:MutexErrorNotifierRWMutexErrorNotifierAtomicBooleanErrorNotifierRacyNoGoodErrorNotifier

在代码的后面部分,定义了一个Run函数,用于模拟发送错误消息的场景。根据不同的运行类型(并发、顺序-所有核心、顺序-单个核心),调用相应的方法发送错误消息。

最后,代码中还包含了一系列基准测试函数,用于测试不同锁机制的性能。

希望以上信息对你有所帮助!如果你有任何其他问题,请随时提问。

英文:

UPDATED thanks to @peter-cordes - see comments below the answer

Apparently, reading and setting atomic.Bool is faster than sync.RWMutex for a single write by a single writer. It is only marginally slower than a no-lock racy solution (used only for benchmarking)

Benchmark results (sync.Mutex-based solution is also provided for benchmarking purposes only):

$ go test -run=XXX -bench=. -benchmem -benchtime=1000000x
goos: darwin
goarch: arm64
pkg: untitled1
BenchmarkRacyNoGoodConcurrently-12                       1000000              3529 ns/op             384 B/op         12 allocs/op
BenchmarkAtomicBooleanConcurrently-12                    1000000              3494 ns/op             384 B/op         12 allocs/op
BenchmarkRWMutexConcurrently-12                          1000000              3909 ns/op             384 B/op         12 allocs/op
BenchmarkMutexConcurrently-12                            1000000              4180 ns/op             384 B/op         12 allocs/op
BenchmarkRacyNoGoodSequentiallyAllCores-12               1000000                 3.661 ns/op           0 B/op          0 allocs/op
BenchmarkAtomicBooleanSequentiallyAllCores-12            1000000                 3.748 ns/op           0 B/op          0 allocs/op
BenchmarkRWMutexSequentiallyAllCores-12                  1000000              1934 ns/op               0 B/op          0 allocs/op
BenchmarkMutexSequentiallyAllCores-12                    1000000              1486 ns/op               0 B/op          0 allocs/op
BenchmarkRacyNoGoodSequentiallySingleCore-12             1000000                28.95 ns/op            0 B/op          0 allocs/op
BenchmarkAtomicBooleanSequentiallySingleCore-12          1000000                29.54 ns/op            0 B/op          0 allocs/op
BenchmarkRWMutexSequentiallySingleCore-12                1000000               188.6 ns/op             0 B/op          0 allocs/op
BenchmarkMutexSequentiallySingleCore-12                  1000000               187.4 ns/op             0 B/op          0 allocs/op
PASS
ok      untitled1       19.093s
package main

import (
	&quot;runtime&quot;
	&quot;sync&quot;
	&quot;sync/atomic&quot;
)

type ErrorNotifier interface {
	SetEmailSenderService(func(string))
	SendErrorMessage(string)
}

// Mutex

type MutexErrorNotifier struct {
	mutex       sync.Mutex
	emailSender func(string)
}

var _ ErrorNotifier = (*MutexErrorNotifier)(nil)

func (a *MutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
	a.mutex.Lock()
	defer a.mutex.Unlock()

	a.emailSender = emailSender
}

func (a *MutexErrorNotifier) SendErrorMessage(errorMessage string) {
	a.mutex.Lock()
	defer a.mutex.Unlock()

	if a.emailSender != nil {
		a.emailSender(errorMessage)
	}
}

// RWMutex

type RWMutexErrorNotifier struct {
	rwMutex     sync.RWMutex
	emailSender func(string)
}

var _ ErrorNotifier = (*RWMutexErrorNotifier)(nil)

func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
	a.rwMutex.Lock()
	defer a.rwMutex.Unlock()

	a.emailSender = emailSender
}

func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
	a.rwMutex.RLock()
	defer a.rwMutex.RUnlock()

	if a.emailSender != nil {
		a.emailSender(errorMessage)
	}
}

// Atomic Boolean

type AtomicBooleanErrorNotifier struct {
	emailerIsSet atomic.Bool
	emailSender  func(string)
}

var _ ErrorNotifier = (*AtomicBooleanErrorNotifier)(nil)

func (a *AtomicBooleanErrorNotifier) SetEmailSenderService(emailSender func(string)) {
	defer a.emailerIsSet.Store(true)

	a.emailSender = emailSender
}

func (a *AtomicBooleanErrorNotifier) SendErrorMessage(errorMessage string) {
	if a.emailerIsSet.Load() {
		a.emailSender(errorMessage)
	}
}

// NOT A SOLUTION: racy no locking solution - just for benchmarking

type RacyNoGoodErrorNotifier struct {
	emailSender func(string)
}

var _ ErrorNotifier = (*RacyNoGoodErrorNotifier)(nil)

func (a *RacyNoGoodErrorNotifier) SetEmailSenderService(emailSender func(string)) {
	a.emailSender = emailSender
}

func (a *RacyNoGoodErrorNotifier) SendErrorMessage(errorMessage string) {
	if a.emailSender != nil {
		a.emailSender(errorMessage)
	}
}

// Demo run

const allConcurrent = &quot;all concurrent&quot;
const sequentialSingleCore = &quot;sequential single core&quot;
const sequentialAllCores = &quot;sequential all cores&quot;

func Run(n int, runner ErrorNotifier, runType string) {
	emailSender := func(emailMessage string) {
		// sending email...
	}

	var wg sync.WaitGroup

	switch runType {
	case allConcurrent:
		wg.Add(n * runtime.NumCPU())

		for i := 0; i &lt; n*runtime.NumCPU(); i++ {
			go func() {
				runner.SendErrorMessage(&quot;ALARM!&quot;)
				wg.Done()
			}()
		}
	case sequentialAllCores:
		wg.Add(runtime.NumCPU())

		for i := 0; i &lt; runtime.NumCPU(); i++ {
			go func() {
				for j := 0; j &lt; n; j++ {
					runner.SendErrorMessage(&quot;ALARM!&quot;)
				}
				wg.Done()
			}()
		}
	case sequentialSingleCore:
		wg.Add(1)

		go func() {
			for j := 0; j &lt; n*runtime.NumCPU(); j++ {
				runner.SendErrorMessage(&quot;ALARM!&quot;)
			}
			wg.Done()
		}()
	default:
		panic(&quot;unknown mode&quot;)
	}

	runner.SetEmailSenderService(emailSender)

	wg.Wait()
}

Benchmarks:

package main

import &quot;testing&quot;

func BenchmarkRacyNoGoodConcurrently(b *testing.B) {
	Run(b.N, &amp;RacyNoGoodErrorNotifier{}, allConcurrent)
}

func BenchmarkAtomicBooleanConcurrently(b *testing.B) {
	Run(b.N, &amp;AtomicBooleanErrorNotifier{}, allConcurrent)
}

func BenchmarkRWMutexConcurrently(b *testing.B) {
	Run(b.N, &amp;RWMutexErrorNotifier{}, allConcurrent)
}

func BenchmarkMutexConcurrently(b *testing.B) {
	Run(b.N, &amp;MutexErrorNotifier{}, allConcurrent)
}

func BenchmarkRacyNoGoodSequentiallyAllCores(b *testing.B) {
	Run(b.N, &amp;RacyNoGoodErrorNotifier{}, sequentialAllCores)
}

func BenchmarkAtomicBooleanSequentiallyAllCores(b *testing.B) {
	Run(b.N, &amp;AtomicBooleanErrorNotifier{}, sequentialAllCores)
}

func BenchmarkRWMutexSequentiallyAllCores(b *testing.B) {
	Run(b.N, &amp;RWMutexErrorNotifier{}, sequentialAllCores)
}

func BenchmarkMutexSequentiallyAllCores(b *testing.B) {
	Run(b.N, &amp;MutexErrorNotifier{}, sequentialAllCores)
}

func BenchmarkRacyNoGoodSequentiallySingleCore(b *testing.B) {
	Run(b.N, &amp;RacyNoGoodErrorNotifier{}, sequentialSingleCore)
}

func BenchmarkAtomicBooleanSequentiallySingleCore(b *testing.B) {
	Run(b.N, &amp;AtomicBooleanErrorNotifier{}, sequentialSingleCore)
}

func BenchmarkRWMutexSequentiallySingleCore(b *testing.B) {
	Run(b.N, &amp;RWMutexErrorNotifier{}, sequentialSingleCore)
}

func BenchmarkMutexSequentiallySingleCore(b *testing.B) {
	Run(b.N, &amp;MutexErrorNotifier{}, sequentialSingleCore)
}

huangapple
  • 本文由 发表于 2023年4月1日 05:52:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75903010.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定