为什么数据被推送到通道中,但从接收者 goroutine 中从未读取?

huangapple go评论89阅读模式
英文:

Why is data being pushed into the channel but never read from the receiver goroutine?

问题

我正在构建一个守护进程,其中有两个服务彼此之间发送数据。服务A是数据的生产者,服务B是数据缓冲服务或队列。所以在main.go文件中,实例化并启动了服务B。Start()方法将buffer()函数作为一个goroutine执行,因为该函数等待数据传递到通道上,我不希望主进程在等待buffer完成时停止。然后实例化并启动服务A。然后还将其与服务B进行了"注册"。

我为服务A创建了一个名为RegisterWithBufferService的方法,它创建了两个新的通道。它将这些通道存储为自己的属性,并将它们提供给服务B。

func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error {
    newIncomingChan := make(chan *data.DataFrame, 1)
	newOutgoingChan := make(chan []byte, 1)
	s.IncomingBuffChan = newIncomingChan
	s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
	bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo{
		IncomingChan: newOutgoingChan, //我们的输出通道是他们的输入通道
		OutgoingChan: newIncomingChan, //我们的输入通道是他们的输出通道
	}
	s.DataBufferService = bufService
	bufService.NewProvider <- s.ServiceName() //DataBuffer服务监听新服务并为缓冲创建一个新的goroutine
	s.Logger.Info().Msg("注册完成。")
	return nil
}

缓冲区基本上监听来自服务A的传入数据,使用Decode()进行解码,然后将其添加到名为buf的切片中。如果切片的长度大于bufferPeriod,则将切片中的第一项通过输出通道发送回服务A。

func (b *DataBuffer) buffer(bufferPeriod int) {
    for {
        select {
        case newProvider := <-b.NewProvider:
            b.wg.Add(1)
            /*
            newProvider是一个字符串
            DataProviders是一个映射,它返回的值是一个包含此服务的输入和输出通道的结构体
            */
            p := b.DataProviders[newProvider]
            go func(prov string, in chan []byte, out chan *DataFrame) {
                defer b.wg.Done()
                var buf []*DataFrame
                for {
                    select {
                    case rawData := <-in:
                        tmp := Decode(rawData) //自定义解码函数。返回一个*DataFrame
                        buf = append(buf, tmp)
						if len(buf) < bufferPeriod {
						    b.Logger.Info().Msg("发送解码后的数据。")
							out <- buf[0]
							buf = buf[1:] //弹出
                        }
                    case <-b.Quit:
                        return
                    }
                }
            }(newProvider, p.IncomingChan, p.OutgoingChan)
        }
    case <-b.Quit:
        return
    }
}

现在,服务A有一个名为record的方法,它会定期将数据推送到其OutgoingDataChannels属性中的所有通道。

func (s *ServiceA) record() error {
    ...
    if atomic.LoadInt32(&s.Listeners) != 0 {
        s.Logger.Info().Msg("将原始数据发送到数据缓冲区")
        for _, outChan := range s.OutgoingDataChannels {
            outChan <- dataBytes //接收方(服务B)已经在监听,这不会阻塞
        }
        s.Logger.Info().Msg("原始数据已发送并接收") //记录器将输出此消息,以便我知道它没有挂起
    }
}

问题是,服务A似乎成功使用record推送数据,但是服务B从未进入inData := <-in的情况。这是因为我嵌套了goroutine吗?如果不清楚的话,当启动服务B时,它调用buffer,但是因为否则会挂起,所以我将对buffer的调用设置为了一个goroutine。因此,当服务A调用RegisterWithBufferService时,buffer的goroutine会创建一个goroutine来监听来自服务B的新数据,并在填充缓冲区后将其推送回服务A。希望我解释清楚了。

编辑1
我创建了一个最小化的、可重现的示例。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	defaultBufferingPeriod int = 3
	DefaultPollingInterval int64 = 10
)

type DataObject struct{
	Data	string
}

type DataProvider interface {
	RegisterWithBufferService(*DataBuffer) error
	ServiceName() string
}

