从频道中选择数据

huangapple go评论79阅读模式
英文:

Selecting data from a channel

问题

type Reader struct {
	sync.RWMutex
	logger   *zerolog.Logger
	wg       *sync.WaitGroup
	ticker   *time.Ticker
	messages chan *TransactionData
}

type TransactionData struct {
	ID       string
	messages []*Message
}

func NewReader(logger zerolog.Logger) *Reader {
	return &Reader{
		wg:       &sync.WaitGroup{},
		logger:   &logger,
		ticker:   time.NewTicker(1 * time.Second),
		messages: make(chan *TransactionData),
	}
}

func (r *Reader) Write(ID string, messages []*Message) error {
	r.wg.Add(1)
	...

	go func(ID string, messages []*Message) {
		defer r.wg.Done()
		r.add(&TransactionData{
			ID:       ID,
			messages: messages,
		})
	}(ID, messages)

	return nil
}

func (r *Reader) add(data *TransactionData) {
	r.Lock()
	defer r.Unlock()
	r.messages <- data
}

func (r *Reader) get() {
	r.Lock()
	defer func() {
		r.Unlock()
	}()
	for {
		select {
		case <-r.ticker.C:
			// how to select all the data from the channel?
			for {
				select {
				case data := <-r.messages:
					// process the data for the transaction
				default:
					// no more data in the channel, exit the loop
					return
				}
			}
		}
	}
}

我正在研究通道。借助通道,我正在尝试解决一个问题。
任务:用户阅读聊天记录,选择用户阅读的所有消息,将这些消息放入通道,第二个通道选择数据并创建事务,通过goroutine使用Write方法将数据添加到通道中。我想创建一个第二个goroutine,每秒钟(使用ticker.C)从通道中选择所有数据并形成一个事务。但是我无法弄清楚如何选择我成功添加到通道中的所有数据,例如:

  user-1: Write(&TransactionData{...}),
user-2: Write(&TransactionData{...}),
....
user-n: Write(&TransactionData{...})

我如何选择绝对所有的数据,我需要选择所有的数据还是一次只需要选择一个数据?

解决方案的简要描述:

使用goroutine在无限循环中从通道中读取数据

  • 将读取的任务放入sync.Map线程安全的位置
  • 使用time.Ticker.C运行第二个goroutine,当触发时,选择用于事务的数据。

第一个问题:我如何从通道中选择所有的数据,以便我可以收集一个事务并更新更多的数据,前提是,我不使用任何键?(我附上了读取方法的大纲)

第二个问题:我应该获取通道中的所有数据还是一次只获取一个TransactionData并形成一个事务?

英文:
type Reader struct {
	sync.RWMutex
	logger   *zerolog.Logger
	wg       *sync.WaitGroup
	ticker   *time.Ticker
	messages chan *TransactionData
}

type TransactionData struct {
	ID       string
	messages []*Message
}

func NewReader(logger zerolog.Logger) *Reader {
	return &amp;Reader{
		wg:       &amp;sync.WaitGroup{},
		logger:   &amp;l,
		ticker:   time.NewTicker(1 * time.Second),
		messages: make(chan *TransactionData),
	}
}

func (r *Reader) Write(ID string, messages []*Message) error {
	r.wg.Add(1)
    ....

	go func(ID string, messages []*domain.Message) {
		defer r.wg.Done()
		r.add(&amp;TransactionData{
			chatID:   ID,
			messages: messages,
		})
	}(ID, messages)

	return nil
}

func (r *Reader) add(data *TransactionData) {
	r.Lock()
	defer r.Unlock()
	r.messages &lt;- data
}

func (r *Reader) get() {
	r.Lock()
	defer func() {
		r.Unlock()
	}()
	for {
		select {
		case r.ticker.C:
			// how to select all the data from the channel?
		}
	}
}

I'm exploring channels right now. And with the help of channels I'm trying to solve one problem.
Task: user reads chat history, selects all messages that user read, put these messages into channel, second channel selects data and creates transaction, through goroutine I add data into channel using Write method. I want to create a second goroutine which every second (using ticker.C) will select all the data from the channel and form a transaction. But I can't figure out how I can select all the data from the channel that I've managed to add there, example:

  user-1: Write(&amp;TransactionData{...}),
user-2: Write(&amp;TransactionData{...}),
....
user-n: Write(&amp;TransactionData{...})

How do I select absolutely all data and do I need to select all data or do I need to select one at a time?

Brief description of the solution:

Read from the channel in an infinite loop using goroutine

  • Put the subtracted tasks into the sync.Map thread-safe place
  • Run a second goroutine with time.Ticker.C and when it is triggered, select data for the transaction.

First question: How do I select all the data from the channel so that I collect one transaction and can update more data, provided that, I don't use any keys?(I attached an outline of the read method)

Second question: Should I get ALL the data from the channel or should I get only one TransactionData at a time and form one transaction?

答案1

得分: 2

你的方法和实现存在几个问题。

首先:发送到通道时不需要锁定互斥锁。

其次:从通道接收时不需要锁定互斥锁。

至于实现方面:由于通道是无缓冲的,只有在有监听的goroutine时通道发送才会起作用。如果在ticker触发后从通道接收,那么所有的发送操作都会在ticker触发后发生。发送操作将一直阻塞,直到那时为止。

因此,你应该摒弃那个ticker,在一个for循环中从通道中读取。当所有的写入完成后,你应该关闭通道,这将终止for循环,以便你可以写入。关闭通道是向接收方发出所有数据已发送的惯用方式。

英文:

There are several problems with your approach and implementation.

First: you don't need to lock a mutex to send to a channel.

Second: you don't need to lock a mutex to receive from a channel.

As for the implementation: since the channel is unbuffered, channel send will only work when there is a listening goroutine. If you receive from a channel when a ticker hits, then all the sends will happen after that ticker hits. Send operations will block until then.

So, you should get rid of that ticker, and read from the channel in a for-loop. When all the writes are completed, you should close the channel, which will terminate the for-loop, so you can write. Closing the channel is the idiomatic way of signaling to the receiver that all the data is sent.

huangapple
  • 本文由 发表于 2023年2月16日 05:18:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/75465516.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定