在Go中处理读写UDP连接

huangapple go评论89阅读模式
英文:

Handling read/write udp connection in Go

问题

我需要创建一个UDP连接,通过该连接我可以同时写入和读取数据包(使用不同的goroutine和GOMAXPROCS(n),其中n>1)。第一次尝试的代码如下:

func new_conn(port, chan_buf int) (conn *net.UDPConn, inbound chan Packet, err error) {
    inbound = make(chan Packet, chan_buf)

    conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: port})
    if err != nil {return}

    go func () {
        for {
            b := make([]byte, UDP_PACKET_SIZE)
            n, addr, err := conn.ReadFromUDP(b)
            if err != nil {
                log.Printf("Error: UDP read error: %v", err)
                continue
            }
            inbound <- Packet{addr, b[:n]}
        }
    }
}

所以我使用packet := <- inbound来读取数据包,使用**conn.WriteTo(data_bytes, remote_addr)**来写入数据包。但是竞争检测器会在连接的同时读写时发出警告。因此,我将代码重写为以下形式:

func new_conn(port, chan_buf int) (inbound, outbound chan Packet, err error) {
    inbound = make(chan Packet, chan_buf)
    outbound = make(chan Packet, chan_buf)

    conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: port})
    if err != nil {return}

    go func () {
        for {
            select {
            case packet := <- outbound:
                _, err := conn.WriteToUDP(packet.data, packet.addr)
                if err != nil {
                    log.Printf("Error: UDP write error: %v", err)
                    continue
                }
            default:
                b := make([]byte, UDP_PACKET_SIZE)
                n, addr, err := conn.ReadFromUDP(b)
                if err != nil {
                    log.Printf("Error: UDP read error: %v", err)
                    continue
                }
                inbound <- Packet{addr, b[:n]}
            }
        }
    }
}

这段代码不再触发竞争条件,但如果没有传入的数据包,goroutine有被阻塞的风险。我唯一看到的解决方法是在调用ReadFromUDP之前调用类似**SetReadDeadline(time.Now()+10*time.Millisecond)**的方法。这段代码可能会起作用,但我不太喜欢它。是否有更优雅的方法来解决这个问题?

**更新:**警告信息如下:

==================
WARNING: DATA RACE
Read by goroutine 553:
  net.ipToSockaddr()
      /usr/local/go/src/pkg/net/ipsock_posix.go:150 +0x18a
  net.(*UDPAddr).sockaddr()
      /usr/local/go/src/pkg/net/udpsock_posix.go:45 +0xd9
  net.(*UDPConn).WriteToUDP()
      /usr/local/go/src/pkg/net/udpsock_posix.go:123 +0x4df
  net.(*UDPConn).WriteTo()
      /usr/local/go/src/pkg/net/udpsock_posix.go:139 +0x2f6
  <traceback which points on conn.WriteTo call>

Previous write by goroutine 556:
  syscall.anyToSockaddr()
      /usr/local/go/src/pkg/syscall/syscall_linux.go:383 +0x336
  syscall.Recvfrom()
      /usr/local/go/src/pkg/syscall/syscall_unix.go:223 +0x15c
  net.(*netFD).ReadFrom()
      /usr/local/go/src/pkg/net/fd_unix.go:227 +0x33c
  net.(*UDPConn).ReadFromUDP()
      /usr/local/go/src/pkg/net/udpsock_posix.go:67 +0x164
  <traceback which points on conn.ReadFromUDP call>

Goroutine 553 (running) created at:
  <traceback>

Goroutine 556 (running) created at:
  <traceback>
==================
英文:

I need to create UDP connection through which I could write and read packets simultaneously. (using different goroutines and with GOMAXPROCS(n) where n>1) First attempt was something like this:

func new_conn(port, chan_buf int) (conn *net.UDPConn, inbound chan Packet, err error) {
    inbound = make(chan Packet, chan_buf)

    conn, err = net.ListenUDP(&quot;udp4&quot;, &amp;net.UDPAddr{Port: port})
    if err != nil {return}

    go func () {
        for {
            b := make([]byte, UDP_PACKET_SIZE)
            n, addr, err := conn.ReadFromUDP(b)
            if err != nil {
                log.Printf(&quot;Error: UDP read error: %v&quot;, err)
                continue
            }
            inbound &lt;- Packet{addr, b[:n]}
        }
    }
}

So to read packet I used packet := <- inbound and to write conn.WriteTo(data_bytes, remote_addr). But race detector issues warnings on simultaneous read/write on connection. So I rewrite code to something like this:

func new_conn(port, chan_buf int) (inbound, outbound chan Packet, err error) {
    inbound = make(chan Packet, chan_buf)
    outbound = make(chan Packet, chan_buf)

    conn, err = net.ListenUDP(&quot;udp4&quot;, &amp;net.UDPAddr{Port: port})
    if err != nil {return}

    go func () {
        for {
            select {
            case packet := &lt;- outbound:
                _, err := conn.WriteToUDP(packet.data, packet.addr)
                if err != nil {
                    log.Printf(&quot;Error: UDP write error: %v&quot;, err)
                    continue
                }
            default:
                b := make([]byte, UDP_PACKET_SIZE)
                n, addr, err := conn.ReadFromUDP(b)
                if err != nil {
                    log.Printf(&quot;Error: UDP read error: %v&quot;, err)
                    continue
                }
                inbound &lt;- Packet{addr, b[:n]}
            }
        }
    }
}

