在这种情况下,如何正确地循环遍历缓冲通道?

huangapple go评论79阅读模式
英文:

How to properly loop through buffered channel in this case?

问题

我正在尝试使用stdlib来玩转Go语言,制作一种使用stdlib的端口扫描器。这只是一种练习,所以请不要对其中的逻辑进行评论。

看一下下面的代码:

package main

import (
	"flag"
	"fmt"
	"net"
	"time"
	"strings"
	"strconv"
	"log"
	"sync"
)

var commonPorts = map[int]string {
	21: "ftp",
	22: "sftp",
	80: "http",
	110: "pop3",
	143: "imap",
	443: "https",
	631: "ipp",
	993: "imaps",
	995: "pop3s",
}

type OP struct {
	mu sync.Mutex
	ports []string
}

func (o *OP) SafeAdd(port string) {
	o.mu.Lock()
	defer o.mu.Unlock()
	o.ports = append(o.ports, port)
}


func worker(host string, port int) string {
	address := fmt.Sprintf("%s:%d", host, port)

	conn, err := net.DialTimeout("tcp", address, time.Second * 3)
	if err != nil {
		return ""; // is offline, cannot connect
	}
	conn.Close()

	stringI := strconv.Itoa(port)
	if name, ok := commonPorts[port]; ok {
		stringI += fmt.Sprintf("(%s)", name)
	}

	return stringI
}

func processWithChannels(host string) <-chan string{
	openPort := make(chan string, 1000)
	var wg sync.WaitGroup
	for i := 1; i <= 65535; i++ {
		wg.Add(1)
		go func(openPort chan string, host string, i int) {
			defer wg.Done()
			port := worker(host, i)
			if port != "" {
				openPort <- port
			}
		}(openPort, host, i)
	}
	wg.Wait()
	close(openPort)
	return openPort
}

func main() {
	var host = flag.String("host", "127.0.0.1", "please insert the host")
	var pType = flag.Int("type", 2, "please insert the type")

	flag.Parse()
	fmt.Printf("Scanning: %s...\n", *host)

	if _, err := net.LookupHost(*host); err != nil {
		log.Fatal(err)
	}

	openPorts := &OP{ports: []string{}};

	if *pType == 1 {

		ports := processWithChannels(*host);
		for port := range ports {
			openPorts.SafeAdd(port)
		}

	} else {

		var wg sync.WaitGroup
		for i := 1; i <= 65535; i++ {
			wg.Add(1)
			go func(o *OP, host string, i int){
				defer wg.Done()
				if port := worker(host, i); port != "" {
					o.SafeAdd(port)
				}
			}(openPorts, *host, i)
		}
		wg.Wait()

	}

	if len(openPorts.ports) > 0 {
		fmt.Printf("Following ports are opened: %s\n", strings.Join(openPorts.ports, ", "))
	} else {
		fmt.Printf("No open port on the host: %s!\n", *host)
	}
}

有两种启动扫描的方式,一种是使用带缓冲的通道,另一种是使用sync.GroupWait,在所有扫描完成后退出。

在这种情况下,我认为使用sync.GroupWait比使用带缓冲的通道更合理,因为使用带缓冲的通道并循环直到通道为空,我看不到一种方法来检测通道上没有其他内容,我应该从for循环中退出,除非使用另一个sync.WaitGroup块。

我想问的问题是,如果我只想使用带缓冲的通道解决方案,如何正确实现它,以便我知道处理何时完成,以便我可以继续执行其他代码?(请不要建议使用超时)。

此外,以下是两种类型的小型基准测试,以供参考:

MacBook-Pro:PortScanner c$ time ./PortScanner -host yahoo.com -type 1
Scanning: yahoo.com...
Following ports are opened: 80(http), 143(imap), 110(pop3), 995(pop3s), 993(imaps)

