在for循环中取消上下文(Context):

huangapple go评论88阅读模式
英文:

Go : Cancel Context inside a For loop

问题

我正在尝试在Golang中创建一个UDP服务器,以便监听一个端口,例如1234。我有一个客户端向该服务器发送启动/停止消息。

当接收到"start"消息时,服务器将开始向该客户端发送随机数据,并在接收到"stop"消息时停止发送。

为此,我使用上下文创建了一个goroutine来发送数据,并在接收到"stop"消息时取消它。

我遇到的问题是,程序对于一个客户端运行良好,但如果我再次启动客户端,数据就不会再次发送。

任何帮助将不胜感激。

UDP服务器代码:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"net"
	"time"
)

func generateMessageToUDP(ctx context.Context, addr *net.UDPAddr) {
	// 停止向UDP写入数据
	done := false
	fmt.Println("向UDP客户端生成消息", addr)
	conn, err := net.DialUDP("udp", nil, addr)
	if err != nil {
		fmt.Println("错误:", err)
	}
	defer func(conn *net.UDPConn) {
		err := conn.Close()
		if err != nil {
			fmt.Println("关闭UDP连接时出错:", err)
		}
	}(conn)
	// 使用UDP连接写入地址
	go func() {
		for i := 0; !done; i++ {
			RandomInt := rand.Intn(100)
			fmt.Println("随机整数:", RandomInt)
			_, err = conn.Write([]byte(fmt.Sprintf("%d", RandomInt)))
			fmt.Println("发送", RandomInt, "到", addr)
			time.Sleep(time.Second * 1)
		}
	}()
	<-ctx.Done()
	fmt.Println("停止向UDP客户端写入消息", addr)
	done = true
}

func main() {
	fmt.Println("你好,这是一个UDP服务器")
	udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
	if err != nil {
		fmt.Println("错误:", err)
	}
	defer func(udpServer *net.UDPConn) {
		err := udpServer.Close()
		if err != nil {
			fmt.Println("关闭UDP连接时出错:", err)
		}
	}(udpServer)
	// 创建缓冲区以读取数据
	buffer := make([]byte, 1024)
	ctx, cancel := context.WithCancel(context.Background())
	for {
		// 将传入的连接读入缓冲区
		n, addr, err := udpServer.ReadFromUDP(buffer)
		fmt.Println("从", addr, "接收到", string(buffer[0:n]))
		if err != nil {
			fmt.Println("错误:", err)
		}
		fmt.Println("从", addr, "接收到", string(buffer[0:n]))
		if string(buffer[0:n]) == "stop" {
			fmt.Println("停止监听")
			cancel()
			continue
		} else if string(buffer[0:n]) == "start" {
			// 向客户端发送响应
			_, err = udpServer.WriteToUDP([]byte("你好,我是一个UDP服务器"), addr)
			if err != nil {
				fmt.Println("错误:", err)
			}
			// 启动一个协程向客户端生成消息
			generateMessageToUDP(ctx, addr)
		} else {
			fmt.Println("未知命令")
		}
	}
}

客户端代码:

package main

import (
	"fmt"
	"net"
	"time"
)

func main() {
	fmt.Println("你好,我是一个客户端")

	// 创建一个新的客户端
	localAddr, err := net.ResolveUDPAddr("udp", ":5011")
	client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
	if err != nil {
		fmt.Println(err)
		return
	}
	defer client3.Close()
	_, err = client3.Write([]byte("start"))
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("消息已发送。等待5秒")
	time.Sleep(time.Second * 5)
	fmt.Println("发送停止消息")
	_, err = client3.Write([]byte("stop"))
	if err != nil {
		fmt.Println(err)
	}
}

希望对你有所帮助!

英文:

I am trying to create a UDP server in Golang to Listen at a port for eg. 1234. I have a client which sends the start/stop message to this server.

On receiving of message "start", the server will start sending random data to this client and on the stop, the server will stop sending to the client.

