如何在Go中以低延迟等待线程?

huangapple go评论82阅读模式
英文:

How to wait for threads with low latency in go?

问题

我一直在尝试在Go中创建一个简单的事件循环包装器。但是我被卡住了,我应该如何跟踪当前线程中的操作呢?
我希望CurrentTick运行一个函数,即使调用函数退出,也不会开始下一个tick,直到所有由CurrentTick运行的函数退出。我想我可以使用互斥锁来监视线程数,但我意识到如果我一直反复检查它,那么会限制CPU。如果我使用time.Sleep,那么会有延迟。你会如何解决这个问题?

package eventloop

import (
	"reflect"
)

type eventLoop *struct{
	functions []reflect.Value
	addFunc chan<- reflect.Value
	mutex chan bool
	threads int
}

func NewEventLoop() eventLoop {
	var funcs chan reflect.Value
	loop := eventLoop{
		[]reflect.Value{},
		funcs = make(chan reflect.Value, 3),
		make(chan bool, 1),
		0,
	}
	go func(){
		for {
			this.mutex <- true
			if threads == 0 {
			}
		}
	}
}

func (this eventLoop) NextTick(f func()) {
	this.addFunc <- reflect.ValueOf(f)
}

func (this eventLoop) CurrentTick(f func()) {
	this.mutex <- true
	threads += 1
	<-this.mutex
	go func() {
		f()
		this.mutex <- true
		threads -= 1
		<-this.mutex
	}()
}
英文:

I've been trying to create a simple event loop wrapper in Go. But I got stumped, how was I supposed to keep track of operations in the current thread?
I wanted CurrentTick to run a function, and even if the calling function quits, not start the next tick until all functions run by CurrentTick quit. I thought I might use a mutex to monitor the number of threads, but I realized if I kept checking that over and over it would throttle the CPU. If I used time.Sleep it would be latent. How would you solve the problem?

package eventloop

import (
	&quot;reflect&quot;
)

type eventLoop *struct{
	functions []reflect.Value
	addFunc chan&lt;-/*3*/ reflect.Value
	mutex chan/*1*/ bool
	threads int
}

func NewEventLoop() eventLoop {
	var funcs chan reflect.Value
	loop := eventLoop{
		[]Reflect.Value{},
		funcs = make(chan reflect.Value, 3),
		make(chan bool, 1),
		0,
	}
	go func(){
		for {
			this.mutex &lt;- 1
			if threads == 0 {
			}
		}
	}
}

func (this eventLoop) NextTick(f func()) {
	this.addFunc &lt;- reflect.ValueOf(f)
}

func (this eventLoop) CurrentTick(f func()) {
	this.mutex &lt;- 1
	threads += 1
	&lt;-this.mutex
	go func() {
		f()
		this.mutex &lt;- 1
		threads -= 1
		&lt;-this.mutex
	}()
}

答案1

得分: 2

如果我理解你的意图,我认为你把事情搞得太复杂了。我会这样做:

package eventloop

type EventLoop struct {
    nextFunc chan func()
    curFunc chan func()
}

func NewEventLoop() *EventLoop {
    el := &EventLoop{
        // 根据需要调整容量
        make(chan func(), 3),
        make(chan func(), 3),
    }
    go eventLoop(el)
    return el
}

func (el *EventLoop) NextTick(f func()) {
    el.nextFunc <- f
}

func (el *EventLoop) CurrentTick(f func()) {
    el.curFunc <- f
}

func (el *EventLoop) Quit() {
    close(el.nextFunc)
}

func eventLoop(el *EventLoop) {
    for {
        f, ok := <-el.nextFunc
        if !ok {
            return
        }
        f()

        drain: for {
            select {
            case f := <-el.curFunc:
                f()
            default:
                break drain
            }
        }
    }
}

根据你的使用情况,你可能需要添加一些同步来确保循环中的所有任务在程序退出之前完成。

英文:

If I understand your intent, I think you're overcomplicating things. I'd do it like this:

package eventloop

type EventLoop struct {
	nextFunc chan func()
	curFunc chan func()
}

func NewEventLoop() *EventLoop {
	el := &amp;EventLoop{
		// Adjust the capacities to taste
		make(chan func(), 3),
		make(chan func(), 3),
	}
	go eventLoop(el)
	return el
}

func (el *EventLoop) NextTick(f func()) {
	el.nextFunc &lt;- f
}

