客户端和服务器之间为什么会引发数据竞争?

huangapple go评论117阅读模式
英文:

Why do the client and server cause a data race?

问题

我正在尝试使用通道创建一个基本的发布-订阅(PubSub)TCP中继。我的目标是将一个TCP流转发给多个客户端(一对多)。我无法解决客户端和服务器连接之间的数据竞争问题。我会很感激任何关于为什么客户端和服务器连接之间会发生数据竞争的见解。

我认为发布-订阅部分是没问题的。它是从以下博客中改编的:

https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/

根据数据竞争警告,竞争发生在下面的main函数代码块中。我在导致数据竞争的行上放了一些注释。我以为可以同时运行服务器和客户端,我错了吗?

  1. package main
  2. import (
  3. "flag"
  4. "net"
  5. "os"
  6. "sync"
  7. )
  8. var (
  9. laddr = flag.String("l", "", "listen address (:port)")
  10. raddr = flag.String("r", "", "remote address (host:port)")
  11. )
  12. type Sub struct {
  13. topic string
  14. id int64
  15. }
  16. type Pubsub struct {
  17. mu sync.RWMutex
  18. subs map[Sub]chan []byte
  19. closed bool
  20. counter int64
  21. }
  22. func NewPubsub() *Pubsub {
  23. ps := &Pubsub{}
  24. ps.subs = make(map[Sub]chan []byte)
  25. ps.closed = false
  26. return ps
  27. }
  28. func (ps *Pubsub) Close() {
  29. ps.mu.Lock()
  30. defer ps.mu.Unlock()
  31. if !ps.closed {
  32. ps.closed = true
  33. for _, sub := range ps.subs {
  34. close(sub)
  35. }
  36. }
  37. }
  38. func (ps *Pubsub) Subscribe(topic string) (<-chan []byte, Sub) {
  39. ps.mu.Lock()
  40. defer ps.mu.Unlock()
  41. // 初始化订阅
  42. sub := Sub{topic: topic, id: ps.counter}
  43. // 将订阅添加到映射中
  44. ch := make(chan []byte, 1)
  45. ps.subs[sub] = ch
  46. // 计数器加一
  47. ps.counter++
  48. return ch, sub
  49. }
  50. func (ps *Pubsub) Unsubscribe(s Sub) {
  51. ps.mu.Lock()
  52. defer ps.mu.Unlock()
  53. delete(ps.subs, s)
  54. }
  55. func (ps *Pubsub) Publish(topic string, msg []byte) {
  56. ps.mu.RLock()
  57. defer ps.mu.RUnlock()
  58. for sub, ch := range ps.subs {
  59. if sub.topic == topic {
  60. ch <- msg
  61. }
  62. }
  63. }
  64. func main() {
  65. flag.Parse()
  66. if *laddr == "" || *raddr == "" {
  67. flag.PrintDefaults()
  68. os.Exit(1)
  69. }
  70. ps := NewPubsub()
  71. publisher := func(topic string) {
  72. remote, err := net.Dial("tcp", *raddr)
  73. if err != nil {
  74. return
  75. }
  76. buf := make([]byte, 2048)
  77. for {
  78. n, _ := remote.Read(buf) // *** 这里有数据竞争 ***
  79. ps.Publish(topic, buf[:n])
  80. }
  81. }
  82. go publisher("relay")
  83. subscriber := func(conn net.Conn, ch <-chan []byte) {
  84. for i := range ch {
  85. conn.Write([]byte(i)) // *** 这里有数据竞争 ***
  86. }
  87. }
  88. ln, err := net.Listen("tcp", *laddr)
  89. if err != nil {
  90. return
  91. }
  92. for {
  93. conn, err := ln.Accept()
  94. if err != nil {
  95. continue
  96. }
  97. ch, _ := ps.Subscribe("relay")
  98. go subscriber(conn, ch)
  99. }
  100. }

当使用"go run -race pubsub.go"命令时,数据竞争输出如下所示。

