为什么数据被推送到通道中,但从接收者 goroutine 中从未读取?

huangapple go评论105阅读模式
英文:

Why is data being pushed into the channel but never read from the receiver goroutine?

问题

我正在构建一个守护进程,其中有两个服务彼此之间发送数据。服务A是数据的生产者,服务B是数据缓冲服务或队列。所以在main.go文件中,实例化并启动了服务B。Start()方法将buffer()函数作为一个goroutine执行,因为该函数等待数据传递到通道上,我不希望主进程在等待buffer完成时停止。然后实例化并启动服务A。然后还将其与服务B进行了"注册"。

我为服务A创建了一个名为RegisterWithBufferService的方法,它创建了两个新的通道。它将这些通道存储为自己的属性,并将它们提供给服务B。

  1. func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error {
  2. newIncomingChan := make(chan *data.DataFrame, 1)
  3. newOutgoingChan := make(chan []byte, 1)
  4. s.IncomingBuffChan = newIncomingChan
  5. s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
  6. bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo{
  7. IncomingChan: newOutgoingChan, //我们的输出通道是他们的输入通道
  8. OutgoingChan: newIncomingChan, //我们的输入通道是他们的输出通道
  9. }
  10. s.DataBufferService = bufService
  11. bufService.NewProvider <- s.ServiceName() //DataBuffer服务监听新服务并为缓冲创建一个新的goroutine
  12. s.Logger.Info().Msg("注册完成。")
  13. return nil
  14. }

缓冲区基本上监听来自服务A的传入数据,使用Decode()进行解码,然后将其添加到名为buf的切片中。如果切片的长度大于bufferPeriod,则将切片中的第一项通过输出通道发送回服务A。

  1. func (b *DataBuffer) buffer(bufferPeriod int) {
  2. for {
  3. select {
  4. case newProvider := <-b.NewProvider:
  5. b.wg.Add(1)
  6. /*
  7. newProvider是一个字符串
  8. DataProviders是一个映射,它返回的值是一个包含此服务的输入和输出通道的结构体
  9. */
  10. p := b.DataProviders[newProvider]
  11. go func(prov string, in chan []byte, out chan *DataFrame) {
  12. defer b.wg.Done()
  13. var buf []*DataFrame
  14. for {
  15. select {
  16. case rawData := <-in:
  17. tmp := Decode(rawData) //自定义解码函数。返回一个*DataFrame
  18. buf = append(buf, tmp)
  19. if len(buf) < bufferPeriod {
  20. b.Logger.Info().Msg("发送解码后的数据。")
  21. out <- buf[0]
  22. buf = buf[1:] //弹出
  23. }
  24. case <-b.Quit:
  25. return
  26. }
  27. }
  28. }(newProvider, p.IncomingChan, p.OutgoingChan)
  29. }
  30. case <-b.Quit:
  31. return
  32. }
  33. }

现在,服务A有一个名为record的方法,它会定期将数据推送到其OutgoingDataChannels属性中的所有通道。

  1. func (s *ServiceA) record() error {
  2. ...
  3. if atomic.LoadInt32(&s.Listeners) != 0 {
  4. s.Logger.Info().Msg("将原始数据发送到数据缓冲区")
  5. for _, outChan := range s.OutgoingDataChannels {
  6. outChan <- dataBytes //接收方(服务B)已经在监听,这不会阻塞
  7. }
  8. s.Logger.Info().Msg("原始数据已发送并接收") //记录器将输出此消息,以便我知道它没有挂起
  9. }
  10. }

问题是,服务A似乎成功使用record推送数据,但是服务B从未进入inData := <-in的情况。这是因为我嵌套了goroutine吗?如果不清楚的话,当启动服务B时,它调用buffer,但是因为否则会挂起,所以我将对buffer的调用设置为了一个goroutine。因此,当服务A调用RegisterWithBufferService时,buffer的goroutine会创建一个goroutine来监听来自服务B的新数据,并在填充缓冲区后将其推送回服务A。希望我解释清楚了。

