如何“尝试发送”到一个通道,并在通道已满时中止?

huangapple go评论84阅读模式
英文:

How to "try send" to a channel, and abort if channel is full?

问题

我有一个变种的经典的“生产者-消费者”问题。在我的程序中,有10个生产者并行工作,它们的目标是总共生产N个产品。

我考虑使用带缓冲的通道:

products := make([]int, 100) // 总共生产100个产品

// 生产者
for i := 0; i < 10; i++ {
    go func() {
        products <- 1 // !!
    }()
}

然而,这样做不起作用:

  • 协程没有意识到目标已经达到,通道发送操作会阻塞,函数永远不会返回。
  • if len(products) < 100 { products <- 1 } 不是原子操作,因此它没有帮助。

那么还有其他方法吗?

英文:

I have a variation of classic "producer-consumer" problem. In my program, there are 10 producers working in parallel, and their goal is to produce N products in total.

I thought about using buffered channel:

products := make([]int, 100) // In total, produce 100 products

// The producers
for i := 0; i &lt; 10; i++ {
    go func() {
        products &lt;- 1 // !!
    }()
}

However, it will not work:

  • The goroutine does not realize that the goal is reached, channel send blocks, and the function never returns.
  • if len(products) &lt; 100 { products &lt;- 1 } is not an atomic operation, therefore it does not help.

So is there any other approach?

答案1

得分: 11

products := make([]int, 100) 创建的是一个切片(slice),而不是通道(chan)。你需要使用以下代码来创建通道:

products := make(chan int, 100)

如果你真的想要进行非阻塞发送,可以使用 select 语句:

select {
case products <- 1:
default:
}

这将首先尝试在 products 上发送数据,如果通道已满,则执行 default 代码(空操作)并继续执行。

英文:

products := make([]int, 100) makes a slice, not a chan. You want:

products := make(chan int, 100)

If you really want to do a non-blocking send, you can use select:

select {
case products &lt;- 1:
default:
}

This will first try to send on products and if it is full run the default code (no-op) and continue.

答案2

得分: 1

这样做的最佳方式可能是在生产者中使用一个退出通道和一个选择语句。Go语言保证关闭的通道将始终注册为可读而不会阻塞。

以下是在playground上的一个可工作版本:

package main

import "fmt"
import "time"

func main() {
    productionChan := make(chan int)
    quit := make(chan struct{})
    for i := 0; i < 110; i++ {
        go produce(productionChan, quit)
    }

    consume(productionChan, quit)
    time.Sleep(5 * time.Second) // 只是为了观察多余的生产通道是否正确退出
}

func consume(productionChan <-chan int, quit chan<- struct{}) {
    payload := make([]int, 100)

    for i := range payload {
        payload[i] = <-productionChan
    }

    close(quit)
    fmt.Println("完整的数据接收完成,payload切片的长度为:", len(payload))
}

func produce(productionChan chan<- int, quit <-chan struct{}) {
    select {
    case <-quit:
        fmt.Println("无需生产,退出!")
    case productionChan <- 1:
    }
}

这个思路是,单个消费者goroutine迭代一个所需大小的payload切片,当切片填满后,循环终止并关闭退出通道。所有的生产者都在一个选择语句中被阻塞,选择是发送还是从退出通道接收。当退出通道关闭时,使用该退出通道启动的每个生产者都会立即退出。

如果你有较少数量的生产者每个发送多个值,这个习惯用法也应该相对容易修改。

英文:

The best way to do this is probably to use a quit channel and a select statement in the producer. Go guarantees that a closed channel will always register as a read without blocking.

Here's a working version on the playground

package main

import &quot;fmt&quot;
import &quot;time&quot;

func main() {
	productionChan := make(chan int)
	quit := make(chan struct{})
	for i := 0; i &lt; 110; i++ {
		go produce(productionChan, quit)
	}

	consume(productionChan, quit)
	time.Sleep(5 * time.Second) // Just so we can observe the excess production channels quitting correctly
}

func consume(productionChan &lt;-chan int, quit chan&lt;- struct{}) {
	payload := make([]int, 100)

	for i := range payload {
		payload[i] = &lt;-productionChan
	}

	close(quit)
	fmt.Println(&quot;Complete payload received, length of payload slice: &quot;, len(payload))
}

func produce(productionChan chan&lt;- int, quit &lt;-chan struct{}) {
	select {
	case &lt;-quit:
		fmt.Println(&quot;No need to produce, quitting!&quot;)
	case productionChan &lt;- 1:
	}
}

