单写多读的并发性

huangapple go评论107阅读模式
英文:

Concurrency with a SINGLE WRITE by a single writer and multiple readers

问题

注意:这个问题与其他并发性问题不同,因为不仅写入者是单个的,而且写操作仅发生一次。

在Go中,当有多个并发读取器和一个只写一次的单个写入器(例如并发环境中的setter)时,可以使用哪种同步方法?

sync.Mutex适用于这种情况,但由于只有一个写入器,sync.RWMutex比常规互斥锁稍微快一些。

然而,对于整个应用程序运行期间仅设置一次值而言,使用互斥锁进行互斥锁定似乎是一种浪费。

有更快的方法吗?

  1. package main
  2. import (
  3. "sync"
  4. )
  5. type RWMutexErrorNotifier struct {
  6. rwMutex sync.RWMutex
  7. emailSender func(string)
  8. }
  9. func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
  10. a.rwMutex.Lock()
  11. defer a.rwMutex.Unlock()
  12. a.emailSender = emailSender
  13. }
  14. func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
  15. a.rwMutex.RLock()
  16. defer a.rwMutex.RUnlock()
  17. if a.emailSender == nil {
  18. return
  19. }
  20. a.emailSender(errorMessage)
  21. }
  22. func main() {
  23. sender := &RWMutexErrorNotifier{}
  24. errorsCount := 100_000
  25. emailSender := func(emailMessage string) {
  26. // 发送电子邮件...
  27. }
  28. var wg sync.WaitGroup // 仅用于演示目的
  29. wg.Add(errorsCount)
  30. for i := 0; i < errorsCount; i++ {
  31. go func() {
  32. sender.SendErrorMessage("ALARM!")
  33. wg.Done()
  34. }()
  35. }
  36. sender.SetEmailSenderService(emailSender) // 进行一次单独的写入
  37. wg.Wait()
  38. }
英文:

NOTE: This question is different from other concurrency questions because not only the writer is single, but also the write operation happens strictly once.

What method of synchronization in Go to use when one has multiple concurrent readers and a single writer that writes once and only once, for example a setter in a concurrent environment?

sync.Mutex works for this case, however, since there is only one writer, sync.RWMutex is even better as it is marginally faster than a regular mutex.

Still, it feels wasteful to do the mutex locking for the duration of the application run to simply set a value once.

Is there a faster way?

  1. package main
  2. import (
  3. &quot;sync&quot;
  4. )
  5. type RWMutexErrorNotifier struct {
  6. rwMutex sync.RWMutex
  7. emailSender func(string)
  8. }
  9. func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
  10. a.rwMutex.Lock()
  11. defer a.rwMutex.Unlock()
  12. a.emailSender = emailSender
  13. }
  14. func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
  15. a.rwMutex.RLock()
  16. defer a.rwMutex.RUnlock()
  17. if a.emailSender == nil {
  18. return
  19. }
  20. a.emailSender(errorMessage)
  21. }
  22. func main() {
  23. sender := &amp;RWMutexErrorNotifier{}
  24. errorsCount := 100_000
  25. emailSender := func(emailMessage string) {
  26. // sending email...
  27. }
  28. var wg sync.WaitGroup // used only for demo purposes
  29. wg.Add(errorsCount)
  30. for i := 0; i &lt; errorsCount; i++ {
  31. go func() {
  32. sender.SendErrorMessage(&quot;ALARM!&quot;)
  33. wg.Done()
  34. }()
  35. }
  36. sender.SetEmailSenderService(emailSender) // making a single write
  37. wg.Wait()
  38. }

答案1

得分: 1

根据你提供的内容,这是一个关于使用不同锁机制实现错误通知器的示例代码。代码中使用了四种不同的锁机制:sync.Mutexsync.RWMutexatomic.Bool和无锁的解决方案(用于基准测试)。根据基准测试的结果,对于单个写入操作,使用atomic.Bool的速度比使用sync.RWMutex更快,只比无锁的解决方案稍慢一些。

代码中定义了一个ErrorNotifier接口,包含了设置邮件发送服务和发送错误消息的方法。然后分别实现了基于不同锁机制的错误通知器类型:MutexErrorNotifierRWMutexErrorNotifierAtomicBooleanErrorNotifierRacyNoGoodErrorNotifier

在代码的后面部分,定义了一个Run函数,用于模拟发送错误消息的场景。根据不同的运行类型(并发、顺序-所有核心、顺序-单个核心),调用相应的方法发送错误消息。

最后,代码中还包含了一系列基准测试函数,用于测试不同锁机制的性能。

希望以上信息对你有所帮助!如果你有任何其他问题,请随时提问。