在第一个客户端连接到监听服务器的端口之前,不会出现数据竞争警告。

在程序运行时,我没有看到其他类型的数据竞争。但是当我中继二进制数据时,字节偶尔会损坏或丢失,这表明我的实现可能还存在其他问题。

英文:

I'm trying to create a TCP relay using channels as a basic PubSub. My goal is to relay one TCP stream to many clients (one to many). I haven't been able to fix a data race between the client and server connections. I would be grateful for any insight as to why the data race occurs between the client and server connections?

I think the pubsub part is OK. It was adapted from the following blog:

https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/

According to the data race warning, the race occurs in the main function code block below. I put some comments on the lines that cause the data race. I thought it would be possible to run a server and client concurrently, am I mistaken?

  1. package main
  2. import (
  3. &quot;flag&quot;
  4. &quot;net&quot;
  5. &quot;os&quot;
  6. &quot;sync&quot;
  7. )
  8. var (
  9. laddr = flag.String(&quot;l&quot;, &quot;&quot;, &quot;listen address (:port)&quot;)
  10. raddr = flag.String(&quot;r&quot;, &quot;&quot;, &quot;remote address (host:port)&quot;)
  11. )
  12. type Sub struct {
  13. topic string
  14. id int64
  15. }
  16. type Pubsub struct {
  17. mu sync.RWMutex
  18. subs map[Sub]chan []byte
  19. closed bool
  20. counter int64
  21. }
  22. func NewPubsub() *Pubsub {
  23. ps := &amp;Pubsub{}
  24. ps.subs = make(map[Sub]chan []byte)
  25. ps.closed = false
  26. return ps
  27. }
  28. func (ps *Pubsub) Close() {
  29. ps.mu.Lock()
  30. defer ps.mu.Unlock()
  31. if !ps.closed {
  32. ps.closed = true
  33. for _, sub := range ps.subs {
  34. close(sub)
  35. }
  36. }
  37. }
  38. func (ps *Pubsub) Subscribe(topic string) (&lt;-chan []byte, Sub) {
  39. ps.mu.Lock()
  40. defer ps.mu.Unlock()
  41. // initialze the subscription
  42. sub := Sub{topic: topic, id: ps.counter}
  43. // Add the subscription to the map
  44. ch := make(chan []byte, 1)
  45. ps.subs[sub] = ch
  46. // Increment the counter
  47. ps.counter++
  48. return ch, sub
  49. }
  50. func (ps *Pubsub) Unsubscribe(s Sub) {
  51. ps.mu.Lock()
  52. defer ps.mu.Unlock()
  53. delete(ps.subs, s)
  54. }
  55. func (ps *Pubsub) Publish(topic string, msg []byte) {
  56. ps.mu.RLock()
  57. defer ps.mu.RUnlock()
  58. for sub, ch := range ps.subs {
  59. if sub.topic == topic {
  60. ch &lt;- msg
  61. }
  62. }
  63. }
  64. func main() {
  65. flag.Parse()
  66. if *laddr == &quot;&quot; || *raddr == &quot;&quot; {
  67. flag.PrintDefaults()
  68. os.Exit(1)
  69. }
  70. ps := NewPubsub()
  71. publisher := func(topic string) {
  72. remote, err := net.Dial(&quot;tcp&quot;, *raddr)
  73. if err != nil {
  74. return
  75. }
  76. buf := make([]byte, 2048)
  77. for {
  78. n, _ := remote.Read(buf) // *** RACE HERE ***
  79. ps.Publish(topic, buf[:n])
  80. }
  81. }
  82. go publisher(&quot;relay&quot;)
  83. subscriber := func(conn net.Conn, ch &lt;-chan []byte) {
  84. for i := range ch {
  85. conn.Write([]byte(i)) // *** RACE HERE ***
  86. }
  87. }
  88. ln, err := net.Listen(&quot;tcp&quot;, *laddr)
  89. if err != nil {
  90. return
  91. }
  92. for {
  93. conn, err := ln.Accept()
  94. if err != nil {
  95. continue
  96. }
  97. ch, _ := ps.Subscribe(&quot;relay&quot;)
  98. go subscriber(conn, ch)
  99. }
  100. }

