如何等待频道活动的间隙来触发某个操作?

huangapple go评论79阅读模式
英文:

How do I wait for lulls in channel activity to trigger something?

问题

我有一个通道,会接收到突发的写入操作。我想要在触发某个动作之前,等待通道上的一连串发送操作完成。

我查看了这个gist,然而,如果缓冲区中有数据,它会每隔interval时间发送一次输出:

func debounceChannel(interval time.Duration, output chan int) chan int {
  input := make(chan int)

  go func() {
    var buffer int
    var ok bool

    // 在至少调用一次之前,我们不会开始等待interval
    buffer, ok = <-input
    // 如果通道关闭,退出,我们也可以关闭输出
    if !ok {
      return
    }
    
    // 我们开始等待一个interval
    for {
      select {
      case buffer, ok = <-input:
        // 如果通道关闭,退出,我们也可以关闭输出
        if !ok {
          return
        }

      case <-time.After(interval):
        // interval已经过去并且我们有数据,所以发送它
        output <- buffer
        // 在开始等待一个interval之前再次等待数据
        buffer, ok = <-input
        if !ok {
          return
        }
        // 如果通道没有关闭,我们有更多的数据并开始等待interval
      }
    }
  }()

  return input
}

在我的情况下,我想要等待直到输入通道上不再有数据发送为止,然后再触发或发送到输出通道。

我该如何实现这个?

英文:

I have a channel that will receive bursts of writes to it. I want to wait until a burst of sends on the channel have finished before triggering an action.

I have looked at this gist, however, it will send on the output every intervalif there is data in the buffer:

func debounceChannel(interval time.Duration, output chan int) chan int {
  input := make(chan int)

  go func() {
    var buffer int
    var ok bool

    // We do not start waiting for interval until called at least once
    buffer, ok = &lt;-input 
    // If channel closed exit, we could also close output
    if !ok {
      return
    }
    
    // We start waiting for an interval
    for {
      select {
      case buffer, ok = &lt;-input:
        // If channel closed exit, we could also close output
        if !ok {
          return
        }

      case &lt;-time.After(interval):
        // Interval has passed and we have data, so send it
        output &lt;- buffer
        // Wait for data again before starting waiting for an interval
        buffer, ok = &lt;-input
        if !ok {
          return
        }
        // If channel is not closed we have more data and start waiting for interval
      }
    }
  }()

  return input
}

In my case, I want to wait until there is no longer any data being sent on the input channel for this burst before triggering or sending on the output.

How do I achieve this?

答案1

得分: 1

听起来你需要在 goroutine 之间进行同步,可能是沿着这个方向。

func main() {

    // 为输入创建一个通道
    input := make(chan int, 1)
    // 创建另一个用于主 goroutine 和分叉 goroutine 之间的同步的通道
    done := make(chan bool)

    go func() {
        // 阻塞等待接收到的值
        <-input

        // 在这里做更多的事情

        // 完成后,向主 goroutine 发送信号
        done <- true
    }()

    // 在等待分叉的 goroutine 时做一些事情

    // 这里会阻塞,直到 `<-done`
    <-done
    close(mychan)
}

这篇文章很清楚地解释了使用通道和同步组进行同步的方法。

英文:

Sounds like you need synchronization between goroutines, perhaps along this line.

func main() {

        // Create a channel for our input
        input := make(chan int, 1)
        // Create another for synchronization between main and forked goroutines
        done := make(chan bool)

	    go func() {
	            // block-wait for received value
	            &lt;-input
	
                // do some more things here
                
                // when done, send signal to the main goroutine
                done &lt;- true
        }()

        // Do something while wait for the forked goroutine

        // this block until `&lt;-done`
        &lt;-done
        close(mychan)
}

This post explains quite clear about synchronization using channels and sync group.

答案2

得分: 0

这是我最终实现的防抖函数代码:

func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {

	go func() {

		var last int64 = 0

		for {
			select {
			case <-in:
				last = time.Now().Unix()

			case <-time.Tick(lull):
				if last != 0 && time.Now().Unix() >= last+int64(lull.Seconds()) {
					last = 0
					out <- struct{}{}
				}
			}
		}
	}()
}

该函数接受一个时间间隔 lull,用于判断在没有接收到输入时是否存在数据突发的间隙。函数有两个通道,一个输入通道和一个输出通道。数据突发通过输入通道传入,每次突发结束时,我们会向输出通道写入数据。

这个实现非常简单。每次从输入通道接收到数据时,我都会存储当前的 Unix 时间戳。然后,我使用一个定时器,定时器的间隔为 lull。定时器的作用是检查上一次突发是否已经超过了等待时间。如果是,就将 last 重置为 0,并在输出通道上触发一个事件。

下面是使用防抖函数的一些代码,其中 lull 时间设置为 2 秒,它会在输入通道上发送随机的数据突发:

