英文:
How do I wait for lulls in channel activity to trigger something?
问题
我有一个通道,会接收到突发的写入操作。我想要在触发某个动作之前,等待通道上的一连串发送操作完成。
我查看了这个gist,然而,如果缓冲区中有数据,它会每隔interval时间发送一次输出:
func debounceChannel(interval time.Duration, output chan int) chan int {
  input := make(chan int)
  go func() {
    var buffer int
    var ok bool
    // 在至少调用一次之前,我们不会开始等待interval
    buffer, ok = <-input
    // 如果通道关闭,退出,我们也可以关闭输出
    if !ok {
      return
    }
    
    // 我们开始等待一个interval
    for {
      select {
      case buffer, ok = <-input:
        // 如果通道关闭,退出,我们也可以关闭输出
        if !ok {
          return
        }
      case <-time.After(interval):
        // interval已经过去并且我们有数据,所以发送它
        output <- buffer
        // 在开始等待一个interval之前再次等待数据
        buffer, ok = <-input
        if !ok {
          return
        }
        // 如果通道没有关闭,我们有更多的数据并开始等待interval
      }
    }
  }()
  return input
}
在我的情况下,我想要等待直到输入通道上不再有数据发送为止,然后再触发或发送到输出通道。
我该如何实现这个?
英文:
I have a channel that will receive bursts of writes to it. I want to wait until a burst of sends on the channel have finished before triggering an action.
I have looked at this gist, however, it will send on the output every intervalif there is data in the buffer:
func debounceChannel(interval time.Duration, output chan int) chan int {
  input := make(chan int)
  go func() {
    var buffer int
    var ok bool
    // We do not start waiting for interval until called at least once
    buffer, ok = <-input 
    // If channel closed exit, we could also close output
    if !ok {
      return
    }
    
    // We start waiting for an interval
    for {
      select {
      case buffer, ok = <-input:
        // If channel closed exit, we could also close output
        if !ok {
          return
        }
      case <-time.After(interval):
        // Interval has passed and we have data, so send it
        output <- buffer
        // Wait for data again before starting waiting for an interval
        buffer, ok = <-input
        if !ok {
          return
        }
        // If channel is not closed we have more data and start waiting for interval
      }
    }
  }()
  return input
}
In my case, I want to wait until there is no longer any data being sent on the input channel for this burst before triggering or sending on the output.
How do I achieve this?
答案1
得分: 1
听起来你需要在 goroutine 之间进行同步,可能是沿着这个方向。
func main() {
    // 为输入创建一个通道
    input := make(chan int, 1)
    // 创建另一个用于主 goroutine 和分叉 goroutine 之间的同步的通道
    done := make(chan bool)
    go func() {
        // 阻塞等待接收到的值
        <-input
        // 在这里做更多的事情
        // 完成后,向主 goroutine 发送信号
        done <- true
    }()
    // 在等待分叉的 goroutine 时做一些事情
    // 这里会阻塞,直到 `<-done`
    <-done
    close(mychan)
}
这篇文章很清楚地解释了使用通道和同步组进行同步的方法。
英文:
Sounds like you need synchronization between goroutines, perhaps along this line.
func main() {
        // Create a channel for our input
        input := make(chan int, 1)
        // Create another for synchronization between main and forked goroutines
        done := make(chan bool)
	    go func() {
	            // block-wait for received value
	            <-input
	
                // do some more things here
                
                // when done, send signal to the main goroutine
                done <- true
        }()
        // Do something while wait for the forked goroutine
        // this block until `<-done`
        <-done
        close(mychan)
}
This post explains quite clear about synchronization using channels and sync group.
答案2
得分: 0
这是我最终实现的防抖函数代码:
func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {
	go func() {
		var last int64 = 0
		for {
			select {
			case <-in:
				last = time.Now().Unix()
			case <-time.Tick(lull):
				if last != 0 && time.Now().Unix() >= last+int64(lull.Seconds()) {
					last = 0
					out <- struct{}{}
				}
			}
		}
	}()
}
该函数接受一个时间间隔 lull,用于判断在没有接收到输入时是否存在数据突发的间隙。函数有两个通道,一个输入通道和一个输出通道。数据突发通过输入通道传入,每次突发结束时,我们会向输出通道写入数据。
这个实现非常简单。每次从输入通道接收到数据时,我都会存储当前的 Unix 时间戳。然后,我使用一个定时器,定时器的间隔为 lull。定时器的作用是检查上一次突发是否已经超过了等待时间。如果是,就将 last 重置为 0,并在输出通道上触发一个事件。
下面是使用防抖函数的一些代码,其中 lull 时间设置为 2 秒,它会在输入通道上发送随机的数据突发:
func main() {
	out := make(chan struct{})
	in := make(chan struct{})
	Debounce(2*time.Second, in, out)
	// 生成数据突发
	go func(in chan struct{}) {
		for {
			select {
			case <-time.Tick(1 * time.Second):
				in <- struct{}{}
				fmt.Println("发送!")
				shouldSleep := rand.Intn(2)
				if shouldSleep == 1 {
					time.Sleep(5 * time.Second)
				}
			}
		}
	}(in)
	// 监听输出事件
	go func(out chan struct{}) {
		for _ = range out {
			fmt.Println("收到一个事件!")
		}
	}(out)
	// 防止 main 函数提前终止
	done := make(chan struct{})
	<-done
}
希望对你有帮助!
英文:
This is what I ended up implementing as my debouncer:
func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {
	go func() {
		var last int64 = 0
		for {
			select {
			case <-in:
				last = time.Now().Unix()
			case <-time.Tick(lull):
				if last != 0 && time.Now().Unix() >= last+int64(lull.Seconds()) {
					last = 0
					out <- struct{}{}
				}
			}
		}
	}()
}
It takes a lull time which is the duration where if we do not receive on the input, then we assume there is a break in the bursts of data. There are 2 channels, 1 input and 1 output. The bursts of data arrives on the input, and for each burst, we write to the output channel at the end of the burst.
The implementation is extremely simplistic. I just store the current unix timestamp every time I receive from the input channel. Then, I have a ticker ticking with a duration of the lull time. All this does is check to see if we've exceeded the wait time for the last burst. If so, it resets last to 0 an emits an event on the output channel.
Here's some code using the debounce function with a lull time of 2 seconds which sends random bursts on the input channel:
func main() {
	out := make(chan struct{})
	in := make(chan struct{})
	Debounce(2*time.Second, in, out)
    
    // Generating bursts of input data
	go func(in chan struct{}) {
		for {
			select {
			case <-time.Tick(1 * time.Second):
				in <- struct{}{}
			
				fmt.Println("Sending!")
			
				shouldSleep := rand.Intn(2)
				if shouldSleep == 1 {
					time.Sleep(5 * time.Second)
				}
			}
		}
	}(in)
    // Listening for output events
	go func(out chan struct{}) {
		for _ = range out {
			fmt.Println("Got an event!")
		}
	}(out)
	// Do not let main terminate.
	done := make(chan struct{})
	<-done
}
答案3
得分: 0
我使用的防抖函数如下:
package pkg
import (
	"context"
	"time"
)
// Debounce接收一个通道,并在静默时间后通过输出通道通知最后接收到的消息。
// 在取消时,它将最后一次检查是否有消息。
// 取消ctx将导致goroutine退出。
func Debounce[T any](ctx context.Context, lull time.Duration, input <-chan T, output chan<- T) {
	go func() {
		var (
			buffer   *T
			minTimer = time.NewTimer(lull)
			flush    = func() {
				if buffer != nil {
					// 如果无法发送,则不要阻塞
					select {
					case output <- *buffer:
					default:
					}
					buffer = nil
				}
			}
		)
		defer minTimer.Stop()
		hits := 0
		for {
			select {
			case <-ctx.Done():
				// 尝试获取最后一条消息
				select {
				case tmpBuf, ok := <-input:
					if !ok {
						break
					}
					buffer = &tmpBuf
				default:
				}
				flush()
				return
			case tmpBuf, ok := <-input:
				if !ok {
					flush()
					return
				}
				hits++
				buffer = &tmpBuf
			case <-minTimer.C:
				flush()
				minTimer.Reset(lull)
			}
		}
	}()
}
希望对你有帮助!
英文:
What I've used as a debouncer:
package pkg
import (
	"context"
	"time"
)
// Debounce takes a channel, and will notify the output channel with the last received message after a lull duration.
// Upon cancel, it will check one last time for a message.
// Cancelling the ctx will cause the goroutine to exit.
func Debounce[T any](ctx context.Context, lull time.Duration, input <-chan T, output chan<- T) {
	
	go func() {
		var (
			buffer   *T
			minTimer = time.NewTimer(min)
			flush    = func() {
				if buffer != nil {
					// don't block if we couldn't send
					select {
					case output <- *buffer:
					default:
					}
					buffer = nil
				}
			}
		)
		defer minTimer.Stop()
		hits := 0
		for {
			select {
			case <-ctx.Done():
				// try and get last message
				select {
				case tmpBuf, ok := <-input:
					if !ok {
						break
					}
					buffer = &tmpBuf
				default:
				}
				flush()
				return
			case tmpBuf, ok := <-input:
				if !ok {
                    flush()
					return
				}
				hits++
				buffer = &tmpBuf
			case <-minTimer.C:
				flush()
				minTimer.Reset(min)
			}
		}
	}()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论