Python: ProcessPoolExecutor vs ThreadPoolExecutor

huangapple go评论73阅读模式
英文:

Python: ProcessPoolExecutor vs ThreadPoolExecutor

问题

以下是您要翻译的内容:

"I have the following function that randomly shuffle the values of one column of the dataframe and use RandomForestClassifier on the overall dataframe including that column that is being randomly shuffled to get the accuracy score.

And I would like to run this function concurrently to each column of the dataframe, as dataframe is pretty large and contains 500k rows and 1k columns. The key is to only randomly shuffle one column at a time.

However, I am struggling to understand why is ProcessPoolExecutor much slower than ThreadPoolExecutor. I thought ThreadPoolExecutor is only suppose to be faster for I/O task. In this case, it doesn't involve reading from or writing to any files.

Or have I done anything wrong here ? Is there a more efficient or better way to optimize this code to make it do things concurrently and run faster?

def randomShuffle(colname, X, y, fit):
    out = {'col_name': colname}
    X_= X.copy(deep = True)
    np.random.shuffle(X_[colname].values) # permutation of a single column
    pred = fit.predict(X_)
    out['scr'] = accuracy_score(y, pred)
    return out

def runConcurrent(classifier, X,y):
    skf = KFold(n_splits=5, shuffle = False)
    acc_scr0, acc_scr1 = pd.Series(), pd.DataFrame(columns = X.columns)
    # split data to training and validation
    for i, (train_idx, val_idx) in enumerate(skf.split(X,y)):
        X_train, y_train = X.iloc[train_idx,:], y.iloc[train_idx]
        X_val, y_val = X.iloc[val_idx,:], y.iloc[val_idx]
        
        fit = classifier.fit(X=X_train, y=y_train)
        # accuracy score
        pred = fit.predict(X_val)
        acc_scr0.loc[i] = accuracy_score(y_val, pred)
        
        # with concurrent.futures.ProcessPoolExecutor() as executor:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = [executor.submit(randomShuffle, colname = j, X= X_val, y= y_val, fit = fit, labels = classifier.classes_) for j in X.columns]
            for res in concurrent.futures.as_completed(results):
                acc_scr1.loc[i, res.result()['col_name']] = res.result()['scr']
    return None
英文:

I have the following function that randomly shuffle the values of one column of the dataframe and use RandomForestClassifier on the overall dataframe including that column that is being randomly shuffled to get the accuracy score.

And I would like to run this function concurrently to each column of the dataframe, as dataframe is pretty large and contains 500k rows and 1k columns. The key is to only randomly shuffle one column at a time.

However, I am struggling to understand why is ProcessPoolExecutor much slower than ThreadPoolExecutor. I thought ThreadPoolExecutor is only suppose to be faster for I/O task. In this case, it doesn't involve reading from or writing to any files.

Or have I done anything wrong here ? Is there a more efficient or better way to optimize this code to make it do things concurrently and run faster?

def randomShuffle(colname, X, y, fit):
    out = {'col_name': colname}
    X_= X.copy(deep = True)
    np.random.shuffle(X_[colname].values) # permutation of a single column
    pred = fit.predict(X_)
    out['scr'] = accuracy_score(y, pred)
    return out

def runConcurrent(classifier, X,y):
    skf = KFold(n_splits=5, shuffle = False)
    acc_scr0, acc_scr1 = pd.Series(), pd.DataFrame(columns = X.columns)
    # split data to training and validation
    for i, (train_idx, val_idx) in enumerate(skf.split(X,y)):
        X_train, y_train = X.iloc[train_idx,:], y.iloc[train_idx]
        X_val, y_val = X.iloc[val_idx,:], y.iloc[val_idx]
        
        fit = classifier.fit(X=X_train, y=y_train)
        # accuracy score
        pred = fit.predict(X_val)
        acc_scr0.loc[i] = accuracy_score(y_val, pred)
        
        # with concurrent.futures.ProcessPoolExecutor() as executor:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = [executor.submit(randomShuffle, colname = j, X= X_val, y= y_val, fit = fit, labels = classifier.classes_) for j in X.columns]
            for res in concurrent.futures.as_completed(results):
                acc_scr1.loc[i, res.result()['col_name']] = res.result()['acc_scr']
    return None

