如何确定 Go 通道的哪一侧正在等待?

huangapple go评论124阅读模式
英文:

How to determine which side of go channel is waiting?

问题

如何确定 Go 通道的哪一侧正在等待另一侧?

我想知道这个问题的答案,以便确定我的处理过程受限的地方,并通过分配更多资源来做出响应。

一些选项

我想到的两种方法都需要对记录的值进行移动平均,以便测量结果不会太过嘈杂,但这并不是一个大问题


  1. 使用计时器检查消费者等待的时间百分比

对于单个消费者,我可以在从通道中消费之前启动一个计时器,并在获取记录后停止计时器。我可以跟踪我等待的时间百分比,并在每个获取周期中相应地做出响应。

  1. 采样缓冲通道的长度

如果通道的长度经常为0,那意味着我们的消费速度比发送速度快。同样地,如果缓冲区已满,那意味着我们的发送速度比接收速度快。我们可以随时间检查通道的长度,以确定哪一侧运行缓慢。


是否有充分的理由更喜欢其中一种方法,出于性能或其他原因?是否有更简单的解决方案来解决这个问题?

示例

我有一个服务,它在同时的 W 个 goroutine 中执行 N 个 HTTP 请求来获取内容,并将所有内容通过通道发送到在单个 goroutine 中运行的 processor,然后将数据反馈给客户端。

每个工作任务都会在通道上发送大量的消息。每个 worker 的任务可能需要几分钟才能完成。

以下图表总结了具有 3 个并发工作任务 (W=3) 的数据流程。

    [worker: task 1] -
                      \
    [worker: task 2] - | --- [ channel ] --- [ processor ] -> [ client ]
                      /
    [worker: task 3] -

我想知道在请求期间是否应该运行更多的工作任务(增加 W)或更少的工作任务(减少 W)。这可能因为客户端在速度非常不同的连接上工作而有很大的变化。

英文:

How do I determine which side of a go channel is waiting on the other side?

I'd like to know this so I can figure out where my processing is being limited and respond by allocating more resources.

Some options

The 2 methods I thought of both require something to do a moving average of recorded values so measurements are not too noisy, but that's not a big problem.


  1. Use a timer to check % of time waiting in consumer

In the case of a single consumer, I can start a timer before consuming from the channel, stopping the timer after I get a record. I can keep track of the % of the time I spend waiting and respond accordingly during each fetch cycle.

  1. Sample length of buffered channel

If the channel is regularly 0, that means we are consuming faster than sending. Similarly if the buffer is full, we're sending faster than we can receive. We can check the length of the our channel over time to determine what is running slow.


Is there a good reason to prefer one of there to another, for performance reasons or otherwise? Is there a simpler solution to this problem?

Example

I have a service that is performing N HTTP requests to grab content in up to W goroutines at the same time and sending all that content on a channel to a processor running in a single goroutine, which in turn feeds data back to a client.

Each worker task will result in a large number of messages sent on the channel. Each worker's task can take several minutes to complete.

The following diagram summarizes the flow of data with 3 concurrent workers (W=3).

    [worker: task 1] -
                      \
    [worker: task 2] - | --- [ channel ] --- [ processor ] -> [ client ]
                      /
    [worker: task 3] -

I want to know whether I should run more workers (increase W) or less workers (decrease W) during a request. This can vary a lot per request since client work over connections of very different speeds.

答案1

得分: 4

实现目标的一种方法是使用“有界发送”和“有界接收”操作,如果你能想出合理的轮询超时时间。

当你的任何一个工作线程尝试通过通道发送一个完成的结果时,不要让它永远阻塞(直到通道缓冲区中有空间);相反,只允许它阻塞一段最长的时间。如果超时发生在通道缓冲区中没有空间之前,你可以对这种情况做出反应:计算它发生的次数,调整未来的截止时间,限制或减少工作线程数量等等。

同样地,对于从工作线程接收结果的“处理器”,你可以限制它阻塞的时间。如果超时发生在没有可用的值之前,处理器将被饥饿。创建更多的工作线程以更快地提供结果给处理器(假设工作线程会从这样的并行性中受益)。

这种方法的缺点是为每个发送或接收操作创建定时器的开销。

以下是每个工作线程可以访问的声明的草图:

const minWorkers = 3
var workers uint32

