英文:
synchronization issue in this Golang program
问题
我正在尝试创建一个作为代理服务器的程序,并且可以动态切换到新的端点。但是我遇到了一个问题,在调用switchOverToNewEndpoint()
之后,仍然有一些代理对象连接到原始的端点8.8.8.8
,这些连接应该被关闭。
经过一番思考,我猜测问题可能出在以下代码处:
mu.RLock()
closedProxies := proxies
mu.RUnlock()
但我不确定这是否是根本原因,以及是否可以通过以下代码进行修复:
closedProxies := make([]*Proxy, 0)
mu.RLock()
for _, proxy := range proxies {
closedProxies = append(closedProxies, proxy)
}
mu.RUnlock()
由于很难重现该问题,所以能否有专业知识的人提供一个思路或提示?欢迎任何评论。提前感谢。
英文:
I'm trying to create a program that functions as a proxy server and can switch over to new endpoint dynamically. but I'm facing a problem that after calling switchOverToNewEndpoint()
, there are still some proxy objects connecting to the original endpoint 8.8.8.8
which should be closed.
package main
import (
"net"
"sync"
"sync/atomic"
"time"
)
type Proxy struct {
ID int32
From, To *net.TCPConn
}
var switchOver int32 = 0
func SetSwitchOver() {
atomic.StoreInt32((*int32)(&switchOver), 1)
}
func SwitchOverEnabled() bool {
return atomic.LoadInt32((*int32)(&switchOver)) == 1
}
var proxies map[int32]*Proxy = make(map[int32]*Proxy, 0)
var proxySeq int32 = 0
var mu sync.RWMutex
func addProxy(from *net.TCPConn) {
mu.Lock()
proxySeq += 1
proxy := &Proxy{ID: proxySeq, From: from}
proxies[proxySeq] = proxy
mu.Unlock()
var toAddr string
if SwitchOverEnabled() {
toAddr = "1.1.1.1"
} else {
toAddr = "8.8.8.8"
}
tcpAddr, _ := net.ResolveTCPAddr("tcp4", toAddr)
toConn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
panic(err)
}
proxy.To = toConn
}
func switchOverToNewEndpoint() {
mu.RLock()
closedProxies := proxies
mu.RUnlock()
SetSwitchOver()
for _, proxy := range closedProxies {
proxy.From.Close()
proxy.To.Close()
mu.Lock()
delete(proxies, proxy.ID)
mu.Unlock()
}
}
func main() {
tcpAddr, _ := net.ResolveTCPAddr("tcp4", "0.0.0.0:5432")
ln, _ := net.ListenTCP("tcp", tcpAddr)
go func() {
time.Sleep(time.Second * 30)
switchOverToNewEndpoint()
}()
for {
clientConn, err := ln.AcceptTCP()
if err != nil {
panic(err)
}
go addProxy(clientConn)
}
}
After thinking for a while, I guess the problem was at
mu.RLock()
closedProxies := proxies
mu.RUnlock()
But I'm not sure if this was the root cause and if I can fix it by replacing it with the following:
closedProxies := make([]*Proxy, 0)
mu.RLock()
for _, proxy := range proxies {
closedProxies = append(closedProxies, proxy)
}
mu.RUnlock()
Since the case is hard to reproduce, so could someone with expertise provide an idea or hint? Any comments are welcome. Thanks in advance.
答案1
得分: 2
问题
这个变化是必要的。在原始实现中,closedProxies
持有相同的映射。请参考以下示例:
package main
import "fmt"
func main() {
proxies := make(map[int]int, 0)
for i := 0; i < 10; i++ {
proxies[i] = i
}
closeProxies := proxies
proxies[10] = 10
proxies[11] = 11
for k := range closeProxies {
delete(proxies, k)
}
fmt.Printf("items left: %d\n", len(proxies))
// Output:
// items left: 0
}
但这不是根本原因。在复制 closeProxies
之后,SetSwitchOver
被调用之前,可能会添加新的代理。在这种情况下,新的代理连接到旧地址,但不在 closeProxies
中。我认为这是根本原因。
还有另一个问题。在设置 To
字段之前,将新代理添加到 proxies
中。可能会发生在 To
字段设置之前关闭此代理的情况,导致恐慌。
可靠工作的设计
思路是将所有的端点放入一个切片中,并让每个端点管理自己的代理列表。因此,我们只需要跟踪当前端点的索引。当我们想要切换到另一个端点时,只需要更改索引,并告诉过时的端点清除其代理。唯一复杂的事情是确保过时的端点可以清除其所有代理。请参考以下实现:
manager.go
这是这个思路的实现。
package main
import (
"sync"
)
// Conn 是一个连接的抽象,使 Manager 容易进行测试。
type Conn interface {
Close() error
}
// Dialer 是一个拨号器的抽象,使 Manager 容易进行测试。
type Dialer interface {
Dial(addr string) (Conn, error)
}
type Manager struct {
// muCurrent 保护 "current" 成员。
muCurrent sync.RWMutex
current int // 当 current 为 -1 时,表示 Manager 已关闭。
endpoints []*endpoint
// mu 保护整个 Switch 操作。
mu sync.Mutex
}
func NewManager(dialer Dialer, addresses ...string) *Manager {
if len(addresses) < 2 {
panic("a manager should handle at least 2 addresses")
}
endpoints := make([]*endpoint, len(addresses))
for i, addr := range addresses {
endpoints[i] = &endpoint{
address: addr,
dialer: dialer,
}
}
return &Manager{
endpoints: endpoints,
}
}
func (m *Manager) AddProxy(from Conn) {
// 1. AddProxy 将等待 m.muCurrent 的写锁被获取。
// 一旦写锁被释放,AddProxy 将连接到新的端点。
// Switch 只持有写锁的时间很短,而且不会频繁调用 Switch,
// 所以 AddProxy 不会等待太久。
// 2. 如果有任何 AddProxy 持有 m.muCurrent 的读锁,
// Switch 将等待。这意味着 Switch 等待的时间更长。
// 优点是在 Switch 中调用 e.clear 时,所有对旧端点的 AddProxy 请求都已完成。
// 因此,此时调用 e.clear 是安全的。
m.muCurrent.RLock()
defer m.muCurrent.RUnlock()
current := m.current
// 当 Manager 已关闭时,不接受任何新连接。
if current == -1 {
from.Close()
return
}
m.endpoints[current].addProxy(from)
}
func (m *Manager) Switch() {
// 在实际情况下,Switch 不会频繁调用。
// 所以在这里加锁是可以的。
// 并且有必要确保旧端点已清除并准备好将来使用。
m.mu.Lock()
defer m.mu.Unlock()
// 获取 m.muCurrent 的写锁。
// 它等待所有持有读锁的 AddProxy 请求完成。
m.muCurrent.Lock()
old := m.current
// 当 Manager 已关闭时,不执行任何操作。
if old == -1 {
m.muCurrent.Unlock()
return
}
next := old + 1
if next >= len(m.endpoints) {
next = 0
}
m.current = next
m.muCurrent.Unlock()
// 当到达这里时,所有对旧端点的 AddProxy 请求都已完成。
// 现在可以安全地调用 e.clear。
m.endpoints[old].clear()
}
func (m *Manager) Shutdown() {
m.mu.Lock()
defer m.mu.Unlock()
m.muCurrent.Lock()
current := m.current
m.current = -1
m.muCurrent.Unlock()
m.endpoints[current].clear()
}
type proxy struct {
from, to Conn
}
type endpoint struct {
address string
dialer Dialer
mu sync.Mutex
proxies []*proxy
}
func (e *endpoint) clear() {
for _, p := range e.proxies {
p.from.Close()
p.to.Close()
}
// 为 e.proxies 分配一个新的切片,旧的切片将被 GC 回收。
e.proxies = []*proxy{}
}
func (e *endpoint) addProxy(from Conn) {
toConn, err := e.dialer.Dial(e.address)
if err != nil {
// 关闭 from 连接,以便客户端重新连接?
from.Close()
return
}
e.mu.Lock()
defer e.mu.Unlock()
e.proxies = append(e.proxies, &proxy{from: from, to: toConn})
}
main.go
这个示例展示了如何使用之前实现的 Manager 类型:
package main
import (
"net"
"time"
)
type realDialer struct{}
func (d realDialer) Dial(addr string) (Conn, error) {
tcpAddr, err := net.ResolveTCPAddr("tcp4", addr)
if err != nil {
return nil, err
}
return net.DialTCP("tcp", nil, tcpAddr)
}
func main() {
manager := NewManager(realDialer{}, "1.1.1.1", "8.8.8.8")
tcpAddr, _ := net.ResolveTCPAddr("tcp4", "0.0.0.0:5432")
ln, _ := net.ListenTCP("tcp", tcpAddr)
go func() {
for range time.Tick(30 * time.Second) {
manager.Switch()
}
}()
for {
clientConn, err := ln.AcceptTCP()
if err != nil {
panic(err)
}
go manager.AddProxy(clientConn)
}
}
manager_test.go
使用以下命令运行测试:go test ./... -race -count 10
package main
import (
"errors"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
)
func TestManager(t *testing.T) {
addresses := []string{"1.1.1.1", "8.8.8.8"}
dialer := newDialer(addresses...)
manager := NewManager(dialer, addresses...)
ch := make(chan int, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for range ch {
manager.Switch()
}
wg.Done()
}()
count := 1000
total := count * 10
fromConn := &fakeFromConn{}
for i := 0; i < total; i++ {
if i%count == count-1 {
ch <- 0
}
go func() {
manager.AddProxy(fromConn)
wg.Done()
}()
}
close(ch)
wg.Wait()
manager.Shutdown()
for _, s := range dialer.servers {
left := len(s.conns)
if left != 0 {
t.Errorf("server %s, unexpected connections left: %d", s.addr, left)
}
}
closedCount := fromConn.closedCount.Load()
if closedCount != int32(total) {
t.Errorf("want closed count: %d, got: %d", total, closedCount)
}
}
type fakeFromConn struct {
closedCount atomic.Int32
}
func (c *fakeFromConn) Close() error {
c.closedCount.Add(1)
return nil
}
type fakeToConn struct {
id uuid.UUID
server *fakeServer
}
func (c *fakeToConn) Close() error {
if c.id == uuid.Nil {
return nil
}
c.server.removeConn(c.id)
return nil
}
type fakeServer struct {
addr string
mu sync.Mutex
conns map[uuid.UUID]bool
}
func (s *fakeServer) addConn() (uuid.UUID, error) {
s.mu.Lock()
defer s.mu.Unlock()
id, err := uuid.NewRandom()
if err == nil {
s.conns[id] = true
}
return id, err
}
func (s *fakeServer) removeConn(id uuid.UUID) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.conns, id)
}
type fakeDialer struct {
servers map[string]*fakeServer
}
func newDialer(addresses ...string) *fakeDialer {
servers := make(map[string]*fakeServer)
for _, addr := range addresses {
servers[addr] = &fakeServer{
addr: addr,
conns: make(map[uuid.UUID]bool),
}
}
return &fakeDialer{
servers: servers,
}
}
func (d *fakeDialer) Dial(addr string) (Conn, error) {
n := rand.Intn(100)
if n == 0 {
return nil, errors.New("fake network error")
}
// 模拟网络延迟。
time.Sleep(time.Duration(n) * time.Millisecond)
s := d.servers[addr]
id, err := s.addConn()
if err != nil {
return nil, err
}
conn := &fakeToConn{
id: id,
server: s,
}
return conn, nil
}
英文:
The issues
The change is necessary. In the original implementation, closedProxies
holds the same map. See this demo:
package main
import "fmt"
func main() {
proxies := make(map[int]int, 0)
for i := 0; i < 10; i++ {
proxies[i] = i
}
closeProxies := proxies
proxies[10] = 10
proxies[11] = 11
for k := range closeProxies {
delete(proxies, k)
}
fmt.Printf("items left: %d\n", len(proxies))
// Output:
// items left: 0
}
But this is not the root cause. A new proxy could be added after closeProxies
is copied but before SetSwitchOver
is called. In this case, the new proxy connects to the old address but is not in closeProxies
. I think this is the root cause.
And there is another issue. A new proxy is added to proxies
before the To
field is set. It could happen that the program wants to close this proxy before the To
field is set and result in a panic.
A design that works reliablly
The idea is to put all the endpoints into a slice, and let each endpoint manage its own list of proxies. So we only need to keep track of the index of the current endpoint. When we want to switch to another endpoint, we just need to change the index, and tell the outdated endpoint to clear its proxies. The only complicated thing left is to make sure an outdated endpoint can clear all its proxies. See the implementation below:
manager.go
This is the implementation of the idea.
package main
import (
"sync"
)
// Conn is abstraction of a connection to make Manager easy to test.
type Conn interface {
Close() error
}
// Dialer is abstraction of a dialer to make Manager easy to test.
type Dialer interface {
Dial(addr string) (Conn, error)
}
type Manager struct {
// muCurrent protects the "current" member.
muCurrent sync.RWMutex
current int // When current is -1, the manager is shuted down.
endpoints []*endpoint
// mu protects the whole Switch action.
mu sync.Mutex
}
func NewManager(dialer Dialer, addresses ...string) *Manager {
if len(addresses) < 2 {
panic("a manger should handle at least 2 addresses")
}
endpoints := make([]*endpoint, len(addresses))
for i, addr := range addresses {
endpoints[i] = &endpoint{
address: addr,
dialer: dialer,
}
}
return &Manager{
endpoints: endpoints,
}
}
func (m *Manager) AddProxy(from Conn) {
// 1. AddProxy will wait when the write lock of m.muCurrent is taken.
// Once the write lock is released, AddProxy will connect to the new endpoint.
// Switch only holds the write lock for a short time, and Switch is called
// not so frequently, so AddProxy won't wait too much.
// 2. Switch will wait if there is any AddProxy holding the read lock of
// m.muCurrent. That means Switch waits longer. The advantage is that when
// e.clear is called in Switch, All AddProxy requests to the old endpoint
// are done. So it's safe to call e.clear then.
m.muCurrent.RLock()
defer m.muCurrent.RUnlock()
current := m.current
// Do not accept any new connection when m has been shutdown.
if current == -1 {
from.Close()
return
}
m.endpoints[current].addProxy(from)
}
func (m *Manager) Switch() {
// In a real world, Switch is called not so frequently.
// So it's ok to add a lock here.
// And it's necessary to make sure the old endpoint is cleared and ready
// for use in the future.
m.mu.Lock()
defer m.mu.Unlock()
// Take the write lock of m.muCurrent.
// It waits for all the AddProxy requests holding the read lock to finish.
m.muCurrent.Lock()
old := m.current
// Do nothing when m has been shutdown.
if old == -1 {
m.muCurrent.Unlock()
return
}
next := old + 1
if next >= len(m.endpoints) {
next = 0
}
m.current = next
m.muCurrent.Unlock()
// When it reaches here, all AddProxy requests to the old endpoint are done.
// And it's safe to call e.clear now.
m.endpoints[old].clear()
}
func (m *Manager) Shutdown() {
m.mu.Lock()
defer m.mu.Unlock()
m.muCurrent.Lock()
current := m.current
m.current = -1
m.muCurrent.Unlock()
m.endpoints[current].clear()
}
type proxy struct {
from, to Conn
}
type endpoint struct {
address string
dialer Dialer
mu sync.Mutex
proxies []*proxy
}
func (e *endpoint) clear() {
for _, p := range e.proxies {
p.from.Close()
p.to.Close()
}
// Assign a new slice to e.proxies, and the GC will collect the old one.
e.proxies = []*proxy{}
}
func (e *endpoint) addProxy(from Conn) {
toConn, err := e.dialer.Dial(e.address)
if err != nil {
// Close the from connection so that the client will reconnect?
from.Close()
return
}
e.mu.Lock()
defer e.mu.Unlock()
e.proxies = append(e.proxies, &proxy{from: from, to: toConn})
}
main.go
This demo shows how to use the Manager type implemented before:
package main
import (
"net"
"time"
)
type realDialer struct{}
func (d realDialer) Dial(addr string) (Conn, error) {
tcpAddr, err := net.ResolveTCPAddr("tcp4", addr)
if err != nil {
return nil, err
}
return net.DialTCP("tcp", nil, tcpAddr)
}
func main() {
manager := NewManager(realDialer{}, "1.1.1.1", "8.8.8.8")
tcpAddr, _ := net.ResolveTCPAddr("tcp4", "0.0.0.0:5432")
ln, _ := net.ListenTCP("tcp", tcpAddr)
go func() {
for range time.Tick(30 * time.Second) {
manager.Switch()
}
}()
for {
clientConn, err := ln.AcceptTCP()
if err != nil {
panic(err)
}
go manager.AddProxy(clientConn)
}
}
manager_test.go
Run the test with this command: go test ./... -race -count 10
package main
import (
"errors"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
)
func TestManager(t *testing.T) {
addresses := []string{"1.1.1.1", "8.8.8.8"}
dialer := newDialer(addresses...)
manager := NewManager(dialer, addresses...)
ch := make(chan int, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for range ch {
manager.Switch()
}
wg.Done()
}()
count := 1000
total := count * 10
wg.Add(total)
fromConn := &fakeFromConn{}
for i := 0; i < total; i++ {
if i%count == count-1 {
ch <- 0
}
go func() {
manager.AddProxy(fromConn)
wg.Done()
}()
}
close(ch)
wg.Wait()
manager.Shutdown()
for _, s := range dialer.servers {
left := len(s.conns)
if left != 0 {
t.Errorf("server %s, unexpected connections left: %d", s.addr, left)
}
}
closedCount := fromConn.closedCount.Load()
if closedCount != int32(total) {
t.Errorf("want closed count: %d, got: %d", total, closedCount)
}
}
type fakeFromConn struct {
closedCount atomic.Int32
}
func (c *fakeFromConn) Close() error {
c.closedCount.Add(1)
return nil
}
type fakeToConn struct {
id uuid.UUID
server *fakeServer
}
func (c *fakeToConn) Close() error {
if c.id == uuid.Nil {
return nil
}
c.server.removeConn(c.id)
return nil
}
type fakeServer struct {
addr string
mu sync.Mutex
conns map[uuid.UUID]bool
}
func (s *fakeServer) addConn() (uuid.UUID, error) {
s.mu.Lock()
defer s.mu.Unlock()
id, err := uuid.NewRandom()
if err == nil {
s.conns[id] = true
}
return id, err
}
func (s *fakeServer) removeConn(id uuid.UUID) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.conns, id)
}
type fakeDialer struct {
servers map[string]*fakeServer
}
func newDialer(addresses ...string) *fakeDialer {
servers := make(map[string]*fakeServer)
for _, addr := range addresses {
servers[addr] = &fakeServer{
addr: addr,
conns: make(map[uuid.UUID]bool),
}
}
return &fakeDialer{
servers: servers,
}
}
func (d *fakeDialer) Dial(addr string) (Conn, error) {
n := rand.Intn(100)
if n == 0 {
return nil, errors.New("fake network error")
}
// Simulate network latency.
time.Sleep(time.Duration(n) * time.Millisecond)
s := d.servers[addr]
id, err := s.addConn()
if err != nil {
return nil, err
}
conn := &fakeToConn{
id: id,
server: s,
}
return conn, nil
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论