如何在Golang中使函数线程安全

huangapple go评论155阅读模式
英文:

How make a function thread safe in golang

问题

如何在golang中防止两个线程同时调用函数或函数体?

我的使用场景是,我有一个Web服务器正在调用一个串行接口,该接口一次只能有一个调用者,两个调用将相互干扰,从而在串行线上产生噪声。

英文:

How to lock a function or the body of a function from being called by two threads in golang?

My use case is that I have a webserver that is calling a serial interface which can only have one caller at a time, two calls will cancel each other out by creating noise for one another on the serial line.

答案1

得分: 20

最简单的方法是使用sync.Mutex

package main

import (
    "fmt"
    "sync"
    "time"
)

var lock sync.Mutex

func main() {
    go importantFunction("foo")
    go importantFunction("bar")
    time.Sleep(3 * time.Second)
}

func importantFunction(name string) {
    lock.Lock()
    defer lock.Unlock()
    fmt.Println(name)
    time.Sleep(1 * time.Second)
}

在这个例子中,你会看到"foo"和"bar"被打印出来,尽管它们是并发执行的,但它们之间相隔了一秒。

Go playground: https://play.golang.org/p/mXKl42zRW8

英文:

Easiest way is to use sync.Mutex:

package main

import (
    "fmt"
    "sync"
    "time"
)

var lock sync.Mutex

func main() {
    go importantFunction("foo")
    go importantFunction("bar")
    time.Sleep(3 * time.Second)
}


func importantFunction(name string) {
    lock.Lock()
    defer lock.Unlock()
    fmt.Println(name)
    time.Sleep(1 * time.Second)
}

Here you'll see that "foo" and "bar" is printed one second apart even though they are go routines.

Go playground: https://play.golang.org/p/mXKl42zRW8

答案2

得分: 6

实现非可重入函数有两种方法:

  • 阻塞:第一个调用者运行函数,后续的调用者会被阻塞并等待函数退出后再运行函数。
  • 放弃:第一个调用者运行函数,后续的调用者在函数执行时会中止。

这两种方法有不同的优点:

  • 阻塞的非可重入函数保证会执行与尝试次数相同的次数。然而,如果执行时间很长,可能会导致积压,然后是一连串的执行。
  • 放弃的非可重入函数保证不会拥塞和出现一连串的执行,并且可以保证最大的执行速率。

通过使用mutex,可以最简单地实现阻塞的非可重入函数,如@Pylinux的回答所述。
通过原子比较和交换,可以实现放弃的非可重入函数,代码如下:

import (
    "sync/atomic"
    "time"
)

func main() {
    tick := time.Tick(time.Second)
    var reentranceFlag int64
    go func() {
        for range tick {
            go CheckSomeStatus()
            go func() {
                if atomic.CompareAndSwapInt64(&reentranceFlag, 0, 1) {
                    defer atomic.StoreInt64(&reentranceFlag, 0)
                } else {
                    return
                }
                CheckAnotherStatus()
            }()
        }
    }()
}

在上述代码中,CheckAnotherStatus() 被保护起来,以防止重入。第一个调用者将 reentranceFlag 设置为 1,后续的调用者无法做到这一点,并且会退出。

请参考我的博文 在 Golang 中实现非可重入函数 以获取更详细的讨论。

英文:

There are two approaches to implementing non-reentrant functions:

  • Blocking: first caller runs the function, subsequent caller(s) block and wait till function exits, then run the function
  • Yielding: first caller runs the function, subsequent caller(s) abort while function is being executed

The two approaches have different merits:

  • Blocking non-reentrant functions are guaranteed to execute as many times as were attempted. However, they can be a backlog in case of long execution times, then bursts of executions following.
  • Yielding non-reentrant functions guarantee non congestion and no bursts, and can guarantee a maximum of execution rate.

Blocking non-reentrant functions are most easily implemented via mutex, as described in @Pylinux's answer.
Yielding non-reentrant functions can be implemented via atomic compare & swap, as follows:

import (
    "sync/atomic"
    "time"
)

func main() {
    tick := time.Tick(time.Second)
    var reentranceFlag int64
    go func() {
        for range tick {
            go CheckSomeStatus()
            go func() {
                if atomic.CompareAndSwapInt64(&reentranceFlag, 0, 1) {
                    defer atomic.StoreInt64(&reentranceFlag, 0)
                } else {
                    return
                }
                CheckAnotherStatus()
            }()
        }
    }()
}

In the above, CheckAnotherStatus() is protected against re-entry such that the first caller sets reentranceFlag to 1, and subsequent callers fail to do the same, and quit.

Please consider my blog post, Implementing non re-entrant functions in Golang for a more elaborate discussion.

答案3

得分: 5

Pylinux的解决方案使用互斥锁(Mutex),就像他所说的,在你的情况下可能是最简单的。我在这里提供另一种替代方案。它可能适用于你的情况,也可能不适用。

你可以使用一个单独的goroutine在串行接口上执行所有操作,并使用一个通道来序列化它需要执行的工作。示例代码如下:

package main

import (
	"fmt"
	"sync"
)

// handleCommands将以串行方式处理命令
func handleCommands(opChan <-chan string) {
	for op := range opChan {
		fmt.Printf("command: %s\n", op)
	}
}

// produceCommands将并发生成多个命令
func produceCommands(opChan chan<- string) {
	var wg sync.WaitGroup
	wg.Add(2)
	go func() { opChan <- "cmd1"; wg.Done() }()
	go func() { opChan <- "cmd2"; wg.Done() }()
	wg.Wait()
	close(opChan)
}