在每个工作线程的goroutine中:

atomic.AddUint32(&workers, 1)
for {
    result, ok := produce()
    if !ok {
        break
    }
    // 检测通道“p”的缓冲区是否已满。
    select {
    case p <- result:
    case <-time.After(500 * time.Millisecond):
        // 无论需要多长时间,都将挂起的结果交给通道。
        p <- result
        // 如果当前工作线程数超过最小值,则减少工作线程数。
        if current := atomic.LoadUint32(&workers); current > minWorkers &&
            atomic.CompareAndSwapUint32(&workers, current, current-1) {
            return
        }
        // 如果仍然超过最小值,考虑再次尝试减少工作线程数。
        // 可能另一个工作线程也自愿退出,改变了计数。
    }
}
atomic.AddUint32(&workers, -1)

请注意,如上所述,你可以通过计时发送到通道p完成所需的时间,并在发送超时时做出反应,而不是进行一次有界发送,然后进行潜在的阻塞发送,从而实现相同的效果。然而,我以这种方式草图化它,因为我怀疑这样的代码会在超时到期时包括日志记录和仪表计数增加。

同样地,在处理器的goroutine中,你可以限制从工作线程接收值时阻塞的时间:

for {
    select {
    case result <- p:
        consume(result)
    case <-time.After(500 * time.Millisecond):
        maybeStartAnotherWorker()
    }
}

显然,你可以为这个装置添加许多旋钮。你将生产者的调度与消费者和生产者本身耦合在一起。引入一个不透明的“监听器”,让生产者和消费者“抱怨”延迟,可以打破这种循环关系,并更容易地改变管理拥塞反应策略的策略。

英文:

One way to reach your goal is to use "bounded send" and "bounded receive" operations—if you're able to come up with reasonable polling timeouts.

When any one of your workers attempts to send a completed result over the channel, don't let it block "forever" (until there's space in the channel's buffer); instead, only allow it to block some maximum amount of time. If the timeout occurs before there was space in the channel's buffer, you can react to that condition: count how many times it occurs, adjust future deadlines, throttle or reduce the worker count, and so on.

Similarly, for your "processor" receiving results from the workers, you can limit the amount of time it blocks. If the timeout occurs before there was a value available, the processor is starved. Create more workers to feed it more quickly (assuming the workers will benefit from such parallelism).

The downside to this approach is the overhead in creating timers for each send or receive operation.

Sketching, with these declarations accessible to each of your workers:

const minWorkers = 3
var workers uint32

In each worker goroutine:

atomic.AddUint32(&amp;workers, 1)
for {
	result, ok := produce()
	if !ok {
		break
	}
	// Detect when channel &quot;p&quot;&#39;s buffer is full.
	select {
	case p &lt;- result:
	case &lt;-time.After(500 * time.Millisecond):
		// Hand over the pending result, no matter how long it takes.
		p &lt;- result
		// Reduce worker count if above minimum.
		if current := atomic.LoadUint32(&amp;workers); current &gt; minWorkers &amp;&amp;
			atomic.CompareAndSwapUint32(&amp;workers, current, current-1) {
			return
		}
		// Consider whether to try decrementing the working count again
		// if we&#39;re still above the minimum. It&#39;s possible another one
		// of the workers also exited voluntarily, changing the count.
	}
}
atomic.AddUint32(&amp;workers, -1)

Note that as written above, you could achieve the same effect by timing how long it takes for the send to channel p to complete, and reacting to it having taken too long, as opposed to doing one bounded send followed by a potential blocking send. However, I sketched it that way because I suspect that such code would mature to include logging and instrumentation counter bumps when the timeout expires.

Similarly, in your processor goroutine, you could limit the amount of time you block receiving a value from the workers:

for {
	select {
	case result &lt;- p:
    	consume(result)
	case &lt;-time.After(500 * time.Millisecond):
		maybeStartAnotherWorker()
	}
}

Obviously, there are many knobs you can attach to this contraption. You wind up coupling the scheduling of the producers to both the consumer and the producers themselves. Introducing an opaque "listener" to which the producers and consumer "complain" about delays allows you to break this circular relationship and more easily vary the policy that governs how you react to congestion.

huangapple
  • 本文由 发表于 2016年12月28日 07:33:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/41353508.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定