英文:
Concurrency with a SINGLE WRITE by a single writer and multiple readers
问题
注意:这个问题与其他并发性问题不同,因为不仅写入者是单个的,而且写操作仅发生一次。
在Go中,当有多个并发读取器和一个只写一次的单个写入器(例如并发环境中的setter)时,可以使用哪种同步方法?
sync.Mutex
适用于这种情况,但由于只有一个写入器,sync.RWMutex
比常规互斥锁稍微快一些。
然而,对于整个应用程序运行期间仅设置一次值而言,使用互斥锁进行互斥锁定似乎是一种浪费。
有更快的方法吗?
package main
import (
"sync"
)
type RWMutexErrorNotifier struct {
rwMutex sync.RWMutex
emailSender func(string)
}
func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.rwMutex.Lock()
defer a.rwMutex.Unlock()
a.emailSender = emailSender
}
func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
a.rwMutex.RLock()
defer a.rwMutex.RUnlock()
if a.emailSender == nil {
return
}
a.emailSender(errorMessage)
}
func main() {
sender := &RWMutexErrorNotifier{}
errorsCount := 100_000
emailSender := func(emailMessage string) {
// 发送电子邮件...
}
var wg sync.WaitGroup // 仅用于演示目的
wg.Add(errorsCount)
for i := 0; i < errorsCount; i++ {
go func() {
sender.SendErrorMessage("ALARM!")
wg.Done()
}()
}
sender.SetEmailSenderService(emailSender) // 进行一次单独的写入
wg.Wait()
}
英文:
NOTE: This question is different from other concurrency questions because not only the writer is single, but also the write operation happens strictly once.
What method of synchronization in Go to use when one has multiple concurrent readers and a single writer that writes once and only once, for example a setter in a concurrent environment?
sync.Mutex
works for this case, however, since there is only one writer, sync.RWMutex
is even better as it is marginally faster than a regular mutex.
Still, it feels wasteful to do the mutex locking for the duration of the application run to simply set a value once.
Is there a faster way?
package main
import (
"sync"
)
type RWMutexErrorNotifier struct {
rwMutex sync.RWMutex
emailSender func(string)
}
func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.rwMutex.Lock()
defer a.rwMutex.Unlock()
a.emailSender = emailSender
}
func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
a.rwMutex.RLock()
defer a.rwMutex.RUnlock()
if a.emailSender == nil {
return
}
a.emailSender(errorMessage)
}
func main() {
sender := &RWMutexErrorNotifier{}
errorsCount := 100_000
emailSender := func(emailMessage string) {
// sending email...
}
var wg sync.WaitGroup // used only for demo purposes
wg.Add(errorsCount)
for i := 0; i < errorsCount; i++ {
go func() {
sender.SendErrorMessage("ALARM!")
wg.Done()
}()
}
sender.SetEmailSenderService(emailSender) // making a single write
wg.Wait()
}
答案1
得分: 1
根据你提供的内容,这是一个关于使用不同锁机制实现错误通知器的示例代码。代码中使用了四种不同的锁机制:sync.Mutex
、sync.RWMutex
、atomic.Bool
和无锁的解决方案(用于基准测试)。根据基准测试的结果,对于单个写入操作,使用atomic.Bool
的速度比使用sync.RWMutex
更快,只比无锁的解决方案稍慢一些。
代码中定义了一个ErrorNotifier
接口,包含了设置邮件发送服务和发送错误消息的方法。然后分别实现了基于不同锁机制的错误通知器类型:MutexErrorNotifier
、RWMutexErrorNotifier
、AtomicBooleanErrorNotifier
和RacyNoGoodErrorNotifier
。
在代码的后面部分,定义了一个Run
函数,用于模拟发送错误消息的场景。根据不同的运行类型(并发、顺序-所有核心、顺序-单个核心),调用相应的方法发送错误消息。
最后,代码中还包含了一系列基准测试函数,用于测试不同锁机制的性能。
希望以上信息对你有所帮助!如果你有任何其他问题,请随时提问。
英文:
UPDATED thanks to @peter-cordes - see comments below the answer
Apparently, reading and setting atomic.Bool
is faster than sync.RWMutex
for a single write by a single writer. It is only marginally slower than a no-lock racy solution (used only for benchmarking)
Benchmark results (sync.Mutex
-based solution is also provided for benchmarking purposes only):
$ go test -run=XXX -bench=. -benchmem -benchtime=1000000x
goos: darwin
goarch: arm64
pkg: untitled1
BenchmarkRacyNoGoodConcurrently-12 1000000 3529 ns/op 384 B/op 12 allocs/op
BenchmarkAtomicBooleanConcurrently-12 1000000 3494 ns/op 384 B/op 12 allocs/op
BenchmarkRWMutexConcurrently-12 1000000 3909 ns/op 384 B/op 12 allocs/op
BenchmarkMutexConcurrently-12 1000000 4180 ns/op 384 B/op 12 allocs/op
BenchmarkRacyNoGoodSequentiallyAllCores-12 1000000 3.661 ns/op 0 B/op 0 allocs/op
BenchmarkAtomicBooleanSequentiallyAllCores-12 1000000 3.748 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexSequentiallyAllCores-12 1000000 1934 ns/op 0 B/op 0 allocs/op
BenchmarkMutexSequentiallyAllCores-12 1000000 1486 ns/op 0 B/op 0 allocs/op
BenchmarkRacyNoGoodSequentiallySingleCore-12 1000000 28.95 ns/op 0 B/op 0 allocs/op
BenchmarkAtomicBooleanSequentiallySingleCore-12 1000000 29.54 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexSequentiallySingleCore-12 1000000 188.6 ns/op 0 B/op 0 allocs/op
BenchmarkMutexSequentiallySingleCore-12 1000000 187.4 ns/op 0 B/op 0 allocs/op
PASS
ok untitled1 19.093s
package main
import (
"runtime"
"sync"
"sync/atomic"
)
type ErrorNotifier interface {
SetEmailSenderService(func(string))
SendErrorMessage(string)
}
// Mutex
type MutexErrorNotifier struct {
mutex sync.Mutex
emailSender func(string)
}
var _ ErrorNotifier = (*MutexErrorNotifier)(nil)
func (a *MutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.emailSender = emailSender
}
func (a *MutexErrorNotifier) SendErrorMessage(errorMessage string) {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.emailSender != nil {
a.emailSender(errorMessage)
}
}
// RWMutex
type RWMutexErrorNotifier struct {
rwMutex sync.RWMutex
emailSender func(string)
}
var _ ErrorNotifier = (*RWMutexErrorNotifier)(nil)
func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.rwMutex.Lock()
defer a.rwMutex.Unlock()
a.emailSender = emailSender
}
func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
a.rwMutex.RLock()
defer a.rwMutex.RUnlock()
if a.emailSender != nil {
a.emailSender(errorMessage)
}
}
// Atomic Boolean
type AtomicBooleanErrorNotifier struct {
emailerIsSet atomic.Bool
emailSender func(string)
}
var _ ErrorNotifier = (*AtomicBooleanErrorNotifier)(nil)
func (a *AtomicBooleanErrorNotifier) SetEmailSenderService(emailSender func(string)) {
defer a.emailerIsSet.Store(true)
a.emailSender = emailSender
}
func (a *AtomicBooleanErrorNotifier) SendErrorMessage(errorMessage string) {
if a.emailerIsSet.Load() {
a.emailSender(errorMessage)
}
}
// NOT A SOLUTION: racy no locking solution - just for benchmarking
type RacyNoGoodErrorNotifier struct {
emailSender func(string)
}
var _ ErrorNotifier = (*RacyNoGoodErrorNotifier)(nil)
func (a *RacyNoGoodErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.emailSender = emailSender
}
func (a *RacyNoGoodErrorNotifier) SendErrorMessage(errorMessage string) {
if a.emailSender != nil {
a.emailSender(errorMessage)
}
}
// Demo run
const allConcurrent = "all concurrent"
const sequentialSingleCore = "sequential single core"
const sequentialAllCores = "sequential all cores"
func Run(n int, runner ErrorNotifier, runType string) {
emailSender := func(emailMessage string) {
// sending email...
}
var wg sync.WaitGroup
switch runType {
case allConcurrent:
wg.Add(n * runtime.NumCPU())
for i := 0; i < n*runtime.NumCPU(); i++ {
go func() {
runner.SendErrorMessage("ALARM!")
wg.Done()
}()
}
case sequentialAllCores:
wg.Add(runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for j := 0; j < n; j++ {
runner.SendErrorMessage("ALARM!")
}
wg.Done()
}()
}
case sequentialSingleCore:
wg.Add(1)
go func() {
for j := 0; j < n*runtime.NumCPU(); j++ {
runner.SendErrorMessage("ALARM!")
}
wg.Done()
}()
default:
panic("unknown mode")
}
runner.SetEmailSenderService(emailSender)
wg.Wait()
}
Benchmarks:
package main
import "testing"
func BenchmarkRacyNoGoodConcurrently(b *testing.B) {
Run(b.N, &RacyNoGoodErrorNotifier{}, allConcurrent)
}
func BenchmarkAtomicBooleanConcurrently(b *testing.B) {
Run(b.N, &AtomicBooleanErrorNotifier{}, allConcurrent)
}
func BenchmarkRWMutexConcurrently(b *testing.B) {
Run(b.N, &RWMutexErrorNotifier{}, allConcurrent)
}
func BenchmarkMutexConcurrently(b *testing.B) {
Run(b.N, &MutexErrorNotifier{}, allConcurrent)
}
func BenchmarkRacyNoGoodSequentiallyAllCores(b *testing.B) {
Run(b.N, &RacyNoGoodErrorNotifier{}, sequentialAllCores)
}
func BenchmarkAtomicBooleanSequentiallyAllCores(b *testing.B) {
Run(b.N, &AtomicBooleanErrorNotifier{}, sequentialAllCores)
}
func BenchmarkRWMutexSequentiallyAllCores(b *testing.B) {
Run(b.N, &RWMutexErrorNotifier{}, sequentialAllCores)
}
func BenchmarkMutexSequentiallyAllCores(b *testing.B) {
Run(b.N, &MutexErrorNotifier{}, sequentialAllCores)
}
func BenchmarkRacyNoGoodSequentiallySingleCore(b *testing.B) {
Run(b.N, &RacyNoGoodErrorNotifier{}, sequentialSingleCore)
}
func BenchmarkAtomicBooleanSequentiallySingleCore(b *testing.B) {
Run(b.N, &AtomicBooleanErrorNotifier{}, sequentialSingleCore)
}
func BenchmarkRWMutexSequentiallySingleCore(b *testing.B) {
Run(b.N, &RWMutexErrorNotifier{}, sequentialSingleCore)
}
func BenchmarkMutexSequentiallySingleCore(b *testing.B) {
Run(b.N, &MutexErrorNotifier{}, sequentialSingleCore)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论