sock.recv()在简单的发布/订阅应用中没有响应。

huangapple go评论49阅读模式
英文:

sock.recv() not responding in simple pub/sub app

问题

I've translated the content you provided:

我已经在Python中编写了一个简单的发布/订阅示例。以下是完整的源代码,但我遇到的问题是,当代理通过send()广播消息时,客户端没有收到recv()。其他一切似乎都正常:listen()accept()正常工作,当客户端调用send()时,代理的recv()接收到消息。

下面是两个客户端和代理之间的交互序列图:

代理                      客户端62863        客户端62867
------------------------------------------------------------
start()
                            start()
(62863) 加入。
                                                start()
(62867) 加入。
                            来自62863的问候
(62863): 来自62863的问候
(62863) => (62867)

在最后一步中,代理调用send('Hello from 62863'),但客户端62867的recv()函数没有接收到它。

有什么建议吗?

以下是完整的代理代码:

import socket
import threading

class Broker(object):

    def __init__(self, host='', port=5000):
        self._host = host
        self._port = port
        self._subscribers = []

    def start(self):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.bind((self._host, self._port))

        while True:
            """
            等待客户端请求连接,并创建一个线程来接收和转发该客户端的消息。
            """
            self._socket.listen()
            subscriber, addr = self._socket.accept()
            print(f'{addr} 加入。', flush=True)
            self._subscribers.append(subscriber)
            threading.Thread(target=self.listen_thread, args=(subscriber,)).start()

    def listen_thread(self, publisher):
        """
        等待来自发布者的消息,然后广播给所有其他订阅者。
        """
        while True:
            msg = publisher.recv(1024).decode()
            if msg is not None:
                print(f'{publisher.getpeername()} 发布: {msg}', end='', flush=True)
                self.broadcast(publisher, msg)
            else:
                print(f'{publisher.getpeername()} 已断开')
                return

    def broadcast(self, publisher, msg):
        for subscriber in self._subscribers:
            if publisher != subscriber:        # 不要发送给自己
                print(f'{publisher.getpeername()} => {subscriber.getpeername()}', flush=True)
                try:
                    subscriber.send(msg) # 注意:请看下面的解决方案!!!
                except:
                    # 损坏的套接字,从订阅者列表中移除
                    self._subscribers.remove(subscriber)

if __name__ == "__main__":
    Broker().start()

以下是相应的客户端代码:

import socket
import threading
import sys

class StdioClient(object):
    """
    一个简单的发布/订阅客户端:
    从stdin接收的任何内容都会发布到代理。
    与此同时,代理广播的任何内容都会在stdout上打印出来。
    """

    def __init__(self, host='localhost', port=5000):
        self._host = host
        self._port = port

    def start(self):
        self._sock = socket.socket()
        self._sock.connect((self._host, self._port))
        threading.Thread(target=self.stdin_to_sock).start()
        threading.Thread(target=self.sock_to_stdout).start()

    def stdin_to_sock(self):
        """
        将从stdin接收到的任何内容发送到代理。
        """
        for msg in sys.stdin:
            self._sock.send(bytes(msg, 'utf-8'))

    def sock_to_stdout(self):
        """
        在stdout上打印从代理接收到的任何内容。
        """
        while True:
            msg = self._sock.recv(1024) # <<< 这里永远不会收到消息
            print(msg.decode('utf-8'), end='', flush=True)

if __name__ == '__main__':
    StdioClient().start()

如果需要进一步的帮助或有其他问题,请告诉我。

英文:

I've written a simple pub/sub sketch in python. Full source code follows, but the issue I'm having is that when the broker broadcasts a message via send(), the client doesn't recv() anything. Everything else seems to be in order: listen() and accept() are working, and when the client calls send(), the broker's recv() gets the message.

Here's a sequence diagram of the interactions between two clients and the broker:

Broker                      Client 62863        Client 62867
------------------------------------------------------------
start()
start()
(62863) joined.
start()
(62867) joined.
Hello from 62863
(62863): Hello from 62863
(62863) =&gt; (62867)

