如何在数据框之间执行并行处理?

huangapple go评论68阅读模式
英文:

How to execute parallel processing between dataframes?

问题

Here's the translated code section:

我有两个字典 dict1 和 dict2它们的键值对是字典和数据框

dict1 = {"A": df1, "B": df2, "C": df3}
dict2 = {"A": df4, "B": df5, "C": df6}

我想要比较 df1['Last_Name'] 的每一行与 df4['Last_Name']并创建一个新字段 df1['Match']其中包含最高的Levenshtein距离类似地df2 与 df5df3 与 df6

现在我想要并行进行这3个比较我尝试过使用 multiprocessing 和 concurrent.futures但不知何故它不起作用

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        for key2, df2 in dict2.items():
            futures.append(executor.submit(add_flag, df1, df2))

    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
        dict1[key1] = result

Is there anything else you'd like to know or any specific questions you have about this code?

英文:

I have two dictionaries dict1 and dict2, with the key value pairs being dictionary and dataframes.

dict1 = {"A" : df1,"B" : df2,"C" : df3}
dict2 = {"A" : df4,"B" : df5,"C" : df6}

I want compare every row of df1['Last_Name'] with df4['Last_Name] and create a new field df1['Match'] with the one with highest Levenstien distance. Similarly df2 with df5 and df3 with df6.

Now I want these 3 comparisons in parallel, I tried multiprocessing and concurrent.futures. But somehow it is not working.

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        for key2, df2 in dict2.items():
            futures.append(executor.submit(add_flag, df1, df2))

    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
        dict1[key1] = result

答案1

得分: 1

以下是翻译好的内容:

  1. concurrent.futures.ThreadPoolExecutor 用于I/O操作;如果要执行与CPU绑定的任务,您应该使用 ProcessPoolExecutor
  2. 您希望以并行方式执行的分析任务可以使用 zip 来完成,而不是嵌套循环。
  3. 注意,add_flag() 需要返回一个带有与 dict1 中某个键匹配的列名的数据帧。
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        df2 = dict2[key1]
        futures.append(executor.submit(add_flag, df1, df2))

    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
        dict1[key1] = result
英文:

So, a few things here:

  1. concurrent.futures.ThreadPoolExecutor is for I/O; you want ProcessPoolExecutor to do things that are bound to your CPU.
  2. Your analysis, which is asking to be done in parallel, can be done using zip, not a nested loop.

And a note, add_flag() needs to return a dataframe with a column name that matches some key in dict1.

with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        df2 = dict2[key1]
        futures.append(executor.submit(add_flag, df1, df2))

    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
        dict1[key1] = result

答案2

得分: 1

以下是您要翻译的内容:

基本上,您需要定义一个函数,该函数接受两个数据框作为参数进行比较,并使用future、多线程或您喜欢的方式来调用此函数,就像这样:
比较Dfs函数(我正在使用fuzzywuzzy来计算Levenstien距离,但您可以用您的方式进行):

def add_flag(df1, df2):
    for index, row in df1.iterrows():
        best_match = None
        best_distance = 0
        for _, row2 in df2.iterrows():
            distance = fuzz.ratio(row['Last_Name'], row2['Last_Name'])
            if distance > best_distance:
                best_match = row2
                best_distance = distance

        if best_match is not None:
            df1.loc[index, 'Match'] = best_match['Last_Name']

    return df1

然后,您可以调用函数,例如,您可以import concurrent.futures并运行以下代码:

dict1 = {"A": df1, "B": df2, "C": df3}
dict2 = {"A": df4, "B": df5, "C": df6}

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        df2 = dict2[key1]
        futures.append(executor.submit(add_flag, df1, df2))

    for future, (key1, df1) in zip(concurrent.futures.as_completed(futures), dict1.items()):
        result = future.result()
        dict1[key1] = result

它将迭代字典并以异步方式调用函数。

英文:

Basically what you need to do is to define a function that receives 2 data frames as parameters and compare it, and call this function using future, multithread, or whatever you prefer, like this:
Compare Dfs function ( I am using fuzzywuzzy to calculate the Levenstien distance, but you can do it in your way:

def add_flag(df1, df2):
    for index, row in df1.iterrows():
        best_match = None
        best_distance = 0
        for _, row2 in df2.iterrows():
            distance = fuzz.ratio(row['Last_Name'], row2['Last_Name'])
            if distance > best_distance:
                best_match = row2
                best_distance = distance

        if best_match is not None:
            df1.loc[index, 'Match'] = best_match['Last_Name']

    return df1

Then you call the functions for example you can import concurrent.futures and run the following:

dict1 = {"A": df1, "B": df2, "C": df3}
dict2 = {"A": df4, "B": df5, "C": df6}

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        df2 = dict2[key1]
        futures.append(executor.submit(add_flag, df1, df2))

    for future, (key1, df1) in zip(concurrent.futures.as_completed(futures), dict1.items()):
        result = future.result()
        dict1[key1] = result

It will iterate the dicts and call the function in an async way.

huangapple
  • 本文由 发表于 2023年5月25日 23:03:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/76333734.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定