type DataProviderInfo struct{
	IncomingChan	chan *DataObject
	OutgoingChan	chan *DataObject
}

type DataBuffer struct{
	Running			int32 //用于原子操作
	DataProviders	map[string]DataProviderInfo
	Quit			chan struct{}
	NewProvider		chan string
	wg				sync.WaitGroup
}

func NewDataBuffer() *DataBuffer{
	var (
		wg sync.WaitGroup
	)
	return &DataBuffer{
		DataProviders: make(map[string]DataProviderInfo),
		Quit: make(chan struct{}),
		NewProvider: make(chan string),
		wg: wg,
	}
}

func (b *DataBuffer) Start() error {
	if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
		return fmt.Errorf("无法启动数据缓冲服务。")
	}
	go b.buffer(defaultBufferingPeriod)
	return nil
}

func (b *DataBuffer) Stop() error {
	if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
		return fmt.Errorf("无法停止数据缓冲服务。")
	}
	for _, p := range b.DataProviders {
		close(p.IncomingChan)
		close(p.OutgoingChan)
	}
	close(b.Quit)
	b.wg.Wait()
	return nil
}

// buffer为每个传入、传出数据对创建goroutine,并使用传入的字节解码为传出的DataFrame
func (b *DataBuffer) buffer(bufferPeriod int) {
	for {
		select {
		case newProvider := <-b.NewProvider:
			fmt.Println("收到新的数据提供者。")
			if _, ok := b.DataProviders[newProvider]; ok { 
				b.wg.Add(1)
				p := b.DataProviders[newProvider]
				go func(prov string, in chan *DataObject, out chan *DataObject) {
					defer b.wg.Done()
					var (
						buf []*DataObject
					)
					fmt.Printf("等待来自:%s 的数据\n", prov)
					for {
						select {
						case inData := <-in:
							fmt.Printf("从:%s 接收到数据\n", prov)
							buf = append(buf, inData)
							if len(buf) > bufferPeriod {
								fmt.Printf("队列已满,将数据发送回 %s\n", prov)
								out <- buf[0]
								fmt.Println("数据已发送")
								buf = buf[1:] //弹出
							}
						case <-b.Quit:
							return
						}
					}
				}(newProvider, p.IncomingChan, p.OutgoingChan)
			}
		case <-b.Quit:
			return
		}
	}
}

type ServiceA struct{
	Active     				int32 // 原子操作
	Stopping   				int32 // 原子操作
	Recording  				int32 // 原子操作
	Listeners				int32 // 原子操作
	name					string
	QuitChan   				chan struct{}
	IncomingBuffChan		chan *DataObject
	OutgoingBuffChans		[]chan *DataObject
	DataBufferService		*DataBuffer
}

// 编译时检查,确保ServiceA完全实现了DataProvider接口
var _ DataProvider = (*ServiceA)(nil)

func NewServiceA() (*ServiceA, error) {
	var newSliceOutChans []chan *DataObject
	return &ServiceA{
		QuitChan:  make(chan struct{}),
		OutgoingBuffChans: newSliceOutChans,
		name:	"SERVICEA",
	}, nil
}

// Start启动服务。如果出现任何问题,返回错误
func (s *ServiceA) Start() error {
	atomic.StoreInt32(&s.Active, 1)
	return nil
}

// Stop停止服务。如果出现任何问题,返回错误
func (s *ServiceA) Stop() error {
	atomic.StoreInt32(&s.Stopping, 1)
	close(s.QuitChan)
	return nil
}

func (s *ServiceA) StartRecording(pol_int int64) error {
	if ok := atomic.CompareAndSwapInt32(&s.Recording, 0, 1); !ok {
		return fmt.Errorf("无法开始记录。数据记录已经开始")
	}
	ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
	go func() {
		for {
			select {
			case <-ticker.C:
				fmt.Println("开始记录...")
				err := s.record()
				if err != nil {
					return
				}
			case <-s.QuitChan:
				ticker.Stop()
				return
			}
		}
	}()
	return nil
}