For this purpose, I am using context to create a goroutine to send the data and cancel it when it gets "stop".

The error I am getting is the program works fine for one client, but if I start the client again the data is not sent again.

Any help would be appreciated?

UDP server Code:

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;math/rand&quot;
&quot;net&quot;
&quot;time&quot;
)
func generateMessageToUDP(ctx context.Context, addr *net.UDPAddr) {
// stop writing to UDP
done := false
fmt.Println(&quot;Generating message to UDP client&quot;, addr)
conn, err := net.DialUDP(&quot;udp&quot;, nil, addr)
if err != nil {
fmt.Println(&quot;Error: &quot;, err)
}
defer func(conn *net.UDPConn) {
err := conn.Close()
if err != nil {
fmt.Println(&quot;Error in closing the UDP Connection: &quot;, err)
}
}(conn)
// write to address using UDP connection
go func() {
for i := 0; !done; i++ {
RandomInt := rand.Intn(100)
fmt.Println(&quot;Random Int: &quot;, RandomInt)
_, err = conn.Write([]byte(fmt.Sprintf(&quot;%d&quot;, RandomInt)))
fmt.Println(&quot;Sent &quot;, RandomInt, &quot; to &quot;, addr)
time.Sleep(time.Second * 1)
}
}()
&lt;-ctx.Done()
fmt.Println(&quot;Stopping writing to UDP client&quot;, addr)
done = true
}
//var addr *net.UDPAddr
//var conn *net.UDPConn
func main() {
fmt.Println(&quot;Hi this is a UDP server&quot;)
udpServer, err := net.ListenUDP(&quot;udp&quot;, &amp;net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
if err != nil {
fmt.Println(&quot;Error: &quot;, err)
}
defer func(udpServer *net.UDPConn) {
err := udpServer.Close()
if err != nil {
fmt.Println(&quot;Error in closing the UDP Connection: &quot;, err)
}
}(udpServer)
// create a buffer to read data into
buffer := make([]byte, 1024)
ctx, cancel := context.WithCancel(context.Background())
for {
// read the incoming connection into the buffer
n, addr, err := udpServer.ReadFromUDP(buffer)
fmt.Println(&quot;Recieved &quot;, string(buffer[0:n]), &quot; from &quot;, addr)
if err != nil {
fmt.Println(&quot;Error: &quot;, err)
}
fmt.Println(&quot;Received &quot;, string(buffer[0:n]), &quot; from &quot;, addr)
if string(buffer[0:n]) == &quot;stop&quot; {
fmt.Println(&quot;Stopped listening&quot;)
cancel()
continue
} else if string(buffer[0:n]) == &quot;start&quot; {
// send a response back to the client
_, err = udpServer.WriteToUDP([]byte(&quot;Hi, I am a UDP server&quot;), addr)
if err != nil {
fmt.Println(&quot;Error: &quot;, err)
}
// start a routine to generate messages to the client
generateMessageToUDP(ctx, addr)
} else {
fmt.Println(&quot;Unknown command&quot;)
}
}
}

Client Code:

package main
import (
&quot;fmt&quot;
&quot;net&quot;
&quot;time&quot;
)
func main() {
fmt.Println(&quot;Hello, I am a client&quot;)
// Create a new client
localAddr, err := net.ResolveUDPAddr(&quot;udp&quot;, &quot;:5011&quot;)
client3, err := net.DialUDP(&quot;udp&quot;, localAddr, &amp;net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
if err != nil {
fmt.Println(err)
return
}
defer client3.Close()
_, err = client3.Write([]byte(&quot;start&quot;))
if err != nil {
fmt.Println(err)
return
}
fmt.Println(&quot;Message sent. Sleeping for 5 seconds&quot;)
time.Sleep(time.Second * 5)
fmt.Println(&quot;Sending stop message&quot;)
_, err = client3.Write([]byte(&quot;stop&quot;))
if err != nil {
fmt.Println(err)
}
}

答案1

得分: 1

你必须注意你正在做什么。

  • 避免数据竞争(done变量在没有同步机制的情况下由两个不同的例程读写)https://go.dev/doc/articles/race_detector

  • 不要在每次程序开始向新客户端发送消息时创建一个新的拨号器。这将打开一个新的本地地址并将其用于发送给客户端。客户端将从另一个地址接收消息,通常应该忽略该地址,因为它没有启动与该远程地址的任何交换。

  • 不要混淆客户端的生命周期与程序上下文的生命周期。在提供的代码中,发送停止消息的客户端将触发整个程序的取消函数,它将停止所有客户端。为每个客户端创建一个新的上下文,派生自程序上下文,在接收到停止消息时取消相关的客户端上下文。

  • UDP连接由所有客户端共享,不能因为程序正在为一个客户端提供服务而停止监听传入的数据包。即generateMessageToUDP的调用应该在另一个例程中执行。

以下是经过修订的版本,考虑了这些评论。

添加了var peers map[string]peer以将远程地址与上下文匹配。类型peer被定义为struct {stop func();since time.Time}。在接收到启动消息时,使用派生的上下文pctx, pcancel := context.WithCancel(ctx)peer添加到map中。然后在不同的例程中为新客户端提供服务,go generateMessageToUDP(pctx, udpServer, addr),该例程与新创建的上下文和服务器套接字绑定。在接收到停止消息时,程序执行查找peer, ok := peers[addr.String()],然后取消关联的peer上下文peer.stop(); delete(peers, addr.String())并忘记peer。

package main

import (
	"context"
	"fmt"
	"math/rand"
	"net"
	"time"
)

func generateMessageToUDP(ctx context.Context, conn *net.UDPConn, addr *net.UDPAddr) {
	fmt.Println("Generating message to UDP client", addr)
	go func() {
		for i := 0; ; i++ {
			RandomInt := rand.Intn(100)
			d := []byte(fmt.Sprintf("%d", RandomInt))
			conn.WriteTo(d, addr)
			time.Sleep(time.Second * 1)
		}
	}()
	<-ctx.Done()
	fmt.Println("Stopping writing to UDP client", addr)
}

//var addr *net.UDPAddr
//var conn *net.UDPConn

func main() {
	fmt.Println("Hi this is a UDP server")
	udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
	if err != nil {
		fmt.Println("Error: ", err)
	}
	defer func(udpServer *net.UDPConn) {
		err := udpServer.Close()
		if err != nil {
			fmt.Println("Error in closing the UDP Connection: ", err)
		}
	}(udpServer)
	// create a buffer to read data into
	type peer struct {
		stop  func()
		since time.Time
	}
	peers := map[string]peer{}
	buffer := make([]byte, 1024)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	for {
		// read the incoming connection into the buffer
		n, addr, err := udpServer.ReadFromUDP(buffer)
		if err != nil {
			fmt.Println("Error: ", err)
		}
		fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
		if string(buffer[0:n]) == "stop" {
			fmt.Println("Stopped listening")
			peer, ok := peers[addr.String()]
			if !ok {
				continue
			}
			peer.stop()
			delete(peers, addr.String())
			continue
		} else if string(buffer[0:n]) == "start" {
			peer, ok := peers[addr.String()]
			if ok {
				continue
			}
			pctx, pcancel := context.WithCancel(ctx)
			peer.stop = pcancel
			peer.since = time.Now()
			peers[addr.String()] = peer
			// send a response back to the client
			_, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
			if err != nil {
				fmt.Println("Error: ", err)
			}
			// start a routine to generate messages to the client
			go generateMessageToUDP(pctx, udpServer, addr)
		} else if string(buffer[0:n]) == "ping" {
			peer, ok := peers[addr.String()]
			if !ok {
				continue
			}
			peer.since = time.Now()
			peers[addr.String()] = peer
		} else {
			fmt.Println("Unknown command")
		}
		for addr, p := range peers {
			if time.Since(p.since) > time.Minute {
				fmt.Println("Peer timedout")
				p.stop()
				delete(peers, addr)
			}
		}
	}
}
module play.ground
package main

import (
	"fmt"
	"log"
	"net"
	"time"
)

func main() {
	fmt.Println("Hello, I am a client")

	// Create a new client
	localAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:5011")
	client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
	if err != nil {
		fmt.Println(err)
		return
	}
	defer client3.Close()
	var n int
	n, err = client3.Write([]byte("start"))
	if err != nil {
		fmt.Println(err)
		return
	}
	log.Println(n)
	now := time.Now()
	b := make([]byte, 2048)
	for time.Since(now) < time.Second*10 {
		n, addr, err := client3.ReadFrom(b)
		fmt.Println(n, addr, err)
		if err != nil {
			fmt.Println(err)
			continue
		}
		if addr.String() == "127.0.0.1:5010" {
			m := b[:n]
			fmt.Println("message:", string(m))
		}
	}
	fmt.Println("Sending stop message")
	_, err = client3.Write([]byte("stop"))
	if err != nil {
		fmt.Println(err)
	}
}

    go func() {
        for i := 0; ; i++ {
            RandomInt := rand.Intn(100)
            d := []byte(fmt.Sprintf("%d", RandomInt))
            conn.WriteTo(d, addr)
            time.Sleep(time.Second * 1)
        }
    }()

我留给读者的练习是在上下文通道上编写缺失的select,以确定例程是否应该退出。

英文:

You must take care to what you are doing.

  • avoid data races (done variable is read/write by two different routines without synchronization mechanism) https://go.dev/doc/articles/race_detector

  • dont make a new dialer everytime the program start sending messages to a new client. This will open a new local address and use it to send it to the client. The client will receive messages from another address, which it should normally ignore, because it did not initiated any exchange with that remote.

  • dont mixup client lifetime span with the program context lifetime span. In the code provided a client sending a stop message will trigger the cancel function of the whole program, it will stop all clients. Make a new context for each client, derived from the program context, cancel the related client context upon receiving a stop message.

  • UDP conns are shared by all clients, they must not be stopped from listening incoming packets because the program is serving a client. IE the call to generateMessageToUDP should be executed into another routine.

Following is a revised version accounting for those comments.

A var peers map[string]peer is added to match a remote address with a context. The type peer is defined as struct {stop func();since time.Time}. Upon receiving a start message, the peer is added to the map with a derived context, pctx, pcancel := context.WithCancel(ctx). The new client is then served in a different routine, go generateMessageToUDP(pctx, udpServer, addr), which is bond to the newly created context and the server socket. Upon receiving a stop message, the program performs a lookup peer, ok := peers[addr.String()], it then cancels the associated peer context peer.stop(); delete(peers, addr.String()) and forgets the peer.

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;math/rand&quot;
&quot;net&quot;
&quot;time&quot;
)
func generateMessageToUDP(ctx context.Context, conn *net.UDPConn, addr *net.UDPAddr) {
fmt.Println(&quot;Generating message to UDP client&quot;, addr)
go func() {
for i := 0; ; i++ {
RandomInt := rand.Intn(100)
d := []byte(fmt.Sprintf(&quot;%d&quot;, RandomInt))
conn.WriteTo(d, addr)
time.Sleep(time.Second * 1)
}
}()
&lt;-ctx.Done()
fmt.Println(&quot;Stopping writing to UDP client&quot;, addr)
}
//var addr *net.UDPAddr
//var conn *net.UDPConn
func main() {
fmt.Println(&quot;Hi this is a UDP server&quot;)
udpServer, err := net.ListenUDP(&quot;udp&quot;, &amp;net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
if err != nil {
fmt.Println(&quot;Error: &quot;, err)
}
defer func(udpServer *net.UDPConn) {
err := udpServer.Close()
if err != nil {
fmt.Println(&quot;Error in closing the UDP Connection: &quot;, err)
}
}(udpServer)
// create a buffer to read data into
type peer struct {
stop  func()
since time.Time
}
peers := map[string]peer{}
buffer := make([]byte, 1024)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
// read the incoming connection into the buffer
n, addr, err := udpServer.ReadFromUDP(buffer)
if err != nil {
fmt.Println(&quot;Error: &quot;, err)
}
fmt.Println(&quot;Received &quot;, string(buffer[0:n]), &quot; from &quot;, addr)
if string(buffer[0:n]) == &quot;stop&quot; {
fmt.Println(&quot;Stopped listening&quot;)
peer, ok := peers[addr.String()]
if !ok {
continue
}
peer.stop()
delete(peers, addr.String())
continue
} else if string(buffer[0:n]) == &quot;start&quot; {
peer, ok := peers[addr.String()]
if ok {
continue
}
pctx, pcancel := context.WithCancel(ctx)
peer.stop = pcancel
peer.since = time.Now()
peers[addr.String()] = peer
// send a response back to the client
_, err = udpServer.WriteToUDP([]byte(&quot;Hi, I am a UDP server&quot;), addr)
if err != nil {
fmt.Println(&quot;Error: &quot;, err)
}
// start a routine to generate messages to the client
go generateMessageToUDP(pctx, udpServer, addr)
} else if string(buffer[0:n]) == &quot;ping&quot; {
peer, ok := peers[addr.String()]
if !ok {
continue
}
peer.since = time.Now()
peers[addr.String()] = peer
} else {
fmt.Println(&quot;Unknown command&quot;)
}
for addr, p := range peers {
if time.Since(p.since) &gt; time.Minute {
fmt.Println(&quot;Peer timedout&quot;)
p.stop()
delete(peers, addr)
}
}
}
}
-- go.mod --
module play.ground
-- client.go --
package main
import (
&quot;fmt&quot;
&quot;log&quot;
&quot;net&quot;
&quot;time&quot;
)
func main() {
fmt.Println(&quot;Hello, I am a client&quot;)
// Create a new client
localAddr, err := net.ResolveUDPAddr(&quot;udp&quot;, &quot;127.0.0.1:5011&quot;)
client3, err := net.DialUDP(&quot;udp&quot;, localAddr, &amp;net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
if err != nil {
fmt.Println(err)
return
}
defer client3.Close()
var n int
n, err = client3.Write([]byte(&quot;start&quot;))
if err != nil {
fmt.Println(err)
return
}
log.Println(n)
now := time.Now()
b := make([]byte, 2048)
for time.Since(now) &lt; time.Second*10 {
n, addr, err := client3.ReadFrom(b)
fmt.Println(n, addr, err)
if err != nil {
fmt.Println(err)
continue
}
if addr.String() == &quot;127.0.0.1:5010&quot; {
m := b[:n]
fmt.Println(&quot;message:&quot;, string(m))
}
}
fmt.Println(&quot;Sending stop message&quot;)
_, err = client3.Write([]byte(&quot;stop&quot;))
if err != nil {
fmt.Println(err)
}
}

In

    go func() {
for i := 0; ; i++ {
RandomInt := rand.Intn(100)
d := []byte(fmt.Sprintf(&quot;%d&quot;, RandomInt))
conn.WriteTo(d, addr)
time.Sleep(time.Second * 1)
}
}()

I left as an exercise to the reader the writing of the missing select on the context channel to figure out if the routine should exit.

答案2

得分: 0

好的,我会为你翻译以下内容:

好的,我在服务器上进行了一个简单的黑客攻击,在创建上下文之前添加了一个标签“开始”,当我取消上下文时,我添加了一个跳转到标签的操作。这意味着当任务被取消时,它将再次创建上下文并开始执行其工作。

英文:

Okay, I did a simple hack on the server and added a label Start before creating a context and when I cancel the context, I addded goto label. This means when the task get cancelled it will again create the context and start doings its job

huangapple
  • 本文由 发表于 2022年2月21日 20:17:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/71206167.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定