Performance degradation with increasing threads in Python multiprocessing.

huangapple go评论77阅读模式
英文:

Performance degradation with increasing threads in Python multiprocessing

问题

I have a machine with 24 cores and 2 threads per core. I'm trying to optimize the following code for parallel execution. However, I noticed that the code's performance starts to degrade after a certain number of threads.

import argparse
import glob
import h5py
import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
import time
import datetime
from multiprocessing import Pool, cpu_count, Lock
import multiprocessing
import cProfile, pstats, io

def process_parcel_file(f, bands, mask):
start_time = time.time()
test = xr.open_dataset(f)
print(f"Elapsed in process_parcel_file for reading dataset: {time.time() - start_time}")

start_time = time.time()
subset = test[bands + ['SCL']].copy()
subset = subset.where(subset != 0, np.nan)
if mask:
    subset = subset.where((subset.SCL >= 3) & (subset.SCL < 7))
subset = subset[bands]

# Adding a new dimension week_year and performing grouping
subset['week_year'] = subset.time.dt.strftime('%Y-%U')
subset = subset.groupby('week_year').mean().sortby('week_year')
subset['id'] = test['id'].copy()

# Store the dates and counting pixels for each parcel
dates = subset.week_year.values
n_pixels = test[['id', 'SCL']].groupby('id').count()['SCL'][:, 0].values.reshape(-1, 1)

# Converting to dataframe
grouped_sum = subset.groupby('id').sum()
ids = grouped_sum.id.values
grouped_sum = grouped_sum.to_array().values
grouped_sum = np.swapaxes(grouped_sum, 0, 1)
grouped_sum = grouped_sum.reshape((grouped_sum.shape[0], -1))
colnames = ["{}_{}".format(b, str(x).split('T')[0]) for b in bands for x in dates] + ['count']
values = np.hstack((grouped_sum, n_pixels))
df = pd.DataFrame(values, columns=colnames)
df.insert(0, 'id', ids)
print(f"Elapsed in process_parcel_file til end: {time.time() - start_time}")
return df

def fs_creation(input_dir, out_file, labels_to_keep=None, th=0.1, n=64, days=5, total_days=180, mask=False,
mode='s2', method='patch', bands=['B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B8A', 'B11', 'B12']):
files = glob.glob(input_dir)
times_pool = [] # For storing execution times
times_seq = []
cpu_counts = list(range(2, multiprocessing.cpu_count() + 1, 4)) # The different CPU counts to use

for count in cpu_counts:
    print(f"Executing with {count} threads")
    if method == 'parcel':
        start_pool = time.time()
        with Pool(count) as pool:
            arguments = [(f, bands, mask) for f in files]
            dfs = list(tqdm(pool.starmap(process_parcel_file, arguments), total=len(arguments)))

        end_pool = time.time()
        start_seq = time.time()
        dfs = pd.concat(dfs)
        dfs = dfs.groupby('id').sum()
        counts = dfs['count'].copy()
        dfs = dfs.div(dfs['count'], axis=0)
        dfs['count'] = counts
        dfs.drop(index=-1).to_csv(out_file)
        end_seq = time.time()
        times_pool.append(end_pool - start_pool)
        times_seq.append(end_seq - start_seq)

pd.DataFrame({'CPU_count': cpu_counts, 'Time pool': times_pool, 
              'Time seq' : times_seq}).to_csv('cpu_times.csv', index=False)

return 0

When executing the code, it scales well up to around 7-8 threads, but after that, the performance starts to deteriorate. I have profiled the code, and it seems that each thread takes more time to execute the same code.

For example, with 2 threads:

Elapsed in process_parcel_file for reading dataset: 0.012271404266357422
Elapsed in process_parcel_file til end: 1.6681673526763916
Elapsed in process_parcel_file for reading dataset: 0.014229536056518555
Elapsed in process_parcel_file til end: 1.5836331844329834

However, with 22 threads:

