如何在Metaflow中使用Python包multiprocessing?

huangapple go评论78阅读模式
英文:

How to use python package multiprocessing in metaflow?

问题

I am trying to run multiprocessing package in metaflow, in which fasttext model is running to predict some results. Here is my code:

我正在尝试在 Metaflow 中运行 multiprocessing 包,其中包含运行 fasttext 模型来预测一些结果的代码:

import pickle
import os
import boto3
import multiprocessing
from functools import partial
from multiprocessing import Manager
import time

from metaflow import batch, conda, FlowSpec, step, conda_base, Flow, Step
from util import pip_install_module

@conda_base(libraries={'scikit-learn': '0.23.1', 'numpy': '1.22.4', 'pandas': '1.5.1', 'fasttext': '0.9.2'})
class BatchInference(FlowSpec):
pip_install_module("python-dev-tools", "2023.3.24")

@batch(cpu=10, memory=120000)
@step
def start(self):
    import pandas as pd
    import numpy as np

    self.df_input = ['af', 'febrt', 'fefv fd we', 'fe hth dw hytht', ' dfegrtg hg df reg']

    self.next(self.predict)

@batch(cpu=10, memory=120000)
@step
def predict(self):
    import fasttext
    fasttext.FastText.eprint = lambda x: None

    print('model reading started')

    # download the fasttext model from aws s3.

    manager = Manager()
    model_abn = manager.list([fasttext.load_model('fasttext_model.bin')])

    print('model reading finished')

    time_start = time.time()

    pool = multiprocessing.Pool()
    # results = pool.map(self.predict_abn, self.df_input)
    results = pool.map(partial(self.predict_abn, model_abn=model_abn), self.df_input)

    pool.close()
    pool.join()

    time_end = time.time()
    print(f"Time elapsed: {round(time_end - time_start, 2)}s")

    self.next(self.end)

@step
def end(self):
    print("Predictions evaluated successfully")

def predict_abn(self, text, model_abn):
    model = model_abn[0]
    return model.predict(text, k=1)

if name == 'main':
BatchInference()

The error message is:

错误消息是:

TypeError: cannot pickle 'fasttext_pybind.fasttext' object

我被告知这是因为 fasttext 模型无法序列化。我还尝试了其他方法,例如:

self.model_bytes_abn = pickle.dumps(model_abn)

来将模型转换为字节类型。但仍然不起作用。

Plz tell me what is wrong about the code and how to fix it?

请告诉我代码中的问题在哪里以及如何修复它?

英文:

I am trying to run multiprocessing package in metaflow, in which fasttext model is running to predict some results. Here is my code:

import pickle
import os
import boto3
import multiprocessing
from functools import partial
from multiprocessing import Manager
import time
import pickle


from metaflow import batch, conda, FlowSpec, step, conda_base, Flow, Step
from util import pip_install_module
 

@conda_base(libraries={'scikit-learn': '0.23.1', 'numpy': '1.22.4', 'pandas': '1.5.1', 'fasttext': '0.9.2'}) 
class BatchInference(FlowSpec):
    pip_install_module("python-dev-tools", "2023.3.24")

    @batch(cpu=10, memory=120000)
    @step
    def start(self):
        import pandas as pd
        import numpy as np

        self.df_input = ['af', 'febrt' ,'fefv fd we' ,'fe hth dw hytht' ,' dfegrtg hg df reg']

        self.next(self.predict)



    @batch(cpu=10, memory=120000)
    @step
    def predict(self):
        import fasttext
        fasttext.FastText.eprint = lambda x: None

        print('model reading started')
        
        #download the fasttext model from aws s3.

        manager = Manager()
        model_abn = manager.list([fasttext.load_model('fasttext_model.bin')])

        
        print('model reading finished')

    
        time_start = time.time()

        pool = multiprocessing.Pool()
        #results = pool.map(self.predict_abn, self.df_input)
        results = pool.map(partial(self.predict_abn, model_abn=model_abn), self.df_input)

        pool.close()
        pool.join()

        time_end = time.time()
        print(f"Time elapsed: {round(time_end - time_start, 2)}s")

        self.next(self.end)


    @step
    def end(self):
        print("Predictions evaluated successfully")


    def predict_abn(self,text, model_abn):
        model = model_abn[0]
        return model.predict(text,k=1)


if __name__ == '__main__':
    BatchInference()

The error message is:

TypeError: cannot pickle 'fasttext_pybind.fasttext' object

I was told this is because fasttext model cannot be serialised. And I also try other message, for example:

self.model_bytes_abn = pickle.dumps(model_abn)

to transfer the model to bytes type. But still does not work.

Plz tell me what is wrong about the code and how to fix it?

答案1

得分: 2

由于错误提示中提到,fasttext的pybind无法被pickle。

TypeError: cannot pickle 'fasttext_pybind.fasttext' object

这是使用pybindings时常见的问题,通常无法被pickle。

因此,你的model_abn是一些来自pybind库的对象的列表,因此无法被pickle。通常情况下,你可以通过在被多进程调用的函数中初始化所有不能序列化的内容来解决这个问题。这样每个进程都会创建自己的对象,无需进行pickle。

在你的情况下,这可能不可行,因为多进程执行的操作只是调用模型。

关于在哪里放置代码、如何分离代码,甚至是否希望在这种情况下使用多进程,这是一个设计问题。你可以在Pool的initializer参数中保持大部分代码不变。

def predict_model(input_data):
    global model
    return model.predict(input_data)

def init_worker():
    global model
    model = ... # 进行初始化操作

def some_func():
    ...
    pool = Pool(num_worker, initializer=init_worker)
    res = pool.map(predict_model, some_list)
    ...

因此,当创建池时,每个工作进程都会运行init_worker函数,并将其自己的模型存储为全局变量。你可以在通过map执行的predict_model函数中使用这个模型。

无论你采取什么措施,如果要在多进程中使用它,你都需要确保模型存在于每个进程中,并由该进程初始化,因为你无法将其序列化和分发。

英文:

As your error says the pybind of fasttext can't be pickled

TypeError: cannot pickle 'fasttext_pybind.fasttext' object

This is a general problem when using pybindings they are normally not able to be pickled.

So your model_abn is a list of some objects from the pybind lib and thus can't be pickled. In general you can solve this by initalizing whatever you need which is not serializable in the function that is called by the multiprocessing. So that every process creates their own objects and nothing has to be pickled.

In your case this is probably not feasible since the thing that is done by multiprocessing is just a simple call executing the model.

It is a bit of a design question where to put things, how to separate, if you even want multiprocessing under these circumstances. What you can do and is keep most of the code the same is use the initializer argument of the Pool.

def predict_model(input_data):
    global model
    return model.predict(input_data)


def init_worker():
    global model
    model = ... # Do whatever you have to to init it


def some_func():
    ...
    pool = Pool(num_worker, initializer=init_worker)
    res = pool.map(predict_model, some_list)
    ...

So you when the pool is created every worker runs the init_worker function and has its own model stored as a global variable. Which you can use in the predict_model function you want to execute via map.

No matter what you do if you want to use it with multiprocessing you somehow need to have the model exist in each process and be initialized by the process, since you can't serialize it and distribute it.

huangapple
  • 本文由 发表于 2023年6月29日 09:50:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76577612.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定