func (s *ServiceA) record() error {
	current_time := time.Now()
	ct := fmt.Sprintf("%02d-%02d-%d", current_time.Day(), current_time.Month(), current_time.Year())
	dataObject := &DataObject{
		Data: ct,
	}
	if atomic.LoadInt32(&s.Listeners) != 0 {
		fmt.Println("将数据发送到数据缓冲区...")
		for _, outChan := range s.OutgoingBuffChans {
			outChan <- dataObject // 接收方应该已经在监听
		}
		fmt.Println("数据已发送。")
	}
	return nil
}

// RegisterWithBufferService满足DataProvider接口。它向bufService提供新的传入和传出通道以及轮询间隔
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
	if _, ok := bufService.DataProviders[s.ServiceName()]; ok {
		return fmt.Errorf("%v 数据提供者已经注册到数据缓冲区。", s.ServiceName())
	}
	newIncomingChan := make(chan *DataObject, 1)
	newOutgoingChan := make(chan *DataObject, 1)
	s.IncomingBuffChan = newIncomingChan
	s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
	bufService.DataProviders[s.ServiceName()] = DataProviderInfo{
		IncomingChan: newOutgoingChan, //我们的输出通道是他们的输入通道
		OutgoingChan: newIncomingChan, //我们的输入通道是他们的输出通道
	}
	s.DataBufferService = bufService
	bufService.NewProvider <- s.ServiceName() //DataBuffer服务监听新服务并为缓冲创建一个新的goroutine
	return nil
}

// ServiceName满足DataProvider接口。它返回服务的名称。
func (s ServiceA) ServiceName() string {
	return s.name
}

func main() {
	var BufferedServices []DataProvider
	fmt.Println("实例化并启动数据缓冲服务...")
	bufService := NewDataBuffer()
	err := bufService.Start()
	if err != nil {
		panic(fmt.Sprintf("%v", err))
	}
	defer bufService.Stop()
	fmt.Println("数据缓冲服务成功启动。")

	fmt.Println("实例化并启动服务A...")
	serviceA, err := NewServiceA()
	if err != nil {
		panic(fmt.Sprintf("%v", err))
	}
	BufferedServices = append(BufferedServices, *serviceA)
	err = serviceA.Start()
	if err != nil {
		panic(fmt.Sprintf("%v", err))
	}
	defer serviceA.Stop()
	fmt.Println("服务A成功启动。")

	fmt.Println("将服务注册到数据缓冲区...")
	for _, s := range BufferedServices {
		_ = s.RegisterWithBufferService(bufService) // 忽略基本情况下的错误消息
	}
	fmt.Println("注册完成。")

	fmt.Println("开始记录...")
	_ = atomic.AddInt32(&serviceA.Listeners, 1)
	err = serviceA.StartRecording(DefaultPollingInterval)
	if err != nil {
		panic(fmt.Sprintf("%v", err))
	}
	for {
		select {
		case RTD := <-serviceA.IncomingBuffChan:
			fmt.Println(RTD)
		case <-serviceA.QuitChan:
			atomic.StoreInt32(&serviceA.Listeners, 0)
			bufService.Quit<-struct{}{}
		}
	}
}

在Go 1.17上运行。运行示例时,它应该每10秒打印以下内容:

开始记录...
将数据发送到数据缓冲区...
数据已发送。

但是,数据缓冲区从未进入inData := <-in的情况。

英文:

I am building a daemon and I have two services that will be sending data to and from each other. Service A is what produces the data and service B a is Data Buffer service or like a queue. So from the main.go file, service B is instantiated and started. The Start() method will perform the buffer() function as a goroutine because this function waits for data to be passed onto a channel and I don't want the main process to halt waiting for buffer to complete. Then Service A is instantiated and started. It is then also "registered" with Service B.

I created a method called RegisterWithBufferService for Service A that creates two new channels. It will store those channels as it's own attributes and also provide them to Service B.

func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error {
newIncomingChan := make(chan *data.DataFrame, 1)
newOutgoingChan := make(chan []byte, 1)
s.IncomingBuffChan = newIncomingChan
s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo{
IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
}
s.DataBufferService = bufService
bufService.NewProvider &lt;- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
s.Logger.Info().Msg(&quot;Registeration completed.&quot;)
return nil
}

