英文:
Golang - concurrent SSH connections to multiple nodes
问题
我有一组服务器,我正在尝试建立SSH连接,并且为每个新的SSH连接生成一个新的goroutine。然后,我将该连接的结果(以及错误(如果有))发送到一个通道中,然后从通道中读取。这个程序在某种程度上可以工作,但即使我关闭了通道,它最后也会冻结。
这是我目前的代码:
package main
import (
"fmt"
"net"
"sync"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
)
// ConnectionResult container
type ConnectionResult struct {
host string
message string
}
func main() {
cnres := make(chan ConnectionResult)
ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
wg := sync.WaitGroup{}
params := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
&ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
resp, err := ec2svc.DescribeInstances(params)
if err != nil {
panic(err)
}
for _, res := range resp.Reservations {
for _, inst := range res.Instances {
for _, tag := range inst.Tags {
if *tag.Key == "Name" {
host := *tag.Value
wg.Add(1)
go func(hostname string, cr chan ConnectionResult) {
defer wg.Done()
_, err := net.Dial("tcp", host+":22")
if err != nil {
cr <- ConnectionResult{host, "failed"}
} else {
cr <- ConnectionResult{host, "succeeded"}
}
}(host, cnres)
}
}
}
}
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
close(cnres)
defer wg.Wait()
}
我做错了什么?在Go中有更好的并发SSH连接的方法吗?
英文:
I have a fleet of servers that I'm trying to establish SSH connections to, and I'm spawning a new goroutine for every new SSH connection I have to establish. I then send the results of that connection (along with the error(s) (if any)) down a channel, and then read from the channel. This program sort of works, but it freezes in the end even though I close the channel.
This is what I have so far:
package main
import (
"fmt"
"net"
"sync"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
)
// ConnectionResult container
type ConnectionResult struct {
host string
message string
}
func main() {
cnres := make(chan ConnectionResult)
ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
wg := sync.WaitGroup{}
params := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
&ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
resp, err := ec2svc.DescribeInstances(params)
if err != nil {
panic(err)
}
for _, res := range resp.Reservations {
for _, inst := range res.Instances {
for _, tag := range inst.Tags {
if *tag.Key == "Name" {
host := *tag.Value
wg.Add(1)
go func(hostname string, cr chan ConnectionResult) {
defer wg.Done()
_, err := net.Dial("tcp", host+":22")
if err != nil {
cr <- ConnectionResult{host, "failed"}
} else {
cr <- ConnectionResult{host, "succeeded"}
}
}(host, cnres)
}
}
}
}
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
close(cnres)
defer wg.Wait()
}
What am I doing wrong? Is there a better way of doing concurrent SSH connections in Go?
答案1
得分: 3
上面的代码在range cnres
的for
循环中出现了问题。正如在优秀的'Go by Example'中指出的那样,range
只会在关闭的通道上退出。
解决这个问题的一种方法是在另一个goroutine中运行range cnres
的迭代。然后你可以使用wg.Wait()
等待,然后使用close()
关闭通道,代码如下:
...
go func() {
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
}()
wg.Wait()
close(cnres)
另外,独立于代码被卡住的问题,我认为意图是在Dial()
函数和后续的通道写入中使用hostname
,而不是host
。
英文:
The code above is stuck in the range cnres
for
loop. As pointed out in the excellent 'Go by Example', range
will only exit on a closed channel.
One way to address that difficulty, is to run the range cnres
iteration in another goroutine. You could then wg.Wait()
, and then close()
the channel, as such:
...
go func() {
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
}()
wg.Wait()
close(cnres)
On a tangential note (independently of the code being stuck), I think the intention was to use hostname
in the Dial()
function, and subsequent channel writes, rather than host
.
答案2
得分: 1
感谢Frederik的帮助,我成功地运行了这个程序:
package main
import (
"fmt"
"net"
"sync"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
)
// ConnectionResult 容器
type ConnectionResult struct {
host string
message string
}
func main() {
cnres := make(chan ConnectionResult)
ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
wg := sync.WaitGroup{}
params := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
&ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
resp, err := ec2svc.DescribeInstances(params)
if err != nil {
panic(err)
}
for _, res := range resp.Reservations {
for _, inst := range res.Instances {
for _, tag := range inst.Tags {
if *tag.Key == "Name" {
host := *tag.Value
publicdnsname := *inst.PublicDNSName
wg.Add(1)
go func(ec2name, cbname string, cr chan ConnectionResult) {
defer wg.Done()
_, err := net.Dial("tcp", ec2name+":22")
if err != nil {
cr <- ConnectionResult{cbname, "failed"}
} else {
cr <- ConnectionResult{cbname, "succeeded"}
}
}(publicdnsname, host, cnres)
}
}
}
}
go func() {
for cr := range cnres {
fmt.Println("连接到" + cr.host + " " + cr.message)
}
}()
wg.Wait()
}
以上是要翻译的内容。
英文:
Thanks to Frederik, I was able to get this running successfully:
package main
import (
"fmt"
"net"
"sync"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
)
// ConnectionResult container
type ConnectionResult struct {
host string
message string
}
func main() {
cnres := make(chan ConnectionResult)
ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
wg := sync.WaitGroup{}
params := &ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
&ec2.Filter{
Name: aws.String("instance-state-name"),
Values: []*string{
aws.String("running"),
},
},
},
}
resp, err := ec2svc.DescribeInstances(params)
if err != nil {
panic(err)
}
for _, res := range resp.Reservations {
for _, inst := range res.Instances {
for _, tag := range inst.Tags {
if *tag.Key == "Name" {
host := *tag.Value
publicdnsname := *inst.PublicDNSName
wg.Add(1)
go func(ec2name, cbname string, cr chan ConnectionResult) {
defer wg.Done()
_, err := net.Dial("tcp", ec2name+":22")
if err != nil {
cr <- ConnectionResult{cbname, "failed"}
} else {
cr <- ConnectionResult{cbname, "succeeded"}
}
}(publicdnsname, host, cnres)
}
}
}
}
go func() {
for cr := range cnres {
fmt.Println("Connection to " + cr.host + " " + cr.message)
}
}()
wg.Wait()
}
答案3
得分: 0
Frederik的解决方案基本上是有效的,但有一些例外情况。如果命令组例程(从写入通道的循环中)执行的命令响应时间稍长,处理例程(根据Frederik的提示)将在最后一个命令例程完成之前处理并关闭通道,因此可能会发生一些数据丢失。
在我的情况下,我使用它来执行对多个服务器的远程SSH命令并打印响应。对我来说,有效的解决方案是使用两个单独的WaitGroup,一个用于命令组例程,另一个用于处理例程。这样,处理例程将等待所有命令例程完成,然后处理响应并关闭通道以退出循环:
// 创建waitgroup、通道并使用并发(goroutine)执行命令
outchan := make(chan CommandResult)
var wg_command sync.WaitGroup
var wg_processing sync.WaitGroup
for _, t := range validNodes {
wg_command.Add(1)
target := t + " (" + user + "@" + nodes[t] + ")"
go func(dst, user, ip, command string, out chan CommandResult) {
defer wg_command.Done()
result := remoteExec(user, ip, cmdCommand)
out <- CommandResult{dst, result}
}(target, user, nodes[t], cmdCommand, outchan)
}
wg_processing.Add(1)
go func() {
defer wg_processing.Done()
for o := range outchan {
bBlue.Println(o.target, "=>", cmdCommand)
fmt.Println(o.cmdout)
}
}()
// 等待所有goroutine完成并关闭通道
wg_command.Wait()
close(outchan)
wg_processing.Wait()
英文:
Frederik's solution works fine but with some exceptions. If command group routines (from loop which write to to the channel) execute command with a bit longer response time, processing routine (Frederik's hint) will process and close the channel, before last command routine to finish, so some data loss may occur.
In my case I'm using it to execute remote SSH command to multiple servers and to print response. Working solution for me is to use 2 separate WaitGroups, one for command group routines and second for processing routine. This way, processing routine will wait all command routines to be completed, then process response and close channel to exit for loop:
// Create waitgroup, channel and execute command with concurrency (goroutine)
outchan := make(chan CommandResult)
var wg_command sync.WaitGroup
var wg_processing sync.WaitGroup
for _, t := range validNodes {
wg_command.Add(1)
target := t + " (" + user + "@" + nodes[t] + ")"
go func(dst, user, ip, command string, out chan CommandResult) {
defer wg_command.Done()
result := remoteExec(user, ip, cmdCommand)
out <- CommandResult{dst, result}
}(target, user, nodes[t], cmdCommand, outchan)
}
wg_processing.Add(1)
go func() {
defer wg_processing.Done()
for o := range outchan {
bBlue.Println(o.target, "=>", cmdCommand)
fmt.Println(o.cmdout)
}
}()
// wait untill all goroutines to finish and close the channel
wg_command.Wait()
close(outchan)
wg_processing.Wait()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论