func main() {
	var opChan = make(chan string)
	go produceCommands(opChan)
	handleCommands(opChan)
}

相对于互斥锁,这种方法的优势在于你对等待队列有更多的控制。使用互斥锁时,队列在Lock()时隐式存在,并且没有限制。而使用通道,你可以限制等待调用者的最大数量,并在同步调用点超载时做出适当的反应。你还可以使用len(opChan)来检查队列中有多少个goroutine。

补充说明:

上面示例的一个限制(如评论中所指出的)是它不能将计算结果返回给原始发送者。在保持使用通道的方法的同时,可以通过为每个命令引入一个结果通道来解决这个问题。因此,不再将字符串发送到命令通道,而是发送以下格式的结构体:

type operation struct {
	command string
	result  chan string
}

命令将按以下方式排队到命令通道:

func enqueueCommand(opChan chan<- operation, cmd string) <-chan string {
	var result = make(chan string)
	opChan <- operation{command: cmd, result: result}
	return result
}

这样,命令处理程序就可以向命令的发起者发送一个值。完整的示例代码可以在playground上查看这里

英文:

Pylinux's solution using a Mutex is, like he says, probably the simplest in your case. I'll add another one here as an alternative, though. It may or may not apply in your case.

Instead of using a Mutex, you could have a single goroutine perform all the operations on the serial interface, and use a channel to serialise the work it needs to perform. Example:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

// handleCommands will handle commands in a serialized fashion
func handleCommands(opChan &lt;-chan string) {
	for op := range opChan {
		fmt.Printf(&quot;command: %s\n&quot;, op)
	}
}

// produceCommands will generate multiple commands concurrently
func produceCommands(opChan chan&lt;- string) {
	var wg sync.WaitGroup
	wg.Add(2)
	go func() { opChan &lt;- &quot;cmd1&quot;; wg.Done() }()
	go func() { opChan &lt;- &quot;cmd2&quot;; wg.Done() }()
	wg.Wait()
	close(opChan)
}

func main() {
	var opChan = make(chan string)
	go produceCommands(opChan)
	handleCommands(opChan)
}

The advantage of this relative to a Mutex is that you have more control over the wait queue. With the Mutex, the queue exists implicitly at Lock(), and is unbounded. Using a channel, on the other hand, you can limit the maximum number of callers waiting and react appropriately if the synchronised call site is overloaded. You can also do things like checking how many goroutines are in the queue with len(opChan).

Edit to add:

A limitation with the above example (as noted in the comments) is that it doesn't handle returning results from the computation back to the original sender. One way to do that, while keeping the approach of using channels, is to introduce a result channel for each command. So instead of sending strings over the command channel, one can send structs of the following format:

type operation struct {
	command string
	result  chan string
}

Commands would be enqueued onto the command channel as follows:

func enqueueCommand(opChan chan&lt;- operation, cmd string) &lt;-chan string {
	var result = make(chan string)
	opChan &lt;- operation{command: cmd, result: result}
	return result
}

This allows the command handler to send a value back to the originator of the command. Full example on the playground here.

答案4

得分: -1

type Semafor struct {
sync.RWMutex
semafor int
}

var mySemafor *Semafor

func (m *Semafor) get() int { //读取锁
if m != nil {
m.RLock()
defer m.RUnlock()
return m.semafor
} else {
panic("错误:信号量未初始化,IntSemafor()")
}
}

func (m *Semafor) set(val int) bool { //写入锁
ok := false
if m != nil {
if val != 0 {
m.Lock()
if m.semafor == 0 {
m.semafor = val
ok = true
}
m.Unlock()
} else {
m.Lock()
m.semafor = val
ok = true
m.Unlock()
}
}
return ok
}

func InitSemafor() {
if mySemafor == nil {
mySemafor = &Semafor{}
mySemafor.set(0)
}
}

func OnSemafor() bool {
if mySemafor != nil {
for !mySemafor.set(1) {
for mySemafor.get() == 1 {
SleepM(2)
}
}
return true
} else {
panic("错误:信号量未初始化,InitSemafor()")
}
}

func OffSemafor() {
mySemafor.set(0)
}

英文:
type Semafor struct {
	sync.RWMutex
	semafor int
}

var mySemafor *Semafor

func (m *Semafor) get() int { //read lock
	if m != nil {
		m.RLock()
		defer m.RUnlock()
		return m.semafor
	} else {
		panic (&quot;Error : The semaphore is not initialized, IntSemafor()&quot;)
	}
}

func (m *Semafor) set(val int) bool { //write lock
	ok := false
	if m != nil {
		if val != 0 {
			m.Lock()
			if m.semafor == 0 {
				m.semafor = val
				ok = true
			}
			m.Unlock()
		} else {
			m.Lock()
			m.semafor = val
			ok = true
			m.Unlock()
		}
	}
	return ok
}

func InitSemafor() {
	if mySemafor == nil {
		mySemafor = &amp;Semafor{}
		mySemafor.set(0)
	}
}

func OnSemafor() bool {
	if mySemafor != nil {
		for !mySemafor.set(1) {
			for mySemafor.get() == 1 {
				SleepM(2)
			}
		}
	  return true	
	} else {
	  panic(&quot;Error : The semaphore is not initialized, InitSemafor()&quot;)
	}
}

func OffSemafor() {
	mySemafor.set(0)
}

huangapple
  • 本文由 发表于 2017年9月10日 15:39:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/46138395.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定