The data race output when using "go run -race pubsub.go" is shown below.

The data race warning does not occur until the first client connects to the listening server's port.

I have not seen any other types of data races while this program runs. But when I relay binary data, bytes are infrequently either corrupted or missing, suggesting that their may be other issues with my naïve implementation.

  1. ==================
  2. WARNING: DATA RACE
  3. Write at 0x00c0000f8000 by goroutine 7:
  4. internal/race.WriteRange()
  5. /usr/local/go/src/internal/race/race.go:49 +0xaa
  6. syscall.Read()
  7. /usr/local/go/src/syscall/syscall_unix.go:190 +0x89
  8. internal/poll.ignoringEINTRIO()
  9. /usr/local/go/src/internal/poll/fd_unix.go:581 +0x1c8
  10. internal/poll.(*FD).Read()
  11. /usr/local/go/src/internal/poll/fd_unix.go:162 +0x17c
  12. net.(*netFD).Read()
  13. /usr/local/go/src/net/fd_posix.go:55 +0x68
  14. net.(*conn).Read()
  15. /usr/local/go/src/net/net.go:183 +0xeb
  16. net.(*TCPConn).Read()
  17. &lt;autogenerated&gt;:1 +0x69
  18. main.main.func1()
  19. /pubsub/pubsub.go:101 +0x154
  20. Previous read at 0x00c0000f8000 by goroutine 9:
  21. internal/race.ReadRange()
  22. /usr/local/go/src/internal/race/race.go:45 +0xb0
  23. syscall.Write()
  24. /usr/local/go/src/syscall/syscall_unix.go:215 +0x94
  25. internal/poll.ignoringEINTRIO()
  26. /usr/local/go/src/internal/poll/fd_unix.go:581 +0x16e
  27. internal/poll.(*FD).Write()
  28. /usr/local/go/src/internal/poll/fd_unix.go:274 +0x294
  29. net.(*netFD).Write()
  30. /usr/local/go/src/net/fd_posix.go:73 +0x68
  31. net.(*conn).Write()
  32. /usr/local/go/src/net/net.go:195 +0xeb
  33. net.(*TCPConn).Write()
  34. &lt;autogenerated&gt;:1 +0x69
  35. main.main.func2()
  36. /pubsub/pubsub.go:110 +0x84
  37. Goroutine 7 (running) created at:
  38. main.main()
  39. /pubsub/pubsub.go:106 +0x288
  40. Goroutine 9 (running) created at:
  41. main.main()
  42. /pubsub/pubsub.go:125 +0x38f
  43. ==================

答案1

得分: 2

快速修复:

  1. // buf := make([]byte, 2048) // &lt;- 将此行代码...
  2. for {
  3. buf := make([]byte, 2048) // &lt;- ... 移动到这里
  4. n, _ := remote.Read(buf)
  5. ps.Publish(topic, buf[:n])
  6. }

为什么会出现问题?因为通过通道传递给多个订阅者(读取器)的是单个(常量)buf - 当下一个for迭代发生时 - 这些读取器将获取到损坏的竞争数据。

每次迭代创建一个唯一的缓冲区将确保新的写入不会破坏由订阅者发送和仍在处理的旧消息。

英文:

Quick fix:

  1. // buf := make([]byte, 2048) // &lt;- move this ...
  2. for {
  3. buf := make([]byte, 2048) // &lt;- ... to here
  4. n, _ := remote.Read(buf)
  5. ps.Publish(topic, buf[:n])
  6. }

Why is this broken? Since a single (constant) buf is passed via a channel to multiple subscribers (readers) - when the next for iteration occurs - those readers will get corrupt racy data.

Creating a unique buffer per iteration will ensure no new writes will corrupt old messages sent & still being processed by subscribers.

huangapple
  • 本文由 发表于 2021年9月12日 06:07:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/69147044.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定