英文:
Lossing reference to thread in subprocess
问题
我正在构建一个应用程序,使用multiprocessing
和multithreading
模块来处理不同的子进程和线程。我在启动子进程内的线程时遇到了一些意外的结果。
我有一个子进程服务器,它从套接字读取数据并调用一个API。
ServerSubprocess:
class ServerSubprocess(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
...
self._cnnr = ApiConnector()
self._cnnr.connect()
def run(self) -> None:
while True:
...
req_params = socket.recv()
self.request_data(req_params)
def request_data(self, req_params, max_retries=5):
retries = 0
while retries < max_retries:
try:
resp = self._cnnr.request_api(**req_params)
except (ConnectionError, TimeoutError) as e:
pass
retries += 1
return resp
连接器类充当API和服务器之间的接口:
Connector class
class Connector(ConnectorInterface):
def __init__(self):
self._api = Api(name, host, port)
def connect(self, n_attempts=10):
n = 0
while n < n_attempts:
try:
self._api.connect()
except (asyncio.TimeoutError, ConnectionError):
n += 1
self._logger.info(f"Couldn't connect. Attempting again... attempt {n + 1}")
def request_api(self, sec, time_params, side):
bars = self._api.request_data(req_params)
API由一个客户端(main_thread
)组成,它连接到数据源的另一个套接字,并有一个读取线程(用于处理API请求的响应):
API client:
class Api:
def __init__(self, name: str, host: str, port: int):
self._host = host
self._port = port
self._name = name
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._read_thread = threading.Thread(group=None, target=self,
name="%s-reader" % self._name,
args=(), kwargs={}, daemon=None)
def connect(self) -> None:
self._sock.connect((self._host, self._port))
self._send_connect_message()
self._read_thread.start()
def __call__(self):
while not self._stop.is_set():
if self._read_messages():
self._process_messages()
def _process_messages():
"""Retrieves req_id from message and processes raw message"""
...
req_id, data = self._read_from_socket()
self._data_buff[req_id] = data
self._notify_response(req_id)
def _read_messages(self) -> bool:
"""Checks if socket has data"""
...
def _notify_response(self, req_id) -> bool:
"""Sets the event for a req_id"""
self._req_event[req_id].set()
def request_data(self, data) -> np.array:
req_id = self._get_next_req_id()
self._req_event[req_id] = threading.Event()
self._send_cmd(data)
self._req_event[req_id].wait(timeout=timeout)
data = self._data_buff[req_id]
return data
问题不在于Api或Connector的实现。我包含的代码是一个简化版本,用于展示组件的主要流程。问题在于使用子进程包装服务器。即使Api上的read_thread
在连接器执行其connect
方法后可以正常运行,但在request_data()
方法创建一个新的req_id
并且read_thread
检查这个req_id
的新响应时,read_thread
不会收到更新。结果,当read_thread
尝试检索不存在的键的req_id
时,会引发KeyError
异常。此外,在调试发送新请求时,似乎read_thread
被停止了,尽管它在Api初始化时已启动。这解释了为什么read_thread
没有确认新的req_id
用于新的API请求。
然而,有趣的部分是,当服务器作为线程而不是进程启动时,整个流程正常工作。这对我来说没有道理,因为这两个线程都在相同的pid
下运行。似乎一旦在Api的connect()
上启动了read_thread
,对read_thread
的引用就丢失了。显然,我在启动子进程上的线程方面有所遗漏。如有帮助,欢迎提供任何帮助。
英文:
I am building an application that handles different child processes and threads using the multiprocessing and multithreading modules. I am obtaining some unexpected results from launching threads within a subprocess.
I have a subprocess server that reads from a socket and calls an API.
ServerSubprocess:
class ServerSubprocess(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
...
self._cnnr = ApiConnector()
self._cnnr.connect()
def run(self) -> None:
while True:
...
req_params = socket.recv()
self.request_data(req_params)
def request_data(self, req_params, max_retries=5):
retries = 0
while retries < max_retries:
try:
resp = self._cnnr.request_api(**req_params)
except (ConnectionError, TimeoutError) as e:
pass
reties += 1
return resp
The connector class serves as an interface betwen the API and the server:
Connector class
class Connector(ConnectorInterface):
def __init__(self):
self._api = Api(name, host, port)
def connect(self,, n_attempts=10):
n = 0
while n < n_attempts:
try:
self._api.connect()
except (asyncio.TimeoutError, ConnectionError):
n += 1
self._logger.info(f"Couldn't connect. Attempting again... attempt {n + 1}")
def request_api(self, sec, time_params, side):
bars = self._api.request_data(req_params)
The API consists on a client (main_thread) that connects another socket to the data source and a read_thread (reads for responses to the api requests):
API client:
class Api:
def __init__(self, name: str, host: str, port: int):
self._host = host
self._port = port
self._name = name
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._read_thread = threading.Thread(group=None, target=self,
name="%s-reader" % self._name,
args=(), kwargs={}, daemon=None)
def connect(self) -> None:
self._sock.connect((self._host, self._port))
self._send_connect_message()
self._read_thread.start()
def __call__(self)
while not self._stop.is_set():
if self._read_messages():
self._process_messages()
def _process_messages():
"""Retrieves req_id from message and process raw message"""
....
req_id, data = self._read_from_socket()
self._data_buff[req_id] = data
self_notify_response(req_id)
def _read_messages(self) -> bool:
"""Checks if socket has data"""
....
def _notify_response(self, req_id) -> bool:
"""Sets the event for a req_id"""
self.__req_event[req_id].set()
def request_data(self, data) -> np.array:
req_id = self._get_next_req_id()
self._req_event[req_id] = threading.Event()
self._send_cmd(data)
self._req_event[req_id].wait(timeout=timeout)
data = self._data_buff[req_id]
return data
The issue here is not in the implementation of the Api or Connector. The code that I have included is a reduced version to show the main flow of the components. The problem lies on using a subprocess to wrap the Server. Even though the read_thread on the Api runs with no problem once the connector executes its connect method, when request_data() method creates a new request_id in the _req_event dict and then the read_thread checks for incoming responses for this req_id, no update is acknowledged on the read_thread. As a consequence, an KeyError excep is raised when the read_thread tries to retrieve the req_id for the non-existing key. Furthermore, after debugging the Api on sending a new request, the read_thread appears to be stoppped, even though it was launched on Api init. This explains why the read_thread did not acknowledged the new req_id for the new api request.
However, the interesting part is that when the server is launched as a thread rather than as a process, the whole flow works fine. This does not make sense to me since both threads live under the same pid. It seems like, once the read_thread is launched on Api connect(), the reference to the read_thread is lost. Obviously I am missing something about launching threads on a child process.
Any help is welcomed.
答案1
得分: 0
问题出在我如何连接新进程中的连接器上。连接是在子进程上下文之外创建的。线程read_thread是在父进程中启动的,而不是在子进程中。这就是为什么我失去了引用,我的意思是,我并不是真的失去了引用,因为线程从未在子进程中启动!为了解决这个问题,self._cnnr.connect()
应该在run()方法内部。
这是一个对multiprocessing.Process对象理解不正确的问题。运行时在run()方法中定义,而不是在init中。这意味着线程应该在run方法中启动,而不是在init中。
英文:
The problem was on how I was connecting the connector in the new process. The connection was created outside the context of the child process. The threads read_thread was launched in the parent process not in the child. That's why I was loosing the reference, I mean I wasn't really because the thread was never lauched in the child! To solve the issue the self._cnnr.connect()
should be inside run().
This was a problem of not understanding the multiprocessing.Process object correctly. The runtime is defined in the run() method, not in init. This means that threads should be started in the run method, not in the init.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论