英文:

UPDATED thanks to @peter-cordes - see comments below the answer

Apparently, reading and setting atomic.Bool is faster than sync.RWMutex for a single write by a single writer. It is only marginally slower than a no-lock racy solution (used only for benchmarking)

Benchmark results (sync.Mutex-based solution is also provided for benchmarking purposes only):

  1. $ go test -run=XXX -bench=. -benchmem -benchtime=1000000x
  2. goos: darwin
  3. goarch: arm64
  4. pkg: untitled1
  5. BenchmarkRacyNoGoodConcurrently-12 1000000 3529 ns/op 384 B/op 12 allocs/op
  6. BenchmarkAtomicBooleanConcurrently-12 1000000 3494 ns/op 384 B/op 12 allocs/op
  7. BenchmarkRWMutexConcurrently-12 1000000 3909 ns/op 384 B/op 12 allocs/op
  8. BenchmarkMutexConcurrently-12 1000000 4180 ns/op 384 B/op 12 allocs/op
  9. BenchmarkRacyNoGoodSequentiallyAllCores-12 1000000 3.661 ns/op 0 B/op 0 allocs/op
  10. BenchmarkAtomicBooleanSequentiallyAllCores-12 1000000 3.748 ns/op 0 B/op 0 allocs/op
  11. BenchmarkRWMutexSequentiallyAllCores-12 1000000 1934 ns/op 0 B/op 0 allocs/op
  12. BenchmarkMutexSequentiallyAllCores-12 1000000 1486 ns/op 0 B/op 0 allocs/op
  13. BenchmarkRacyNoGoodSequentiallySingleCore-12 1000000 28.95 ns/op 0 B/op 0 allocs/op
  14. BenchmarkAtomicBooleanSequentiallySingleCore-12 1000000 29.54 ns/op 0 B/op 0 allocs/op
  15. BenchmarkRWMutexSequentiallySingleCore-12 1000000 188.6 ns/op 0 B/op 0 allocs/op
  16. BenchmarkMutexSequentiallySingleCore-12 1000000 187.4 ns/op 0 B/op 0 allocs/op
  17. PASS
  18. ok untitled1 19.093s
  1. package main
  2. import (
  3. &quot;runtime&quot;
  4. &quot;sync&quot;
  5. &quot;sync/atomic&quot;
  6. )
  7. type ErrorNotifier interface {
  8. SetEmailSenderService(func(string))
  9. SendErrorMessage(string)
  10. }
  11. // Mutex
  12. type MutexErrorNotifier struct {
  13. mutex sync.Mutex
  14. emailSender func(string)
  15. }
  16. var _ ErrorNotifier = (*MutexErrorNotifier)(nil)
  17. func (a *MutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
  18. a.mutex.Lock()
  19. defer a.mutex.Unlock()
  20. a.emailSender = emailSender
  21. }
  22. func (a *MutexErrorNotifier) SendErrorMessage(errorMessage string) {
  23. a.mutex.Lock()
  24. defer a.mutex.Unlock()
  25. if a.emailSender != nil {
  26. a.emailSender(errorMessage)
  27. }
  28. }
  29. // RWMutex
  30. type RWMutexErrorNotifier struct {
  31. rwMutex sync.RWMutex
  32. emailSender func(string)
  33. }
  34. var _ ErrorNotifier = (*RWMutexErrorNotifier)(nil)
  35. func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
  36. a.rwMutex.Lock()
  37. defer a.rwMutex.Unlock()
  38. a.emailSender = emailSender
  39. }
  40. func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
  41. a.rwMutex.RLock()
  42. defer a.rwMutex.RUnlock()
  43. if a.emailSender != nil {
  44. a.emailSender(errorMessage)
  45. }
  46. }
  47. // Atomic Boolean
  48. type AtomicBooleanErrorNotifier struct {
  49. emailerIsSet atomic.Bool
  50. emailSender func(string)
  51. }
  52. var _ ErrorNotifier = (*AtomicBooleanErrorNotifier)(nil)
  53. func (a *AtomicBooleanErrorNotifier) SetEmailSenderService(emailSender func(string)) {
  54. defer a.emailerIsSet.Store(true)
  55. a.emailSender = emailSender
  56. }
  57. func (a *AtomicBooleanErrorNotifier) SendErrorMessage(errorMessage string) {
  58. if a.emailerIsSet.Load() {
  59. a.emailSender(errorMessage)
  60. }
  61. }
  62. // NOT A SOLUTION: racy no locking solution - just for benchmarking
  63. type RacyNoGoodErrorNotifier struct {
  64. emailSender func(string)
  65. }
  66. var _ ErrorNotifier = (*RacyNoGoodErrorNotifier)(nil)
  67. func (a *RacyNoGoodErrorNotifier) SetEmailSenderService(emailSender func(string)) {
  68. a.emailSender = emailSender
  69. }
  70. func (a *RacyNoGoodErrorNotifier) SendErrorMessage(errorMessage string) {
  71. if a.emailSender != nil {
  72. a.emailSender(errorMessage)
  73. }
  74. }
  75. // Demo run
  76. const allConcurrent = &quot;all concurrent&quot;
  77. const sequentialSingleCore = &quot;sequential single core&quot;
  78. const sequentialAllCores = &quot;sequential all cores&quot;
  79. func Run(n int, runner ErrorNotifier, runType string) {
  80. emailSender := func(emailMessage string) {
  81. // sending email...
  82. }
  83. var wg sync.WaitGroup
  84. switch runType {
  85. case allConcurrent:
  86. wg.Add(n * runtime.NumCPU())
  87. for i := 0; i &lt; n*runtime.NumCPU(); i++ {
  88. go func() {
  89. runner.SendErrorMessage(&quot;ALARM!&quot;)
  90. wg.Done()
  91. }()
  92. }
  93. case sequentialAllCores:
  94. wg.Add(runtime.NumCPU())
  95. for i := 0; i &lt; runtime.NumCPU(); i++ {
  96. go func() {
  97. for j := 0; j &lt; n; j++ {
  98. runner.SendErrorMessage(&quot;ALARM!&quot;)
  99. }
  100. wg.Done()
  101. }()
  102. }
  103. case sequentialSingleCore:
  104. wg.Add(1)
  105. go func() {
  106. for j := 0; j &lt; n*runtime.NumCPU(); j++ {
  107. runner.SendErrorMessage(&quot;ALARM!&quot;)
  108. }
  109. wg.Done()
  110. }()
  111. default:
  112. panic(&quot;unknown mode&quot;)
  113. }
  114. runner.SetEmailSenderService(emailSender)
  115. wg.Wait()
  116. }

