英文:
Problems with mangos - the nanomsg bus protocol provided by Golang package
问题
我想使用nanomsg
/nng
作为完全分布式对等多节点网络的通信基础,以帮助构建拓扑发现和维护的动态能力。现在我在其Golang包mangos
中遇到了困难。
相同的工作已经在Python和pynng(nanomsg的Python绑定)中完成,但是当我在Go中使用mangos
调用相应的方法时,它们的行为完全不同。主要有以下三个问题:
1)总线类型的Socket的Recv()默认以阻塞模式运行,似乎无法配置为非阻塞模式。文档中说:
OptionRecvDeadline是下一次Recv超时的时间。该值是一个time.Duration。可以传递零值表示不应用超时。负值表示非阻塞操作。默认情况下没有超时。
我尝试了一个负值,但是Recv()
仍然是阻塞的。我还应该做些什么?如何理解"零超时"和"非阻塞"之间的区别?
- 通过
(s *socket) NewDialer(...)
返回的dialer
在调用dialer.Close()
后似乎仍然存在,因为在调用下一个dialer.Dial()
时会报告错误,指示它仍然是"地址正在使用"。但是当我尝试再次Close()
dialer
时,也会报错,指示它已经关闭。我还尝试了以下选项的不同组合,但所有尝试都失败了:
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true // 或者 false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond // 或者零
opts[mangos.OptionKeepAliveTime] = time.Millisecond // 或者更小
opts[mangos.OptionKeepAlive] = false // 或者 true
当我想要完全关闭dialer,或者想要稍后重新使用"伪关闭"的dialer时,我应该怎么做?
- 总线类型的Socket的
Send()
方法很奇怪。在我的代码中,每个节点都应该定期发送一条消息。我关闭了一个节点(称为"Node-X")的物理连接,让它离线一段时间,然后重新连接到网络。我发现当Node-X重新连接时,它会立即重新发送大量的消息。但是我真正希望的是,即使Node-X没有邻居,它也可以将这些消息发送到空中。
我想知道是否有办法解决这些问题。我猜可能缺少一些选项或配置,但我无法找出它们。
以下代码用于重现重新拨号和重新关闭错误。
package main
import (
"fmt"
"os"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/bus"
// 注册传输方式
_ "go.nanomsg.org/mangos/v3/transport/all"
)
var (
sock mangos.Socket
DialerMap map[string]*mangos.Dialer
opts map[string]interface{}
)
func main() {
var err error
opts = make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true
opts[mangos.OptionMaxReconnectTime] = time.Millisecond
// opts[mangos.OptionKeepAliveTime] = time.Millisecond
opts[mangos.OptionKeepAlive] = false
DialerMap = make(map[string]*mangos.Dialer)
if sock, err = bus.NewSocket(); err != nil {
fmt.Println("bus.NewSocket error. ", err)
os.Exit(1)
}
TargetUUID := "node-A"
TargetAddr := "tcp://192.168.0.172:60000" // 这应该更改为可用的地址
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(100 * time.Second)
}
func MyDial(TargetUUID string, TargetAddr string) (mangos.Dialer, error) {
_, is_exist := DialerMap[TargetUUID]
var err error
var dialer mangos.Dialer
if !is_exist {
dialer, err = sock.NewDialer(TargetAddr, opts)
if err != nil {
} else {
DialerMap[TargetUUID] = &dialer
}
}
dialer = *DialerMap[TargetUUID]
err = dialer.Dial()
if err != nil {
fmt.Println("Dialer fails to dial()", err)
} else {
fmt.Println("Dialer succeeds to dial()")
}
return dialer, err
}
func MyClose(TargetUUID string, TargetAddr string) {
dialerAddr, is_exist := DialerMap[TargetUUID]
if !is_exist {
fmt.Println("Dialer does not exist")
}
dialer := *dialerAddr
err := dialer.Close()
if err != nil {
fmt.Println("dialer fails to close.", err)
} else {
fmt.Println("dialer succeeds to close")
}
}
控制台输出为
Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed
英文:
I'd like to use nanomsg
/nng
as the communication basis of a fully distributed peer-to-peer multi-node network, to help construct the dynamic ability of topological discovery and maintenance. Now I get stuck in its Golang package mangos
.
The same work has been done in Python and pynng (which is a python binding for nanomsg), but when I use Go and invoke the corresponding methods by mangos instead, their behaviors are totally different. The puzzle is mainly threefold:
-
The bus-type-Socket's Recv() acts in a blocking mode by default and seems not to be configurable to the non-blocking mode. Documents says:
> OptionRecvDeadline is the time until the next Recv times out. The value is a time.Duration. Zero value may be passed to indicate that no timeout should be applied. A negative value indicates a non-blocking operation. By default there is no timeout.I tried a negative value accordingly, but
Recv()
was still blocking. What else should I do? and how to understand the difference between "Zero-timeout" and "non-blocking"?
- The
dialer
returned by(s *socket) NewDialer(...)
seems to linger on after callingdialer.Close()
, since an error will occur when calling a nextdialer.Dial()
reporting it's still "address in use". But when I tried toClose()
thedialer
again, error occurs as well reporting it's already closed. I also tried different combinations of the following options, but all the attempts failed
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond // or zero
opts[mangos.OptionKeepAliveTime] = time.Millisecond // or even smaller
opts[mangos.OptionKeepAlive] = false // or true
What should I do when I want to kill the dialer completely, or want to reuse the "pseudo-closed" dialer some time later?
- The bus-type-Socket's
Send()
is strange. Normally each node is supposed to periodically send a message in my code. I shut down the physical connection of a node (say "Node-X") from the network, keep it offline for a while, and then reconnect it to the network. I found Node-X would re-send lots of messages immediately when it got reconnected. But what I really expect is that Node-X could send those messages to the air even if it has no neighbors.
I wonder if there is any way to come over these problems. I guess it could be missing some options or configurations, but I failed to figure them out.
The following code is used for reproducing the re-dial and re-close errors.
package main
import (
"fmt"
"os"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/bus"
// register transports
_ "go.nanomsg.org/mangos/v3/transport/all"
)
var (
sock mangos.Socket
DialerMap map[string]*mangos.Dialer
opts map[string]interface{}
)
func main() {
var err error
opts = make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true
opts[mangos.OptionMaxReconnectTime] = time.Millisecond
// opts[mangos.OptionKeepAliveTime] = time.Millisecond
opts[mangos.OptionKeepAlive] = false
DialerMap = make(map[string]*mangos.Dialer)
if sock, err = bus.NewSocket(); err != nil {
fmt.Println("bus.NewSocket error. ", err)
os.Exit(1)
}
TargetUUID := "node-A"
TargetAddr := "tcp://192.168.0.172:60000" // this should be changed to a available address
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyDial(TargetUUID, TargetAddr)
time.Sleep(time.Second * 2)
MyClose(TargetUUID, TargetAddr)
time.Sleep(100 * time.Second)
}
func MyDial(TargetUUID string, TargetAddr string) (mangos.Dialer, error) {
_, is_exist := DialerMap[TargetUUID]
var err error
var dialer mangos.Dialer
if !is_exist {
dialer, err = sock.NewDialer(TargetAddr, opts)
if err != nil {
} else {
DialerMap[TargetUUID] = &dialer
}
}
dialer = *DialerMap[TargetUUID]
err = dialer.Dial()
if err != nil {
fmt.Println("Dialer fails to dial()", err)
} else {
fmt.Println("Dialer succeeds to dial()")
}
return dialer, err
}
func MyClose(TargetUUID string, TargetAddr string) {
dialerAddr, is_exist := DialerMap[TargetUUID]
if !is_exist {
fmt.Println("Dialer does not exist")
}
dialer := *dialerAddr
err := dialer.Close()
if err != nil {
fmt.Println("dialer fails to close.", err)
} else {
fmt.Println("dialer succeeds to close")
}
}
and console output is
Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed
答案1
得分: 1
我通常不会监视stackoverflow或reddit上的这类问题,我们有一个discord频道(链接在mangos和NNG主页上),以及一个邮件列表。
话虽如此,让我看看我能否帮到你(我是NNG和mangos的作者):
- 对于总线,支持OptionRecvDeadline。但是,你是正确的,它不支持负值的非阻塞模式,相反,负值被视为零,并且作为阻塞操作。这是一个文档错误。要实现逻辑上的非阻塞,使用值"1",表示一纳秒,这在逻辑上等同于非阻塞,尽管粒度可能受到调度器延迟的限制。(在这种情况下,就像执行"go close(channel); <-channel"一样,几乎是非阻塞的。
我会尽快修复文档。
-
在拨号器上调用Close()是正确的做法。它会等待管道关闭,而它会自动关闭。你使用非常短的重拨时间可能会导致这个问题--坦率地说,我没有考虑过非常短的重拨时间--通常这样做是不好的,因为这意味着如果对等方不可用,你的代码将在处理器上旋转,尝试重新连接。我通常建议至少设置10毫秒的重试间隔上限。(mangos.OptionMaxReconnectTime)
-
我认为你看到的是排队的效果,但我不能百分之百确定--我需要看到一个能重现这个问题的测试案例。无论如何,总线协议是尽力而为的传递,如果没有连接的对等方,消息将被丢弃。(我刚刚重新检查了一下,以确保这一点。)
英文:
I don't usually monitor stackoverflow or reddit for questions like this -- we do have a discord channel (link from the mangos and NNG home pages), as well as a mailing list.
Having said that, let me see if I can help (I'm the author for both NNG and mangos):
- OptionRecvDeadline is supported for bus. However, you're correct that it doesn't support non-blocking mode with a negative value, instead the negative value is treated the same as zero, and acts as blocking. This is a documentation bug. To achieve a logical non-blocking, use the value "1", which means one nanosecond, and that will logically equate to non-blocking, although the granularity may be limited by the scheduler latency. (In this case it would be like doing a "go close(channel); <-channel" -- very nearly non-blocking.
I'll see about fixing the documentation.
-
Calling Close() on the dialer is the right thing to do. It will linger until the pipes are closed, which it does automatically. It is possible that your use of a very short redial time might confound this -- I'll be honest in saying that I had not considered tiny redial times -- usually it's bad form to do this because it means that if the peer is not available your code will spin hard on the processor trying to reconnect. I usually recommend at minimum a 10 millisecond retry interval cap. (mangos.OptionMaxReconnectTime)
-
I think you're seeing the effect of queueing, but I'm not 100% certain -- I'd need to see a test case reproducing this. Definitely the bus protocol is best effort delivery, and if there are no connected peers then the message is dropped on the floor. (Just rechecked that to be certain.)
答案2
得分: 0
感谢@Garrett D'Amore的回复,我现在可以用另一种方式解决我的问题了。作为一个对底层通信层知识了解有限的新的Golang爱好者,我为给你带来这样一个初级而愚蠢的问题而道歉。
问题(1)已经得到了作者的很好回答。
问题(3)可能与问题(2)有关,因为作者如下所述的机制消除了发送缓冲区累积的可能性。
显然,总线协议是尽力而为的交付,如果没有连接的对等体,消息就会被丢弃。(刚刚再次检查以确保。)
问题(2),我尝试在第一次将mangos.OptionMaxReconnectTime
设置为100 ms
,但问题仍然存在。第二次,我尝试了各种options
组合来配置套接字和拨号器,但尝试也失败了。
最后,由于作者指出:
在拨号器上调用Close()是正确的做法。它会等待管道关闭,然后自动关闭。你使用非常短的重拨时间可能会使这个过程变得复杂。
我转而采用另一种关闭旧拨号器的方法,通过显式关闭它所拥有的所有管道。为了实现这一点,可以定义一个回调处理程序,如下所示:
var pipe_c chan *mangos.Pipe
func callbackHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
pAddr := &pipe
pipe_c <- pAddr
}
然后将callbackHandler附加到套接字上:
sock.SetPipeEventHook(callbackHandler)
通过这样做,用户可以获取(私有变量)管道。当用户想要关闭拨号连接时,可以执行以下操作:
dialer.Close() // 尽力关闭拨号器
for pAddr := range pipe_c {
(*pAddr).Close() // 显式关闭拨号器的所有管道
}
然后只需将“伪关闭”的拨号器保留即可。当用户想要再次连接到远程地址时,可以创建并使用一个新的拨号器。
我不知道旧的“伪关闭”的拨号器是否会在内存中累积。但这已经是我能找到的唯一解决方案了。
英文:
Thanks to the reply by @Garrett D'Amore
, I could now solve my problems in an alternative way, and I (as a a new Golang fan with little knowledge on underlying communication layers) would like to apologize for troubling you with such an elementary and stupid question.
Problem (1) is well answered by the author.
Problem (3) might be coupled with Problem (2), since the author told the mechanism as below and thus eliminated the possibility of send buffering accumulation.
>Definitely the bus protocol is best effort delivery, and if there are no connected peers then the message is dropped on the floor. (Just rechecked that to be certain.)
Problem (2), I tried to set mangos.OptionMaxReconnectTime
to 100 ms
in a first time, but the problem still existed. In a second time, I tried out all kinds of options
combinations to configure the socket and the dialer, but attempts failed as well.
Finally, since the author pointed out that
>Calling Close() on the dialer is the right thing to do. It will linger until the pipes are closed, which it does automatically. It is possible that your use of a very short redial time might confound this.
I turn to an alternative way to shutdown an old dialer, by explicitly closing all the pipes it has. To achieve this, one could define a callback handler like
var pipe_c chan
func callbackHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
pAddr := &pipe
pipe_c <- pAddr
}
Then attach the callbackHandler to the socket
sock.SetPipeEventHook(callbackHandler)
By doing this, the (private var) pipes can be obtained by the user. When one wants to shutdown the dialing connection, he or she can do
dialer.Close() // try best to close a dialer automatically
for pAddr, num := range pipeSet {
(*pAddr).Close() // explicitly close all the pipes of the dialer
}
And just leave the "pseudo-closed" dialer alone. When one wants to connect to a remote address once again, a new dialer could be created and used.
I don't know whether the old "pseudo-closed" dialer would be accumulated in-memory. But this is already the only solution I could find.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论