英文:
How do I wait for lulls in channel activity to trigger something?
问题
我有一个通道,会接收到突发的写入操作。我想要在触发某个动作之前,等待通道上的一连串发送操作完成。
我查看了这个gist,然而,如果缓冲区中有数据,它会每隔interval
时间发送一次输出:
func debounceChannel(interval time.Duration, output chan int) chan int {
input := make(chan int)
go func() {
var buffer int
var ok bool
// 在至少调用一次之前,我们不会开始等待interval
buffer, ok = <-input
// 如果通道关闭,退出,我们也可以关闭输出
if !ok {
return
}
// 我们开始等待一个interval
for {
select {
case buffer, ok = <-input:
// 如果通道关闭,退出,我们也可以关闭输出
if !ok {
return
}
case <-time.After(interval):
// interval已经过去并且我们有数据,所以发送它
output <- buffer
// 在开始等待一个interval之前再次等待数据
buffer, ok = <-input
if !ok {
return
}
// 如果通道没有关闭,我们有更多的数据并开始等待interval
}
}
}()
return input
}
在我的情况下,我想要等待直到输入通道上不再有数据发送为止,然后再触发或发送到输出通道。
我该如何实现这个?
英文:
I have a channel that will receive bursts of writes to it. I want to wait until a burst of sends on the channel have finished before triggering an action.
I have looked at this gist, however, it will send on the output every interval
if there is data in the buffer:
func debounceChannel(interval time.Duration, output chan int) chan int {
input := make(chan int)
go func() {
var buffer int
var ok bool
// We do not start waiting for interval until called at least once
buffer, ok = <-input
// If channel closed exit, we could also close output
if !ok {
return
}
// We start waiting for an interval
for {
select {
case buffer, ok = <-input:
// If channel closed exit, we could also close output
if !ok {
return
}
case <-time.After(interval):
// Interval has passed and we have data, so send it
output <- buffer
// Wait for data again before starting waiting for an interval
buffer, ok = <-input
if !ok {
return
}
// If channel is not closed we have more data and start waiting for interval
}
}
}()
return input
}
In my case, I want to wait until there is no longer any data being sent on the input channel for this burst before triggering or sending on the output.
How do I achieve this?
答案1
得分: 1
听起来你需要在 goroutine 之间进行同步,可能是沿着这个方向。
func main() {
// 为输入创建一个通道
input := make(chan int, 1)
// 创建另一个用于主 goroutine 和分叉 goroutine 之间的同步的通道
done := make(chan bool)
go func() {
// 阻塞等待接收到的值
<-input
// 在这里做更多的事情
// 完成后,向主 goroutine 发送信号
done <- true
}()
// 在等待分叉的 goroutine 时做一些事情
// 这里会阻塞,直到 `<-done`
<-done
close(mychan)
}
这篇文章很清楚地解释了使用通道和同步组进行同步的方法。
英文:
Sounds like you need synchronization between goroutines, perhaps along this line.
func main() {
// Create a channel for our input
input := make(chan int, 1)
// Create another for synchronization between main and forked goroutines
done := make(chan bool)
go func() {
// block-wait for received value
<-input
// do some more things here
// when done, send signal to the main goroutine
done <- true
}()
// Do something while wait for the forked goroutine
// this block until `<-done`
<-done
close(mychan)
}
This post explains quite clear about synchronization using channels and sync group.
答案2
得分: 0
这是我最终实现的防抖函数代码:
func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {
go func() {
var last int64 = 0
for {
select {
case <-in:
last = time.Now().Unix()
case <-time.Tick(lull):
if last != 0 && time.Now().Unix() >= last+int64(lull.Seconds()) {
last = 0
out <- struct{}{}
}
}
}
}()
}
该函数接受一个时间间隔 lull
,用于判断在没有接收到输入时是否存在数据突发的间隙。函数有两个通道,一个输入通道和一个输出通道。数据突发通过输入通道传入,每次突发结束时,我们会向输出通道写入数据。
这个实现非常简单。每次从输入通道接收到数据时,我都会存储当前的 Unix 时间戳。然后,我使用一个定时器,定时器的间隔为 lull
。定时器的作用是检查上一次突发是否已经超过了等待时间。如果是,就将 last
重置为 0,并在输出通道上触发一个事件。
下面是使用防抖函数的一些代码,其中 lull
时间设置为 2 秒,它会在输入通道上发送随机的数据突发:
func main() {
out := make(chan struct{})
in := make(chan struct{})
Debounce(2*time.Second, in, out)
// 生成数据突发
go func(in chan struct{}) {
for {
select {
case <-time.Tick(1 * time.Second):
in <- struct{}{}
fmt.Println("发送!")
shouldSleep := rand.Intn(2)
if shouldSleep == 1 {
time.Sleep(5 * time.Second)
}
}
}
}(in)
// 监听输出事件
go func(out chan struct{}) {
for _ = range out {
fmt.Println("收到一个事件!")
}
}(out)
// 防止 main 函数提前终止
done := make(chan struct{})
<-done
}
希望对你有帮助!
英文:
This is what I ended up implementing as my debouncer:
func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {
go func() {
var last int64 = 0
for {
select {
case <-in:
last = time.Now().Unix()
case <-time.Tick(lull):
if last != 0 && time.Now().Unix() >= last+int64(lull.Seconds()) {
last = 0
out <- struct{}{}
}
}
}
}()
}
It takes a lull time which is the duration where if we do not receive on the input, then we assume there is a break in the bursts of data. There are 2 channels, 1 input and 1 output. The bursts of data arrives on the input, and for each burst, we write to the output channel at the end of the burst.
The implementation is extremely simplistic. I just store the current unix timestamp every time I receive from the input channel. Then, I have a ticker ticking with a duration of the lull time. All this does is check to see if we've exceeded the wait time for the last burst. If so, it resets last
to 0 an emits an event on the output channel.
Here's some code using the debounce function with a lull time of 2 seconds which sends random bursts on the input channel:
func main() {
out := make(chan struct{})
in := make(chan struct{})
Debounce(2*time.Second, in, out)
// Generating bursts of input data
go func(in chan struct{}) {
for {
select {
case <-time.Tick(1 * time.Second):
in <- struct{}{}
fmt.Println("Sending!")
shouldSleep := rand.Intn(2)
if shouldSleep == 1 {
time.Sleep(5 * time.Second)
}
}
}
}(in)
// Listening for output events
go func(out chan struct{}) {
for _ = range out {
fmt.Println("Got an event!")
}
}(out)
// Do not let main terminate.
done := make(chan struct{})
<-done
}
答案3
得分: 0
我使用的防抖函数如下:
package pkg
import (
"context"
"time"
)
// Debounce接收一个通道,并在静默时间后通过输出通道通知最后接收到的消息。
// 在取消时,它将最后一次检查是否有消息。
// 取消ctx将导致goroutine退出。
func Debounce[T any](ctx context.Context, lull time.Duration, input <-chan T, output chan<- T) {
go func() {
var (
buffer *T
minTimer = time.NewTimer(lull)
flush = func() {
if buffer != nil {
// 如果无法发送,则不要阻塞
select {
case output <- *buffer:
default:
}
buffer = nil
}
}
)
defer minTimer.Stop()
hits := 0
for {
select {
case <-ctx.Done():
// 尝试获取最后一条消息
select {
case tmpBuf, ok := <-input:
if !ok {
break
}
buffer = &tmpBuf
default:
}
flush()
return
case tmpBuf, ok := <-input:
if !ok {
flush()
return
}
hits++
buffer = &tmpBuf
case <-minTimer.C:
flush()
minTimer.Reset(lull)
}
}
}()
}
希望对你有帮助!
英文:
What I've used as a debouncer:
package pkg
import (
"context"
"time"
)
// Debounce takes a channel, and will notify the output channel with the last received message after a lull duration.
// Upon cancel, it will check one last time for a message.
// Cancelling the ctx will cause the goroutine to exit.
func Debounce[T any](ctx context.Context, lull time.Duration, input <-chan T, output chan<- T) {
go func() {
var (
buffer *T
minTimer = time.NewTimer(min)
flush = func() {
if buffer != nil {
// don't block if we couldn't send
select {
case output <- *buffer:
default:
}
buffer = nil
}
}
)
defer minTimer.Stop()
hits := 0
for {
select {
case <-ctx.Done():
// try and get last message
select {
case tmpBuf, ok := <-input:
if !ok {
break
}
buffer = &tmpBuf
default:
}
flush()
return
case tmpBuf, ok := <-input:
if !ok {
flush()
return
}
hits++
buffer = &tmpBuf
case <-minTimer.C:
flush()
minTimer.Reset(min)
}
}
}()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论