Elapsed in process_parcel_file for reading dataset: 0.17968058586120605
Elapsed in process_parcel_file til end: 12.049026727676392
Elapsed in process_parcel_file for reading dataset: 0.052398681640625
Elapsed in process_parcel_file til end: 6.014119625091553

I'm struggling to understand why the performance degrades with more threads. I've already verified that the system has the required number of cores and threads.

I would appreciate any guidance or suggestions to help me identify the cause of this issue and optimize the code for better performance.

It's really hard for me to provide a minimal working example so take that into account.

Thank you in advance.

Edit:
The files are around 80MB each. I have 451 files.
I added the following code to profile the function:

...
start_time = time.time()
mem_usage_start = memory_usage(-1, interval=0.1, timeout=None)[0]
cpu_usage_start = psutil.cpu_percent(interval=None)
test = xr.open_dataset(f)
times['read_dataset'] = time.time() - start_time
memory['read_dataset'] = memory_usage(-1, interval=0.1, timeout=None)[0] - mem_usage_start
cpu_usage['read_dataset'] = psutil.cpu_percent(interval=None) - cpu_usage_start
...

And more code for each line in a similar fashion.
I used the libraries memory_profiler and psutil, and I have the information for each thread.
CSV's with the results are available here:
CSV Results

The results identify each line in the function with the number of CPUs selected, so each one is a thread.

Edit2:

Here I have a report of a subset of the data, where you can clearly see what each thread is doing, and how some threads are getting less work than others:

Subset Report

英文:

I have a machine with 24 cores and 2 threads per core. I'm trying to optimize the following code for parallel execution. However, I noticed that the code's performance starts to degrade after a certain number of threads.

import argparse
import glob
import h5py
import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
import time
import datetime
from multiprocessing import Pool, cpu_count, Lock
import multiprocessing
import cProfile, pstats, io