编辑1
我创建了一个最小化的、可重现的示例。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. var (
  9. defaultBufferingPeriod int = 3
  10. DefaultPollingInterval int64 = 10
  11. )
  12. type DataObject struct{
  13. Data string
  14. }
  15. type DataProvider interface {
  16. RegisterWithBufferService(*DataBuffer) error
  17. ServiceName() string
  18. }
  19. type DataProviderInfo struct{
  20. IncomingChan chan *DataObject
  21. OutgoingChan chan *DataObject
  22. }
  23. type DataBuffer struct{
  24. Running int32 //用于原子操作
  25. DataProviders map[string]DataProviderInfo
  26. Quit chan struct{}
  27. NewProvider chan string
  28. wg sync.WaitGroup
  29. }
  30. func NewDataBuffer() *DataBuffer{
  31. var (
  32. wg sync.WaitGroup
  33. )
  34. return &DataBuffer{
  35. DataProviders: make(map[string]DataProviderInfo),
  36. Quit: make(chan struct{}),
  37. NewProvider: make(chan string),
  38. wg: wg,
  39. }
  40. }
  41. func (b *DataBuffer) Start() error {
  42. if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
  43. return fmt.Errorf("无法启动数据缓冲服务。")
  44. }
  45. go b.buffer(defaultBufferingPeriod)
  46. return nil
  47. }
  48. func (b *DataBuffer) Stop() error {
  49. if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
  50. return fmt.Errorf("无法停止数据缓冲服务。")
  51. }
  52. for _, p := range b.DataProviders {
  53. close(p.IncomingChan)
  54. close(p.OutgoingChan)
  55. }
  56. close(b.Quit)
  57. b.wg.Wait()
  58. return nil
  59. }
  60. // buffer为每个传入、传出数据对创建goroutine,并使用传入的字节解码为传出的DataFrame
  61. func (b *DataBuffer) buffer(bufferPeriod int) {
  62. for {
  63. select {
  64. case newProvider := <-b.NewProvider:
  65. fmt.Println("收到新的数据提供者。")
  66. if _, ok := b.DataProviders[newProvider]; ok {
  67. b.wg.Add(1)
  68. p := b.DataProviders[newProvider]
  69. go func(prov string, in chan *DataObject, out chan *DataObject) {
  70. defer b.wg.Done()
  71. var (
  72. buf []*DataObject
  73. )
  74. fmt.Printf("等待来自:%s 的数据\n", prov)
  75. for {
  76. select {
  77. case inData := <-in:
  78. fmt.Printf("从:%s 接收到数据\n", prov)
  79. buf = append(buf, inData)
  80. if len(buf) > bufferPeriod {
  81. fmt.Printf("队列已满,将数据发送回 %s\n", prov)
  82. out <- buf[0]
  83. fmt.Println("数据已发送")
  84. buf = buf[1:] //弹出
  85. }
  86. case <-b.Quit:
  87. return
  88. }
  89. }
  90. }(newProvider, p.IncomingChan, p.OutgoingChan)
  91. }
  92. case <-b.Quit:
  93. return
  94. }
  95. }
  96. }
  97. type ServiceA struct{
  98. Active int32 // 原子操作
  99. Stopping int32 // 原子操作
  100. Recording int32 // 原子操作
  101. Listeners int32 // 原子操作
  102. name string
  103. QuitChan chan struct{}
  104. IncomingBuffChan chan *DataObject
  105. OutgoingBuffChans []chan *DataObject
  106. DataBufferService *DataBuffer
  107. }
  108. // 编译时检查,确保ServiceA完全实现了DataProvider接口
  109. var _ DataProvider = (*ServiceA)(nil)
  110. func NewServiceA() (*ServiceA, error) {
  111. var newSliceOutChans []chan *DataObject
  112. return &ServiceA{
  113. QuitChan: make(chan struct{}),
  114. OutgoingBuffChans: newSliceOutChans,
  115. name: "SERVICEA",
  116. }, nil
  117. }
  118. // Start启动服务。如果出现任何问题,返回错误
  119. func (s *ServiceA) Start() error {
  120. atomic.StoreInt32(&s.Active, 1)
  121. return nil
  122. }
  123. // Stop停止服务。如果出现任何问题,返回错误
  124. func (s *ServiceA) Stop() error {
  125. atomic.StoreInt32(&s.Stopping, 1)
  126. close(s.QuitChan)
  127. return nil
  128. }
  129. func (s *ServiceA) StartRecording(pol_int int64) error {
  130. if ok := atomic.CompareAndSwapInt32(&s.Recording, 0, 1); !ok {
  131. return fmt.Errorf("无法开始记录。数据记录已经开始")
  132. }
  133. ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
  134. go func() {
  135. for {
  136. select {
  137. case <-ticker.C:
  138. fmt.Println("开始记录...")
  139. err := s.record()
  140. if err != nil {
  141. return
  142. }
  143. case <-s.QuitChan:
  144. ticker.Stop()
  145. return
  146. }
  147. }
  148. }()
  149. return nil
  150. }
  151. func (s *ServiceA) record() error {
  152. current_time := time.Now()
  153. ct := fmt.Sprintf("%02d-%02d-%d", current_time.Day(), current_time.Month(), current_time.Year())
  154. dataObject := &DataObject{
  155. Data: ct,
  156. }
  157. if atomic.LoadInt32(&s.Listeners) != 0 {
  158. fmt.Println("将数据发送到数据缓冲区...")
  159. for _, outChan := range s.OutgoingBuffChans {
  160. outChan <- dataObject // 接收方应该已经在监听
  161. }
  162. fmt.Println("数据已发送。")
  163. }
  164. return nil
  165. }
  166. // RegisterWithBufferService满足DataProvider接口。它向bufService提供新的传入和传出通道以及轮询间隔
  167. func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
  168. if _, ok := bufService.DataProviders[s.ServiceName()]; ok {
  169. return fmt.Errorf("%v 数据提供者已经注册到数据缓冲区。", s.ServiceName())
  170. }
  171. newIncomingChan := make(chan *DataObject, 1)
  172. newOutgoingChan := make(chan *DataObject, 1)
  173. s.IncomingBuffChan = newIncomingChan
  174. s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
  175. bufService.DataProviders[s.ServiceName()] = DataProviderInfo{
  176. IncomingChan: newOutgoingChan, //我们的输出通道是他们的输入通道
  177. OutgoingChan: newIncomingChan, //我们的输入通道是他们的输出通道
  178. }
  179. s.DataBufferService = bufService
  180. bufService.NewProvider <- s.ServiceName() //DataBuffer服务监听新服务并为缓冲创建一个新的goroutine
  181. return nil
  182. }
  183. // ServiceName满足DataProvider接口。它返回服务的名称。
  184. func (s ServiceA) ServiceName() string {
  185. return s.name
  186. }
  187. func main() {
  188. var BufferedServices []DataProvider
  189. fmt.Println("实例化并启动数据缓冲服务...")
  190. bufService := NewDataBuffer()
  191. err := bufService.Start()
  192. if err != nil {
  193. panic(fmt.Sprintf("%v", err))
  194. }
  195. defer bufService.Stop()
  196. fmt.Println("数据缓冲服务成功启动。")
  197. fmt.Println("实例化并启动服务A...")
  198. serviceA, err := NewServiceA()
  199. if err != nil {
  200. panic(fmt.Sprintf("%v", err))
  201. }
  202. BufferedServices = append(BufferedServices, *serviceA)
  203. err = serviceA.Start()
  204. if err != nil {
  205. panic(fmt.Sprintf("%v", err))
  206. }
  207. defer serviceA.Stop()
  208. fmt.Println("服务A成功启动。")
  209. fmt.Println("将服务注册到数据缓冲区...")
  210. for _, s := range BufferedServices {
  211. _ = s.RegisterWithBufferService(bufService) // 忽略基本情况下的错误消息
  212. }
  213. fmt.Println("注册完成。")
  214. fmt.Println("开始记录...")
  215. _ = atomic.AddInt32(&serviceA.Listeners, 1)
  216. err = serviceA.StartRecording(DefaultPollingInterval)
  217. if err != nil {
  218. panic(fmt.Sprintf("%v", err))
  219. }
  220. for {
  221. select {
  222. case RTD := <-serviceA.IncomingBuffChan:
  223. fmt.Println(RTD)
  224. case <-serviceA.QuitChan:
  225. atomic.StoreInt32(&serviceA.Listeners, 0)
  226. bufService.Quit<-struct{}{}
  227. }
  228. }
  229. }

