英文:
How to use python package multiprocessing in metaflow?
问题
I am trying to run multiprocessing package in metaflow, in which fasttext model is running to predict some results. Here is my code:
我正在尝试在 Metaflow 中运行 multiprocessing 包,其中包含运行 fasttext 模型来预测一些结果的代码:
import pickle
import os
import boto3
import multiprocessing
from functools import partial
from multiprocessing import Manager
import time
from metaflow import batch, conda, FlowSpec, step, conda_base, Flow, Step
from util import pip_install_module
@conda_base(libraries={'scikit-learn': '0.23.1', 'numpy': '1.22.4', 'pandas': '1.5.1', 'fasttext': '0.9.2'})
class BatchInference(FlowSpec):
pip_install_module("python-dev-tools", "2023.3.24")
@batch(cpu=10, memory=120000)
@step
def start(self):
import pandas as pd
import numpy as np
self.df_input = ['af', 'febrt', 'fefv fd we', 'fe hth dw hytht', ' dfegrtg hg df reg']
self.next(self.predict)
@batch(cpu=10, memory=120000)
@step
def predict(self):
import fasttext
fasttext.FastText.eprint = lambda x: None
print('model reading started')
# download the fasttext model from aws s3.
manager = Manager()
model_abn = manager.list([fasttext.load_model('fasttext_model.bin')])
print('model reading finished')
time_start = time.time()
pool = multiprocessing.Pool()
# results = pool.map(self.predict_abn, self.df_input)
results = pool.map(partial(self.predict_abn, model_abn=model_abn), self.df_input)
pool.close()
pool.join()
time_end = time.time()
print(f"Time elapsed: {round(time_end - time_start, 2)}s")
self.next(self.end)
@step
def end(self):
print("Predictions evaluated successfully")
def predict_abn(self, text, model_abn):
model = model_abn[0]
return model.predict(text, k=1)
if name == 'main':
BatchInference()
The error message is:
错误消息是:
TypeError: cannot pickle 'fasttext_pybind.fasttext' object
我被告知这是因为 fasttext 模型无法序列化。我还尝试了其他方法,例如:
self.model_bytes_abn = pickle.dumps(model_abn)
来将模型转换为字节类型。但仍然不起作用。
Plz tell me what is wrong about the code and how to fix it?
请告诉我代码中的问题在哪里以及如何修复它?
英文:
I am trying to run multiprocessing package in metaflow, in which fasttext model is running to predict some results. Here is my code:
import pickle
import os
import boto3
import multiprocessing
from functools import partial
from multiprocessing import Manager
import time
import pickle
from metaflow import batch, conda, FlowSpec, step, conda_base, Flow, Step
from util import pip_install_module
@conda_base(libraries={'scikit-learn': '0.23.1', 'numpy': '1.22.4', 'pandas': '1.5.1', 'fasttext': '0.9.2'})
class BatchInference(FlowSpec):
pip_install_module("python-dev-tools", "2023.3.24")
@batch(cpu=10, memory=120000)
@step
def start(self):
import pandas as pd
import numpy as np
self.df_input = ['af', 'febrt' ,'fefv fd we' ,'fe hth dw hytht' ,' dfegrtg hg df reg']
self.next(self.predict)
@batch(cpu=10, memory=120000)
@step
def predict(self):
import fasttext
fasttext.FastText.eprint = lambda x: None
print('model reading started')
#download the fasttext model from aws s3.
manager = Manager()
model_abn = manager.list([fasttext.load_model('fasttext_model.bin')])
print('model reading finished')
time_start = time.time()
pool = multiprocessing.Pool()
#results = pool.map(self.predict_abn, self.df_input)
results = pool.map(partial(self.predict_abn, model_abn=model_abn), self.df_input)
pool.close()
pool.join()
time_end = time.time()
print(f"Time elapsed: {round(time_end - time_start, 2)}s")
self.next(self.end)
@step
def end(self):
print("Predictions evaluated successfully")
def predict_abn(self,text, model_abn):
model = model_abn[0]
return model.predict(text,k=1)
if __name__ == '__main__':
BatchInference()
The error message is:
TypeError: cannot pickle 'fasttext_pybind.fasttext' object
I was told this is because fasttext model cannot be serialised. And I also try other message, for example:
self.model_bytes_abn = pickle.dumps(model_abn)
to transfer the model to bytes type. But still does not work.
Plz tell me what is wrong about the code and how to fix it?
答案1
得分: 2
由于错误提示中提到,fasttext的pybind无法被pickle。
TypeError: cannot pickle 'fasttext_pybind.fasttext' object
这是使用pybindings时常见的问题,通常无法被pickle。
因此,你的model_abn
是一些来自pybind库的对象的列表,因此无法被pickle。通常情况下,你可以通过在被多进程调用的函数中初始化所有不能序列化的内容来解决这个问题。这样每个进程都会创建自己的对象,无需进行pickle。
在你的情况下,这可能不可行,因为多进程执行的操作只是调用模型。
关于在哪里放置代码、如何分离代码,甚至是否希望在这种情况下使用多进程,这是一个设计问题。你可以在Pool
的initializer参数中保持大部分代码不变。
def predict_model(input_data):
global model
return model.predict(input_data)
def init_worker():
global model
model = ... # 进行初始化操作
def some_func():
...
pool = Pool(num_worker, initializer=init_worker)
res = pool.map(predict_model, some_list)
...
因此,当创建池时,每个工作进程都会运行init_worker
函数,并将其自己的模型存储为全局变量。你可以在通过map
执行的predict_model
函数中使用这个模型。
无论你采取什么措施,如果要在多进程中使用它,你都需要确保模型存在于每个进程中,并由该进程初始化,因为你无法将其序列化和分发。
英文:
As your error says the pybind of fasttext can't be pickled
TypeError: cannot pickle 'fasttext_pybind.fasttext' object
This is a general problem when using pybindings they are normally not able to be pickled.
So your model_abn
is a list of some objects from the pybind lib and thus can't be pickled. In general you can solve this by initalizing whatever you need which is not serializable in the function that is called by the multiprocessing. So that every process creates their own objects and nothing has to be pickled.
In your case this is probably not feasible since the thing that is done by multiprocessing is just a simple call executing the model.
It is a bit of a design question where to put things, how to separate, if you even want multiprocessing under these circumstances. What you can do and is keep most of the code the same is use the initializer argument of the Pool
.
def predict_model(input_data):
global model
return model.predict(input_data)
def init_worker():
global model
model = ... # Do whatever you have to to init it
def some_func():
...
pool = Pool(num_worker, initializer=init_worker)
res = pool.map(predict_model, some_list)
...
So you when the pool is created every worker runs the init_worker
function and has its own model stored as a global variable. Which you can use in the predict_model
function you want to execute via map
.
No matter what you do if you want to use it with multiprocessing you somehow need to have the model exist in each process and be initialized by the process, since you can't serialize it and distribute it.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论