ProcessPoolExecutor使用一个应该向列表添加内容的函数未正常工作。

huangapple go评论67阅读模式
英文:

ProcessPoolExecutor using a function which is supposed to append to a list not working properly

问题

我开始使用Python来进行预测,之前用的是R。在尝试使用多进程来估计多个模型时,遇到了问题。我试图用for循环来估计相同的模型(ARIMA),成功地实现了这个目标。然而,我尝试应用多进程以提高效率时遇到了一些问题。

这是我尝试运行的函数:

arima_comp = []

def arima_tester(p,q,P,Q):
    mod = ARIMA(training_set, order = (p,1,q), seasonal_order = (P,1,Q,12))
    modfit = mod.fit()
    forecast = modfit.forecast(steps = len(test_set))
    error = forecast - test_set 
    rmse = np.square(error).mean() ** 0.5
    aic = modfit.aic
    model = "ARIMA("+str(p)+",1,"+str(q)+")("+str(P)+",1,"+str(Q)+")[12]: "
    arima_comp.append(
        {
            "Modelo":model,
            "AIC":aic,
            "RMSE":rmse
        }
    )

我的想法是估计多个模型组合,并将它们全部附加到arima_comp列表中,以便之后进行比较。它在for循环中运行良好,但是当我运行concurrent.futures.ProcessPoolExecutor.map时:

pq_aux = [0,1,2,3]
PQ_aux = [0,1]

with concurrent.futures.ProcessPoolExecutor() as executor:
    executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux)

似乎函数没有被执行,因为如果我调用arima_comp列表,它会返回一个空列表:

arima_comp
# 返回 []

你们有没有任何想法我做错了什么?当我运行这段代码时,我并没有收到错误,我只是没有得到预期的结果。谢谢你们提前。

我期望的结果与我在arima_tester函数中针对不同的p,q,P,Q组合运行for循环时相同,它会附加到arima_comp列表中,给我类似以下的东西:

arima_tester(0,0,0,0)
arima_tester(0,0,0,1)
arima_tester(0,0,1,1)
arima_comp

# 返回:[{'Modelo': 'ARIMA(0,1,0)(0,1,0)[12]: ',
  'AIC': 1198.2900239812564,
  'RMSE': 6.653975098794921},
 {'Modelo': 'ARIMA(0,1,0)(0,1,1)[12]: ',
  'AIC': 1127.5935159029652,
  'RMSE': 7.400152967890038},
 {'Modelo': 'ARIMA(0,1,0)(1,1,1)[12]: ',
  'AIC': 1125.0164173629523,
  'RMSE': 7.996184118574912}]

更新

我刚刚按照下面的答案进行了操作,现在我得到了一个“BrokenProcessPool”错误。一个最小的、可重现的导致(至少对我来说)相同错误的示例可以在下面找到:

import concurrent.futures

def function(a,b,c,d):
    sum = a+b+c+d
    halfsum = sum/2
    squaredsum = a ** 2 + b ** 2 + c ** 2 + d ** 2
    return {
        "Sum":sum,
        "Sum Divided by 2":halfsum,
        "Squared Sum": squaredsum
    }

ab_values = [0,1,2,3]
cd_values = [0,1]

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))

每当我运行这段代码时,我会得到以下错误:

---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
Cell In[4], line 6
      4 if __name__ == '__main__':
      5     with concurrent.futures.ProcessPoolExecutor() as executor:
----> 6         final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))

File c:\Users\lfval\anaconda3\lib\concurrent\futures\process.py:570, in _chain_from_iterable_of_lists(iterable)
    564 def _chain_from_iterable_of_lists(iterable):
    565     """
    566     Specialized implementation of itertools.chain.from_iterable.
    567     Each item in *iterable* should be a list.  This function is
    568     careful not to keep references to yielded objects.
    569     """
--> 570     for element in iterable:
    571         element.reverse()
    572         while element:

File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:621, in Executor.map..result_iterator()
    618 while fs:
    619     # Careful not to keep a reference to the popped future
    620     if timeout is None:
--> 621         yield _result_or_cancel(fs.pop())
    622     else,

我试图在Windows 10上使用Python 3.10.9运行此代码。

英文:

I'm starting to use Python for forecasting coming from R, and I bumped into this problem while trying to estimate multiple models using multiprocessing. I'm trying to estimate a same model (ARIMA) with multiple combinations of parameters, and had success doing it using a for-loop. However, I was trying to apply multiprocessing to increase efficiency, and I'm having some issues.

This is the function I'm trying to run:

arima_comp = []

def arima_tester(p,q,P,Q):
    mod = ARIMA(training_set, order = (p,1,q), seasonal_order = (P,1,Q,12))
    modfit = mod.fit()
    forecast = modfit.forecast(steps = len(test_set))
    error = forecast - test_set 
    rmse = np.square(error).mean() ** 0.5
    aic = modfit.aic
    model = "ARIMA("+str(p)+",1,"+str(q)+")"+"("+str(P)+",1,"+str(Q)+")[12]: "
    arima_comp.append(
        {
            "Modelo":model,
            "AIC":aic,
            "RMSE":rmse
        }
    )

The idea was to estimate multiple combinations of models and append all of them to this arima_comp list, in order to compare them aftwerwards. It worked fine with the for-loop, but when I run the concurrent.futures.ProcessPoolExecutor.map as follows:

