英文:
How to execute parallel processing between dataframes?
问题
Here's the translated code section:
我有两个字典 dict1 和 dict2,它们的键值对是字典和数据框。
dict1 = {"A": df1, "B": df2, "C": df3}
dict2 = {"A": df4, "B": df5, "C": df6}
我想要比较 df1['Last_Name'] 的每一行与 df4['Last_Name'],并创建一个新字段 df1['Match'],其中包含最高的Levenshtein距离。类似地,df2 与 df5,df3 与 df6。
现在我想要并行进行这3个比较,我尝试过使用 multiprocessing 和 concurrent.futures。但不知何故它不起作用。
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for key1, df1 in dict1.items():
for key2, df2 in dict2.items():
futures.append(executor.submit(add_flag, df1, df2))
for future in concurrent.futures.as_completed(futures):
result = future.result()
key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
dict1[key1] = result
Is there anything else you'd like to know or any specific questions you have about this code?
英文:
I have two dictionaries dict1 and dict2, with the key value pairs being dictionary and dataframes.
dict1 = {"A" : df1,"B" : df2,"C" : df3}
dict2 = {"A" : df4,"B" : df5,"C" : df6}
I want compare every row of df1['Last_Name']
with df4['Last_Name]
and create a new field df1['Match']
with the one with highest Levenstien distance. Similarly df2 with df5 and df3 with df6.
Now I want these 3 comparisons in parallel, I tried multiprocessing and concurrent.futures. But somehow it is not working.
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for key1, df1 in dict1.items():
for key2, df2 in dict2.items():
futures.append(executor.submit(add_flag, df1, df2))
for future in concurrent.futures.as_completed(futures):
result = future.result()
key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
dict1[key1] = result
答案1
得分: 1
以下是翻译好的内容:
concurrent.futures.ThreadPoolExecutor
用于I/O操作;如果要执行与CPU绑定的任务,您应该使用ProcessPoolExecutor
。- 您希望以并行方式执行的分析任务可以使用
zip
来完成,而不是嵌套循环。 - 注意,
add_flag()
需要返回一个带有与dict1
中某个键匹配的列名的数据帧。
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = []
for key1, df1 in dict1.items():
df2 = dict2[key1]
futures.append(executor.submit(add_flag, df1, df2))
for future in concurrent.futures.as_completed(futures):
result = future.result()
key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
dict1[key1] = result
英文:
So, a few things here:
concurrent.futures.ThreadPoolExecutor
is for I/O; you wantProcessPoolExecutor
to do things that are bound to your CPU.- Your analysis, which is asking to be done in parallel, can be done using
zip
, not a nested loop.
And a note, add_flag()
needs to return a dataframe with a column name that matches some key in dict1
.
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = []
for key1, df1 in dict1.items():
df2 = dict2[key1]
futures.append(executor.submit(add_flag, df1, df2))
for future in concurrent.futures.as_completed(futures):
result = future.result()
key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
dict1[key1] = result
答案2
得分: 1
以下是您要翻译的内容:
基本上,您需要定义一个函数,该函数接受两个数据框作为参数进行比较,并使用future、多线程或您喜欢的方式来调用此函数,就像这样:
比较Dfs函数(我正在使用fuzzywuzzy来计算Levenstien距离,但您可以用您的方式进行):
def add_flag(df1, df2):
for index, row in df1.iterrows():
best_match = None
best_distance = 0
for _, row2 in df2.iterrows():
distance = fuzz.ratio(row['Last_Name'], row2['Last_Name'])
if distance > best_distance:
best_match = row2
best_distance = distance
if best_match is not None:
df1.loc[index, 'Match'] = best_match['Last_Name']
return df1
然后,您可以调用函数,例如,您可以import concurrent.futures
并运行以下代码:
dict1 = {"A": df1, "B": df2, "C": df3}
dict2 = {"A": df4, "B": df5, "C": df6}
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for key1, df1 in dict1.items():
df2 = dict2[key1]
futures.append(executor.submit(add_flag, df1, df2))
for future, (key1, df1) in zip(concurrent.futures.as_completed(futures), dict1.items()):
result = future.result()
dict1[key1] = result
它将迭代字典并以异步方式调用函数。
英文:
Basically what you need to do is to define a function that receives 2 data frames as parameters and compare it, and call this function using future, multithread, or whatever you prefer, like this:
Compare Dfs function ( I am using fuzzywuzzy to calculate the Levenstien distance, but you can do it in your way:
def add_flag(df1, df2):
for index, row in df1.iterrows():
best_match = None
best_distance = 0
for _, row2 in df2.iterrows():
distance = fuzz.ratio(row['Last_Name'], row2['Last_Name'])
if distance > best_distance:
best_match = row2
best_distance = distance
if best_match is not None:
df1.loc[index, 'Match'] = best_match['Last_Name']
return df1
Then you call the functions for example you can import concurrent.futures
and run the following:
dict1 = {"A": df1, "B": df2, "C": df3}
dict2 = {"A": df4, "B": df5, "C": df6}
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for key1, df1 in dict1.items():
df2 = dict2[key1]
futures.append(executor.submit(add_flag, df1, df2))
for future, (key1, df1) in zip(concurrent.futures.as_completed(futures), dict1.items()):
result = future.result()
dict1[key1] = result
It will iterate the dicts and call the function in an async way.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论