Buffer essentially listens for incoming data from Service A, decodes it using Decode() and then adds it to a slice called buf. If the slice is greater in length than bufferPeriod then it will send the first item in the slice in the Outgoing channel back to Service A.

func (b* DataBuffer) buffer(bufferPeriod int) {
for {
select {
case newProvider := &lt;- b.NewProvider:
b.wg.Add(1)
/*
newProvider is a string
DataProviders is a map the value it returns is a struct containing the Incoming and 
Outgoing channels for this service
*/
p := b.DataProviders[newProvider]
go func(prov string, in chan []byte, out chan *DataFrame) {
defer b.wg.Done()
var buf []*DataFrame
for {
select {
case rawData := &lt;-in:
tmp := Decode(rawData) //custom decoding function. Returns a *DataFrame
buf = append(buf, tmp)
if len(buf) &lt; bufferPeriod {
b.Logger.Info().Msg(&quot;Sending decoded data out.&quot;)
out &lt;- buf[0]
buf = buf[1:] //pop
}
case &lt;- b.Quit:
return
}
}
}(newProvider, p.IncomingChan, p.OutgoingChan)
}
case &lt;- b.Quit:
return
}
}

Now Service A has a method called record that will periodically push data to all the channels in it's OutgoingDataChannels attribute.

func (s *ServiceA) record() error {
...
if atomic.LoadInt32(&amp;s.Listeners) != 0 {
s.Logger.Info().Msg(&quot;Sending raw data to data buffer&quot;)
for _, outChan := range s.OutgoingDataChannels {
outChan &lt;- dataBytes // the receiver (Service B) is already listening and this doesn&#39;t hang
}
s.Logger.Info().Msg(&quot;Raw data sent and received&quot;) // The logger will output this so I know it&#39;s not hanging 
}
}

The problem is that Service A seems to push the data successfully using record but Service B never goes into the case rawData := &lt;-in: case in the buffer sub-goroutine. Is this because I have nested goroutines? Incase it's not clear, when Service B is started, it calls buffer but because it would hang otherwise, I made the call to buffer a goroutine. So then when Service A calls RegisterWithBufferService, the buffer goroutine creates a goroutine to listen for new data from Service B and push it back to Service A once the buffer is filled. I hope I explained it clearly.

