在通道上进行非阻塞的多次接收。

huangapple go评论91阅读模式
英文:

Go Nonblocking multiple receive on channel

问题

到处似乎都在讨论从通道中读取应该是一个阻塞操作。态度似乎是这是 Go 的方式。这有些合理,但我正在努力弄清楚如何从通道中聚合数据。

例如,发送 HTTP 请求。假设我设置了一个生成数据流的管道,所以我有一个产生队列/流数据的通道。然后,我可以有一个 goroutine 监听这个通道,并发送 HTTP 请求将其存储在服务中。这样可以工作,但我为每个数据点创建了一个 HTTP 请求。

我发送到的端点允许我批量发送多个数据点。我想做的是:

  1. 读取尽可能多的值,直到在通道上阻塞。
  2. 将它们组合起来/发送单个 HTTP 请求。
  3. 然后在通道上阻塞,直到我可以再次读取。

这就是我在 C 中做事情的方式,使用线程安全的队列和 select 语句。基本上在可能的时候刷新整个/队列缓冲区。这在 Go 中是一种有效的技术吗?

似乎 Go 的 select 语句确实给了我类似于 C 的 select 的东西,但我仍然不确定是否有通道上的“非阻塞读取”。

编辑:我也愿意接受我打算的方式可能不是 Go 的方式,但是不停地发送非停止的 HTTP 请求对我来说也是错误的,特别是如果它们可以被聚合。如果有人有一个替代架构,那将很酷,但我想避免诸如“神奇地缓冲 N 个项目”或“等待 X 秒钟再发送”之类的事情。

英文:

Everywhere seems to discuss that reading from a channel should always be a blocking operation. The attitude seems to be this is the Go way. This makes some sense but I'm trying to figure out how I would aggregate things from channels.

For example, sending http requests. Say I have a pipeline setup that generates streams of data, so I have a channel that produces queue/stream of points. I could then have a goroutine listen to this channel and send a HTTP Request to store it in a service. This works, but I'm creating a http request for every point.

The endpoint I'm sending it too allows me to send multiple data points in a batch. What I would like to do, is

  1. Read as many values until I would block on channel.
  2. Combine them/send single http request.
  3. Then block on channel until I can read
    one again.

This is how I would've done things in C, with threadsafe queues and select statements. Basically flushing the entire/queue buffer when possible. Is this a valid technique in go?

It seems the go select statement does give me something similar to C's select, but I'm still not sure if there is a 'nonblocking read' on channels.

EDIT: I'm also willing to accept what I'm intending may not be the Go Way, but constantly smashing off non stop http requests also seems wrong to me, especially if they can be aggregated. If someone has an alternative architecture that will be cool, but I want to avoid things like, magically buffering N items, or waiting X seconds until sending.

答案1

得分: 5

以下是如何批处理直到通道为空的方法。变量batch是您的数据点类型的切片。变量ch是您的数据点类型的通道。

var batch []someType
for {
    select {
    case v := <-ch:
        batch = append(batch, v)
    default:
        if len(batch) > 0 {
            sendBatch(batch)
            batch := batch[:0]
        }
        batch = append(batch, <-ch)  // 在此处接收一个值可以防止忙等待。
    }
}

您应该防止批处理无限增长。以下是一种简单的方法:

var batch []someType
for {
    select {
    case v := <-ch:
        batch = append(batch, v)
        if len(batch) >= batchLimit {
            sendBatch(batch)
            batch := batch[:0]
        }
    default:
        if len(batch) > 0 {
            sendBatch(batch)
            batch := batch[:0]
        }
        batch = append(batch, <-ch)
    }
}
英文:

Here's how to batch until the channel is empty. The variable batch is a slice of your data point type. The variable ch is a channel of your data point type.

var batch []someType
for {
    select {
    case v := &lt;-ch:
       batch = append(batch, v)
    default:
       if len(batch) &gt; 0 {
           sendBatch(batch)
           batch := batch[:0]
       }
       batch = append(batch, &lt;-ch)  // receiving a value here prevents busy waiting.
    }
}

You should prevent the batch from growing without limit. Here's a simple way to do it:

var batch []someType
for {
    select {
    case v := &lt;-ch:
       batch = append(batch, v)
       if len(batch) &gt;= batchLimit {
           sendBatch(batch)
           batch := batch[:0]
       }
    default:
       if len(batch) &gt; 0 {
           sendBatch(batch)
           batch := batch[:0]
       }
       batch = append(batch, &lt;-ch)
    }
}

答案2

得分: 2

Dewy Broto提供了一个解决方案来解决你的问题。这是一个直接明了的解决方案,但我想更广泛地评论一下你如何解决不同问题的方法。

