英文:
sock.recv() not responding in simple pub/sub app
问题
I've translated the content you provided:
我已经在Python中编写了一个简单的发布/订阅示例。以下是完整的源代码,但我遇到的问题是,当代理通过send()
广播消息时,客户端没有收到recv()
。其他一切似乎都正常:listen()
和accept()
正常工作,当客户端调用send()
时,代理的recv()
接收到消息。
下面是两个客户端和代理之间的交互序列图:
代理 客户端62863 客户端62867
------------------------------------------------------------
start()
start()
(62863) 加入。
start()
(62867) 加入。
来自62863的问候
(62863): 来自62863的问候
(62863) => (62867)
在最后一步中,代理调用send('Hello from 62863')
,但客户端62867的recv()
函数没有接收到它。
有什么建议吗?
以下是完整的代理代码:
import socket
import threading
class Broker(object):
def __init__(self, host='', port=5000):
self._host = host
self._port = port
self._subscribers = []
def start(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.bind((self._host, self._port))
while True:
"""
等待客户端请求连接,并创建一个线程来接收和转发该客户端的消息。
"""
self._socket.listen()
subscriber, addr = self._socket.accept()
print(f'{addr} 加入。', flush=True)
self._subscribers.append(subscriber)
threading.Thread(target=self.listen_thread, args=(subscriber,)).start()
def listen_thread(self, publisher):
"""
等待来自发布者的消息,然后广播给所有其他订阅者。
"""
while True:
msg = publisher.recv(1024).decode()
if msg is not None:
print(f'{publisher.getpeername()} 发布: {msg}', end='', flush=True)
self.broadcast(publisher, msg)
else:
print(f'{publisher.getpeername()} 已断开')
return
def broadcast(self, publisher, msg):
for subscriber in self._subscribers:
if publisher != subscriber: # 不要发送给自己
print(f'{publisher.getpeername()} => {subscriber.getpeername()}', flush=True)
try:
subscriber.send(msg) # 注意:请看下面的解决方案!!!
except:
# 损坏的套接字,从订阅者列表中移除
self._subscribers.remove(subscriber)
if __name__ == "__main__":
Broker().start()
以下是相应的客户端代码:
import socket
import threading
import sys
class StdioClient(object):
"""
一个简单的发布/订阅客户端:
从stdin接收的任何内容都会发布到代理。
与此同时,代理广播的任何内容都会在stdout上打印出来。
"""
def __init__(self, host='localhost', port=5000):
self._host = host
self._port = port
def start(self):
self._sock = socket.socket()
self._sock.connect((self._host, self._port))
threading.Thread(target=self.stdin_to_sock).start()
threading.Thread(target=self.sock_to_stdout).start()
def stdin_to_sock(self):
"""
将从stdin接收到的任何内容发送到代理。
"""
for msg in sys.stdin:
self._sock.send(bytes(msg, 'utf-8'))
def sock_to_stdout(self):
"""
在stdout上打印从代理接收到的任何内容。
"""
while True:
msg = self._sock.recv(1024) # <<< 这里永远不会收到消息
print(msg.decode('utf-8'), end='', flush=True)
if __name__ == '__main__':
StdioClient().start()
如果需要进一步的帮助或有其他问题,请告诉我。
英文:
I've written a simple pub/sub sketch in python. Full source code follows, but the issue I'm having is that when the broker broadcasts a message via send()
, the client doesn't recv()
anything. Everything else seems to be in order: listen()
and accept()
are working, and when the client calls send()
, the broker's recv()
gets the message.
Here's a sequence diagram of the interactions between two clients and the broker:
Broker Client 62863 Client 62867
------------------------------------------------------------
start()
start()
(62863) joined.
start()
(62867) joined.
Hello from 62863
(62863): Hello from 62863
(62863) => (62867)
In that last step, the Broker calls send('Hello from 62863')
, but Client 62867's recv()
function doesn't receive it.
Any suggestions?
Here's the complete Broker code:
import socket
import threading
class Broker(object):
def __init__(self, host='', port=5000):
self._host = host
self._port = port
self._subscribers = []
def start(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.bind((self._host, self._port))
while True:
"""
Wait for a client to request a connection and spawn a thread to
receive and forward messages from that client.
"""
self._socket.listen()
subscriber, addr = self._socket.accept()
print(f'{addr} joined.', flush=True)
self._subscribers.append(subscriber)
threading.Thread(target=self.listen_thread, args=(subscriber,)).start()
def listen_thread(self, publisher):
"""
Wait for a message to arrive from a publisher, broadcast to all other
subscribers.
"""
while True:
msg = publisher.recv(1024).decode()
if msg is not None:
print(f'{publisher.getpeername()} published: {msg}', end='', flush=True)
self.broadcast(publisher, msg)
else:
print(f'{publisher.getpeername()} has disconnected')
return
def broadcast(self, publisher, msg):
for subscriber in self._subscribers:
if publisher != subscriber: # don't send to yourself
print(f'{publisher.getpeername()} => {subscriber.getpeername()}', flush=True)
try:
subscriber.send(msg) # NOTE: See solution below!!!
except:
# broken socket, remove from subscriber list
self._subscribers.remove(subscriber)
if __name__ == "__main__":
Broker().start()
And here's the corresponding Client code:
import socket
import threading
import sys
class StdioClient(object):
"""
A simple pub/sub client:
Anything received on stdin is published to the broker.
Concurrently, anything broadcast by the broker is printed on stdout.
"""
def __init__(self, host='localhost', port=5000):
self._host = host
self._port = port
def start(self):
self._sock = socket.socket()
self._sock.connect((self._host, self._port))
threading.Thread(target=self.stdin_to_sock).start()
threading.Thread(target=self.sock_to_stdout).start()
def stdin_to_sock(self):
"""
Send anything received on stdin to the broker.
"""
for msg in sys.stdin:
self._sock.send(bytes(msg, 'utf-8'))
def sock_to_stdout(self):
"""
Print anything received from the broker on stdout.
"""
while True:
msg = self._sock.recv(1024) # <<<= This never gets a message
print(msg.decode('utf-8'), eol='', flush=True)
if __name__ == '__main__':
StdioClient().start()
答案1
得分: 1
Solved. And it was a stupid, preventable, "I should have known better" error. Right here:
try:
subscriber.send(msg)
except:
self._subscribers.remove(subscriber)
Never ever use a blanket except:
to catch errors, unless you really know what you're doing or log any errors (thank you @user207421). In this case, send()
was raising an error because msg
was a string, not bytes, but with the blanket except:
, there was no visible error.
Moral: Always qualify your except:
clauses.
(In case you're curious, the fix is this):
subscriber.send(bytes(msg, 'utf-8'))
英文:
Solved. And it was a stupid, preventable, "I should have known better" error. Right here:
try:
subscriber.send(msg)
except:
self._subscribers.remove(subscriber)
Never ever use a blanket except:
to catch errors, unless you <s>really know what you're doing</s>log any errors (thank you @user207421). In this case, send()
was raising an error because msg
was a string, not bytes, but with the blanket except:
, there was no visible error.
Moral: Always qualify your except:
clauses.
(In case you're curious, the fix is this):
subscriber.send(bytes(msg, 'utf-8'))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论