答案1

得分: 2

这段文本中讨论了多进程编程中的一些性能和优化问题,以及如何使用chunksize参数来提高性能。以下是代码部分的翻译:

from functools import partial

...

with ProcessPoolExecutor() as executor:
    chunksize = len(points) // (executor._max_workers * 4)
    randomShuffleWrapper = partial(randomShuffle, X=X_val, y=y_val, fit=fit, labels=classifier.classes_)
    results = list(executor.map(randomShuffleWrapper, X.columns, chunksize=chunksize))

这段代码中使用了ProcessPoolExecutorchunksize参数来进行并行处理,以提高性能。

def _process_chunk(fn, chunk):
    return [fn(*args) for args in chunk]

这个函数是用来处理数据块的,它会对每个数据块中的元素调用指定的函数。

希望这些翻译能够帮助你理解代码部分的内容。

英文:

It is hard to see without testing since the speed of multiprocessing is dependent on a lot of things. First the communication overhead, so if you need to send around a lot of data it is slow, but also the amount of tasks created is important.

Creating a task has quite some overhead and has to be seen in relation to how long a method called takes to return. If a method only takes a fraction of a second to finish and you call it thousand of times, the overhead of creating a task is significant. If, on the other hand, the function takes like multiple seconds to return, the overhead is negligible.

I can't really tell how fast the randomShuffle is, but what you can do and see if it speeds up anything using the map function and a setting a chunksize.

from functools import partial

...

with ProcessPoolExecutor() as executor:
    chunksize = len(points) // (executor._max_workers * 4)
    randomShuffleWrapper = partial(randomShuffle, X=X_val, y=y_val, fit=fit, labels=classifier.classes_)
    results = list(executor.map(randomShuffleWrapper, X.columns, chunksize=chunksize))

The only thing which changes in all calls to randomShuffle is the colname. So create a partial function to set all other parameters and your new function only takes the colname as the first argument. Now we also have to set an appropriate chunksize.

This is a bit of a hyper parameter and really there is no general good value and you maybe need to try different ones to find the best. It creates chunks of your iterable and wraps your function, so that one tasks calculates the outputs for all entries in a chunk.

So if you have 1000 entries and a chunksize of 100, only 10 tasks a created, every task is calculating 100 entries. This will lead to way less overhead from creating and finishing a task.

As a starting point, I use multiprocessing.pool.Pool if chunksize isn't given. ProcessPoolExecutor.map() sets the chunksize to 1 as a default, which basically ends up in what you are already doing, creating a task for every element.

I don't have any idea how big all the things you are passing to the function are. Namely X=X_val, y=y_val, fit=fit, labels=classifier.classes_. If they are big, there will be a lot of communication overhead, since all will always be serialized sent over and deserialized. So also check if they are big and if they have to be. You normally want to only send what is absolutely necessary, and the same with the return of the function. It also should be as small as possible.

> This is why you propose using chunksize to chop things up. Is my
> understanding correct?
>
> ...
>
> One other question: say I split the column names into 4 chunks, does it mean 4 processes will be created for these 4 chunks? And for each chunk, how are the data being processed? i.e., for loop or multiprocess / multithread?

So maybe I can explain a bit more what the chunksize actually does, since it is actually quite simple and can be seen directly in the code. I am going to reference code found in Anaconda Python 3.9 python3.9/concurrent/futures/process.py.

It has the following line of code for the ProcessPoolExecutor class.

class ProcessPoolExecutor(_base.Executor):
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), timeout=timeout)

The _get_chunks just divides the iterables into equal parts of size chunksize and possibly a smaller part if the length of the iterables is not dividable by chunksize.

partial(_process_chunk, fn) creates a partial function of _process_chunk, which looks like this:

def _process_chunk(fn, chunk):
    return [fn(*args) for args in chunk]

