英文:
How to wait for threads with low latency in go?
问题
我一直在尝试在Go中创建一个简单的事件循环包装器。但是我被卡住了,我应该如何跟踪当前线程中的操作呢?
我希望CurrentTick运行一个函数,即使调用函数退出,也不会开始下一个tick,直到所有由CurrentTick运行的函数退出。我想我可以使用互斥锁来监视线程数,但我意识到如果我一直反复检查它,那么会限制CPU。如果我使用time.Sleep,那么会有延迟。你会如何解决这个问题?
package eventloop
import (
"reflect"
)
type eventLoop *struct{
functions []reflect.Value
addFunc chan<- reflect.Value
mutex chan bool
threads int
}
func NewEventLoop() eventLoop {
var funcs chan reflect.Value
loop := eventLoop{
[]reflect.Value{},
funcs = make(chan reflect.Value, 3),
make(chan bool, 1),
0,
}
go func(){
for {
this.mutex <- true
if threads == 0 {
}
}
}
}
func (this eventLoop) NextTick(f func()) {
this.addFunc <- reflect.ValueOf(f)
}
func (this eventLoop) CurrentTick(f func()) {
this.mutex <- true
threads += 1
<-this.mutex
go func() {
f()
this.mutex <- true
threads -= 1
<-this.mutex
}()
}
英文:
I've been trying to create a simple event loop wrapper in Go. But I got stumped, how was I supposed to keep track of operations in the current thread?
I wanted CurrentTick to run a function, and even if the calling function quits, not start the next tick until all functions run by CurrentTick quit. I thought I might use a mutex to monitor the number of threads, but I realized if I kept checking that over and over it would throttle the CPU. If I used time.Sleep it would be latent. How would you solve the problem?
package eventloop
import (
"reflect"
)
type eventLoop *struct{
functions []reflect.Value
addFunc chan<-/*3*/ reflect.Value
mutex chan/*1*/ bool
threads int
}
func NewEventLoop() eventLoop {
var funcs chan reflect.Value
loop := eventLoop{
[]Reflect.Value{},
funcs = make(chan reflect.Value, 3),
make(chan bool, 1),
0,
}
go func(){
for {
this.mutex <- 1
if threads == 0 {
}
}
}
}
func (this eventLoop) NextTick(f func()) {
this.addFunc <- reflect.ValueOf(f)
}
func (this eventLoop) CurrentTick(f func()) {
this.mutex <- 1
threads += 1
<-this.mutex
go func() {
f()
this.mutex <- 1
threads -= 1
<-this.mutex
}()
}
答案1
得分: 2
如果我理解你的意图,我认为你把事情搞得太复杂了。我会这样做:
package eventloop
type EventLoop struct {
nextFunc chan func()
curFunc chan func()
}
func NewEventLoop() *EventLoop {
el := &EventLoop{
// 根据需要调整容量
make(chan func(), 3),
make(chan func(), 3),
}
go eventLoop(el)
return el
}
func (el *EventLoop) NextTick(f func()) {
el.nextFunc <- f
}
func (el *EventLoop) CurrentTick(f func()) {
el.curFunc <- f
}
func (el *EventLoop) Quit() {
close(el.nextFunc)
}
func eventLoop(el *EventLoop) {
for {
f, ok := <-el.nextFunc
if !ok {
return
}
f()
drain: for {
select {
case f := <-el.curFunc:
f()
default:
break drain
}
}
}
}
根据你的使用情况,你可能需要添加一些同步来确保循环中的所有任务在程序退出之前完成。
英文:
If I understand your intent, I think you're overcomplicating things. I'd do it like this:
package eventloop
type EventLoop struct {
nextFunc chan func()
curFunc chan func()
}
func NewEventLoop() *EventLoop {
el := &EventLoop{
// Adjust the capacities to taste
make(chan func(), 3),
make(chan func(), 3),
}
go eventLoop(el)
return el
}
func (el *EventLoop) NextTick(f func()) {
el.nextFunc <- f
}
func (el *EventLoop) CurrentTick(f func()) {
el.curFunc <- f
}
func (el *EventLoop) Quit() {
close(el.nextFunc)
}
func eventLoop(el *EventLoop) {
for {
f, ok := <-el.nextFunc
if !ok {
return
}
f()
drain: for {
select {
case f := <-el.curFunc:
f()
default:
break drain
}
}
}
}
Depending on your use, you may need to add some synchronization to make sure all tasks in the loop finish before your program exits.
答案2
得分: 1
我自己解决了,经过了很多问题和随机问题,包括使用长度为15而不是容量... 看起来你只需要在减少计数器后发送一条消息的线程。(循环的tick部分可以内联,但我不担心这个)
package eventloop
type eventLoop struct{
functions []func()
addFunc chan/*3*/ func()
mutex chan/*1*/ bool
threads int
waitChannel chan bool
pauseState chan bool
}
func (this *eventLoop) NextTick (f func()) {
this.addFunc <- f
}
func (this *eventLoop) tick () {
this.mutex <- true
for this.threads != 0 {
<-this.mutex
<-this.waitChannel
this.mutex <- true
}
<-this.mutex
L1: for {
select {
case f := <-this.addFunc:
this.functions = append(this.functions,f)
default: break L1
}
}
if len(this.functions) != 0 {
this.functions[0]()
if len(this.functions) >= 2 {
this.functions = this.functions[1:]
} else {
this.functions = []func(){}
}
} else {
(<-this.addFunc)()
}
}
func (this *eventLoop) CurrentTick (f func()) {
this.mutex <- true
this.threads += 1
<-this.mutex
go func() {
f()
this.mutex <- true
this.threads -= 1
<-this.mutex
this.waitChannel <- true
}()
}
func NewEventLoop () *eventLoop {
funcs := make(chan func(),3)
loop := &eventLoop{
make([]func(),0,15), /*functions*/
funcs, /*addFunc*/
make(chan bool, 1), /*mutex for threads*/
0, /*Number of threads*/
make(chan bool,0), /*The "wait" channel*/
make(chan bool,1),
}
go func(){
for { loop.tick() }
}()
return loop
}
注意:这仍然有很多其他问题。
英文:
I figured it out myself, after a lot of problems and random issues including using 15 as length instead of capacity... Seems you just have a thread send a message after you decrement the counter. (the loop.tick part could be inlined, but I'm not worried about that)
package eventloop
type eventLoop struct{
functions []func()
addFunc chan/*3*/ func()
mutex chan/*1*/ bool
threads int
waitChannel chan bool
pauseState chan bool
}
func (this *eventLoop) NextTick (f func()) {
this.addFunc <- f
}
func (this *eventLoop) tick () {
this.mutex <- true
for this.threads != 0 {
<-this.mutex
<-this.waitChannel
this.mutex <- true
}
<-this.mutex
L1: for {
select {
case f := <-this.addFunc:
this.functions = append(this.functions,f)
default: break L1
}
}
if len(this.functions) != 0 {
this.functions[0]()
if len(this.functions) >= 2 {
this.functions = this.functions[1:]
} else {
this.functions = []func(){}
}
} else {
(<-this.addFunc)()
}
}
func (this *eventLoop) CurrentTick (f func()) {
this.mutex <- true
this.threads += 1
<-this.mutex
go func() {
f()
this.mutex <- true
this.threads -= 1
<-this.mutex
this.waitChannel <- true
}()
}
func NewEventLoop () *eventLoop {
funcs := make(chan func(),3)
loop := &eventLoop{
make([]func(),0,15), /*functions*/
funcs, /*addFunc*/
make(chan bool, 1), /*mutex for threads*/
0, /*Number of threads*/
make(chan bool,0), /*The "wait" channel*/
make(chan bool,1),
}
go func(){
for { loop.tick() }
}()
return loop
}
Note: this still has lots of other problems.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论