real    0m4.620s
user    0m1.193s
sys     0m1.284s
MacBook-Pro:PortScanner c$ time ./PortScanner -host yahoo.com -type 2
Scanning: yahoo.com...
Following ports are opened: 110(pop3), 80(http), 143(imap), 995(pop3s), 993(imaps)

real    0m4.055s
user    0m1.051s
sys     0m0.946s
英文:

I am trying to play around with go to make some kind of port scanner using the stdlib. This is more of an exercise than anything else, so please don't comment on the logic involved.

Looking at the following code:

package main
import (
&quot;flag&quot;
&quot;fmt&quot;
&quot;net&quot;
&quot;time&quot;
&quot;strings&quot;
&quot;strconv&quot;
&quot;log&quot;
&quot;sync&quot;
)
var commonPorts = map[int]string {
21: &quot;ftp&quot;,
22: &quot;sftp&quot;,
80: &quot;http&quot;,
110: &quot;pop3&quot;,
143: &quot;imap&quot;,
443: &quot;https&quot;,
631: &quot;ipp&quot;,
993: &quot;imaps&quot;,
995: &quot;pop3s&quot;,
}
type OP struct {
mu sync.Mutex
ports []string
}
func (o *OP) SafeAdd(port string) {
o.mu.Lock()
defer o.mu.Unlock()
o.ports = append(o.ports, port)
}
func worker(host string, port int) string {
address := fmt.Sprintf(&quot;%s:%d&quot;, host, port)
conn, err := net.DialTimeout(&quot;tcp&quot;, address, time.Second * 3)
if err != nil {
return &quot;&quot;; // is offline, cannot connect
}
conn.Close()
stringI := strconv.Itoa(port)
if name, ok := commonPorts[port]; ok {
stringI += fmt.Sprintf(&quot;(%s)&quot;, name)
}
return stringI
}
func processWithChannels(host string) &lt;-chan string{
openPort := make(chan string, 1000)
var wg sync.WaitGroup
for i := 1; i &lt;= 65535; i++ {
wg.Add(1)
go func(openPort chan string, host string, i int) {
defer wg.Done()
port := worker(host, i)
if port != &quot;&quot; {
openPort &lt;- port
}
}(openPort, host, i)
}
wg.Wait()
close(openPort)
return openPort
}
func main() {
var host = flag.String(&quot;host&quot;, &quot;127.0.0.1&quot;, &quot;please insert the host&quot;)
var pType = flag.Int(&quot;type&quot;, 2, &quot;please insert the type&quot;)
flag.Parse()
fmt.Printf(&quot;Scanning: %s...\n&quot;, *host)
if _, err := net.LookupHost(*host); err != nil {
log.Fatal(err)
}
openPorts := &amp;OP{ports: []string{}};
if *pType == 1 {
ports := processWithChannels(*host);
for port := range ports {
openPorts.SafeAdd(port)
}
} else {
var wg sync.WaitGroup
for i := 1; i &lt;= 65535; i++ {
wg.Add(1)
go func(o *OP, host string, i int){
defer wg.Done()
if port := worker(host, i); port != &quot;&quot; {
o.SafeAdd(port)
}
}(openPorts, *host, i)
}
wg.Wait()
}
if len(openPorts.ports) &gt; 0 {
fmt.Printf(&quot;Following ports are opened: %s\n&quot;, strings.Join(openPorts.ports, &quot;, &quot;))
} else {
fmt.Printf(&quot;No open port on the host: %s!\n&quot;, *host)
}
}

There are two ways of starting a scan, either by using a buffered channel or by using sync.GroupWait and bail out once all the scans are done.

It seems to me that in this case, using sync.GroupWait makes more sense than using a buffered channel and loop through it till it's empty. However, using a buffered channel here, i don't see a way to detect that there's nothing else on the channel and that i should bail out from the for loop, except by using another sync.WaitGroup block.