在Go 1.17上运行。运行示例时,它应该每10秒打印以下内容:

  1. 开始记录...
  2. 将数据发送到数据缓冲区...
  3. 数据已发送。

但是,数据缓冲区从未进入inData := <-in的情况。

英文:

I am building a daemon and I have two services that will be sending data to and from each other. Service A is what produces the data and service B a is Data Buffer service or like a queue. So from the main.go file, service B is instantiated and started. The Start() method will perform the buffer() function as a goroutine because this function waits for data to be passed onto a channel and I don't want the main process to halt waiting for buffer to complete. Then Service A is instantiated and started. It is then also "registered" with Service B.

I created a method called RegisterWithBufferService for Service A that creates two new channels. It will store those channels as it's own attributes and also provide them to Service B.

  1. func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error {
  2. newIncomingChan := make(chan *data.DataFrame, 1)
  3. newOutgoingChan := make(chan []byte, 1)
  4. s.IncomingBuffChan = newIncomingChan
  5. s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
  6. bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo{
  7. IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
  8. OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
  9. }
  10. s.DataBufferService = bufService
  11. bufService.NewProvider &lt;- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
  12. s.Logger.Info().Msg(&quot;Registeration completed.&quot;)
  13. return nil
  14. }

Buffer essentially listens for incoming data from Service A, decodes it using Decode() and then adds it to a slice called buf. If the slice is greater in length than bufferPeriod then it will send the first item in the slice in the Outgoing channel back to Service A.

  1. func (b* DataBuffer) buffer(bufferPeriod int) {
  2. for {
  3. select {
  4. case newProvider := &lt;- b.NewProvider:
  5. b.wg.Add(1)
  6. /*
  7. newProvider is a string
  8. DataProviders is a map the value it returns is a struct containing the Incoming and
  9. Outgoing channels for this service
  10. */
  11. p := b.DataProviders[newProvider]
  12. go func(prov string, in chan []byte, out chan *DataFrame) {
  13. defer b.wg.Done()
  14. var buf []*DataFrame
  15. for {
  16. select {
  17. case rawData := &lt;-in:
  18. tmp := Decode(rawData) //custom decoding function. Returns a *DataFrame
  19. buf = append(buf, tmp)
  20. if len(buf) &lt; bufferPeriod {
  21. b.Logger.Info().Msg(&quot;Sending decoded data out.&quot;)
  22. out &lt;- buf[0]
  23. buf = buf[1:] //pop
  24. }
  25. case &lt;- b.Quit:
  26. return
  27. }
  28. }
  29. }(newProvider, p.IncomingChan, p.OutgoingChan)
  30. }
  31. case &lt;- b.Quit:
  32. return
  33. }
  34. }

