英文:
Why is data being pushed into the channel but never read from the receiver goroutine?
问题
我正在构建一个守护进程,其中有两个服务彼此之间发送数据。服务A是数据的生产者,服务B是数据缓冲服务或队列。所以在main.go
文件中,实例化并启动了服务B。Start()
方法将buffer()
函数作为一个goroutine执行,因为该函数等待数据传递到通道上,我不希望主进程在等待buffer
完成时停止。然后实例化并启动服务A。然后还将其与服务B进行了"注册"。
我为服务A创建了一个名为RegisterWithBufferService
的方法,它创建了两个新的通道。它将这些通道存储为自己的属性,并将它们提供给服务B。
func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error {
newIncomingChan := make(chan *data.DataFrame, 1)
newOutgoingChan := make(chan []byte, 1)
s.IncomingBuffChan = newIncomingChan
s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo{
IncomingChan: newOutgoingChan, //我们的输出通道是他们的输入通道
OutgoingChan: newIncomingChan, //我们的输入通道是他们的输出通道
}
s.DataBufferService = bufService
bufService.NewProvider <- s.ServiceName() //DataBuffer服务监听新服务并为缓冲创建一个新的goroutine
s.Logger.Info().Msg("注册完成。")
return nil
}
缓冲区基本上监听来自服务A的传入数据,使用Decode()
进行解码,然后将其添加到名为buf
的切片中。如果切片的长度大于bufferPeriod
,则将切片中的第一项通过输出通道发送回服务A。
func (b *DataBuffer) buffer(bufferPeriod int) {
for {
select {
case newProvider := <-b.NewProvider:
b.wg.Add(1)
/*
newProvider是一个字符串
DataProviders是一个映射,它返回的值是一个包含此服务的输入和输出通道的结构体
*/
p := b.DataProviders[newProvider]
go func(prov string, in chan []byte, out chan *DataFrame) {
defer b.wg.Done()
var buf []*DataFrame
for {
select {
case rawData := <-in:
tmp := Decode(rawData) //自定义解码函数。返回一个*DataFrame
buf = append(buf, tmp)
if len(buf) < bufferPeriod {
b.Logger.Info().Msg("发送解码后的数据。")
out <- buf[0]
buf = buf[1:] //弹出
}
case <-b.Quit:
return
}
}
}(newProvider, p.IncomingChan, p.OutgoingChan)
}
case <-b.Quit:
return
}
}
现在,服务A有一个名为record
的方法,它会定期将数据推送到其OutgoingDataChannels
属性中的所有通道。
func (s *ServiceA) record() error {
...
if atomic.LoadInt32(&s.Listeners) != 0 {
s.Logger.Info().Msg("将原始数据发送到数据缓冲区")
for _, outChan := range s.OutgoingDataChannels {
outChan <- dataBytes //接收方(服务B)已经在监听,这不会阻塞
}
s.Logger.Info().Msg("原始数据已发送并接收") //记录器将输出此消息,以便我知道它没有挂起
}
}
问题是,服务A似乎成功使用record
推送数据,但是服务B从未进入inData := <-in
的情况。这是因为我嵌套了goroutine吗?如果不清楚的话,当启动服务B时,它调用buffer
,但是因为否则会挂起,所以我将对buffer
的调用设置为了一个goroutine。因此,当服务A调用RegisterWithBufferService
时,buffer
的goroutine会创建一个goroutine来监听来自服务B的新数据,并在填充缓冲区后将其推送回服务A。希望我解释清楚了。
编辑1
我创建了一个最小化的、可重现的示例。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
defaultBufferingPeriod int = 3
DefaultPollingInterval int64 = 10
)
type DataObject struct{
Data string
}
type DataProvider interface {
RegisterWithBufferService(*DataBuffer) error
ServiceName() string
}
type DataProviderInfo struct{
IncomingChan chan *DataObject
OutgoingChan chan *DataObject
}
type DataBuffer struct{
Running int32 //用于原子操作
DataProviders map[string]DataProviderInfo
Quit chan struct{}
NewProvider chan string
wg sync.WaitGroup
}
func NewDataBuffer() *DataBuffer{
var (
wg sync.WaitGroup
)
return &DataBuffer{
DataProviders: make(map[string]DataProviderInfo),
Quit: make(chan struct{}),
NewProvider: make(chan string),
wg: wg,
}
}
func (b *DataBuffer) Start() error {
if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
return fmt.Errorf("无法启动数据缓冲服务。")
}
go b.buffer(defaultBufferingPeriod)
return nil
}
func (b *DataBuffer) Stop() error {
if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
return fmt.Errorf("无法停止数据缓冲服务。")
}
for _, p := range b.DataProviders {
close(p.IncomingChan)
close(p.OutgoingChan)
}
close(b.Quit)
b.wg.Wait()
return nil
}
// buffer为每个传入、传出数据对创建goroutine,并使用传入的字节解码为传出的DataFrame
func (b *DataBuffer) buffer(bufferPeriod int) {
for {
select {
case newProvider := <-b.NewProvider:
fmt.Println("收到新的数据提供者。")
if _, ok := b.DataProviders[newProvider]; ok {
b.wg.Add(1)
p := b.DataProviders[newProvider]
go func(prov string, in chan *DataObject, out chan *DataObject) {
defer b.wg.Done()
var (
buf []*DataObject
)
fmt.Printf("等待来自:%s 的数据\n", prov)
for {
select {
case inData := <-in:
fmt.Printf("从:%s 接收到数据\n", prov)
buf = append(buf, inData)
if len(buf) > bufferPeriod {
fmt.Printf("队列已满,将数据发送回 %s\n", prov)
out <- buf[0]
fmt.Println("数据已发送")
buf = buf[1:] //弹出
}
case <-b.Quit:
return
}
}
}(newProvider, p.IncomingChan, p.OutgoingChan)
}
case <-b.Quit:
return
}
}
}
type ServiceA struct{
Active int32 // 原子操作
Stopping int32 // 原子操作
Recording int32 // 原子操作
Listeners int32 // 原子操作
name string
QuitChan chan struct{}
IncomingBuffChan chan *DataObject
OutgoingBuffChans []chan *DataObject
DataBufferService *DataBuffer
}
// 编译时检查,确保ServiceA完全实现了DataProvider接口
var _ DataProvider = (*ServiceA)(nil)
func NewServiceA() (*ServiceA, error) {
var newSliceOutChans []chan *DataObject
return &ServiceA{
QuitChan: make(chan struct{}),
OutgoingBuffChans: newSliceOutChans,
name: "SERVICEA",
}, nil
}
// Start启动服务。如果出现任何问题,返回错误
func (s *ServiceA) Start() error {
atomic.StoreInt32(&s.Active, 1)
return nil
}
// Stop停止服务。如果出现任何问题,返回错误
func (s *ServiceA) Stop() error {
atomic.StoreInt32(&s.Stopping, 1)
close(s.QuitChan)
return nil
}
func (s *ServiceA) StartRecording(pol_int int64) error {
if ok := atomic.CompareAndSwapInt32(&s.Recording, 0, 1); !ok {
return fmt.Errorf("无法开始记录。数据记录已经开始")
}
ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
go func() {
for {
select {
case <-ticker.C:
fmt.Println("开始记录...")
err := s.record()
if err != nil {
return
}
case <-s.QuitChan:
ticker.Stop()
return
}
}
}()
return nil
}
func (s *ServiceA) record() error {
current_time := time.Now()
ct := fmt.Sprintf("%02d-%02d-%d", current_time.Day(), current_time.Month(), current_time.Year())
dataObject := &DataObject{
Data: ct,
}
if atomic.LoadInt32(&s.Listeners) != 0 {
fmt.Println("将数据发送到数据缓冲区...")
for _, outChan := range s.OutgoingBuffChans {
outChan <- dataObject // 接收方应该已经在监听
}
fmt.Println("数据已发送。")
}
return nil
}
// RegisterWithBufferService满足DataProvider接口。它向bufService提供新的传入和传出通道以及轮询间隔
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
if _, ok := bufService.DataProviders[s.ServiceName()]; ok {
return fmt.Errorf("%v 数据提供者已经注册到数据缓冲区。", s.ServiceName())
}
newIncomingChan := make(chan *DataObject, 1)
newOutgoingChan := make(chan *DataObject, 1)
s.IncomingBuffChan = newIncomingChan
s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
bufService.DataProviders[s.ServiceName()] = DataProviderInfo{
IncomingChan: newOutgoingChan, //我们的输出通道是他们的输入通道
OutgoingChan: newIncomingChan, //我们的输入通道是他们的输出通道
}
s.DataBufferService = bufService
bufService.NewProvider <- s.ServiceName() //DataBuffer服务监听新服务并为缓冲创建一个新的goroutine
return nil
}
// ServiceName满足DataProvider接口。它返回服务的名称。
func (s ServiceA) ServiceName() string {
return s.name
}
func main() {
var BufferedServices []DataProvider
fmt.Println("实例化并启动数据缓冲服务...")
bufService := NewDataBuffer()
err := bufService.Start()
if err != nil {
panic(fmt.Sprintf("%v", err))
}
defer bufService.Stop()
fmt.Println("数据缓冲服务成功启动。")
fmt.Println("实例化并启动服务A...")
serviceA, err := NewServiceA()
if err != nil {
panic(fmt.Sprintf("%v", err))
}
BufferedServices = append(BufferedServices, *serviceA)
err = serviceA.Start()
if err != nil {
panic(fmt.Sprintf("%v", err))
}
defer serviceA.Stop()
fmt.Println("服务A成功启动。")
fmt.Println("将服务注册到数据缓冲区...")
for _, s := range BufferedServices {
_ = s.RegisterWithBufferService(bufService) // 忽略基本情况下的错误消息
}
fmt.Println("注册完成。")
fmt.Println("开始记录...")
_ = atomic.AddInt32(&serviceA.Listeners, 1)
err = serviceA.StartRecording(DefaultPollingInterval)
if err != nil {
panic(fmt.Sprintf("%v", err))
}
for {
select {
case RTD := <-serviceA.IncomingBuffChan:
fmt.Println(RTD)
case <-serviceA.QuitChan:
atomic.StoreInt32(&serviceA.Listeners, 0)
bufService.Quit<-struct{}{}
}
}
}
在Go 1.17上运行。运行示例时,它应该每10秒打印以下内容:
开始记录...
将数据发送到数据缓冲区...
数据已发送。
但是,数据缓冲区从未进入inData := <-in
的情况。
英文:
I am building a daemon and I have two services that will be sending data to and from each other. Service A is what produces the data and service B a is Data Buffer service or like a queue. So from the main.go
file, service B is instantiated and started. The Start()
method will perform the buffer()
function as a goroutine because this function waits for data to be passed onto a channel and I don't want the main process to halt waiting for buffer
to complete. Then Service A is instantiated and started. It is then also "registered" with Service B.
I created a method called RegisterWithBufferService
for Service A that creates two new channels. It will store those channels as it's own attributes and also provide them to Service B.
func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error {
newIncomingChan := make(chan *data.DataFrame, 1)
newOutgoingChan := make(chan []byte, 1)
s.IncomingBuffChan = newIncomingChan
s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo{
IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
}
s.DataBufferService = bufService
bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
s.Logger.Info().Msg("Registeration completed.")
return nil
}
Buffer essentially listens for incoming data from Service A, decodes it using Decode()
and then adds it to a slice called buf
. If the slice is greater in length than bufferPeriod
then it will send the first item in the slice in the Outgoing channel back to Service A.
func (b* DataBuffer) buffer(bufferPeriod int) {
for {
select {
case newProvider := <- b.NewProvider:
b.wg.Add(1)
/*
newProvider is a string
DataProviders is a map the value it returns is a struct containing the Incoming and
Outgoing channels for this service
*/
p := b.DataProviders[newProvider]
go func(prov string, in chan []byte, out chan *DataFrame) {
defer b.wg.Done()
var buf []*DataFrame
for {
select {
case rawData := <-in:
tmp := Decode(rawData) //custom decoding function. Returns a *DataFrame
buf = append(buf, tmp)
if len(buf) < bufferPeriod {
b.Logger.Info().Msg("Sending decoded data out.")
out <- buf[0]
buf = buf[1:] //pop
}
case <- b.Quit:
return
}
}
}(newProvider, p.IncomingChan, p.OutgoingChan)
}
case <- b.Quit:
return
}
}
Now Service A has a method called record
that will periodically push data to all the channels in it's OutgoingDataChannels
attribute.
func (s *ServiceA) record() error {
...
if atomic.LoadInt32(&s.Listeners) != 0 {
s.Logger.Info().Msg("Sending raw data to data buffer")
for _, outChan := range s.OutgoingDataChannels {
outChan <- dataBytes // the receiver (Service B) is already listening and this doesn't hang
}
s.Logger.Info().Msg("Raw data sent and received") // The logger will output this so I know it's not hanging
}
}
The problem is that Service A seems to push the data successfully using record
but Service B never goes into the case rawData := <-in:
case in the buffer
sub-goroutine. Is this because I have nested goroutines? Incase it's not clear, when Service B is started, it calls buffer
but because it would hang otherwise, I made the call to buffer
a goroutine. So then when Service A calls RegisterWithBufferService
, the buffer
goroutine creates a goroutine to listen for new data from Service B and push it back to Service A once the buffer is filled. I hope I explained it clearly.
EDIT 1
I've made a minimal, reproducible example.
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
defaultBufferingPeriod int = 3
DefaultPollingInterval int64 = 10
)
type DataObject struct{
Data string
}
type DataProvider interface {
RegisterWithBufferService(*DataBuffer) error
ServiceName() string
}
type DataProviderInfo struct{
IncomingChan chan *DataObject
OutgoingChan chan *DataObject
}
type DataBuffer struct{
Running int32 //used atomically
DataProviders map[string]DataProviderInfo
Quit chan struct{}
NewProvider chan string
wg sync.WaitGroup
}
func NewDataBuffer() *DataBuffer{
var (
wg sync.WaitGroup
)
return &DataBuffer{
DataProviders: make(map[string]DataProviderInfo),
Quit: make(chan struct{}),
NewProvider: make(chan string),
wg: wg,
}
}
func (b *DataBuffer) Start() error {
if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
return fmt.Errorf("Could not start Data Buffer Service.")
}
go b.buffer(defaultBufferingPeriod)
return nil
}
func (b *DataBuffer) Stop() error {
if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
return fmt.Errorf("Could not stop Data Buffer Service.")
}
for _, p := range b.DataProviders {
close(p.IncomingChan)
close(p.OutgoingChan)
}
close(b.Quit)
b.wg.Wait()
return nil
}
// buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing DataFrames
func (b *DataBuffer) buffer(bufferPeriod int) {
for {
select {
case newProvider := <- b.NewProvider:
fmt.Println("Received new Data provider.")
if _, ok := b.DataProviders[newProvider]; ok {
b.wg.Add(1)
p := b.DataProviders[newProvider]
go func(prov string, in chan *DataObject, out chan *DataObject) {
defer b.wg.Done()
var (
buf []*DataObject
)
fmt.Printf("Waiting for data from: %s\n", prov)
for {
select {
case inData := <-in:
fmt.Printf("Received data from: %s\n", prov)
buf = append(buf, inData)
if len(buf) > bufferPeriod {
fmt.Printf("Queue is filled, sending data back to %s\n", prov)
out <- buf[0]
fmt.Println("Data Sent")
buf = buf[1:] //pop
}
case <- b.Quit:
return
}
}
}(newProvider, p.IncomingChan, p.OutgoingChan)
}
case <- b.Quit:
return
}
}
}
type ServiceA struct{
Active int32 // atomic
Stopping int32 // atomic
Recording int32 // atomic
Listeners int32 // atomic
name string
QuitChan chan struct{}
IncomingBuffChan chan *DataObject
OutgoingBuffChans []chan *DataObject
DataBufferService *DataBuffer
}
// A compile time check to ensure ServiceA fully implements the DataProvider interface
var _ DataProvider = (*ServiceA)(nil)
func NewServiceA() (*ServiceA, error) {
var newSliceOutChans []chan *DataObject
return &ServiceA{
QuitChan: make(chan struct{}),
OutgoingBuffChans: newSliceOutChans,
name: "SERVICEA",
}, nil
}
// Start starts the service. Returns an error if any issues occur
func (s *ServiceA) Start() error {
atomic.StoreInt32(&s.Active, 1)
return nil
}
// Stop stops the service. Returns an error if any issues occur
func (s *ServiceA) Stop() error {
atomic.StoreInt32(&s.Stopping, 1)
close(s.QuitChan)
return nil
}
func (s *ServiceA) StartRecording(pol_int int64) error {
if ok := atomic.CompareAndSwapInt32(&s.Recording, 0, 1); !ok {
return fmt.Errorf("Could not start recording. Data recording already started")
}
ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
go func() {
for {
select {
case <-ticker.C:
fmt.Println("Time to record...")
err := s.record()
if err != nil {
return
}
case <-s.QuitChan:
ticker.Stop()
return
}
}
}()
return nil
}
func (s *ServiceA) record() error {
current_time := time.Now()
ct := fmt.Sprintf("%02d-%02d-%d", current_time.Day(), current_time.Month(), current_time.Year())
dataObject := &DataObject{
Data: ct,
}
if atomic.LoadInt32(&s.Listeners) != 0 {
fmt.Println("Sending data to Data buffer...")
for _, outChan := range s.OutgoingBuffChans {
outChan <- dataObject // the receivers should already be listening
}
fmt.Println("Data sent.")
}
return nil
}
// RegisterWithBufferService satisfies the DataProvider interface. It provides the bufService with new incoming and outgoing channels along with a polling interval
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
if _, ok := bufService.DataProviders[s.ServiceName()]; ok {
return fmt.Errorf("%v data provider already registered with Data Buffer.", s.ServiceName())
}
newIncomingChan := make(chan *DataObject, 1)
newOutgoingChan := make(chan *DataObject, 1)
s.IncomingBuffChan = newIncomingChan
s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
bufService.DataProviders[s.ServiceName()] = DataProviderInfo{
IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
}
s.DataBufferService = bufService
bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
return nil
}
// ServiceName satisfies the DataProvider interface. It returns the name of the service.
func (s ServiceA) ServiceName() string {
return s.name
}
func main() {
var BufferedServices []DataProvider
fmt.Println("Instantiating and Starting Data Buffer Service...")
bufService := NewDataBuffer()
err := bufService.Start()
if err != nil {
panic(fmt.Sprintf("%v", err))
}
defer bufService.Stop()
fmt.Println("Data Buffer Service successfully started.")
fmt.Println("Instantiating and Starting Service A...")
serviceA, err := NewServiceA()
if err != nil {
panic(fmt.Sprintf("%v", err))
}
BufferedServices = append(BufferedServices, *serviceA)
err = serviceA.Start()
if err != nil {
panic(fmt.Sprintf("%v", err))
}
defer serviceA.Stop()
fmt.Println("Service A successfully started.")
fmt.Println("Registering services with Data Buffer...")
for _, s := range BufferedServices {
_ = s.RegisterWithBufferService(bufService) // ignoring error msgs for base case
}
fmt.Println("Registration complete.")
fmt.Println("Beginning recording...")
_ = atomic.AddInt32(&serviceA.Listeners, 1)
err = serviceA.StartRecording(DefaultPollingInterval)
if err != nil {
panic(fmt.Sprintf("%v", err))
}
for {
select {
case RTD := <-serviceA.IncomingBuffChan:
fmt.Println(RTD)
case <-serviceA.QuitChan:
atomic.StoreInt32(&serviceA.Listeners, 0)
bufService.Quit<-struct{}{}
}
}
}
Running on Go 1.17. When running the example, it should print the following every 10 seconds:
Time to record...
Sending data to Data buffer...
Data sent.
But then Data buffer never goes into the inData := <-in
case.
答案1
得分: 1
为了诊断这个问题,我将fmt.Println("Sending data to Data buffer...")
更改为fmt.Println("Sending data to Data buffer...", s.OutgoingBuffChans)
,输出结果如下:
Time to record...
Sending data to Data buffer... []
所以实际上你并没有将数据发送到任何通道。原因是:
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
由于接收器不是指针,当你执行s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
时,你实际上是在一个被丢弃的ServiceA
副本中修改了s.OutgoingBuffChans
。为了修复这个问题,将代码修改为:
func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
以及
BufferedServices = append(BufferedServices, *serviceA)
修改为
BufferedServices = append(BufferedServices, serviceA)
修改后的版本输出结果如下:
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
这样解决了报告的问题(如果还有其他问题,我不会感到惊讶,但希望这能指导你朝正确的方向前进)。我注意到你最初发布的代码确实使用了指针接收器,所以可能存在其他问题(但在这种情况下很难对代码片段进行评论)。
英文:
To diagnose this I changed fmt.Println("Sending data to Data buffer...")
to fmt.Println("Sending data to Data buffer...", s.OutgoingBuffChans)
and the output was:
Time to record...
Sending data to Data buffer... []
So you are not actually sending the data to any channels. The reason for this is:
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
As the receiver is not a pointer when you do the s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
you are changing s.OutgoingBuffChans
in a copy of the ServiceA
which is discarded when the function exits. To fix this change:
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
to
func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
and
BufferedServices = append(BufferedServices, *serviceA)
to
BufferedServices = append(BufferedServices, serviceA)
The amended version outputs:
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
So this resolves the reported issue (I would not be suprised if there are other issues but hopefully this points you in the right direction). I did notice that the code you originally posted does use a pointer receiver so that might have suffered from another issue (but its difficult to comment on code fragments in a case like this).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论