Go语言使用通信顺序进程代数(CSP)作为其通道、选择和轻量级进程('goroutines')的基础。CSP保证事件的顺序;只有在你通过选择(也称为select)来引入非确定性时,它才会引入非确定性。这种有序性有时被称为"happens-before",它使编码比另一种(广泛流行的)非阻塞风格更简单。它还为创建组件提供了更多的空间:通过通道以可预测的方式与外部世界交互的长期功能单元。

也许对通道的阻塞的讨论会给学习Go语言的人带来心理障碍。我们在I/O上阻塞,但我们在通道上等待。在通道上等待是可以接受的,前提是整个系统有足够的并行弹性(即其他活动的goroutines)来保持CPU繁忙。

可视化组件

所以,回到你的问题。让我们从组件的角度来考虑,你有许多需要探索的点的来源。假设每个来源都是一个goroutine,那么它就形成了你设计中的一个组件,带有一个输出通道。Go语言允许通道端点共享,因此许多来源可以安全地按顺序将它们的点插入到单个通道中。你不需要做任何事情-这就是通道的工作方式。

Dewy Broto描述的批处理函数本质上是另一个组件。作为学习练习,以这种方式表达它是一件好事。批处理组件有一个点的输入通道和一个批次的输出通道。

最后,HTTP I/O行为也可以是一个组件,它有一个输入通道和没有输出通道,仅用于接收整个批次的点,然后通过HTTP发送它们。

以只有一个来源的简单情况为例,可以这样描述:

+--------+     点      +---------+     批次     +-------------+
| 来源   +-------&gt;-------+ 批处理器 +-------&gt;-------+ HTTP输出   |
+--------+               +---------+               +-------------+

这里的意图是以它们的基本层次来描述不同的活动。这有点像数字电路图,这不是巧合。

你确实可以在Go语言中实现这个,它会工作。它甚至可能工作得足够好,但实际上你可能更喜欢通过组合成对的组件来优化它。在这种情况下,很容易将批处理器和HTTP输出组合在一起,这样做就得到了Dewy Broto的解决方案。

重要的是,Go语言的并发最容易发生在以下情况下:

  • (a) 不要事先担心阻塞;
  • (b) 以相当细粒度的级别描述需要发生的活动(在简单情况下,你可以在脑海中做到这一点);
  • (c) 如果需要,通过将函数组合在一起进行优化。

我将把将通道端点可视化的更高级主题(Pi-Calculus)作为一个挑战,其中通道用于将通道端点发送给其他goroutines。

英文:

Dewy Broto has given a good solution to your problem. This is a straightforward direct solution, but I wanted to comment more broadly on how you might go about finding solutions for different problems.

Go uses Communicating Sequential Process algebra (CSP) as its basis for channels, selection and lightweight processes ('goroutines'). CSP guarantees the order of events; it only introduces non-determinism when you make it do so by making a choice (a.k.a. select). The guaranteed ordering is sometimes called "happens-before" - it makes coding so much simpler than the alternative (widely popular) non-blocking style. It also gives more scope for creating components: units of long-lived functionality that interact with the outside world through channels in a predictable way.

Perhaps talk of blocking on channels puts a mental hurdle in the way of people learning Go. We block on I/O, but we wait on channels. Waiting on channels is not to be frowned on, provided that the system as a whole has enough parallel slackness (i.e. other active goroutines) to get on keeping the CPU busy.

Visualising Components

So, back to your problem. Let's think about it in terms of components, you have many sources of points you need to explore. Suppose each source is a goroutine, it then forms a component in your design with an output channel. Go lets channel-ends be shared, therefore many sources can safely interleave, in order, their points onto a single channel. You don't have to do anything - it's just how channels work.

The batching function described by Dewy Broto is, at essence, another component. As a learning exercise, it's a good thing to express it this way. The batching component has one input channel of points and one output channel of batches.

Finally the HTTP i/o behaviour could also be a component with one input channel and no output channels, serving merely to receive whole batches of points then send them via HTTP.

Taking the simple case of only one source, this might be depicted like this:

+--------+     point     +---------+     batch     +-------------+
| source +-------&gt;-------+ batcher +-------&gt;-------+ http output |
+--------+               +---------+               +-------------+

The intention here is to depict the different activities at their fundamental level. It's a bit like a digital circuit diagram and that's not a coincidence.

You could indeed implement this in Go and it would work. It might even work well enough, but you may in practice prefer to optimise it by combining pairs of components, repeatedly as necessary. In this case, it's easy to combine the batcher and http output and doing so ends up with Dewy Broto's solution.

The important point is that Go concurrency happens easiest by

  • (a) do not worry up front about blocking;
  • (b) depict the activities that need to happen at a fairly fine-grained level (you can do this in your head in simple cases);
  • (c) if necessary, optimise by combining functions together.

I'll leave as a challenge the more advanced topic of visualising mobile channel ends (Pi-Calculus) where channels are used to send channel-ends to other goroutines.

huangapple
  • 本文由 发表于 2014年10月7日 10:17:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/26227620.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定