Now Service A has a method called record that will periodically push data to all the channels in it's OutgoingDataChannels attribute.

  1. func (s *ServiceA) record() error {
  2. ...
  3. if atomic.LoadInt32(&amp;s.Listeners) != 0 {
  4. s.Logger.Info().Msg(&quot;Sending raw data to data buffer&quot;)
  5. for _, outChan := range s.OutgoingDataChannels {
  6. outChan &lt;- dataBytes // the receiver (Service B) is already listening and this doesn&#39;t hang
  7. }
  8. s.Logger.Info().Msg(&quot;Raw data sent and received&quot;) // The logger will output this so I know it&#39;s not hanging
  9. }
  10. }

The problem is that Service A seems to push the data successfully using record but Service B never goes into the case rawData := &lt;-in: case in the buffer sub-goroutine. Is this because I have nested goroutines? Incase it's not clear, when Service B is started, it calls buffer but because it would hang otherwise, I made the call to buffer a goroutine. So then when Service A calls RegisterWithBufferService, the buffer goroutine creates a goroutine to listen for new data from Service B and push it back to Service A once the buffer is filled. I hope I explained it clearly.

EDIT 1
I've made a minimal, reproducible example.

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;sync&quot;
  5. &quot;sync/atomic&quot;
  6. &quot;time&quot;
  7. )
  8. var (
  9. defaultBufferingPeriod int = 3
  10. DefaultPollingInterval int64 = 10
  11. )
  12. type DataObject struct{
  13. Data string
  14. }
  15. type DataProvider interface {
  16. RegisterWithBufferService(*DataBuffer) error
  17. ServiceName() string
  18. }
  19. type DataProviderInfo struct{
  20. IncomingChan chan *DataObject
  21. OutgoingChan chan *DataObject
  22. }
  23. type DataBuffer struct{
  24. Running int32 //used atomically
  25. DataProviders map[string]DataProviderInfo
  26. Quit chan struct{}
  27. NewProvider chan string
  28. wg sync.WaitGroup
  29. }
  30. func NewDataBuffer() *DataBuffer{
  31. var (
  32. wg sync.WaitGroup
  33. )
  34. return &amp;DataBuffer{
  35. DataProviders: make(map[string]DataProviderInfo),
  36. Quit: make(chan struct{}),
  37. NewProvider: make(chan string),
  38. wg: wg,
  39. }
  40. }
  41. func (b *DataBuffer) Start() error {
  42. if ok := atomic.CompareAndSwapInt32(&amp;b.Running, 0, 1); !ok {
  43. return fmt.Errorf(&quot;Could not start Data Buffer Service.&quot;)
  44. }
  45. go b.buffer(defaultBufferingPeriod)
  46. return nil
  47. }
  48. func (b *DataBuffer) Stop() error {
  49. if ok := atomic.CompareAndSwapInt32(&amp;b.Running, 1, 0); !ok {
  50. return fmt.Errorf(&quot;Could not stop Data Buffer Service.&quot;)
  51. }
  52. for _, p := range b.DataProviders {
  53. close(p.IncomingChan)
  54. close(p.OutgoingChan)
  55. }
  56. close(b.Quit)
  57. b.wg.Wait()
  58. return nil
  59. }
  60. // buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing DataFrames
  61. func (b *DataBuffer) buffer(bufferPeriod int) {
  62. for {
  63. select {
  64. case newProvider := &lt;- b.NewProvider:
  65. fmt.Println(&quot;Received new Data provider.&quot;)
  66. if _, ok := b.DataProviders[newProvider]; ok {
  67. b.wg.Add(1)
  68. p := b.DataProviders[newProvider]
  69. go func(prov string, in chan *DataObject, out chan *DataObject) {
  70. defer b.wg.Done()
  71. var (
  72. buf []*DataObject
  73. )
  74. fmt.Printf(&quot;Waiting for data from: %s\n&quot;, prov)
  75. for {
  76. select {
  77. case inData := &lt;-in:
  78. fmt.Printf(&quot;Received data from: %s\n&quot;, prov)
  79. buf = append(buf, inData)
  80. if len(buf) &gt; bufferPeriod {
  81. fmt.Printf(&quot;Queue is filled, sending data back to %s\n&quot;, prov)
  82. out &lt;- buf[0]
  83. fmt.Println(&quot;Data Sent&quot;)
  84. buf = buf[1:] //pop
  85. }
  86. case &lt;- b.Quit:
  87. return
  88. }
  89. }
  90. }(newProvider, p.IncomingChan, p.OutgoingChan)
  91. }
  92. case &lt;- b.Quit:
  93. return
  94. }
  95. }
  96. }
  97. type ServiceA struct{
  98. Active int32 // atomic
  99. Stopping int32 // atomic
  100. Recording int32 // atomic
  101. Listeners int32 // atomic
  102. name string
  103. QuitChan chan struct{}
  104. IncomingBuffChan chan *DataObject
  105. OutgoingBuffChans []chan *DataObject
  106. DataBufferService *DataBuffer
  107. }
  108. // A compile time check to ensure ServiceA fully implements the DataProvider interface
  109. var _ DataProvider = (*ServiceA)(nil)
  110. func NewServiceA() (*ServiceA, error) {
  111. var newSliceOutChans []chan *DataObject
  112. return &amp;ServiceA{
  113. QuitChan: make(chan struct{}),
  114. OutgoingBuffChans: newSliceOutChans,
  115. name: &quot;SERVICEA&quot;,
  116. }, nil
  117. }
  118. // Start starts the service. Returns an error if any issues occur
  119. func (s *ServiceA) Start() error {
  120. atomic.StoreInt32(&amp;s.Active, 1)
  121. return nil
  122. }
  123. // Stop stops the service. Returns an error if any issues occur
  124. func (s *ServiceA) Stop() error {
  125. atomic.StoreInt32(&amp;s.Stopping, 1)
  126. close(s.QuitChan)
  127. return nil
  128. }
  129. func (s *ServiceA) StartRecording(pol_int int64) error {
  130. if ok := atomic.CompareAndSwapInt32(&amp;s.Recording, 0, 1); !ok {
  131. return fmt.Errorf(&quot;Could not start recording. Data recording already started&quot;)
  132. }
  133. ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
  134. go func() {
  135. for {
  136. select {
  137. case &lt;-ticker.C:
  138. fmt.Println(&quot;Time to record...&quot;)
  139. err := s.record()
  140. if err != nil {
  141. return
  142. }
  143. case &lt;-s.QuitChan:
  144. ticker.Stop()
  145. return
  146. }
  147. }
  148. }()
  149. return nil
  150. }
  151. func (s *ServiceA) record() error {
  152. current_time := time.Now()
  153. ct := fmt.Sprintf(&quot;%02d-%02d-%d&quot;, current_time.Day(), current_time.Month(), current_time.Year())
  154. dataObject := &amp;DataObject{
  155. Data: ct,
  156. }
  157. if atomic.LoadInt32(&amp;s.Listeners) != 0 {
  158. fmt.Println(&quot;Sending data to Data buffer...&quot;)
  159. for _, outChan := range s.OutgoingBuffChans {
  160. outChan &lt;- dataObject // the receivers should already be listening
  161. }
  162. fmt.Println(&quot;Data sent.&quot;)
  163. }
  164. return nil
  165. }
  166. // RegisterWithBufferService satisfies the DataProvider interface. It provides the bufService with new incoming and outgoing channels along with a polling interval
  167. func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {
  168. if _, ok := bufService.DataProviders[s.ServiceName()]; ok {
  169. return fmt.Errorf(&quot;%v data provider already registered with Data Buffer.&quot;, s.ServiceName())
  170. }
  171. newIncomingChan := make(chan *DataObject, 1)
  172. newOutgoingChan := make(chan *DataObject, 1)
  173. s.IncomingBuffChan = newIncomingChan
  174. s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
  175. bufService.DataProviders[s.ServiceName()] = DataProviderInfo{
  176. IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
  177. OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
  178. }
  179. s.DataBufferService = bufService
  180. bufService.NewProvider &lt;- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
  181. return nil
  182. }
  183. // ServiceName satisfies the DataProvider interface. It returns the name of the service.
  184. func (s ServiceA) ServiceName() string {
  185. return s.name
  186. }
  187. func main() {
  188. var BufferedServices []DataProvider
  189. fmt.Println(&quot;Instantiating and Starting Data Buffer Service...&quot;)
  190. bufService := NewDataBuffer()
  191. err := bufService.Start()
  192. if err != nil {
  193. panic(fmt.Sprintf(&quot;%v&quot;, err))
  194. }
  195. defer bufService.Stop()
  196. fmt.Println(&quot;Data Buffer Service successfully started.&quot;)
  197. fmt.Println(&quot;Instantiating and Starting Service A...&quot;)
  198. serviceA, err := NewServiceA()
  199. if err != nil {
  200. panic(fmt.Sprintf(&quot;%v&quot;, err))
  201. }
  202. BufferedServices = append(BufferedServices, *serviceA)
  203. err = serviceA.Start()
  204. if err != nil {
  205. panic(fmt.Sprintf(&quot;%v&quot;, err))
  206. }
  207. defer serviceA.Stop()
  208. fmt.Println(&quot;Service A successfully started.&quot;)
  209. fmt.Println(&quot;Registering services with Data Buffer...&quot;)
  210. for _, s := range BufferedServices {
  211. _ = s.RegisterWithBufferService(bufService) // ignoring error msgs for base case
  212. }
  213. fmt.Println(&quot;Registration complete.&quot;)
  214. fmt.Println(&quot;Beginning recording...&quot;)
  215. _ = atomic.AddInt32(&amp;serviceA.Listeners, 1)
  216. err = serviceA.StartRecording(DefaultPollingInterval)
  217. if err != nil {
  218. panic(fmt.Sprintf(&quot;%v&quot;, err))
  219. }
  220. for {
  221. select {
  222. case RTD := &lt;-serviceA.IncomingBuffChan:
  223. fmt.Println(RTD)
  224. case &lt;-serviceA.QuitChan:
  225. atomic.StoreInt32(&amp;serviceA.Listeners, 0)
  226. bufService.Quit&lt;-struct{}{}
  227. }
  228. }
  229. }