func (el *EventLoop) CurrentTick(f func()) {
	el.curFunc &lt;- f
}

func (el *EventLoop) Quit() {
	close(el.nextFunc)
}

func eventLoop(el *EventLoop) {
	for {
		f, ok := &lt;-el.nextFunc
		if !ok {
			return
		}
		f()

		drain: for {
			select {
			case f := &lt;-el.curFunc:
				f()
			default:
				break drain
			}
		}
	}
}

Depending on your use, you may need to add some synchronization to make sure all tasks in the loop finish before your program exits.

答案2

得分: 1

我自己解决了,经过了很多问题和随机问题,包括使用长度为15而不是容量... 看起来你只需要在减少计数器后发送一条消息的线程。(循环的tick部分可以内联,但我不担心这个)

package eventloop

type eventLoop struct{
	functions []func()
	addFunc chan/*3*/ func()
	mutex chan/*1*/ bool
	threads int
	waitChannel chan bool
	pauseState chan bool
}
func (this *eventLoop) NextTick (f func()) {
	this.addFunc <- f
}

func (this *eventLoop) tick () {
	this.mutex <- true
	for this.threads != 0 {
		<-this.mutex
		<-this.waitChannel
		this.mutex <- true
	}
	<-this.mutex
	L1: for {
		select {
			case f := <-this.addFunc:
				this.functions = append(this.functions,f)
			default: break L1
		}
	}
	if len(this.functions) != 0 {
		this.functions[0]()
		if len(this.functions) >= 2 {
			this.functions = this.functions[1:]
		} else {
			this.functions = []func(){}
		}
	}	else {
		(<-this.addFunc)()
	}
}
func (this *eventLoop) CurrentTick (f func()) {
	this.mutex <- true
	this.threads += 1
	<-this.mutex
	go func() {
		f()
		this.mutex <- true
		this.threads -= 1
		<-this.mutex
		this.waitChannel <- true
	}()
}
func NewEventLoop () *eventLoop {
	funcs := make(chan func(),3)
	loop := &eventLoop{
		make([]func(),0,15), /*functions*/
		funcs, /*addFunc*/
		make(chan bool, 1), /*mutex for threads*/
		0, /*Number of threads*/
		make(chan bool,0), /*The "wait" channel*/
		make(chan bool,1), 
	}
	go func(){
		for { loop.tick() }
	}()
	return loop
}

注意:这仍然有很多其他问题。

英文:

I figured it out myself, after a lot of problems and random issues including using 15 as length instead of capacity... Seems you just have a thread send a message after you decrement the counter. (the loop.tick part could be inlined, but I'm not worried about that)

package eventloop

type eventLoop struct{
	functions []func()
	addFunc chan/*3*/ func()
	mutex chan/*1*/ bool
	threads int
	waitChannel chan bool
	pauseState chan bool
}
func (this *eventLoop) NextTick (f func()) {
	this.addFunc &lt;- f
}

func (this *eventLoop) tick () {
	this.mutex &lt;- true
	for this.threads != 0 {
		&lt;-this.mutex
		&lt;-this.waitChannel
		this.mutex &lt;- true
	}
	&lt;-this.mutex
	L1: for {
		select {
			case f := &lt;-this.addFunc:
				this.functions = append(this.functions,f)
			default: break L1
		}
	}
	if len(this.functions) != 0 {
		this.functions[0]()
		if len(this.functions) &gt;= 2 {
			this.functions = this.functions[1:]
		} else {
			this.functions = []func(){}
		}
	}	else {
		(&lt;-this.addFunc)()
	}
}
func (this *eventLoop) CurrentTick (f func()) {
	this.mutex &lt;- true
	this.threads += 1
	&lt;-this.mutex
	go func() {
		f()
		this.mutex &lt;- true
		this.threads -= 1
		&lt;-this.mutex
		this.waitChannel &lt;- true
	}()
}
func NewEventLoop () *eventLoop {
	funcs := make(chan func(),3)
	loop := &amp;eventLoop{
		make([]func(),0,15), /*functions*/
		funcs, /*addFunc*/
		make(chan bool, 1), /*mutex for threads*/
		0, /*Number of threads*/
		make(chan bool,0), /*The &quot;wait&quot; channel*/
		make(chan bool,1), 
	}
	go func(){
		for { loop.tick() }
	}()
	return loop
}

Note: this still has lots of other problems.

huangapple
  • 本文由 发表于 2011年9月26日 02:54:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/7547796.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定