I think my question is, in case i want to use the buffered channel solution only, how do i implement it properly so that i know when the processing is done so that i can proceed with the rest of the code? (don't suggest timeouts please).

Here's also a small benchmark with the two types, in case anyone interested:

MacBook-Pro:PortScanner c$ time ./PortScanner -host yahoo.com -type 1
Scanning: yahoo.com...
Following ports are opened: 80(http), 143(imap), 110(pop3), 995(pop3s), 993(imaps)
real    0m4.620s
user    0m1.193s
sys     0m1.284s
MacBook-Pro:PortScanner c$ time ./PortScanner -host yahoo.com -type 2
Scanning: yahoo.com...
Following ports are opened: 110(pop3), 80(http), 143(imap), 995(pop3s), 993(imaps)
real    0m4.055s
user    0m1.051s
sys     0m0.946s

答案1

得分: 2

调用processWithChannels会在需要将超过1000个项目放入通道时挂起。如果要使用缓冲通道来保存所有值直到处理完成,通道的容量必须足够接受所有值。

如果要将所有值收集到单个切片中,那么没有必要使用通道,你的第二个解决方案就可以了。

如果你想尽快“流式”返回端口,那么需要在这两个解决方案之间加入一些东西。

ports := make(chan string)

var wg sync.WaitGroup
for i := 1; i <= 65535; i++ {
    wg.Add(1)
    go func(i int) {
        defer wg.Done()
        if port := worker(*host, i); port != "" {
            ports <- port
        }
    }(i)
}

go func() {
    wg.Wait()
    close(ports)
}()

for port := range ports {
    fmt.Println("PORT:", port)
}

然而,这种方法很可能会遇到问题,比如当你同时拨号所有65535个端口时可能会丢失打开的端口。下面是一种可能的模式,使用一个工作池并发拨号:

ports := make(chan string)
toScan := make(chan int)
var wg sync.WaitGroup

// 创建100个用于拨号的工作线程
for i := 0; i < 100; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        for p := range toScan {
            ports <- worker(*host, p)
        }
    }()
}

// 所有工作线程完成后关闭接收端口的通道
go func() {
    wg.Wait()
    close(ports)
}()

// 将端口发送到工作池
go func() {
    for i := 1; i <= 65535; i++ {
        toScan <- i
    }
    // 发送关闭信号给工作线程
    close(toScan)
}()

for port := range ports {
    if port != "" {
        fmt.Println("PORT:", port)
    }
}
英文:

The call to processWithChannels will hang if you need to put more than 1000 items into the channel. If you're going to use a buffered channel to hold all values until processing, there has to be enough capacity to accept all values.

If you are going to collect all values into a single slice, then there's no reason to use a channel, and your second solution is just fine.

If you want to "stream" the ports back as soon as possible, then you need something in between the two solutions

ports := make(chan string)
var wg sync.WaitGroup
for i := 1; i &lt;= 65535; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
if port := worker(*host, i); port != &quot;&quot; {
ports &lt;- port
}
}(i)
}
go func() {
wg.Wait()
close(ports)
}()
for port := range ports {
fmt.Println(&quot;PORT:&quot;, port)
}

This however is likely to run into problems, like missing open ports when you dial all 65535 ports at the same time. Here is one possible pattern to use a pool of workers to dial concurrently:

ports := make(chan string)
toScan := make(chan int)
var wg sync.WaitGroup
// make 100 workers for dialing
for i := 0; i &lt; 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for p := range toScan {
ports &lt;- worker(*host, p)
}
}()
}
// close our receiving ports channel once all workers are done
go func() {
wg.Wait()
close(ports)
}()
// feed the ports to the worker pool
go func() {
for i := 1; i &lt;= 65535; i++ {
toScan &lt;- i
}
// signal the workers to stop
close(toScan)
}()
for port := range ports {
if port != &quot;&quot; {
fmt.Println(&quot;PORT:&quot;, port)
}
}

huangapple
  • 本文由 发表于 2015年7月23日 03:52:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/31572753.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定