Running on Go 1.17. When running the example, it should print the following every 10 seconds:

  1. Time to record...
  2. Sending data to Data buffer...
  3. Data sent.

But then Data buffer never goes into the inData := &lt;-in case.

答案1

得分: 1

为了诊断这个问题,我将fmt.Println("Sending data to Data buffer...")更改为fmt.Println("Sending data to Data buffer...", s.OutgoingBuffChans),输出结果如下:

  1. Time to record...
  2. Sending data to Data buffer... []

所以实际上你并没有将数据发送到任何通道。原因是:

  1. func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

由于接收器不是指针,当你执行s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)时,你实际上是在一个被丢弃的ServiceA副本中修改了s.OutgoingBuffChans。为了修复这个问题,将代码修改为:

  1. func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

以及

  1. BufferedServices = append(BufferedServices, *serviceA)

修改为

  1. BufferedServices = append(BufferedServices, serviceA)

修改后的版本输出结果如下:

  1. Time to record...
  2. Sending data to Data buffer... [0xc0000d8060]
  3. Data sent.
  4. Received data from: SERVICEA
  5. Time to record...
  6. Sending data to Data buffer... [0xc0000d8060]
  7. Data sent.
  8. Received data from: SERVICEA

这样解决了报告的问题(如果还有其他问题,我不会感到惊讶,但希望这能指导你朝正确的方向前进)。我注意到你最初发布的代码确实使用了指针接收器,所以可能存在其他问题(但在这种情况下很难对代码片段进行评论)。