Benchmarks:

  1. package main
  2. import &quot;testing&quot;
  3. func BenchmarkRacyNoGoodConcurrently(b *testing.B) {
  4. Run(b.N, &amp;RacyNoGoodErrorNotifier{}, allConcurrent)
  5. }
  6. func BenchmarkAtomicBooleanConcurrently(b *testing.B) {
  7. Run(b.N, &amp;AtomicBooleanErrorNotifier{}, allConcurrent)
  8. }
  9. func BenchmarkRWMutexConcurrently(b *testing.B) {
  10. Run(b.N, &amp;RWMutexErrorNotifier{}, allConcurrent)
  11. }
  12. func BenchmarkMutexConcurrently(b *testing.B) {
  13. Run(b.N, &amp;MutexErrorNotifier{}, allConcurrent)
  14. }
  15. func BenchmarkRacyNoGoodSequentiallyAllCores(b *testing.B) {
  16. Run(b.N, &amp;RacyNoGoodErrorNotifier{}, sequentialAllCores)
  17. }
  18. func BenchmarkAtomicBooleanSequentiallyAllCores(b *testing.B) {
  19. Run(b.N, &amp;AtomicBooleanErrorNotifier{}, sequentialAllCores)
  20. }
  21. func BenchmarkRWMutexSequentiallyAllCores(b *testing.B) {
  22. Run(b.N, &amp;RWMutexErrorNotifier{}, sequentialAllCores)
  23. }
  24. func BenchmarkMutexSequentiallyAllCores(b *testing.B) {
  25. Run(b.N, &amp;MutexErrorNotifier{}, sequentialAllCores)
  26. }
  27. func BenchmarkRacyNoGoodSequentiallySingleCore(b *testing.B) {
  28. Run(b.N, &amp;RacyNoGoodErrorNotifier{}, sequentialSingleCore)
  29. }
  30. func BenchmarkAtomicBooleanSequentiallySingleCore(b *testing.B) {
  31. Run(b.N, &amp;AtomicBooleanErrorNotifier{}, sequentialSingleCore)
  32. }
  33. func BenchmarkRWMutexSequentiallySingleCore(b *testing.B) {
  34. Run(b.N, &amp;RWMutexErrorNotifier{}, sequentialSingleCore)
  35. }
  36. func BenchmarkMutexSequentiallySingleCore(b *testing.B) {
  37. Run(b.N, &amp;MutexErrorNotifier{}, sequentialSingleCore)
  38. }

huangapple
  • 本文由 发表于 2023年4月1日 05:52:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75903010.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定