func main() {

	out := make(chan struct{})
	in := make(chan struct{})

	Debounce(2*time.Second, in, out)

	// 生成数据突发
	go func(in chan struct{}) {

		for {
			select {
			case <-time.Tick(1 * time.Second):
				in <- struct{}{}

				fmt.Println("发送!")

				shouldSleep := rand.Intn(2)
				if shouldSleep == 1 {
					time.Sleep(5 * time.Second)
				}
			}
		}
	}(in)

	// 监听输出事件
	go func(out chan struct{}) {

		for _ = range out {
			fmt.Println("收到一个事件!")
		}
	}(out)

	// 防止 main 函数提前终止
	done := make(chan struct{})
	<-done
}

希望对你有帮助!

英文:

This is what I ended up implementing as my debouncer:

func Debounce(lull time.Duration, in chan struct{}, out chan struct{}) {

	go func() {

		var last int64 = 0

		for {
			select {
			case &lt;-in:
				last = time.Now().Unix()

			case &lt;-time.Tick(lull):
				if last != 0 &amp;&amp; time.Now().Unix() &gt;= last+int64(lull.Seconds()) {
					last = 0
					out &lt;- struct{}{}
				}
			}
		}
	}()
}

It takes a lull time which is the duration where if we do not receive on the input, then we assume there is a break in the bursts of data. There are 2 channels, 1 input and 1 output. The bursts of data arrives on the input, and for each burst, we write to the output channel at the end of the burst.

The implementation is extremely simplistic. I just store the current unix timestamp every time I receive from the input channel. Then, I have a ticker ticking with a duration of the lull time. All this does is check to see if we've exceeded the wait time for the last burst. If so, it resets last to 0 an emits an event on the output channel.

Here's some code using the debounce function with a lull time of 2 seconds which sends random bursts on the input channel:

func main() {

	out := make(chan struct{})
	in := make(chan struct{})

	Debounce(2*time.Second, in, out)
    
    // Generating bursts of input data
	go func(in chan struct{}) {

		for {
			select {
			case &lt;-time.Tick(1 * time.Second):
				in &lt;- struct{}{}
			
				fmt.Println(&quot;Sending!&quot;)
			
				shouldSleep := rand.Intn(2)
				if shouldSleep == 1 {
					time.Sleep(5 * time.Second)
				}
			}
		}
	}(in)

    // Listening for output events
	go func(out chan struct{}) {

		for _ = range out {
			fmt.Println(&quot;Got an event!&quot;)
		}
	}(out)

	// Do not let main terminate.
	done := make(chan struct{})
	&lt;-done
}

答案3

得分: 0

我使用的防抖函数如下:

package pkg

import (
	"context"
	"time"
)

// Debounce接收一个通道,并在静默时间后通过输出通道通知最后接收到的消息。
// 在取消时,它将最后一次检查是否有消息。
// 取消ctx将导致goroutine退出。
func Debounce[T any](ctx context.Context, lull time.Duration, input <-chan T, output chan<- T) {

	go func() {
		var (
			buffer   *T
			minTimer = time.NewTimer(lull)
			flush    = func() {
				if buffer != nil {
					// 如果无法发送,则不要阻塞
					select {
					case output <- *buffer:
					default:
					}
					buffer = nil
				}
			}
		)
		defer minTimer.Stop()

		hits := 0
		for {
			select {
			case <-ctx.Done():
				// 尝试获取最后一条消息
				select {
				case tmpBuf, ok := <-input:
					if !ok {
						break
					}
					buffer = &tmpBuf
				default:
				}
				flush()
				return
			case tmpBuf, ok := <-input:
				if !ok {
					flush()
					return
				}
				hits++
				buffer = &tmpBuf
			case <-minTimer.C:
				flush()
				minTimer.Reset(lull)
			}
		}
	}()
}

希望对你有帮助!

英文:

What I've used as a debouncer:

package pkg

import (
	&quot;context&quot;
	&quot;time&quot;
)

// Debounce takes a channel, and will notify the output channel with the last received message after a lull duration.
// Upon cancel, it will check one last time for a message.
// Cancelling the ctx will cause the goroutine to exit.
func Debounce[T any](ctx context.Context, lull time.Duration, input &lt;-chan T, output chan&lt;- T) {

	
	go func() {
		var (
			buffer   *T
			minTimer = time.NewTimer(min)
			flush    = func() {
				if buffer != nil {
					// don&#39;t block if we couldn&#39;t send
					select {
					case output &lt;- *buffer:
					default:
					}
					buffer = nil
				}
			}
		)
		defer minTimer.Stop()

		hits := 0
		for {
			select {
			case &lt;-ctx.Done():
				// try and get last message
				select {
				case tmpBuf, ok := &lt;-input:
					if !ok {
						break
					}
					buffer = &amp;tmpBuf
				default:
				}
				flush()
				return
			case tmpBuf, ok := &lt;-input:
				if !ok {
                    flush()
					return
				}
				hits++
				buffer = &amp;tmpBuf
			case &lt;-minTimer.C:
				flush()
				minTimer.Reset(min)
			}
		}
	}()
}

huangapple
  • 本文由 发表于 2016年3月14日 13:01:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/35979863.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定