英文:
Pyton multicore processing with Dask progress bar not showing
问题
我有一个名为'Sensor_Data'的数据库表,其中包含来自不同位置的传感器数据,包括列id、timestamp和value。每个id对应于特定位置的传感器。
我正在尝试使用Dash进行多核处理,以计算两种不同算法的结果,并将结果保存为每个传感器的parquet文件。
以下代码在多个核心上运行完美,但似乎无法在Jupyter Notebook中显示进度条。
我已省略了get_data、compute_algo1和compute_algo2函数,因为它们只是加载数据并对df行执行简单的计算。
import pandas as pd
import os
import pandas as pd
import dask
from datetime import datetime
from dask.distributed import Client, LocalCluster
from dask import delayed, bag
from dask import dataframe as dd
from tqdm.auto import tqdm
from dask.diagnostics import ProgressBar
id_list = [1,2,3,4,5 ..... 10000]
out_folder = 'output' + os.path.sep
def compute_parallel_dask(pid,out_folder):
df_data = get_data(pid)
fname = out_folder + f'{pid}.parquet'
if not os.path.exists(fname):
df_computed = pd.DataFrame(columns=['DateTime','raw_value','algo1_value','algo2_value'])
for indx, row in df_data.iterrows():
raw_value = row.value
algo1_val = compute_algo1(row)
algo2_val = compute_algo2(row)
df_computed.loc[len(df_computed)] = [pd.to_datetime(indx),raw_value,algo1_val,algo2_val]
df_computed.to_parquet(fname, index=False)
else:
df_computed = pd.read_parquet(fname)
return dd.from_pandas(df_computed, npartitions=1)
results = []
num_cores = 6
folder_list = [out_folder for pid in id_list]
# 使用dask.bag来并行处理具有多个参数的函数调用
b = bag.from_sequence(zip(id_list, folder_list)).map(lambda x: compute_parallel_dask(*x))
# 设置本地Dask集群和客户端
cluster = LocalCluster(n_workers=num_cores, scheduler_port=0)
client = Client(cluster)
# 注册dask.diagnostics.ProgressBar以显示进度条
dask.diagnostics.ProgressBar().register()
# 使用tqdm手动显示进度条
for result in tqdm(b.compute(), desc="Processing"):
results.append(result)
# 由于现在执行了计算,所以Parquet文件应该已经被写入
print('All Done')
任何有助于解决这个问题的帮助将不胜感激。谢谢!
英文:
I have a database table 'Sensor_Data' with sensor data form different locations having columns, id, timestamp, value. Each id corresponds to a sensor in a particular location.
I am trying to use multicore processing using dash to compute results for two different algorithms and save the results in parquet files for each sensor.
The code below runs perfects on multiple cores however I can't seem to get the progress bar to display in jupyter notebook.
I have omitted the get_data, compute_algo1 and compute_algo2 functions as they just load data and perform simple calculations on df rows.
import pandas as pd
import os
import pandas as pd
import dask
from datetime import datetime
from dask.distributed import Client, LocalCluster
from dask import delayed, bag
from dask import dataframe as dd
from tqdm.auto import tqdm
from dask.diagnostics import ProgressBar
id_list = [1,2,3,4,5 ..... 10000]
out_folder = 'output' +os.path.sep
def compute_parallel_dask(pid,out_folder):
df_data = get_data(pid)
fname = out_folder+f'{pid}.parquet'
if not os.path.exists(fname):
df_computed = pd.DataFrame(columns=['DateTime','raw_value','algo1_value','algo2_value']
for indx, row in df_data.iterrows():
raw_value = row.value
algo1_val = compute_algo1(row)
algo2_val = compute_algo2(row)
df_computed.loc[len(df_computed)] = [pd.to_datetime(indx),raw_value,algo1_val,algo2_val]
df_computed.to_parquet(fname, index=False)
else:
df_computed = pd.read_parquet(fname)
return dd.from_pandas(df_computed, npartitions=1)
results = []
num_cores = 6
folder_list = [out_folder for pid in id_list]
# Use dask.bag to parallelize the function calls with multiple arguments
b = bag.from_sequence(zip(id_list, folder_list)).map(lambda x: compute_parallel_dask(*x))
# Set up the local Dask cluster and client
cluster = LocalCluster(n_workers=num_cores, scheduler_port=0)
client = Client(cluster)
# Register dask.diagnostics.ProgressBar to display the progress bar
dask.diagnostics.ProgressBar().register()
# Use tqdm to manually display the progress bar
for result in tqdm(b.compute(), desc="Processing"):
results.append(result)
# Since the computations are now executed, the Parquet files should have been written
print('All Done')
Any help in fixing this would be greatly appreciated. Thanks
答案1
得分: 0
请参阅 https://docs.dask.org/en/stable/diagnostics-distributed.html#progress-bar:
dask.distributed的进度条与用于本地诊断的ProgressBar不同。进度函数接受一个在后台执行的Dask对象
由于您正在使用Dask分布式调度程序,您应该使用链接中提供的方法。
英文:
See https://docs.dask.org/en/stable/diagnostics-distributed.html#progress-bar:
> The dask.distributed progress bar differs from the ProgressBar used for local diagnostics. The progress function takes a Dask object that is executing in the background
Since you are using Dask Distributed Scheduler, you should use the method provided in the link.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论