So all it does is to iterate over every element in a chunk and call a function, in your case randomShuffle. So it just means one task does not consist of one call to your randomShuffle but chunksize many calls. All results are collected in a list and later combined.

The super().map()call means the map function from the parent class Executor is used:

class Executor(object)
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
        ...

As you can see, at this point, also only the submit function is called for all iterables. At this point, the fn is the partial function created earlier partial(_process_chunk, fn) and iterables is what _get_chunks(*iterables, chunksize=chunksize) returned (the equally-sized chunks of the original iterables). So all the map function of the ProcessPoolExecutor does is wrap your function and divide your iterables into chunks for you, before the submit is called.

All is done with the goal to reduce the number of tasks created (submit calls) by having tasks do more, in this case calling a given function for every element of some iterables.

So how do tasks actually map to processes? By creating a ProcessPoolExecutor, you create a pool of processes. The number is defined by the number of your cores on your system or it is what you define via the max_workers argument.

When the submit is called, a worker is assigned to it, so the worker receives all data necessary to run the function and returns the output of the function to the main process. This data transfer is done by serializing and deserializing the data, usually with the pickle module. This is also where a lot of overhead comes from, since transferring data between processes is slow.

So if you created ProcessPoolExecutor with max_workers=10, you can have 10 tasks executed in parallel in theory (if you got 10 cores of course). The abstraction of the pool and tasks is so you do not have to worry what tasks runs where. You just submit all that has to be done and let the ProcessPoolExecutor figure out how to best assign tasks to processes.

答案2

得分: -3

ProcessPoolExecutor和ThreadPoolExecutor之间的性能差异可以归因于Python中的全局解释器锁(GIL)。GIL只允许一个线程同时执行Python字节码,即使在多核系统上也是如此。这意味着在像训练机器学习模型这样的CPU绑定任务中,使用ProcessPoolExecutor的多个进程实际上可能会因为进程间通信的开销而减慢执行速度。

另一方面,ThreadPoolExecutor可以更有效,因为它在单个进程内利用多个线程,允许更好地利用CPU资源。特别是在任务涉及I/O操作或阻塞调用时,这一点尤为明显。

在你的情况下,由于你正在使用scikit-learn的RandomForestClassifier来训练模型,这是CPU绑定的,因此使用ThreadPoolExecutor是一个更好的选择。ProcessPoolExecutor中的进程间通信开销可能会抵消并行执行的好处。

为了进一步优化代码,你可以考虑以下建议:

  1. 在KFold中使用较小的折数以减少迭代次数。

  2. 不要在每个列的循环中创建新的fit对象,而是在循环之外创建一次,并将其作为参数传递给randomShuffle函数。

  3. 考虑使用更高效的算法或模型,可以更有效地处理大型数据集,如梯度提升算法。

英文:

The performance difference between ProcessPoolExecutor and ThreadPoolExecutor can be attributed to the Global Interpreter Lock (GIL) in Python. The GIL allows only one thread to execute Python bytecode at a time, even on multi-core systems. This means that in CPU-bound tasks like training machine learning models, using multiple processes with ProcessPoolExecutor can actually slow down the execution due to the overhead of inter-process communication.

On the other hand, ThreadPoolExecutor can be more efficient since it utilizes multiple threads within a single process, allowing for better utilization of CPU resources. This is especially true when the tasks involve I/O operations or blocking calls.

In your case, since you are using scikit-learn's RandomForestClassifier for training the model, which is CPU-bound, using ThreadPoolExecutor is a better choice. The overhead of inter-process communication in ProcessPoolExecutor can outweigh the benefits of parallel execution.

To optimize the code further, you can consider the following suggestions:

  1. Use a smaller number of folds in KFold to reduce the number of iterations.

  2. Instead of creating a new fit object for each column, you can create it once outside the loop and pass it as an argument to the randomShuffle function.

  3. Consider using a more efficient algorithm or model that can handle large datasets more effectively, such as gradient boosting algorithms.

huangapple
  • 本文由 发表于 2023年6月26日 11:01:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76553283.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定