英文:
ProcessPoolExecutor using a function which is supposed to append to a list not working properly
问题
我开始使用Python来进行预测,之前用的是R。在尝试使用多进程来估计多个模型时,遇到了问题。我试图用for循环来估计相同的模型(ARIMA),成功地实现了这个目标。然而,我尝试应用多进程以提高效率时遇到了一些问题。
这是我尝试运行的函数:
arima_comp = []
def arima_tester(p,q,P,Q):
mod = ARIMA(training_set, order = (p,1,q), seasonal_order = (P,1,Q,12))
modfit = mod.fit()
forecast = modfit.forecast(steps = len(test_set))
error = forecast - test_set
rmse = np.square(error).mean() ** 0.5
aic = modfit.aic
model = "ARIMA("+str(p)+",1,"+str(q)+")("+str(P)+",1,"+str(Q)+")[12]: "
arima_comp.append(
{
"Modelo":model,
"AIC":aic,
"RMSE":rmse
}
)
我的想法是估计多个模型组合,并将它们全部附加到arima_comp
列表中,以便之后进行比较。它在for循环中运行良好,但是当我运行concurrent.futures.ProcessPoolExecutor.map
时:
pq_aux = [0,1,2,3]
PQ_aux = [0,1]
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux)
似乎函数没有被执行,因为如果我调用arima_comp
列表,它会返回一个空列表:
arima_comp
# 返回 []
你们有没有任何想法我做错了什么?当我运行这段代码时,我并没有收到错误,我只是没有得到预期的结果。谢谢你们提前。
我期望的结果与我在arima_tester
函数中针对不同的p,q,P,Q
组合运行for循环时相同,它会附加到arima_comp
列表中,给我类似以下的东西:
arima_tester(0,0,0,0)
arima_tester(0,0,0,1)
arima_tester(0,0,1,1)
arima_comp
# 返回:[{'Modelo': 'ARIMA(0,1,0)(0,1,0)[12]: ',
'AIC': 1198.2900239812564,
'RMSE': 6.653975098794921},
{'Modelo': 'ARIMA(0,1,0)(0,1,1)[12]: ',
'AIC': 1127.5935159029652,
'RMSE': 7.400152967890038},
{'Modelo': 'ARIMA(0,1,0)(1,1,1)[12]: ',
'AIC': 1125.0164173629523,
'RMSE': 7.996184118574912}]
更新
我刚刚按照下面的答案进行了操作,现在我得到了一个“BrokenProcessPool”错误。一个最小的、可重现的导致(至少对我来说)相同错误的示例可以在下面找到:
import concurrent.futures
def function(a,b,c,d):
sum = a+b+c+d
halfsum = sum/2
squaredsum = a ** 2 + b ** 2 + c ** 2 + d ** 2
return {
"Sum":sum,
"Sum Divided by 2":halfsum,
"Squared Sum": squaredsum
}
ab_values = [0,1,2,3]
cd_values = [0,1]
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))
每当我运行这段代码时,我会得到以下错误:
---------------------------------------------------------------------------
BrokenProcessPool Traceback (most recent call last)
Cell In[4], line 6
4 if __name__ == '__main__':
5 with concurrent.futures.ProcessPoolExecutor() as executor:
----> 6 final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))
File c:\Users\lfval\anaconda3\lib\concurrent\futures\process.py:570, in _chain_from_iterable_of_lists(iterable)
564 def _chain_from_iterable_of_lists(iterable):
565 """
566 Specialized implementation of itertools.chain.from_iterable.
567 Each item in *iterable* should be a list. This function is
568 careful not to keep references to yielded objects.
569 """
--> 570 for element in iterable:
571 element.reverse()
572 while element:
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:621, in Executor.map..result_iterator()
618 while fs:
619 # Careful not to keep a reference to the popped future
620 if timeout is None:
--> 621 yield _result_or_cancel(fs.pop())
622 else,
我试图在Windows 10上使用Python 3.10.9运行此代码。
英文:
I'm starting to use Python for forecasting coming from R, and I bumped into this problem while trying to estimate multiple models using multiprocessing. I'm trying to estimate a same model (ARIMA) with multiple combinations of parameters, and had success doing it using a for-loop. However, I was trying to apply multiprocessing to increase efficiency, and I'm having some issues.
This is the function I'm trying to run:
arima_comp = []
def arima_tester(p,q,P,Q):
mod = ARIMA(training_set, order = (p,1,q), seasonal_order = (P,1,Q,12))
modfit = mod.fit()
forecast = modfit.forecast(steps = len(test_set))
error = forecast - test_set
rmse = np.square(error).mean() ** 0.5
aic = modfit.aic
model = "ARIMA("+str(p)+",1,"+str(q)+")"+"("+str(P)+",1,"+str(Q)+")[12]: "
arima_comp.append(
{
"Modelo":model,
"AIC":aic,
"RMSE":rmse
}
)
The idea was to estimate multiple combinations of models and append all of them to this arima_comp
list, in order to compare them aftwerwards. It worked fine with the for-loop, but when I run the concurrent.futures.ProcessPoolExecutor.map
as follows:
pq_aux = [0,1,2,3]
PQ_aux = [0,1]
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux)
It seems as if the function was not executed, because if I call the arima_comp
list, it returns me an empty list:
arima_comp
#Returns []
Do any of you have any idea what am I doing wrong? It's not as if I'm receiving an error when I run this code, I just don't get the expected result. Thank you in advance.
I expected the same result as if I ran a for-loop over the function for different combinations of p,q,P,Q
in the arima_tester
function, the it would append to arima_comp
list, giving me something like that:
arima_tester(0,0,0,0)
arima_tester(0,0,0,1)
arima_tester(0,0,1,1)
arima_comp
#Returns:[{'Modelo': 'ARIMA(0,1,0)(0,1,0)[12]: ',
'AIC': 1198.2900239812564,
'RMSE': 6.653975098794921},
{'Modelo': 'ARIMA(0,1,0)(0,1,1)[12]: ',
'AIC': 1127.5935159029652,
'RMSE': 7.400152967890038},
{'Modelo': 'ARIMA(0,1,0)(1,1,1)[12]: ',
'AIC': 1125.0164173629523,
'RMSE': 7.996184118574912}]
UPDATE
I just followed the answers below, and now I'm getting a "BrokenProcessPool" error. A minimal, reproducible example that leads (at least for me) to the very same error can be found below:
import concurrent.futures
def function(a,b,c,d):
sum = a+b+c+d
halfsum = sum/2
squaredsum = a ** 2 + b ** 2 + c ** 2 + d ** 2
return {
"Sum":sum,
"Sum Divided by 2":halfsum,
"Squared Sum": squaredsum
}
ab_values = [0,1,2,3]
cd_values = [0,1]
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))
Whenever I run this code I get the following error:
---------------------------------------------------------------------------
BrokenProcessPool Traceback (most recent call last)
Cell In[4], line 6
4 if __name__ == '__main__':
5 with concurrent.futures.ProcessPoolExecutor() as executor:
----> 6 final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))
File c:\Users\lfval\anaconda3\lib\concurrent\futures\process.py:570, in _chain_from_iterable_of_lists(iterable)
564 def _chain_from_iterable_of_lists(iterable):
565 """
566 Specialized implementation of itertools.chain.from_iterable.
567 Each item in *iterable* should be a list. This function is
568 careful not to keep references to yielded objects.
569 """
--> 570 for element in iterable:
571 element.reverse()
572 while element:
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:621, in Executor.map..result_iterator()
618 while fs:
619 # Careful not to keep a reference to the popped future
620 if timeout is None:
--> 621 yield _result_or_cancel(fs.pop())
622 else:
623 yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:319, in _result_or_cancel(***failed resolving arguments***)
317 try:
318 try:
--> 319 return fut.result(timeout)
320 finally:
321 fut.cancel()
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:458, in Future.result(self, timeout)
456 raise CancelledError()
457 elif self._state == FINISHED:
--> 458 return self.__get_result()
459 else:
460 raise TimeoutError()
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:403, in Future.__get_result(self)
401 if self._exception:
402 try:
--> 403 raise self._exception
404 finally:
405 # Break a reference cycle with the exception in self._exception
406 self = None
I'm trying to run this code on Windows 10 using Python 3.10.9.
答案1
得分: 0
你试图追加到的列表 `arima_comp` 在全局范围内定义。当你创建处理池时,每个进程在自己的地址空间内运行(这是将多进程与多线程区分开的特性之一),将会追加到 *它自己的副本* 的 `arima_comp`。换句话说,这个列表在进程之间是不共享的。因此,当池子里的所有任务完成时,在主进程的地址空间中存在的 `arima_comp` 副本仍然是空的,因为它从未被修改过。
最简单的解决方案是:
1. 完全移除 `arima_comp`,并让 `arima_tester` *返回* 它之前附加到 `arima_comp` 的字典。
```python
def arima_tester(p,q,P,Q):
... # 为简洁起见省略未修改的代码
return {
"Modelo":model,
"AIC":aic,
"RMSE":rmse
}
- 你调用的
ProcessPoolExecutor.map
函数实际上返回一个可以迭代以从arima_tester
中检索连续返回值的迭代器。因此,你只需要进行以下小的代码更改:
with concurrent.futures.ProcessPoolExecutor() as executor:
arima_comp = list(executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux))
<details>
<summary>英文:</summary>
The list to which you are trying to append to, `arima_comp`, is defined at global scope. When you create your processing pool, each process, which is running in its own address space (one of characteristics that distinguished multiprocessing from multithreading) will be appending to *its own copy* of `arima_comp`. That is, this list is not sharable across processes. Thus when all the tasks submitted to the pool complete, the copy of `arima_comp` that exists in the main process's address will still be empty since it was never modified.
The simplest solution is to:
1. Remove `arima_comp` altogether and have `arima_tester` *return* the dictionary it was previously appending to `arima_comp`.
```python
def arima_tester(p,q,P,Q):
... # Unmodified code omitted for brevity
return {
"Modelo":model,
"AIC":aic,
"RMSE":rmse
}
- The
ProcessPoolExecutor.map
function you are calling actually returns an iterator that can be iterated to retrieve successive return values fromarima_tester
. Thus you just need the following minor code change:
with concurrent.futures.ProcessPoolExecutor() as executor:
arima_comp = list(executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论