How to listen to a client continiously using gob in Golang

huangapple go评论98阅读模式
英文:

How to listen to a client continiously using gob in Golang

问题

在我的用例中,我想要持续监听一个TCP连接并接收值。预期的值是一个对象。所以我正在使用gob解码器从连接中接收值。我想要使用Go协程持续监听连接并接收对象。我在这里有一段代码片段[这是应用程序的一部分,代码片段无法编译]。它可以获取第一次的值,但不能接收后续的对象。

func main() {

    //...
    // SOME CODE
    //...

    
    // 所有已连接的主机;一个映射,其中键是IP地址,值是net.Conn对象
    allClients := make(map[string]net.Conn)

    tMaps := make(chan map[string]int64)

    for {
            select {
            // 接受新的客户端连接
            //
            case conn := <-newConnections:
            log.Printf("Accepted new client, #%s", hostIp)

            // 在一个goroutine中不断读取来自该客户端的传入消息,并将其推送到tMaps通道以广播给其他客户端。
            //
            go func(conn net.Conn) {
                    dec := gob.NewDecoder(conn)
                    for {
                            var tMap map[string]int64
                            err := dec.Decode(&tMap)
                            if err != nil {
                                    fmt.Println("Error in decoding ", err)
                                    break
                            }
                            log.Printf("Received values: %+v", tMap)
                            // 根据接收到的值更新节流映射
                            tMaps <- throttleMap
                    }

            }(conn)
    }
}

有人可以帮我解决这个问题吗?

英文:

In my use case I would like to continuously listen to a TCP connection and receive the value. The expected value is an object. So I am using gob decoder to receive the value from the connection. I would like to continuously listen to the connection and receive the object using go routines. I have the code snippet here[It is part of the application. code snippet does not compile]. It is getting value for the first time but not receiving for the subsequent objects.

func main() {

    //...
    // SOME CODE
    //...

    
    // All hosts who are connected; a map wherein
    // the keys are ip addreses and the values are
    //net.Conn objects
    allClients := make(map[string]net.Conn)

    tMaps := make(chan map[string]int64)

    for {
            select {
            // Accept new clients
            //
            case conn := &lt;-newConnections:
            log.Printf(&quot;Accepted new client, #%s&quot;, hostIp)

            // Constantly read incoming messages from this
            // client in a goroutine and push those onto
            // the tMaps channel for broadcast to others.
            //
            go func(conn net.Conn) {
                    dec := gob.NewDecoder(conn)
                    for {
                            var tMap map[string]int64
                            err := dec.Decode(&amp;tMap)
                            if err != nil {
                                    fmt.Println(&quot;Error in decoding &quot;, err)
                                    break
                            }
                            log.Printf(&quot;Received values: %+v&quot;, tMap)
                            //update throttle map based on the received value
                            tMaps &lt;- throttleMap
                    }

            }(conn)
    }
}

Could anyone help me on this?

答案1

得分: 4

让我们来看一下使用Go语言编写TCP服务器的基础知识。

首先,我们需要设置“监听”部分。可以像这样设置:

package main

import (
	"fmt"
	"io"
	"net"
	"time"
)

func main() {
	ln, err := net.Listen("tcp", ":9000")
	if err != nil {
		panic(err)
	}
	defer ln.Close()

	for {
		conn, err := ln.Accept()
		if err != nil {
			panic(err)
		}

		io.WriteString(conn, fmt.Sprint("Hello World\n", time.Now(), "\n"))

		conn.Close()
	}
}

请注意无限的for循环。它始终在运行并循环执行该代码。被循环执行的代码是做什么的呢?如果有一个连接进入正在监听的端口,那么该连接将被接受。然后我们对该连接执行一些操作。在这种情况下,我们使用io.WriteString向其写入响应。我们然后关闭连接。如果有更多的连接,我们准备好接受它们。

现在让我们创建一个客户端来连接到TCP服务器。这被称为“拨号”到TCP服务器。

要在您的计算机上运行所有这些代码,请运行上面的TCP服务器代码。要运行该代码,请打开终端并输入:go run main.go

现在将以下代码直接放入另一个文件中。在终端中启动另一个选项卡。通过输入以下命令也运行该代码:go run main.go

下面的代码将“拨号”到您的TCP服务器,连接到服务器,然后TCP服务器将响应并关闭连接。

以下是作为客户端连接到TCP服务器的代码:

package main

import (
	"fmt"
	"io/ioutil"
	"net"
)

func main() {
	conn, err := net.Dial("tcp", "localhost:9000")
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	bs, _ := ioutil.ReadAll(conn)
	fmt.Println(string(bs))

}

我们可以利用这些基础知识开始玩耍。

让我们创建一个“回显”服务器。

这将演示接受多个连接。

package main

import (
	"io"
	"net"
)

func main() {
	ln, err := net.Listen("tcp", ":9000")
	if err != nil {
		panic(err)
	}
	defer ln.Close()

	for {
		conn, err := ln.Accept()
		if err != nil {
			panic(err)
		}

		// 处理无限连接
		go func() {
			io.Copy(conn, conn)
			conn.Close()
		}()
	}
}

以与之前相同的方式运行上面的文件:go run main.go

如果出现错误,请确保您已关闭我们从前一个示例中运行的TCP服务器。您可以使用终端中的ctrl+c关闭TCP服务器。

现在您的新TCP服务器正在运行,让我们使用Telnet连接到它。

在Windows上,您可以安装Telnet;在Mac上,它应该已经存在。使用以下命令运行Telnet并连接到您的TCP服务器:telnet localhost 9000

现在再来一个例子 - 类似Redis的内存数据库:

package main

import (
	"bufio"
	"fmt"
	"io"
	"log"
	"net"
	"strings"
)

var data = make(map[string]string)

func handle(conn net.Conn) {
	defer conn.Close()

	scanner := bufio.NewScanner(conn)
	for scanner.Scan() {
		ln := scanner.Text()
		fs := strings.Fields(ln)

		if len(fs) < 2 {
			io.WriteString(conn, "This is an in-memory database \n"+
				"Use SET, GET, DEL like this: \n"+
				"SET key value \n"+
				"GET key \n"+
				"DEL key \n\n"+
				"For example - try these commands: \n"+
				"SET fav chocolate \n"+
				"GET fav \n\n\n")
			continue
		}

		switch fs[0] {
		case "GET":
			key := fs[1]
			value := data[key]
			fmt.Fprintf(conn, "%s\n", value)
		case "SET":
			if len(fs) != 3 {
				io.WriteString(conn, "EXPECTED VALUE\n")
				continue
			}
			key := fs[1]
			value := fs[2]
			data[key] = value
		case "DEL":
			key := fs[1]
			delete(data, key)
		default:
			io.WriteString(conn, "INVALID COMMAND "+fs[0]+"\n")
		}
	}
}

func main() {
	li, err := net.Listen("tcp", ":9000")
	if err != nil {
		log.Fatalln(err)
	}
	defer li.Close()

	for {
		conn, err := li.Accept()
		if err != nil {
			log.Fatalln(err)
		}
		handle(conn)
	}
}

并添加并发性:

package main

import (
	"bufio"
	"fmt"
	"io"
	"log"
	"net"
	"strings"
)

type Command struct {
	Fields []string
	Result chan string
}

func redisServer(commands chan Command) {
	var data = make(map[string]string)
	for cmd := range commands {
		if len(cmd.Fields) < 2 {
			cmd.Result <- "Expected at least 2 arguments"
			continue
		}

		fmt.Println("GOT COMMAND", cmd)

		switch cmd.Fields[0] {
		// GET <KEY>
		case "GET":
			key := cmd.Fields[1]
			value := data[key]

			cmd.Result <- value

		// SET <KEY> <VALUE>
		case "SET":
			if len(cmd.Fields) != 3 {
				cmd.Result <- "EXPECTED VALUE"
				continue
			}
			key := cmd.Fields[1]
			value := cmd.Fields[2]
			data[key] = value
			cmd.Result <- ""
		// DEL <KEY>
		case "DEL":
			key := cmd.Fields[1]
			delete(data, key)
			cmd.Result <- ""
		default:
			cmd.Result <- "INVALID COMMAND " + cmd.Fields[0] + "\n"
		}
	}
}

func handle(commands chan Command, conn net.Conn) {
	defer conn.Close()

	scanner := bufio.NewScanner(conn)
	for scanner.Scan() {
		ln := scanner.Text()
		fs := strings.Fields(ln)

		result := make(chan string)
		commands <- Command{
			Fields: fs,
			Result: result,
		}

		io.WriteString(conn, <-result+"\n")
	}

}

func main() {
	li, err := net.Listen("tcp", ":9000")
	if err != nil {
		log.Fatalln(err)
	}
	defer li.Close()

	commands := make(chan Command)
	go redisServer(commands)

	for {
		conn, err := li.Accept()
		if err != nil {
			log.Fatalln(err)
		}

		go handle(commands, conn)
	}
}

请参阅我在CSUF课程中描述所有这些的讲座。还有一个很棒的资源

英文:

Let's look at the basics of a TCP server in Go.

First, there is the "listening" part. We can set that up like this:

package main
import (
&quot;fmt&quot;
&quot;io&quot;
&quot;net&quot;
&quot;time&quot;
)
func main() {
ln, err := net.Listen(&quot;tcp&quot;, &quot;:9000&quot;)
if err != nil {
panic(err)
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
panic(err)
}
io.WriteString(conn, fmt.Sprint(&quot;Hello World\n&quot;, time.Now(), &quot;\n&quot;))
conn.Close()
}
}

Notice the infinite for loop. It is always running and looping over that code. What does the code that is being looped over do? If a connection comes in on the port which is being listened on, then that connection is accepted. We then do something with that connection. In this case, we write back to it with io.WriteString. To this one connection, we are sending a response. We then close the connection. And if there are more connections, we're ready to accept them.

Now let's create a client to connect to the TCP server. This is known as "dialing" in to the TCP server.

To run all of this code on your machine, run the TCP server code above. To run the code, go to your terminal and enter: go run main.go

Now put the code directly below into another file. Launch another tab in your terminal. Run that code also by entering: go run main.go

The code below which "dials" in to your TCP server will connect to the server and the TCP server will respond, then close the connection.

Here is the code for dialing into a TCP server as a client:

package main
import (
&quot;fmt&quot;
&quot;io/ioutil&quot;
&quot;net&quot;
)
func main() {
conn, err := net.Dial(&quot;tcp&quot;, &quot;localhost:9000&quot;)
if err != nil {
panic(err)
}
defer conn.Close()
bs, _ := ioutil.ReadAll(conn)
fmt.Println(string(bs))
}

We can take these basics and start having fun.

Let's create an "echo" server.

This will illustrate accepting many connections.

package main
import (
&quot;io&quot;
&quot;net&quot;
)
func main() {
ln, err := net.Listen(&quot;tcp&quot;, &quot;:9000&quot;)
if err != nil {
panic(err)
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
panic(err)
}
// handles unlimited connections
go func() {
io.Copy(conn, conn)
conn.Close()
}()
}
}

Run the file above the same way as before: go run main.go

If you get an error, make sure you have closed the TCP server we were running from the previous example. You close the TCP server with ctrl+c in the terminal.

Now that your new TCP server is running, let's connect to it using Telnet.

On windows you can install telnet; on Mac, it should already be there. Use this command to run telnet and connect to your TCP server: telnet localhost 9000

Now for one more example - an in-memory database like Redis:

package main
import (
&quot;bufio&quot;
&quot;fmt&quot;
&quot;io&quot;
&quot;log&quot;
&quot;net&quot;
&quot;strings&quot;
)
var data = make(map[string]string)
func handle(conn net.Conn) {
defer conn.Close()
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
ln := scanner.Text()
fs := strings.Fields(ln)
if len(fs) &lt; 2 {
io.WriteString(conn, &quot;This is an in-memory database \n&quot; +
&quot;Use SET, GET, DEL like this: \n&quot; +
&quot;SET key value \n&quot; +
&quot;GET key \n&quot; +
&quot;DEL key \n\n&quot; +
&quot;For example - try these commands: \n&quot; +
&quot;SET fav chocolate \n&quot; +
&quot;GET fav \n\n\n&quot;)
continue
}
switch fs[0] {
case &quot;GET&quot;:
key := fs[1]
value := data[key]
fmt.Fprintf(conn, &quot;%s\n&quot;, value)
case &quot;SET&quot;:
if len(fs) != 3 {
io.WriteString(conn, &quot;EXPECTED VALUE\n&quot;)
continue
}
key := fs[1]
value := fs[2]
data[key] = value
case &quot;DEL&quot;:
key := fs[1]
delete(data, key)
default:
io.WriteString(conn, &quot;INVALID COMMAND &quot;+fs[0]+&quot;\n&quot;)
}
}
}
func main() {
li, err := net.Listen(&quot;tcp&quot;, &quot;:9000&quot;)
if err != nil {
log.Fatalln(err)
}
defer li.Close()
for {
conn, err := li.Accept()
if err != nil {
log.Fatalln(err)
}
handle(conn)
}
}