In that last step, the Broker calls send(&#39;Hello from 62863&#39;), but Client 62867's recv() function doesn't receive it.

Any suggestions?

Here's the complete Broker code:

import socket
import threading
class Broker(object):
def __init__(self, host=&#39;&#39;, port=5000):
self._host = host
self._port = port
self._subscribers = []
def start(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.bind((self._host, self._port))
while True:
&quot;&quot;&quot;
Wait for a client to request a connection and spawn a thread to
receive and forward messages from that client.
&quot;&quot;&quot;
self._socket.listen()
subscriber, addr = self._socket.accept()
print(f&#39;{addr} joined.&#39;, flush=True)
self._subscribers.append(subscriber)
threading.Thread(target=self.listen_thread, args=(subscriber,)).start()
def listen_thread(self, publisher):
&quot;&quot;&quot;
Wait for a message to arrive from a publisher, broadcast to all other
subscribers.
&quot;&quot;&quot;
while True:
msg = publisher.recv(1024).decode()
if msg is not None:
print(f&#39;{publisher.getpeername()} published: {msg}&#39;, end=&#39;&#39;, flush=True)
self.broadcast(publisher, msg)
else:
print(f&#39;{publisher.getpeername()} has disconnected&#39;)
return
def broadcast(self, publisher, msg):
for subscriber in self._subscribers:
if publisher != subscriber:        # don&#39;t send to yourself
print(f&#39;{publisher.getpeername()} =&gt; {subscriber.getpeername()}&#39;, flush=True)
try:
subscriber.send(msg) # NOTE: See solution below!!!
except:
# broken socket, remove from subscriber list
self._subscribers.remove(subscriber)
if __name__ == &quot;__main__&quot;:
Broker().start()

And here's the corresponding Client code:

import socket
import threading
import sys
class StdioClient(object):
&quot;&quot;&quot;
A simple pub/sub client:
Anything received on stdin is published to the broker.
Concurrently, anything broadcast by the broker is printed on stdout.
&quot;&quot;&quot;
def __init__(self, host=&#39;localhost&#39;, port=5000):
self._host = host
self._port = port
def start(self):
self._sock = socket.socket()
self._sock.connect((self._host, self._port))
threading.Thread(target=self.stdin_to_sock).start()
threading.Thread(target=self.sock_to_stdout).start()
def stdin_to_sock(self):
&quot;&quot;&quot;
Send anything received on stdin to the broker.
&quot;&quot;&quot;
for msg in sys.stdin:
self._sock.send(bytes(msg, &#39;utf-8&#39;))
def sock_to_stdout(self):
&quot;&quot;&quot;
Print anything received from the broker on stdout.
&quot;&quot;&quot;
while True:
msg = self._sock.recv(1024) # &lt;&lt;&lt;= This never gets a message
print(msg.decode(&#39;utf-8&#39;), eol=&#39;&#39;, flush=True)
if __name__ == &#39;__main__&#39;:
StdioClient().start()

答案1

得分: 1

Solved. And it was a stupid, preventable, "I should have known better" error. Right here:

                try:
                    subscriber.send(msg)
                except:
                    self._subscribers.remove(subscriber)

Never ever use a blanket except: to catch errors, unless you really know what you're doing or log any errors (thank you @user207421). In this case, send() was raising an error because msg was a string, not bytes, but with the blanket except:, there was no visible error.

Moral: Always qualify your except: clauses.

(In case you're curious, the fix is this):

                    subscriber.send(bytes(msg, 'utf-8'))
英文:

Solved. And it was a stupid, preventable, "I should have known better" error. Right here:

                try:
subscriber.send(msg)
except:
self._subscribers.remove(subscriber)

Never ever use a blanket except: to catch errors, unless you <s>really know what you're doing</s>log any errors (thank you @user207421). In this case, send() was raising an error because msg was a string, not bytes, but with the blanket except:, there was no visible error.

Moral: Always qualify your except: clauses.

(In case you're curious, the fix is this):

                    subscriber.send(bytes(msg, &#39;utf-8&#39;))

huangapple
  • 本文由 发表于 2023年5月22日 23:18:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/76307659.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定