Python多核处理与Dask进度条不显示

huangapple go评论68阅读模式
英文:

Pyton multicore processing with Dask progress bar not showing

问题

我有一个名为'Sensor_Data'的数据库表,其中包含来自不同位置的传感器数据,包括列id、timestamp和value。每个id对应于特定位置的传感器。

我正在尝试使用Dash进行多核处理,以计算两种不同算法的结果,并将结果保存为每个传感器的parquet文件。

以下代码在多个核心上运行完美,但似乎无法在Jupyter Notebook中显示进度条。

我已省略了get_data、compute_algo1和compute_algo2函数,因为它们只是加载数据并对df行执行简单的计算。

import pandas as pd
import os
import pandas as pd
import dask
from datetime import datetime
from dask.distributed import Client, LocalCluster
from dask import delayed, bag
from dask import dataframe as dd
from tqdm.auto import tqdm
from dask.diagnostics import ProgressBar

id_list = [1,2,3,4,5 ..... 10000]
out_folder = 'output' + os.path.sep

def compute_parallel_dask(pid,out_folder):
    df_data = get_data(pid)
    fname = out_folder + f'{pid}.parquet'
    if not os.path.exists(fname):
         df_computed = pd.DataFrame(columns=['DateTime','raw_value','algo1_value','algo2_value'])
         for indx, row in df_data.iterrows(): 
               raw_value = row.value
               algo1_val = compute_algo1(row)
               algo2_val = compute_algo2(row)
               df_computed.loc[len(df_computed)] = [pd.to_datetime(indx),raw_value,algo1_val,algo2_val]
         df_computed.to_parquet(fname, index=False)
    else:
        df_computed = pd.read_parquet(fname)
    return dd.from_pandas(df_computed, npartitions=1)

results = []
num_cores = 6

folder_list = [out_folder for pid in id_list]

# 使用dask.bag来并行处理具有多个参数的函数调用
b = bag.from_sequence(zip(id_list, folder_list)).map(lambda x: compute_parallel_dask(*x))

# 设置本地Dask集群和客户端
cluster = LocalCluster(n_workers=num_cores, scheduler_port=0)
client = Client(cluster)
# 注册dask.diagnostics.ProgressBar以显示进度条
dask.diagnostics.ProgressBar().register()

# 使用tqdm手动显示进度条
for result in tqdm(b.compute(), desc="Processing"):
    results.append(result)
# 由于现在执行了计算,所以Parquet文件应该已经被写入
print('All Done')

任何有助于解决这个问题的帮助将不胜感激。谢谢!

英文:

I have a database table 'Sensor_Data' with sensor data form different locations having columns, id, timestamp, value. Each id corresponds to a sensor in a particular location.

I am trying to use multicore processing using dash to compute results for two different algorithms and save the results in parquet files for each sensor.

The code below runs perfects on multiple cores however I can't seem to get the progress bar to display in jupyter notebook.

I have omitted the get_data, compute_algo1 and compute_algo2 functions as they just load data and perform simple calculations on df rows.

import pandas as pd
import os
import pandas as pd
import dask
from datetime import datetime
from dask.distributed import Client, LocalCluster
from dask import delayed, bag
from dask import dataframe as dd
from tqdm.auto import tqdm
from dask.diagnostics import ProgressBar

id_list = [1,2,3,4,5 ..... 10000]
out_folder = 'output' +os.path.sep

def compute_parallel_dask(pid,out_folder):
    df_data = get_data(pid)
    fname = out_folder+f'{pid}.parquet'
    if not os.path.exists(fname):
         df_computed = pd.DataFrame(columns=['DateTime','raw_value','algo1_value','algo2_value']
         for indx, row in df_data.iterrows(): 
               raw_value = row.value
               algo1_val = compute_algo1(row)
               algo2_val = compute_algo2(row)
               df_computed.loc[len(df_computed)] = [pd.to_datetime(indx),raw_value,algo1_val,algo2_val]
         df_computed.to_parquet(fname, index=False)
    else:
        df_computed = pd.read_parquet(fname)
    return dd.from_pandas(df_computed, npartitions=1)

results = []
num_cores = 6

folder_list = [out_folder for pid in id_list]

# Use dask.bag to parallelize the function calls with multiple arguments
b = bag.from_sequence(zip(id_list, folder_list)).map(lambda x: compute_parallel_dask(*x))

# Set up the local Dask cluster and client
cluster = LocalCluster(n_workers=num_cores, scheduler_port=0)
client = Client(cluster)
# Register dask.diagnostics.ProgressBar to display the progress bar
dask.diagnostics.ProgressBar().register()

# Use tqdm to manually display the progress bar
for result in tqdm(b.compute(), desc="Processing"):
    results.append(result)
# Since the computations are now executed, the Parquet files should have been written
print('All Done')

Any help in fixing this would be greatly appreciated. Thanks

答案1

得分: 0

请参阅 https://docs.dask.org/en/stable/diagnostics-distributed.html#progress-bar

dask.distributed的进度条与用于本地诊断的ProgressBar不同。进度函数接受一个在后台执行的Dask对象

由于您正在使用Dask分布式调度程序,您应该使用链接中提供的方法。

英文:

See https://docs.dask.org/en/stable/diagnostics-distributed.html#progress-bar:

> The dask.distributed progress bar differs from the ProgressBar used for local diagnostics. The progress function takes a Dask object that is executing in the background

Since you are using Dask Distributed Scheduler, you should use the method provided in the link.

huangapple
  • 本文由 发表于 2023年7月20日 19:36:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/76729436.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定