英文:
how to perform multiprocessing inside Azure API run function
问题
我正在尝试通过使用多进程来减少API调用的执行时间:
我的原始需求实际上是使用LIME生成和显示ML可解释性。为简单起见,让我们假设我有以下数据:
import numpy as np
import pandas as pd
df = pd.DataFrame({'cust_id': ['id1','id1','id2','id2','id2','id1','id2','id1','id1','id2'],
'prod_id': ['p1','p2','p3','p1','p4','p5','p6','p6','p8','p9'],
'cost': np.random.randint(100, 1000, 10)})
我们有另一个数据框:
df1 = pd.DataFrame({'cust_id': ['id1','id1','id1','id2','id2','id2'],
'prod_id': ['p1','p8','p3','p8','p9','p7']})
我的API函数看起来像这样:
import json
def main(data):
input_data = json.loads(data)["data"]
customer_id = input_data[0]
print(customer_id)
item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()
idx = df.loc[
(df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
].index.values.tolist()
for i in idx:
df.loc[i, "cost"] = df.loc[i, "cost"] * 2
return df
输入数据是以json
格式提供的:
data = '{"data":["id1"]}'
out = main(data)
在实际代码中,这个地方用以下代码替代了乘法:
explainer.explain_instance(df.loc[idx], model.predict_proba)
在实际情况下,for循环将运行24次,获取行并将其放入explain_instance
中。
请问有人可以告诉我如何执行for循环的多进程处理,以尽量减少24次迭代的时间?我的实际实例中有12个CPU核心。
英文:
I am trying to reduce the execution time of an API call by using multiprocessing:
My original requirement is actually on generating and displaying ML explainability using LIME. For simplicity, let's assume I have below data:
import numpy as np
import pandas as pd
df = pd.DataFrame({'cust_id' : ['id1','id1','id2','id2','id2','id1','id2','id1','id1','id2'],
'prod_id' : ['p1','p2','p3','p1','p4','p5','p6','p6','p8','p9'],
'cost' : np.random.randint(100, 1000, 10)})
We have another dataframe:
df1 = pd.DataFrame({'cust_id' : ['id1','id1','id1','id2','id2','id2'],
'prod_id' : ['p1','p8','p3','p8','p9','p7']})
My API function looks something like this:
import json
def main(data):
input_data = json.loads(data)["data"]
customer_id = input_data[0]
print(customer_id)
item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()
idx = df.loc[
(df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
].index.values.tolist()
for i in idx:
df.loc[i, "cost"] = df.loc[i, "cost"] * 2
return df
The input is in json
format:
data = '{"data":["id1"]}'
out = main(data)
My actual code consists of this inplace of the multiplication:
explainer.explain_instance(df.loc[idx], model.predict_proba)
In actual scenario, the for loop would run for 24 times, fetching the row and putting it in the explain_instance
.
Could someone please let me know how to perform multiprocessing of the for loop such that the 24 iterations come down as much as possible. I have 12 CPU cores in my actual instance.
答案1
得分: 1
以下是使用Python标准库中的concurrent.futures和functools模块的ProcessPoolExecutor类以及partial函数来执行的一种方法:
from concurrent.futures import ProcessPoolExecutor
from functools import partial
def helper(row, explainer=None, predict_fn=None):
"""并行应用的函数"""
df_ = pd.DataFrame(explainer(row, predict_fn))
# 处理 df_
# ...
return df_
def main(data):
# 在循环之前的相同代码
input_data = json.loads(data)["data"]
customer_id = input_data[0]
item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()
idx = df.loc[
(df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
].index.values.tolist()
with ProcessPoolExecutor() as executor:
# 这里需要使用 partial 来处理额外的参数:
# explainer 和 predict_fn
future = executor.map(
partial(
helper,
explainer=explainer.explain_instance,
predict_fn=model.predict_proba,
),
[df.loc[i] for i in idx],
)
# future 是一个生成器
return list(future)
if __name__ == "__main__":
data = '{"data":["id1"]}'
dfs = main(data) # 数据框的列表
你必须在顶层代码环境中执行此模块(使用 if __name__ == "main"),否则会引发BrokenProcessPool异常。
英文:
Here is one way to do it with ProcessPoolExecutor class and partial function from Python standard library's concurrent.futures and functools modules:
from concurrent.futures import ProcessPoolExecutor
from functools import partial
def helper(row, explainer=None, predict_fn=None):
"""Function that will be applied concurrently"""
df_ = pd.DataFrame(explainer(row, predict_fn))
# do things with df_
# ...
return df_
def main(data):
# Same code as before your for-loop
input_data = json.loads(data)["data"]
customer_id = input_data[0]
item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()
idx = df.loc[
(df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
].index.values.tolist()
with ProcessPoolExecutor() as executor:
# Here, you need partial to deal with additional arguments:
# explainer, predict_fn
future = executor.map(
partial(
helper,
explainer=explainer.explain_instance,
predict_fn=model.predict_proba,
),
[df.loc[i] for i in idx],
)
# future is a generator
return list(future)
if __name__ == "__main__":
data = '{"data":["id1"]}'
dfs = main(data) # list of dataframes
You have to execute the module in the top-level code environment (using if __name__ == "__main__"), otherwise a BrokenProcessPool exception will be raised.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论