This code will no more trigger race condition, but have risk of blocking goroutine if there is no inbound packets. Only solution which I see is to call something like SetReadDeadline(time.Now()+10*time.Millisecond) before calling ReadFromUDP. This code will probably work, but I don't like it so much. Is there more elegant ways to solve this problem?

UPD: Warning message:

==================
WARNING: DATA RACE
Read by goroutine 553:
  net.ipToSockaddr()
      /usr/local/go/src/pkg/net/ipsock_posix.go:150 +0x18a
  net.(*UDPAddr).sockaddr()
      /usr/local/go/src/pkg/net/udpsock_posix.go:45 +0xd9
  net.(*UDPConn).WriteToUDP()
      /usr/local/go/src/pkg/net/udpsock_posix.go:123 +0x4df
  net.(*UDPConn).WriteTo()
      /usr/local/go/src/pkg/net/udpsock_posix.go:139 +0x2f6
  &lt;traceback which points on conn.WriteTo call&gt;

Previous write by goroutine 556:
  syscall.anyToSockaddr()
      /usr/local/go/src/pkg/syscall/syscall_linux.go:383 +0x336
  syscall.Recvfrom()
      /usr/local/go/src/pkg/syscall/syscall_unix.go:223 +0x15c
  net.(*netFD).ReadFrom()
      /usr/local/go/src/pkg/net/fd_unix.go:227 +0x33c
  net.(*UDPConn).ReadFromUDP()
      /usr/local/go/src/pkg/net/udpsock_posix.go:67 +0x164
  &lt;traceback which points on conn.ReadFromUDP call&gt;

Goroutine 553 (running) created at:
  &lt;traceback&gt;

Goroutine 556 (running) created at:
  &lt;traceback&gt;
==================

答案1

得分: 3

根据竞争检测器的跟踪,检测到的竞争似乎是由于在后续的写操作中重用了由读取调用返回的UDPAddr引起的。特别是,它的IP字段引用的数据。

不过目前还不清楚这是否真的是一个问题,因为syscall.ReadFrom在每次调用时都会分配一个新的地址结构,并且不会长期持有该结构。你可以尝试在将其发送到你的出站goroutine之前复制该地址。例如:

newAddr := new(net.UDPAddr)
*newAddr = *addr
newAddr.IP = make(net.IP, len(addr.IP))
copy(newAddr.IP, add.IP)

但是,如果不了解你的程序的更多信息,很难确定为什么会将其标记为竞争。也许这足以指引你朝正确的方向前进。根据你发布的内容,我无法复现这个竞争问题,你可以参考这个测试程序:http://play.golang.org/p/suDG6hCYYP

英文:

According to the trace from the race detector, the detected race appears to be due to the reuse of a UDPAddr returned by a read call in a subsequent write. In particular, the data its IP field references.

It's not clear that this is really a problem though, since syscall.ReadFrom is allocating a new address structure on every call and doesn't hold on to that structure long term. You could try copying the address prior to sending it to your outbound goroutine. For example:

newAddr := new(net.UDPAddr)
*newAddr = *addr
newAddr.IP = make(net.IP, len(addr.IP))
copy(newAddr.IP, add.IP)

But without knowing more about your program, it is difficult to tell why this is being flagged as a race. Perhaps it is enough to point you in the right direction though. I wasn't able to reproduce the race using this test program based on what you've posted: http://play.golang.org/p/suDG6hCYYP

答案2

得分: 2

为什么不同时启动两个goroutine,一个用于写入,一个用于读取,以实现全双工通信?即:

func new_conn(port, chan_buf int) (inbound, outbound chan Packet, err error) {
    inbound = make(chan Packet, chan_buf)
    outbound = make(chan Packet, chan_buf)

    conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: port})
    if err != nil {
        return
    }

    go func() {
        for packet := range outbound {
            _, err := conn.WriteToUDP(packet.data, packet.addr)
            if err != nil {
                log.Printf("Error: UDP write error: %v", err)
                continue
            }
        }
    }()

    go func() {

        b := make([]byte, UDP_PACKET_SIZE)
        for {

            n, addr, err := conn.ReadFromUDP(b)
            if err != nil {
                log.Printf("Error: UDP read error: %v", err)
                continue
            }
            b2 := make([]byte, UDP_PACKET_SIZE)
            copy(b2, b)
            inbound <- Packet{addr, b2[:n]}
        }
    }()

}
英文:

Why not start two goroutines, one for writing and one for reading and be full duplex? i.e:

func new_conn(port, chan_buf int) (inbound, outbound chan Packet, err error) {
	inbound = make(chan Packet, chan_buf)
	outbound = make(chan Packet, chan_buf)

	conn, err = net.ListenUDP(&quot;udp4&quot;, &amp;net.UDPAddr{Port: port})
	if err != nil {
		return
	}

	go func() {
		for packet := range outbound {
			_, err := conn.WriteToUDP(packet.data, packet.addr)
			if err != nil {
				log.Printf(&quot;Error: UDP write error: %v&quot;, err)
				continue
			}
		}
	}()

	go func() {

		b := make([]byte, UDP_PACKET_SIZE)
		for {

			n, addr, err := conn.ReadFromUDP(b)
			if err != nil {
				log.Printf(&quot;Error: UDP read error: %v&quot;, err)
				continue
			}
			b2 := make([]byte, UDP_PACKET_SIZE)
			copy(b2, b)
			inbound &lt;- Packet{addr, b2[:n]}
		}
	}()

}

huangapple
  • 本文由 发表于 2014年2月23日 20:36:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/21968266.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定