And adding in concurrency:

package main
import (
&quot;bufio&quot;
&quot;fmt&quot;
&quot;io&quot;
&quot;log&quot;
&quot;net&quot;
&quot;strings&quot;
)
type Command struct {
Fields []string
Result chan string
}
func redisServer(commands chan Command) {
var data = make(map[string]string)
for cmd := range commands {
if len(cmd.Fields) &lt; 2 {
cmd.Result &lt;- &quot;Expected at least 2 arguments&quot;
continue
}
fmt.Println(&quot;GOT COMMAND&quot;, cmd)
switch cmd.Fields[0] {
// GET &lt;KEY&gt;
case &quot;GET&quot;:
key := cmd.Fields[1]
value := data[key]
cmd.Result &lt;- value
// SET &lt;KEY&gt; &lt;VALUE&gt;
case &quot;SET&quot;:
if len(cmd.Fields) != 3 {
cmd.Result &lt;- &quot;EXPECTED VALUE&quot;
continue
}
key := cmd.Fields[1]
value := cmd.Fields[2]
data[key] = value
cmd.Result &lt;- &quot;&quot;
// DEL &lt;KEY&gt;
case &quot;DEL&quot;:
key := cmd.Fields[1]
delete(data, key)
cmd.Result &lt;- &quot;&quot;
default:
cmd.Result &lt;- &quot;INVALID COMMAND &quot; + cmd.Fields[0] + &quot;\n&quot;
}
}
}
func handle(commands chan Command, conn net.Conn) {
defer conn.Close()
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
ln := scanner.Text()
fs := strings.Fields(ln)
result := make(chan string)
commands &lt;- Command{
Fields: fs,
Result: result,
}
io.WriteString(conn, &lt;-result+&quot;\n&quot;)
}
}
func main() {
li, err := net.Listen(&quot;tcp&quot;, &quot;:9000&quot;)
if err != nil {
log.Fatalln(err)
}
defer li.Close()
commands := make(chan Command)
go redisServer(commands)
for {
conn, err := li.Accept()
if err != nil {
log.Fatalln(err)
}
go handle(commands, conn)
}
}

See my lectures from my CSUF class describing all of this here. And one more great resource.

huangapple
  • 本文由 发表于 2015年12月10日 15:04:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/34195658.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定