如何在Azure API运行函数内执行多进程操作

huangapple go评论68阅读模式
英文:

how to perform multiprocessing inside Azure API run function

问题

我正在尝试通过使用多进程来减少API调用的执行时间:

我的原始需求实际上是使用LIME生成和显示ML可解释性。为简单起见,让我们假设我有以下数据:

import numpy as np
import pandas as pd

df = pd.DataFrame({'cust_id': ['id1','id1','id2','id2','id2','id1','id2','id1','id1','id2'],
                      'prod_id': ['p1','p2','p3','p1','p4','p5','p6','p6','p8','p9'],
                      'cost': np.random.randint(100, 1000, 10)})

我们有另一个数据框:

 df1 = pd.DataFrame({'cust_id': ['id1','id1','id1','id2','id2','id2'],
                       'prod_id': ['p1','p8','p3','p8','p9','p7']})

我的API函数看起来像这样:

import json

def main(data):
    input_data = json.loads(data)["data"]
    customer_id = input_data[0]
    print(customer_id)
    item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()

    idx = df.loc[
        (df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
    ].index.values.tolist()

    for i in idx:
        df.loc[i, "cost"] = df.loc[i, "cost"] * 2

    return df

输入数据是以json格式提供的:

data = '{"data":["id1"]}'
out = main(data)

在实际代码中,这个地方用以下代码替代了乘法:

explainer.explain_instance(df.loc[idx], model.predict_proba)

在实际情况下,for循环将运行24次,获取行并将其放入explain_instance中。

请问有人可以告诉我如何执行for循环的多进程处理,以尽量减少24次迭代的时间?我的实际实例中有12个CPU核心。

英文:

I am trying to reduce the execution time of an API call by using multiprocessing:

My original requirement is actually on generating and displaying ML explainability using LIME. For simplicity, let's assume I have below data:

import numpy as np
import pandas as pd

df = pd.DataFrame({'cust_id' : ['id1','id1','id2','id2','id2','id1','id2','id1','id1','id2'],
                      'prod_id' : ['p1','p2','p3','p1','p4','p5','p6','p6','p8','p9'],
                      'cost' : np.random.randint(100, 1000, 10)})

We have another dataframe:

 df1 = pd.DataFrame({'cust_id' : ['id1','id1','id1','id2','id2','id2'],
                       'prod_id' : ['p1','p8','p3','p8','p9','p7']})

My API function looks something like this:

import json

def main(data):
    input_data = json.loads(data)["data"]
    customer_id = input_data[0]
    print(customer_id)
    item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()

    idx = df.loc[
        (df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
    ].index.values.tolist()

    for i in idx:
        df.loc[i, "cost"] = df.loc[i, "cost"] * 2

    return df

The input is in json format:

data = '{"data":["id1"]}'

out = main(data)

My actual code consists of this inplace of the multiplication:

explainer.explain_instance(df.loc[idx], model.predict_proba)

In actual scenario, the for loop would run for 24 times, fetching the row and putting it in the explain_instance.

Could someone please let me know how to perform multiprocessing of the for loop such that the 24 iterations come down as much as possible. I have 12 CPU cores in my actual instance.

答案1

得分: 1

以下是使用Python标准库中的concurrent.futuresfunctools模块的ProcessPoolExecutor类以及partial函数来执行的一种方法:

from concurrent.futures import ProcessPoolExecutor
from functools import partial

def helper(row, explainer=None, predict_fn=None):
    """并行应用的函数"""
    df_ = pd.DataFrame(explainer(row, predict_fn))
    # 处理 df_
    # ...
    return df_

def main(data):
    # 在循环之前的相同代码
    input_data = json.loads(data)["data"]
    customer_id = input_data[0]
    item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()

    idx = df.loc[
        (df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
    ].index.values.tolist()

    with ProcessPoolExecutor() as executor:
        # 这里需要使用 partial 来处理额外的参数:
        # explainer 和 predict_fn
        future = executor.map(
            partial(
                helper,
                explainer=explainer.explain_instance,
                predict_fn=model.predict_proba,
            ),
            [df.loc[i] for i in idx],
        )

    # future 是一个生成器
    return list(future)

if __name__ == "__main__":
    data = '{"data":["id1"]}'
    dfs = main(data)  # 数据框的列表

你必须在顶层代码环境中执行此模块(使用 if __name__ == "main"),否则会引发BrokenProcessPool异常。

英文:

Here is one way to do it with ProcessPoolExecutor class and partial function from Python standard library's concurrent.futures and functools modules:

from concurrent.futures import ProcessPoolExecutor
from functools import partial


def helper(row, explainer=None, predict_fn=None):
    """Function that will be applied concurrently"""
    df_ = pd.DataFrame(explainer(row, predict_fn))
    # do things with df_
    # ...
    return df_


def main(data):
    # Same code as before your for-loop
    input_data = json.loads(data)["data"]
    customer_id = input_data[0]
    item_list = df1.loc[df1["cust_id"] == customer_id, "prod_id"].tolist()

    idx = df.loc[
        (df["cust_id"] == customer_id) & (df["prod_id"].isin(item_list))
    ].index.values.tolist()

    with ProcessPoolExecutor() as executor:
        # Here, you need partial to deal with additional arguments:
        # explainer, predict_fn
        future = executor.map(
            partial(
                helper,
                explainer=explainer.explain_instance,
                predict_fn=model.predict_proba,
            ),
            [df.loc[i] for i in idx],
        )

    # future is a generator
    return list(future)


if __name__ == "__main__":
    data = '{"data":["id1"]}'
    dfs = main(data)  # list of dataframes

You have to execute the module in the top-level code environment (using if __name__ == "__main__"), otherwise a BrokenProcessPool exception will be raised.

huangapple
  • 本文由 发表于 2023年3月9日 20:22:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/75684557.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定