EDIT 1
I've made a minimal, reproducible example.

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
&quot;sync/atomic&quot;
&quot;time&quot;
)
var (
defaultBufferingPeriod int = 3
DefaultPollingInterval int64 = 10
)
type DataObject struct{
Data	string
}
type DataProvider interface {
RegisterWithBufferService(*DataBuffer) error
ServiceName() string
}
type DataProviderInfo struct{
IncomingChan	chan *DataObject
OutgoingChan	chan *DataObject
}
type DataBuffer struct{
Running			int32 //used atomically
DataProviders	map[string]DataProviderInfo
Quit			chan struct{}
NewProvider		chan string
wg				sync.WaitGroup
}
func NewDataBuffer() *DataBuffer{
var (
wg sync.WaitGroup
)
return &amp;DataBuffer{
DataProviders: make(map[string]DataProviderInfo),
Quit: make(chan struct{}),
NewProvider: make(chan string),
wg: wg,
}
}
func (b *DataBuffer) Start() error {
if ok := atomic.CompareAndSwapInt32(&amp;b.Running, 0, 1); !ok {
return fmt.Errorf(&quot;Could not start Data Buffer Service.&quot;)
}
go b.buffer(defaultBufferingPeriod)
return nil
}
func (b *DataBuffer) Stop() error {
if ok := atomic.CompareAndSwapInt32(&amp;b.Running, 1, 0); !ok {
return fmt.Errorf(&quot;Could not stop Data Buffer Service.&quot;)
}
for _, p := range b.DataProviders {
close(p.IncomingChan)
close(p.OutgoingChan)
}
close(b.Quit)
b.wg.Wait()
return nil
}
// buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing DataFrames
func (b *DataBuffer) buffer(bufferPeriod int) {
for {
select {
case newProvider := &lt;- b.NewProvider:
fmt.Println(&quot;Received new Data provider.&quot;)
if _, ok := b.DataProviders[newProvider]; ok { 
b.wg.Add(1)
p := b.DataProviders[newProvider]
go func(prov string, in chan *DataObject, out chan *DataObject) {
defer b.wg.Done()
var (
buf []*DataObject
)
fmt.Printf(&quot;Waiting for data from: %s\n&quot;, prov)
for {
select {
case inData := &lt;-in:
fmt.Printf(&quot;Received data from: %s\n&quot;, prov)
buf = append(buf, inData)
if len(buf) &gt; bufferPeriod {
fmt.Printf(&quot;Queue is filled, sending data back to %s\n&quot;, prov)
out &lt;- buf[0]
fmt.Println(&quot;Data Sent&quot;)
buf = buf[1:] //pop
}
case &lt;- b.Quit:
return
}
}
}(newProvider, p.IncomingChan, p.OutgoingChan)
}
case &lt;- b.Quit:
return
}
}
}
type ServiceA struct{
Active     				int32 // atomic
Stopping   				int32 // atomic
Recording  				int32 // atomic
Listeners				int32 // atomic
name					string
QuitChan   				chan struct{}
IncomingBuffChan		chan *DataObject
OutgoingBuffChans		[]chan *DataObject
DataBufferService		*DataBuffer
}
// A compile time check to ensure ServiceA fully implements the DataProvider interface
var _ DataProvider = (*ServiceA)(nil)
func NewServiceA() (*ServiceA, error) {
var newSliceOutChans []chan *DataObject
return &amp;ServiceA{
QuitChan:  make(chan struct{}),
OutgoingBuffChans: newSliceOutChans,
name:	&quot;SERVICEA&quot;,
}, nil
}
// Start starts the service. Returns an error if any issues occur
func (s *ServiceA) Start() error {
atomic.StoreInt32(&amp;s.Active, 1)
return nil
}
// Stop stops the service. Returns an error if any issues occur
func (s *ServiceA) Stop() error {
atomic.StoreInt32(&amp;s.Stopping, 1)
close(s.QuitChan)
return nil
}
func (s *ServiceA) StartRecording(pol_int int64) error {
if ok := atomic.CompareAndSwapInt32(&amp;s.Recording, 0, 1); !ok {
return fmt.Errorf(&quot;Could not start recording. Data recording already started&quot;)
}
ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
go func() {
for {
select {
case &lt;-ticker.C:
fmt.Println(&quot;Time to record...&quot;)
err := s.record()
if err != nil {
return
}
case &lt;-s.QuitChan:
ticker.Stop()
return
}
}
}()
return nil
}
func (s *ServiceA) record() error {
current_time := time.Now()
ct := fmt.Sprintf(&quot;%02d-%02d-%d&quot;, current_time.Day(), current_time.Month(), current_time.Year())
dataObject := &amp;DataObject{
Data: ct,
}
if atomic.LoadInt32(&amp;s.Listeners) != 0 {
fmt.Println(&quot;Sending data to Data buffer...&quot;)
for _, outChan := range s.OutgoingBuffChans {
outChan &lt;- dataObject // the receivers should already be listening
}
fmt.Println(&quot;Data sent.&quot;)
}
return nil
}
// RegisterWithBufferService satisfies the DataProvider interface. It provides the bufService with new incoming and outgoing channels along with a polling interval
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
if _, ok := bufService.DataProviders[s.ServiceName()]; ok {
return fmt.Errorf(&quot;%v data provider already registered with Data Buffer.&quot;, s.ServiceName())
}
newIncomingChan := make(chan *DataObject, 1)
newOutgoingChan := make(chan *DataObject, 1)
s.IncomingBuffChan = newIncomingChan
s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
bufService.DataProviders[s.ServiceName()] = DataProviderInfo{
IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
}
s.DataBufferService = bufService
bufService.NewProvider &lt;- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
return nil
}
// ServiceName satisfies the DataProvider interface. It returns the name of the service.
func (s ServiceA) ServiceName() string {
return s.name
}
func main() {
var BufferedServices []DataProvider
fmt.Println(&quot;Instantiating and Starting Data Buffer Service...&quot;)
bufService := NewDataBuffer()
err := bufService.Start()
if err != nil {
panic(fmt.Sprintf(&quot;%v&quot;, err))
}
defer bufService.Stop()
fmt.Println(&quot;Data Buffer Service successfully started.&quot;)
fmt.Println(&quot;Instantiating and Starting Service A...&quot;)
serviceA, err := NewServiceA()
if err != nil {
panic(fmt.Sprintf(&quot;%v&quot;, err))
}
BufferedServices = append(BufferedServices, *serviceA)
err = serviceA.Start()
if err != nil {
panic(fmt.Sprintf(&quot;%v&quot;, err))
}
defer serviceA.Stop()
fmt.Println(&quot;Service A successfully started.&quot;)
fmt.Println(&quot;Registering services with Data Buffer...&quot;)
for _, s := range BufferedServices {
_ = s.RegisterWithBufferService(bufService) // ignoring error msgs for base case
}
fmt.Println(&quot;Registration complete.&quot;)
fmt.Println(&quot;Beginning recording...&quot;)
_ = atomic.AddInt32(&amp;serviceA.Listeners, 1)
err = serviceA.StartRecording(DefaultPollingInterval)
if err != nil {
panic(fmt.Sprintf(&quot;%v&quot;, err))
}
for {
select {
case RTD := &lt;-serviceA.IncomingBuffChan:
fmt.Println(RTD)
case &lt;-serviceA.QuitChan:
atomic.StoreInt32(&amp;serviceA.Listeners, 0)
bufService.Quit&lt;-struct{}{}
}
}
}

