英文:
Why do the client and server cause a data race?
问题
我正在尝试使用通道创建一个基本的发布-订阅(PubSub)TCP中继。我的目标是将一个TCP流转发给多个客户端(一对多)。我无法解决客户端和服务器连接之间的数据竞争问题。我会很感激任何关于为什么客户端和服务器连接之间会发生数据竞争的见解。
我认为发布-订阅部分是没问题的。它是从以下博客中改编的:
https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/
根据数据竞争警告,竞争发生在下面的main函数代码块中。我在导致数据竞争的行上放了一些注释。我以为可以同时运行服务器和客户端,我错了吗?
package main
import (
"flag"
"net"
"os"
"sync"
)
var (
laddr = flag.String("l", "", "listen address (:port)")
raddr = flag.String("r", "", "remote address (host:port)")
)
type Sub struct {
topic string
id int64
}
type Pubsub struct {
mu sync.RWMutex
subs map[Sub]chan []byte
closed bool
counter int64
}
func NewPubsub() *Pubsub {
ps := &Pubsub{}
ps.subs = make(map[Sub]chan []byte)
ps.closed = false
return ps
}
func (ps *Pubsub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.closed {
ps.closed = true
for _, sub := range ps.subs {
close(sub)
}
}
}
func (ps *Pubsub) Subscribe(topic string) (<-chan []byte, Sub) {
ps.mu.Lock()
defer ps.mu.Unlock()
// 初始化订阅
sub := Sub{topic: topic, id: ps.counter}
// 将订阅添加到映射中
ch := make(chan []byte, 1)
ps.subs[sub] = ch
// 计数器加一
ps.counter++
return ch, sub
}
func (ps *Pubsub) Unsubscribe(s Sub) {
ps.mu.Lock()
defer ps.mu.Unlock()
delete(ps.subs, s)
}
func (ps *Pubsub) Publish(topic string, msg []byte) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for sub, ch := range ps.subs {
if sub.topic == topic {
ch <- msg
}
}
}
func main() {
flag.Parse()
if *laddr == "" || *raddr == "" {
flag.PrintDefaults()
os.Exit(1)
}
ps := NewPubsub()
publisher := func(topic string) {
remote, err := net.Dial("tcp", *raddr)
if err != nil {
return
}
buf := make([]byte, 2048)
for {
n, _ := remote.Read(buf) // *** 这里有数据竞争 ***
ps.Publish(topic, buf[:n])
}
}
go publisher("relay")
subscriber := func(conn net.Conn, ch <-chan []byte) {
for i := range ch {
conn.Write([]byte(i)) // *** 这里有数据竞争 ***
}
}
ln, err := net.Listen("tcp", *laddr)
if err != nil {
return
}
for {
conn, err := ln.Accept()
if err != nil {
continue
}
ch, _ := ps.Subscribe("relay")
go subscriber(conn, ch)
}
}
当使用"go run -race pubsub.go"命令时,数据竞争输出如下所示。
在第一个客户端连接到监听服务器的端口之前,不会出现数据竞争警告。
在程序运行时,我没有看到其他类型的数据竞争。但是当我中继二进制数据时,字节偶尔会损坏或丢失,这表明我的实现可能还存在其他问题。
英文:
I'm trying to create a TCP relay using channels as a basic PubSub. My goal is to relay one TCP stream to many clients (one to many). I haven't been able to fix a data race between the client and server connections. I would be grateful for any insight as to why the data race occurs between the client and server connections?
I think the pubsub part is OK. It was adapted from the following blog:
https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/
According to the data race warning, the race occurs in the main function code block below. I put some comments on the lines that cause the data race. I thought it would be possible to run a server and client concurrently, am I mistaken?
package main
import (
"flag"
"net"
"os"
"sync"
)
var (
laddr = flag.String("l", "", "listen address (:port)")
raddr = flag.String("r", "", "remote address (host:port)")
)
type Sub struct {
topic string
id int64
}
type Pubsub struct {
mu sync.RWMutex
subs map[Sub]chan []byte
closed bool
counter int64
}
func NewPubsub() *Pubsub {
ps := &Pubsub{}
ps.subs = make(map[Sub]chan []byte)
ps.closed = false
return ps
}
func (ps *Pubsub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.closed {
ps.closed = true
for _, sub := range ps.subs {
close(sub)
}
}
}
func (ps *Pubsub) Subscribe(topic string) (<-chan []byte, Sub) {
ps.mu.Lock()
defer ps.mu.Unlock()
// initialze the subscription
sub := Sub{topic: topic, id: ps.counter}
// Add the subscription to the map
ch := make(chan []byte, 1)
ps.subs[sub] = ch
// Increment the counter
ps.counter++
return ch, sub
}
func (ps *Pubsub) Unsubscribe(s Sub) {
ps.mu.Lock()
defer ps.mu.Unlock()
delete(ps.subs, s)
}
func (ps *Pubsub) Publish(topic string, msg []byte) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for sub, ch := range ps.subs {
if sub.topic == topic {
ch <- msg
}
}
}
func main() {
flag.Parse()
if *laddr == "" || *raddr == "" {
flag.PrintDefaults()
os.Exit(1)
}
ps := NewPubsub()
publisher := func(topic string) {
remote, err := net.Dial("tcp", *raddr)
if err != nil {
return
}
buf := make([]byte, 2048)
for {
n, _ := remote.Read(buf) // *** RACE HERE ***
ps.Publish(topic, buf[:n])
}
}
go publisher("relay")
subscriber := func(conn net.Conn, ch <-chan []byte) {
for i := range ch {
conn.Write([]byte(i)) // *** RACE HERE ***
}
}
ln, err := net.Listen("tcp", *laddr)
if err != nil {
return
}
for {
conn, err := ln.Accept()
if err != nil {
continue
}
ch, _ := ps.Subscribe("relay")
go subscriber(conn, ch)
}
}
The data race output when using "go run -race pubsub.go" is shown below.
The data race warning does not occur until the first client connects to the listening server's port.
I have not seen any other types of data races while this program runs. But when I relay binary data, bytes are infrequently either corrupted or missing, suggesting that their may be other issues with my naïve implementation.
==================
WARNING: DATA RACE
Write at 0x00c0000f8000 by goroutine 7:
internal/race.WriteRange()
/usr/local/go/src/internal/race/race.go:49 +0xaa
syscall.Read()
/usr/local/go/src/syscall/syscall_unix.go:190 +0x89
internal/poll.ignoringEINTRIO()
/usr/local/go/src/internal/poll/fd_unix.go:581 +0x1c8
internal/poll.(*FD).Read()
/usr/local/go/src/internal/poll/fd_unix.go:162 +0x17c
net.(*netFD).Read()
/usr/local/go/src/net/fd_posix.go:55 +0x68
net.(*conn).Read()
/usr/local/go/src/net/net.go:183 +0xeb
net.(*TCPConn).Read()
<autogenerated>:1 +0x69
main.main.func1()
/pubsub/pubsub.go:101 +0x154
Previous read at 0x00c0000f8000 by goroutine 9:
internal/race.ReadRange()
/usr/local/go/src/internal/race/race.go:45 +0xb0
syscall.Write()
/usr/local/go/src/syscall/syscall_unix.go:215 +0x94
internal/poll.ignoringEINTRIO()
/usr/local/go/src/internal/poll/fd_unix.go:581 +0x16e
internal/poll.(*FD).Write()
/usr/local/go/src/internal/poll/fd_unix.go:274 +0x294
net.(*netFD).Write()
/usr/local/go/src/net/fd_posix.go:73 +0x68
net.(*conn).Write()
/usr/local/go/src/net/net.go:195 +0xeb
net.(*TCPConn).Write()
<autogenerated>:1 +0x69
main.main.func2()
/pubsub/pubsub.go:110 +0x84
Goroutine 7 (running) created at:
main.main()
/pubsub/pubsub.go:106 +0x288
Goroutine 9 (running) created at:
main.main()
/pubsub/pubsub.go:125 +0x38f
==================
答案1
得分: 2
快速修复:
// buf := make([]byte, 2048) // <- 将此行代码...
for {
buf := make([]byte, 2048) // <- ... 移动到这里
n, _ := remote.Read(buf)
ps.Publish(topic, buf[:n])
}
为什么会出现问题?因为通过通道传递给多个订阅者(读取器)的是单个(常量)buf
- 当下一个for
迭代发生时 - 这些读取器将获取到损坏的竞争数据。
每次迭代创建一个唯一的缓冲区将确保新的写入不会破坏由订阅者发送和仍在处理的旧消息。
英文:
Quick fix:
// buf := make([]byte, 2048) // <- move this ...
for {
buf := make([]byte, 2048) // <- ... to here
n, _ := remote.Read(buf)
ps.Publish(topic, buf[:n])
}
Why is this broken? Since a single (constant) buf
is passed via a channel to multiple subscribers (readers) - when the next for
iteration occurs - those readers will get corrupt racy data.
Creating a unique buffer per iteration will ensure no new writes will corrupt old messages sent & still being processed by subscribers.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论