pq_aux = [0,1,2,3]
PQ_aux = [0,1]

with concurrent.futures.ProcessPoolExecutor() as executor:
    executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux)

It seems as if the function was not executed, because if I call the arima_comp list, it returns me an empty list:

arima_comp
#Returns []

Do any of you have any idea what am I doing wrong? It's not as if I'm receiving an error when I run this code, I just don't get the expected result. Thank you in advance.

I expected the same result as if I ran a for-loop over the function for different combinations of p,q,P,Q in the arima_tester function, the it would append to arima_comp list, giving me something like that:

arima_tester(0,0,0,0)
arima_tester(0,0,0,1)
arima_tester(0,0,1,1)
arima_comp


#Returns:[{'Modelo': 'ARIMA(0,1,0)(0,1,0)[12]: ',
  'AIC': 1198.2900239812564,
  'RMSE': 6.653975098794921},
 {'Modelo': 'ARIMA(0,1,0)(0,1,1)[12]: ',
  'AIC': 1127.5935159029652,
  'RMSE': 7.400152967890038},
 {'Modelo': 'ARIMA(0,1,0)(1,1,1)[12]: ',
  'AIC': 1125.0164173629523,
  'RMSE': 7.996184118574912}]

UPDATE

I just followed the answers below, and now I'm getting a "BrokenProcessPool" error. A minimal, reproducible example that leads (at least for me) to the very same error can be found below:

import concurrent.futures

def function(a,b,c,d):
    sum = a+b+c+d
    halfsum = sum/2
    squaredsum = a ** 2 + b ** 2 + c ** 2 + d ** 2
    return {
        "Sum":sum,
        "Sum Divided by 2":halfsum,
        "Squared Sum": squaredsum
    }

ab_values = [0,1,2,3]
cd_values = [0,1]

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))

Whenever I run this code I get the following error:

---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
Cell In[4], line 6
      4 if __name__ == '__main__':
      5     with concurrent.futures.ProcessPoolExecutor() as executor:
----> 6         final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))

File c:\Users\lfval\anaconda3\lib\concurrent\futures\process.py:570, in _chain_from_iterable_of_lists(iterable)
    564 def _chain_from_iterable_of_lists(iterable):
    565     """
    566     Specialized implementation of itertools.chain.from_iterable.
    567     Each item in *iterable* should be a list.  This function is
    568     careful not to keep references to yielded objects.
    569     """
--> 570     for element in iterable:
    571         element.reverse()
    572         while element:

File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:621, in Executor.map..result_iterator()
    618 while fs:
    619     # Careful not to keep a reference to the popped future
    620     if timeout is None:
--> 621         yield _result_or_cancel(fs.pop())
    622     else:
    623         yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:319, in _result_or_cancel(***failed resolving arguments***)
    317 try:
    318     try:
--> 319         return fut.result(timeout)
    320     finally:
    321         fut.cancel()

File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:458, in Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

I'm trying to run this code on Windows 10 using Python 3.10.9.

答案1

得分: 0

你试图追加到的列表 `arima_comp` 在全局范围内定义当你创建处理池时每个进程在自己的地址空间内运行这是将多进程与多线程区分开的特性之一),将会追加到 *它自己的副本*`arima_comp`。换句话说这个列表在进程之间是不共享的因此当池子里的所有任务完成时在主进程的地址空间中存在的 `arima_comp` 副本仍然是空的因为它从未被修改过

最简单的解决方案是

1. 完全移除 `arima_comp`,并让 `arima_tester` *返回* 它之前附加到 `arima_comp` 的字典

```python
def arima_tester(p,q,P,Q):
    ... # 为简洁起见省略未修改的代码
    return {
        "Modelo":model,
        "AIC":aic,
        "RMSE":rmse
    }
  1. 你调用的 ProcessPoolExecutor.map 函数实际上返回一个可以迭代以从 arima_tester 中检索连续返回值的迭代器。因此,你只需要进行以下小的代码更改:
with concurrent.futures.ProcessPoolExecutor() as executor:
    arima_comp = list(executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux))

<details>
<summary>英文:</summary>

The list to which you are trying to append to, `arima_comp`, is defined at global scope. When you create your processing pool, each process, which is running in its own address space (one of characteristics that distinguished multiprocessing from multithreading) will be appending to *its own copy* of `arima_comp`. That is, this list is not sharable across processes. Thus when all the tasks submitted to the pool complete, the copy of `arima_comp` that exists in the main process&#39;s address will still be empty since it was never modified.

The simplest solution is to:

1. Remove `arima_comp` altogether and have `arima_tester` *return* the dictionary it was previously appending to `arima_comp`.

```python
def arima_tester(p,q,P,Q):
    ... # Unmodified code omitted for brevity
    return {
        &quot;Modelo&quot;:model,
        &quot;AIC&quot;:aic,
        &quot;RMSE&quot;:rmse
    }
  1. The ProcessPoolExecutor.map function you are calling actually returns an iterator that can be iterated to retrieve successive return values from arima_tester. Thus you just need the following minor code change:
with concurrent.futures.ProcessPoolExecutor() as executor:
    arima_comp = list(executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux))

huangapple
  • 本文由 发表于 2023年7月4日 23:59:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/76614292.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定