如何在Azure API运行函数内执行多进程操作

huangapple go评论94阅读模式
英文:

how to perform multiprocessing inside Azure API run function

问题

我正在尝试通过使用多进程来减少API调用的执行时间:

我的原始需求实际上是使用LIME生成和显示ML可解释性。为简单起见,让我们假设我有以下数据:

  1. import numpy as np
  2. import pandas as pd
  3. df = pd.DataFrame({'cust_id': ['id1','id1','id2','id2','id2','id1','id2','id1','id1','id2'],
  4. 'prod_id': ['p1','p2','p3','p1','p4','p5','p6','p6','p8','p9'],
  5. 'cost': np.random.randint(100, 1000, 10)})

我们有另一个数据框:

  1. df1 = pd.DataFrame({'cust_id': ['id1','id1','id1','id2','id2','id2'],
  2. 'prod_id': ['p1','p8','p3','p8','p9','p7']})

我的API函数看起来像这样:

  1. import json
  2. def main(data):
  3. input_data = json.loads(data)["data"]
  4. customer_id = input_data[0]
  5. print(customer_id)
  6. item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()
  7. idx = df.loc[
  8. (df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
  9. ].index.values.tolist()
  10. for i in idx:
  11. df.loc[i, "cost"] = df.loc[i, "cost"] * 2
  12. return df

输入数据是以json格式提供的:

  1. data = '{"data":["id1"]}'
  2. out = main(data)

在实际代码中,这个地方用以下代码替代了乘法:

  1. explainer.explain_instance(df.loc[idx], model.predict_proba)

在实际情况下,for循环将运行24次,获取行并将其放入explain_instance中。

请问有人可以告诉我如何执行for循环的多进程处理,以尽量减少24次迭代的时间?我的实际实例中有12个CPU核心。

英文:

I am trying to reduce the execution time of an API call by using multiprocessing:

My original requirement is actually on generating and displaying ML explainability using LIME. For simplicity, let's assume I have below data:

  1. import numpy as np
  2. import pandas as pd
  3. df = pd.DataFrame({'cust_id' : ['id1','id1','id2','id2','id2','id1','id2','id1','id1','id2'],
  4. 'prod_id' : ['p1','p2','p3','p1','p4','p5','p6','p6','p8','p9'],
  5. 'cost' : np.random.randint(100, 1000, 10)})

We have another dataframe:

  1. df1 = pd.DataFrame({'cust_id' : ['id1','id1','id1','id2','id2','id2'],
  2. 'prod_id' : ['p1','p8','p3','p8','p9','p7']})

My API function looks something like this:

  1. import json
  2. def main(data):
  3. input_data = json.loads(data)["data"]
  4. customer_id = input_data[0]
  5. print(customer_id)
  6. item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()
  7. idx = df.loc[
  8. (df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
  9. ].index.values.tolist()
  10. for i in idx:
  11. df.loc[i, "cost"] = df.loc[i, "cost"] * 2
  12. return df

The input is in json format:

  1. data = '{"data":["id1"]}'
  2. out = main(data)

My actual code consists of this inplace of the multiplication:

  1. explainer.explain_instance(df.loc[idx], model.predict_proba)

In actual scenario, the for loop would run for 24 times, fetching the row and putting it in the explain_instance.

Could someone please let me know how to perform multiprocessing of the for loop such that the 24 iterations come down as much as possible. I have 12 CPU cores in my actual instance.

答案1

得分: 1

以下是使用Python标准库中的concurrent.futuresfunctools模块的ProcessPoolExecutor类以及partial函数来执行的一种方法:

  1. from concurrent.futures import ProcessPoolExecutor
  2. from functools import partial
  3. def helper(row, explainer=None, predict_fn=None):
  4. """并行应用的函数"""
  5. df_ = pd.DataFrame(explainer(row, predict_fn))
  6. # 处理 df_
  7. # ...
  8. return df_
  9. def main(data):
  10. # 在循环之前的相同代码
  11. input_data = json.loads(data)["data"]
  12. customer_id = input_data[0]
  13. item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()
  14. idx = df.loc[
  15. (df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
  16. ].index.values.tolist()
  17. with ProcessPoolExecutor() as executor:
  18. # 这里需要使用 partial 来处理额外的参数:
  19. # explainer 和 predict_fn
  20. future = executor.map(
  21. partial(
  22. helper,
  23. explainer=explainer.explain_instance,
  24. predict_fn=model.predict_proba,
  25. ),
  26. [df.loc[i] for i in idx],
  27. )
  28. # future 是一个生成器
  29. return list(future)
  30. if __name__ == "__main__":
  31. data = '{"data":["id1"]}'
  32. dfs = main(data) # 数据框的列表

你必须在顶层代码环境中执行此模块(使用 if __name__ == "main"),否则会引发BrokenProcessPool异常。

英文:

Here is one way to do it with ProcessPoolExecutor class and partial function from Python standard library's concurrent.futures and functools modules:

  1. from concurrent.futures import ProcessPoolExecutor
  2. from functools import partial
  3. def helper(row, explainer=None, predict_fn=None):
  4. """Function that will be applied concurrently"""
  5. df_ = pd.DataFrame(explainer(row, predict_fn))
  6. # do things with df_
  7. # ...
  8. return df_
  9. def main(data):
  10. # Same code as before your for-loop
  11. input_data = json.loads(data)["data"]
  12. customer_id = input_data[0]
  13. item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()
  14. idx = df.loc[
  15. (df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
  16. ].index.values.tolist()
  17. with ProcessPoolExecutor() as executor:
  18. # Here, you need partial to deal with additional arguments:
  19. # explainer, predict_fn
  20. future = executor.map(
  21. partial(
  22. helper,
  23. explainer=explainer.explain_instance,
  24. predict_fn=model.predict_proba,
  25. ),
  26. [df.loc[i] for i in idx],
  27. )
  28. # future is a generator
  29. return list(future)
  30. if __name__ == "__main__":
  31. data = '{"data":["id1"]}'
  32. dfs = main(data) # list of dataframes

You have to execute the module in the top-level code environment (using if __name__ == "__main__"), otherwise a BrokenProcessPool exception will be raised.

huangapple
  • 本文由 发表于 2023年3月9日 20:22:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/75684557.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定