The idea is that the single consumer goroutine iterates over a payload slice of the desired size, after that slice is filled, the loop terminates and it closes the quit channel. All producers are blocked in a select statement about whether to send, or receive from the quit channel. When the quit channel is closed, every producer launched with that quit channel will immediately exit.

This idiom should also be fairly easy to modify if you have a smaller number of producers that send multiple values each.

答案3

得分: 1

免责声明:这段代码并不是 Go 语言中最合乎习惯的写法,我不建议在实际应用中使用这段代码。话虽如此...

Go 最近引入了 reflect.Value.TryRecvreflect.Value.TrySend。它们正好符合你的需求。

products := make(chan int, 100)

for i := 0; i < 10; i++ {
    go func() {
        for {
            if !reflect.ValueOf(products).TrySend(1) {
                return
            }
        }
    }()
}

go playground 上运行这段代码。

英文:

Disclaimer: This is not idiomatic Go in the slightest, and I don't suggest actually using this code in practice. That said...

Go recently introduced reflect.Value.TryRecv and reflect.Value.TrySend. They do exactly what you're looking for.

products := make(chan int, 100)

for i := 0; i &lt; 10; i++ {
    go func() {
        for {
            if !reflect.ValueOf(products).TrySend(1) {
                return
            }
        }
    }()
}

See the code run on the go playground.

答案4

得分: 0

你可以使用select语句尝试向带缓冲的通道发送数据:如果在发送时选择了通道,并且通道缓冲已满,你将进入default分支:

// 这个 goroutine 不断向通道发送数据,直到无法发送为止
func f(c chan int, wg *sync.WaitGroup) {
    for i := 0; ; i++ {
        select {
        case c <- i: // 发送成功
            continue
        default:
            fmt.Println("无法发送,中止!")
            wg.Done()
            return
        }
    }
}

func main() {
    c := make(chan int, 100)
    wg := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go f(c, &wg)
    }
    wg.Wait()
    fmt.Println("完成!")
}

这种方法的缺点是,如果消费者在生产者完成之前开始消费,你将进入无限循环。

你还可以从消费者端关闭通道,导致生产者发生恐慌,并捕获该恐慌:

func f(c chan int) {
    defer func() {
        _ = recover()
        fmt.Println("完成!")
    }()
    for i := 0; ; i++ {
        c <- i
    }
}

func main() {
    c := make(chan int)
    for i := 0; i < 10; i++ {
        go f(c)
    }
    n := 0
    for {
        <-c
        n++
        if n > 100 {
            close(c)
            break
        }
    }
}

但是,如果你只想生产不超过给定数量的项目,为什么不生成N个 goroutine,每个 goroutine 生产1个项目?或者生成K个 goroutine,每个 goroutine 生产N/K个项目?最好事先知道这一点。

英文:

You can use select to try and send to a buffered channel: if you select on sending and the channel buffer is full, you'll reach the default case:

//this goroutine sends to the channel until it can&#39;t
func f(c chan int, wg *sync.WaitGroup) {

	for i := 0; ; i++ {
		select {
		case c &lt;- i: //sent successfully
			continue
		default:
			fmt.Println(&quot;Can&#39;t send, aborting!&quot;)
			wg.Done()
			return
		}
	}
}

func main() {

	c := make(chan int, 100)
	wg := sync.WaitGroup{}
	for i := 0; i &lt; 10; i++ {
		wg.Add(1)
		go f(c, &amp;wg)
	}

	wg.Wait()
	fmt.Println(&quot;Done!&quot;)

}

The drawback of this approach is that if the consumer starts consuming before you're done producing, you'll enter an infinite loop.

You can also close the channel from the consumer side causing the producers to panic, and catching this panic:

func f(c chan int) {

	defer func() {
		_ = recover()
		fmt.Println(&quot;Done!&quot;)
	}()

	for i := 0; ; i++ {
		c &lt;- i
	}
}

func main() {

	c := make(chan int)

	for i := 0; i &lt; 10; i++ {

		go f(c)
	}

	n := 0
	for {
		&lt;-c
		n++
		if n &gt; 100 {
			close(c)
			break
		}
	}

But if you just want to not produce more items than a given amount, why not spawn N goroutines, each producing 1 item? or spawning K goroutines each producing N/K items? It's better to know this in advance.

huangapple
  • 本文由 发表于 2014年1月6日 12:47:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/20943170.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定