英文:

To diagnose this I changed fmt.Println(&quot;Sending data to Data buffer...&quot;) to fmt.Println(&quot;Sending data to Data buffer...&quot;, s.OutgoingBuffChans) and the output was:

  1. Time to record...
  2. Sending data to Data buffer... []

So you are not actually sending the data to any channels. The reason for this is:

  1. func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

As the receiver is not a pointer when you do the s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan) you are changing s.OutgoingBuffChans in a copy of the ServiceA which is discarded when the function exits. To fix this change:

  1. func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

to

  1. func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error {

and

  1. BufferedServices = append(BufferedServices, *serviceA)

to

  1. BufferedServices = append(BufferedServices, serviceA)

The amended version outputs:

  1. Time to record...
  2. Sending data to Data buffer... [0xc0000d8060]
  3. Data sent.
  4. Received data from: SERVICEA
  5. Time to record...
  6. Sending data to Data buffer... [0xc0000d8060]
  7. Data sent.
  8. Received data from: SERVICEA

So this resolves the reported issue (I would not be suprised if there are other issues but hopefully this points you in the right direction). I did notice that the code you originally posted does use a pointer receiver so that might have suffered from another issue (but its difficult to comment on code fragments in a case like this).

huangapple
  • 本文由 发表于 2021年10月19日 01:13:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/69620018.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定