Running on Go 1.17. When running the example, it should print the following every 10 seconds:

Time to record...
Sending data to Data buffer...
Data sent.

But then Data buffer never goes into the inData := &lt;-in case.

答案1

得分: 1

为了诊断这个问题,我将fmt.Println("Sending data to Data buffer...")更改为fmt.Println("Sending data to Data buffer...", s.OutgoingBuffChans),输出结果如下:

Time to record...
Sending data to Data buffer... []

所以实际上你并没有将数据发送到任何通道。原因是:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

由于接收器不是指针,当你执行s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)时,你实际上是在一个被丢弃的ServiceA副本中修改了s.OutgoingBuffChans。为了修复这个问题,将代码修改为:

func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

以及

BufferedServices = append(BufferedServices, *serviceA)

修改为

BufferedServices = append(BufferedServices, serviceA)

修改后的版本输出结果如下:

Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA

这样解决了报告的问题(如果还有其他问题,我不会感到惊讶,但希望这能指导你朝正确的方向前进)。我注意到你最初发布的代码确实使用了指针接收器,所以可能存在其他问题(但在这种情况下很难对代码片段进行评论)。

英文:

To diagnose this I changed fmt.Println(&quot;Sending data to Data buffer...&quot;) to fmt.Println(&quot;Sending data to Data buffer...&quot;, s.OutgoingBuffChans) and the output was:

Time to record...
Sending data to Data buffer... []

So you are not actually sending the data to any channels. The reason for this is:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

As the receiver is not a pointer when you do the s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan) you are changing s.OutgoingBuffChans in a copy of the ServiceA which is discarded when the function exits. To fix this change:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

to

func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

and

BufferedServices = append(BufferedServices, *serviceA)

to

BufferedServices = append(BufferedServices, serviceA)

The amended version outputs:

Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA

So this resolves the reported issue (I would not be suprised if there are other issues but hopefully this points you in the right direction). I did notice that the code you originally posted does use a pointer receiver so that might have suffered from another issue (but its difficult to comment on code fragments in a case like this).

huangapple
  • 本文由 发表于 2021年10月19日 01:13:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/69620018.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定