def process_parcel_file(f, bands, mask):
    start_time = time.time()
    test = xr.open_dataset(f)
    print(f&quot;Elapsed in process_parcel_file for reading dataset: {time.time() - start_time}&quot;)

    start_time = time.time()
    subset = test[bands + [&#39;SCL&#39;]].copy()
    subset = subset.where(subset != 0, np.nan)
    if mask:
        subset = subset.where((subset.SCL &gt;= 3) &amp; (subset.SCL &lt; 7))
    subset = subset[bands]

    # Adding a new dimension week_year and performing grouping
    subset[&#39;week_year&#39;] = subset.time.dt.strftime(&#39;%Y-%U&#39;)
    subset = subset.groupby(&#39;week_year&#39;).mean().sortby(&#39;week_year&#39;)
    subset[&#39;id&#39;] = test[&#39;id&#39;].copy()

    # Store the dates and counting pixels for each parcel
    dates = subset.week_year.values
    n_pixels = test[[&#39;id&#39;, &#39;SCL&#39;]].groupby(&#39;id&#39;).count()[&#39;SCL&#39;][:, 0].values.reshape(-1, 1)

    # Converting to dataframe
    grouped_sum = subset.groupby(&#39;id&#39;).sum()
    ids = grouped_sum.id.values
    grouped_sum = grouped_sum.to_array().values
    grouped_sum = np.swapaxes(grouped_sum, 0, 1)
    grouped_sum = grouped_sum.reshape((grouped_sum.shape[0], -1))
    colnames = [&quot;{}_{}&quot;.format(b, str(x).split(&#39;T&#39;)[0]) for b in bands for x in dates] + [&#39;count&#39;]
    values = np.hstack((grouped_sum, n_pixels))
    df = pd.DataFrame(values, columns=colnames)
    df.insert(0, &#39;id&#39;, ids)
    print(f&quot;Elapsed in process_parcel_file til end: {time.time() - start_time}&quot;)
    return df


def fs_creation(input_dir, out_file, labels_to_keep=None, th=0.1, n=64, days=5, total_days=180, mask=False,
                mode=&#39;s2&#39;, method=&#39;patch&#39;, bands=[&#39;B02&#39;, &#39;B03&#39;, &#39;B04&#39;, &#39;B05&#39;, &#39;B06&#39;, &#39;B07&#39;, &#39;B08&#39;, &#39;B8A&#39;, &#39;B11&#39;, &#39;B12&#39;]):
    files = glob.glob(input_dir)
    times_pool = []  # For storing execution times
    times_seq = []
    cpu_counts = list(range(2, multiprocessing.cpu_count() + 1, 4))  # The different CPU counts to use

    for count in cpu_counts:
        print(f&quot;Executing with {count} threads&quot;)
        if method == &#39;parcel&#39;:
            start_pool = time.time()
            with Pool(count) as pool:
                arguments = [(f, bands, mask) for f in files]
                dfs = list(tqdm(pool.starmap(process_parcel_file, arguments), total=len(arguments)))


            end_pool = time.time()
            start_seq = time.time()
            dfs = pd.concat(dfs)
            dfs = dfs.groupby(&#39;id&#39;).sum()
            counts = dfs[&#39;count&#39;].copy()
            dfs = dfs.div(dfs[&#39;count&#39;], axis=0)
            dfs[&#39;count&#39;] = counts
            dfs.drop(index=-1).to_csv(out_file)
            end_seq = time.time()
            times_pool.append(end_pool - start_pool)  
            times_seq.append(end_seq - start_seq)

    pd.DataFrame({&#39;CPU_count&#39;: cpu_counts, &#39;Time pool&#39;: times_pool, 
                  &#39;Time seq&#39; : times_seq}).to_csv(&#39;cpu_times.csv&#39;, index=False)

    return 0

When executing the code, it scales well up to around 7-8 threads, but after that, the performance starts to deteriorate. I have profiled the code, and it seems that each thread takes more time to execute the same code.

For example, with 2 threads:

Elapsed in process_parcel_file for reading dataset: 0.012271404266357422
Elapsed in process_parcel_file til end: 1.6681673526763916
Elapsed in process_parcel_file for reading dataset: 0.014229536056518555
Elapsed in process_parcel_file til end: 1.5836331844329834

However, with 22 threads:

Elapsed in process_parcel_file for reading dataset: 0.17968058586120605
Elapsed in process_parcel_file til end: 12.049026727676392
Elapsed in process_parcel_file for reading dataset: 0.052398681640625
Elapsed in process_parcel_file til end: 6.014119625091553

I'm struggling to understand why the performance degrades with more threads. I've already verified that the system has the required number of cores and threads.

I would appreciate any guidance or suggestions to help me identify the cause of this issue and optimize the code for better performance.

It's really hard for me to provide a minimal working example so take that into account.

Thank you in advance.

Edit:
The files are around 80MB each. I have 451 files.
I added the following code to profile the function:

...
    start_time = time.time()
    mem_usage_start = memory_usage(-1, interval=0.1, timeout=None)[0]
    cpu_usage_start = psutil.cpu_percent(interval=None)
    test = xr.open_dataset(f)
    times[&#39;read_dataset&#39;] = time.time() - start_time
    memory[&#39;read_dataset&#39;] = memory_usage(-1, interval=0.1, timeout=None)[0] - mem_usage_start
    cpu_usage[&#39;read_dataset&#39;] = psutil.cpu_percent(interval=None) - cpu_usage_start
...

And more code for each line in a similar fashion.
I used the libraries memory_profiler and psutil, and I have the information for each thread.
CSV's with the results are available here:
https://wetransfer.com/downloads/44df14ea831da7693300a29d8e0d4e7a20230703173536/da04a0
The results identify each line in the function with the number of cpus selected, so each one is a thread.

Edit2:

Here I have a report of a subset of the data, where you can clearly see what each thread is doing, and how some threads are getting less work than others:

https://wetransfer.com/downloads/259b4e42aae6dd9cda5a22d576aba29520230717135248/ae3f88

答案1

得分: 4

从磁盘上并行读取数据是不好的

不仅磁盘不适合进行多个并发读取,而且使用多个进程会产生通信成本,如序列化的需求

也许最好的方法是减少读取数据的线程数,但在已读取的文件上利用并行计算(甚至是它们的分块,如果适用)。

还请参阅此文章,其中对各种并行加载文件的方式进行了基准测试

一些实践:首先尝试使用多线程读取文件。对于每个包含 500,000 行的 100 个文件,在我的机器上看到以下结果:

import multiprocessing as mp
import time

def job(filename):
    _ = pd.read_csv(filename)
    return 'OK'

from glob import glob
tasks = glob('tasks/*csv')

for i in range(1, 20):
    start_time = time.time()
    with mp.pool.ThreadPool(i) as pool:
        outs = pool.map(job, tasks)
    print(f'Threads={i} finished in time {time.time() - start_time}')
Threads=1 finished in time 17.437994480133057
Threads=2 finished in time 9.725012063980103
Threads=3 finished in time 6.816091060638428
Threads=4 finished in time 5.413954973220825
Threads=5 finished in time 4.8136584758758545
Threads=6 finished in time 4.713972806930542
Threads=7 finished in time 4.915326356887817
Threads=8 finished in time 4.91536545753479
Threads=9 finished in time 4.6191277503967285
Threads=10 finished in time 4.731768369674683
Threads=11 finished in time 4.846496105194092
Threads=12 finished in time 5.274311542510986
Threads=13 finished in time 4.9633073806762695
Threads=14 finished in time 4.85762357711792
Threads=15 finished in time 4.9740986824035645
Threads=16 finished in time 5.085602760314941
Threads=17 finished in time 4.990613222122192
Threads=18 finished in time 4.979219436645508
Threads=19 finished in time 5.094106435775757
英文:

Reading from disk with many parallel processes is bad.

Not only the disk is not well-suited to multiple concurent reads, but using multiple processes incurs communication costs such as the need of serialization.

Perhaps it is better to have less threads reading the data, but benefit from parallel computing on already-read files (or even their chunks, if that's applicable).

See also this article on benchmarking various ways of parallelizing files loading.


Some practice: try first to just read files with multithreading. For 100 files of 500,000 lines each I see the following on my machine:

import multiprocessing as mp
import time


def job(filename):
    _ = pd.read_csv(filename)
    return &#39;OK&#39;

from glob import glob
tasks = glob(&#39;tasks/*csv&#39;)

for i in range(1,20):
    start_time=time.time()
    with mp.pool.ThreadPool(i) as pool:
        outs = pool.map(job, tasks)
    print(f&#39;Threads={i} finished in time {time.time() - start_time}&#39;)
Threads=1 finished in time 17.437994480133057
Threads=2 finished in time 9.725012063980103
Threads=3 finished in time 6.816091060638428
Threads=4 finished in time 5.413954973220825
Threads=5 finished in time 4.8136584758758545
Threads=6 finished in time 4.713972806930542
Threads=7 finished in time 4.915326356887817
Threads=8 finished in time 4.91536545753479
Threads=9 finished in time 4.6191277503967285
Threads=10 finished in time 4.731768369674683
Threads=11 finished in time 4.846496105194092
Threads=12 finished in time 5.274311542510986
Threads=13 finished in time 4.9633073806762695
Threads=14 finished in time 4.85762357711792
Threads=15 finished in time 4.9740986824035645
Threads=16 finished in time 5.085602760314941
Threads=17 finished in time 4.990613222122192
Threads=18 finished in time 4.979219436645508
Threads=19 finished in time 5.094106435775757

答案2

得分: 0

我的第一想法也是IO瓶颈。然而,由于您报告在提前将所有内容加载到内存中时出现类似行为,我将基于没有IO瓶颈的假设来回答。

您正在使用的许多库在实际数据处理时会在后台使用多个CPU核心。您观察到在大约7-8个线程时性能开始下降。我怀疑CPU在大约7-8个线程时已经饱和,并且超过那个线程数会导致线程争用影响性能。

您可以进行测试。使用只有1个线程运行,并监视每个核心的CPU利用率(例如,在Linux上使用top或等效工具)。查看是否有多个核心被利用。然后运行一系列不同线程数的附加测试。如果我的假设是正确的,那么一旦您达到大约7-8个线程,那么所有核心都将以100%的利用率运行。

英文:

My first thought was also IO bottleneck. However since you report similar behavior when you load everything into memory ahead of time, I'm going to answer under the assumption that there is no IO bottleneck.

Many of the libraries you are using to do the actual data processing will use multiple CPU cores behind the scenes. You observe that around 7-8 threads performance starts to degrade. I suspect that the CPU is saturated at around 7-8 threads, and beyond that thread contention starts to harm performance.

You can test this. Run with just 1 thread, and monitor CPU utilization per core (e.g. if on Linux, use top or equivalent). See if more than one core being utilized. Then run a series of additional tests with different numbers of threads. If my hypothesis is correct, then once you reach about 7-8 threads, then all cores will be at 100% utilization.

答案3

得分: 0

xarray.load_dataset替换xarray.open_dataset,立即将数据加载到内存中;前者相对较懒惰,而后者稍微更渴望(也称为贪婪、严格)。
[如果此答案被接受]

解释:

[冒着重复以前的评论/答案的风险,] 这似乎是一个I/O吞吐量限制问题,其中太多的线程/进程试图同时从同一设备读取数据,具有讽刺意味的是,这减慢了所有兄弟线程/进程的速度。

至于造成这种情况的原因,我认为是在第17行调用了xarrayopen_datasettest = xr.open_dataset(f),根据文档的某种具体解释,它可能会被解释为懒加载数据:
>数据总是从netCDF文件中懒加载的。您可以操作、切片和子集化Dataset和DataArray对象,直到您尝试执行某种实际计算之前,不会将任何数组值加载到内存中。

如果这是真的,它可以解释你正在展示的症状 - 451个文件最初由"空"对象表示(这几乎不会花费多少时间),因为这些对象的数据几乎同时被读取(或操作) - 存储设备受到成千上万(或数百万,取决于块大小)的读取请求的冲击。

有一个提示,在第四段中:
>Xarray对远程或磁盘上的数据集的懒加载通常是可取的,但并不总是如此。在执行计算密集型操作之前,通常最好通过调用Dataset.load()方法将Dataset(或DataArray)完全加载到内存中。

或者,尝试使用load_dataset

英文:

TL;DR

Replace xarray.open_dataset with xarray.load_dataset to immediately load the data into memory; the former is quite lazy, while the latter is a bit more eager (a.k.a greedy, strict).
[if this answer is accepted]

Explanation:

[At the risk of repeating previous comments/answers,] This appears to be an I/O throughput limitation, where too many threads/processes attempt to simultaneously read from the same device, ironically slowing things down for all sibling threads/processes.

As for what's causing this, I believe it to be the call to xarray's open_dataset on line 17: test = xr.open_dataset(f), which by some specific interpretation of the docs may appear to be loading the data lazily:
>Data is always loaded lazily from netCDF files. You can manipulate, slice and subset Dataset and DataArray objects, and no array values are loaded into memory until you try to perform some sort of actual computation.

In case this is true, it can explain the symptoms you're exhibiting - 451 files initially get represented by "empty" objects (which takes an almost insignificant amount of time), and as these objects' data is read (or manipulated) almost simultaneously - the storage device gets stormed by tens of thousands (or millions, depending on blocksize) of read requests.

There is a tip, four paragraphs down :
>Xarray’s lazy loading of remote or on-disk datasets is often but not always desirable. Before performing computationally intense operations, it is often a good idea to load a Dataset (or DataArray) entirely into memory by invoking the Dataset.load() method.

Alternatively, try using load_dataset.

huangapple
  • 本文由 发表于 2023年6月29日 05:57:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76576942.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定