英文:
Selecting data from a channel
问题
type Reader struct {
sync.RWMutex
logger *zerolog.Logger
wg *sync.WaitGroup
ticker *time.Ticker
messages chan *TransactionData
}
type TransactionData struct {
ID string
messages []*Message
}
func NewReader(logger zerolog.Logger) *Reader {
return &Reader{
wg: &sync.WaitGroup{},
logger: &logger,
ticker: time.NewTicker(1 * time.Second),
messages: make(chan *TransactionData),
}
}
func (r *Reader) Write(ID string, messages []*Message) error {
r.wg.Add(1)
...
go func(ID string, messages []*Message) {
defer r.wg.Done()
r.add(&TransactionData{
ID: ID,
messages: messages,
})
}(ID, messages)
return nil
}
func (r *Reader) add(data *TransactionData) {
r.Lock()
defer r.Unlock()
r.messages <- data
}
func (r *Reader) get() {
r.Lock()
defer func() {
r.Unlock()
}()
for {
select {
case <-r.ticker.C:
// how to select all the data from the channel?
for {
select {
case data := <-r.messages:
// process the data for the transaction
default:
// no more data in the channel, exit the loop
return
}
}
}
}
}
我正在研究通道。借助通道,我正在尝试解决一个问题。
任务:用户阅读聊天记录,选择用户阅读的所有消息,将这些消息放入通道,第二个通道选择数据并创建事务,通过goroutine使用Write
方法将数据添加到通道中。我想创建一个第二个goroutine,每秒钟(使用ticker.C)从通道中选择所有数据并形成一个事务。但是我无法弄清楚如何选择我成功添加到通道中的所有数据,例如:
user-1: Write(&TransactionData{...}),
user-2: Write(&TransactionData{...}),
....
user-n: Write(&TransactionData{...})
我如何选择绝对所有的数据,我需要选择所有的数据还是一次只需要选择一个数据?
解决方案的简要描述:
使用goroutine在无限循环中从通道中读取数据
- 将读取的任务放入
sync.Map
线程安全的位置 - 使用
time.Ticker.C
运行第二个goroutine,当触发时,选择用于事务的数据。
第一个问题:我如何从通道中选择所有的数据,以便我可以收集一个事务并更新更多的数据,前提是,我不使用任何键?(我附上了读取方法的大纲)
第二个问题:我应该获取通道中的所有数据还是一次只获取一个TransactionData
并形成一个事务?
英文:
type Reader struct {
sync.RWMutex
logger *zerolog.Logger
wg *sync.WaitGroup
ticker *time.Ticker
messages chan *TransactionData
}
type TransactionData struct {
ID string
messages []*Message
}
func NewReader(logger zerolog.Logger) *Reader {
return &Reader{
wg: &sync.WaitGroup{},
logger: &l,
ticker: time.NewTicker(1 * time.Second),
messages: make(chan *TransactionData),
}
}
func (r *Reader) Write(ID string, messages []*Message) error {
r.wg.Add(1)
....
go func(ID string, messages []*domain.Message) {
defer r.wg.Done()
r.add(&TransactionData{
chatID: ID,
messages: messages,
})
}(ID, messages)
return nil
}
func (r *Reader) add(data *TransactionData) {
r.Lock()
defer r.Unlock()
r.messages <- data
}
func (r *Reader) get() {
r.Lock()
defer func() {
r.Unlock()
}()
for {
select {
case r.ticker.C:
// how to select all the data from the channel?
}
}
}
I'm exploring channels right now. And with the help of channels I'm trying to solve one problem.
Task: user reads chat history, selects all messages that user read, put these messages into channel, second channel selects data and creates transaction, through goroutine I add data into channel using Write
method. I want to create a second goroutine which every second (using ticker.C) will select all the data from the channel and form a transaction. But I can't figure out how I can select all the data from the channel that I've managed to add there, example:
user-1: Write(&TransactionData{...}),
user-2: Write(&TransactionData{...}),
....
user-n: Write(&TransactionData{...})
How do I select absolutely all data and do I need to select all data or do I need to select one at a time?
Brief description of the solution:
Read from the channel in an infinite loop using goroutine
- Put the subtracted tasks into the
sync.Map
thread-safe place - Run a second goroutine with
time.Ticker.C
and when it is triggered, select data for the transaction.
First question: How do I select all the data from the channel so that I collect one transaction and can update more data, provided that, I don't use any keys?(I attached an outline of the read method)
Second question: Should I get ALL the data from the channel or should I get only one TransactionData
at a time and form one transaction?
答案1
得分: 2
你的方法和实现存在几个问题。
首先:发送到通道时不需要锁定互斥锁。
其次:从通道接收时不需要锁定互斥锁。
至于实现方面:由于通道是无缓冲的,只有在有监听的goroutine时通道发送才会起作用。如果在ticker触发后从通道接收,那么所有的发送操作都会在ticker触发后发生。发送操作将一直阻塞,直到那时为止。
因此,你应该摒弃那个ticker,在一个for循环中从通道中读取。当所有的写入完成后,你应该关闭通道,这将终止for循环,以便你可以写入。关闭通道是向接收方发出所有数据已发送的惯用方式。
英文:
There are several problems with your approach and implementation.
First: you don't need to lock a mutex to send to a channel.
Second: you don't need to lock a mutex to receive from a channel.
As for the implementation: since the channel is unbuffered, channel send will only work when there is a listening goroutine. If you receive from a channel when a ticker hits, then all the sends will happen after that ticker hits. Send operations will block until then.
So, you should get rid of that ticker, and read from the channel in a for-loop. When all the writes are completed, you should close the channel, which will terminate the for-loop, so you can write. Closing the channel is the idiomatic way of signaling to the receiver that all the data is sent.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论