如何在Sagemaker处理作业中并行执行for循环?

huangapple go评论64阅读模式
英文:

How to parallel for loop in Sagemaker Processing job

问题

我正在Sagemaker处理作业上运行Python代码,具体来说是SKLearnProcessor。代码运行一个包含200次循环的for循环(每次迭代都是独立的),每次需要20分钟。

例如:script.py

for i in list:
   run_function(i)

我从笔记本启动作业:

sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1", role=role,
    instance_type="ml.m5.4xlarge", instance_count=1,
    sagemaker_session=Session()
)

out_path = 's3://' + os.path.join(bucket, prefix, 'outpath')

sklearn_processor.run(
    code="script.py",
    outputs=[
        ProcessingOutput(output_name="load_training_data",
                         source='/opt/ml/processing/output}',
                         destination=out_path),
    ],
    arguments=["--some-args", "args"]
)

我想并行运行这段代码,并使Sagemaker处理作业以最佳容量运行尽可能多的并发作业。如何实现这一目标?

英文:

I'm running a python code on Sagemaker Processing job, specifically SKLearnProcessor. The code run a for-loop for 200 times (each iteration is independent), each time takes 20 minutes.
for example: script.py

for i in list:
   run_function(i)

I'm kicking off the job from a notebook:

sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1", role=role,
    instance_type="ml.m5.4xlarge", instance_count=1,
    sagemaker_session = Session()
)

out_path = 's3://' + os.path.join(bucket, prefix,'outpath')

sklearn_processor.run(
    code="script.py",
    outputs=[
        ProcessingOutput(output_name="load_training_data",
                         source = f'/opt/ml/processing/output}',
                         destination = out_path),
    ],
    arguments=["--some-args", "args"]
)

I want to parallel this code and make the Sagemaker processing job use it best capacity to run as many concurrent jobs as possible.
How can I do that

答案1

得分: 2

有基本上有3种路径可供选择,具体取决于上下文。

并行执行函数

这个解决方案与SageMaker无关。 它适用于任何Python脚本,无论生态系统如何,只要您有必要的资源来并行执行任务。

根据您的软件需求,您需要确定是并行多线程还是多进程。这个问题可能会澄清在这方面的一些疑虑:Multiprocessing vs. Threading Python

以下是如何并行执行的简单示例:

from multiprocessing import Pool
import os

POOL_SIZE = os.cpu_count()

your_list = [...]

def run_function(i):
    # ...
    return your_result


if __name__ == '__main__':
    with Pool(POOL_SIZE) as pool:
        print(pool.map(run_function, your_list))

将输入数据分成多个实例

这个解决方案取决于数据的数量和大小。如果它们彼此完全独立且具有相当大的大小,那么将数据分成多个实例可能是有意义的。这样,执行速度将更快,还可能根据选择的初始较大实例减少成本。

在您的情况下,明显应该设置instance_count参数,如文档所述:

instance_count(int或PipelineVariable) - 要运行Processing作业的实例数量。默认为1。

这应该与ProcessingInput的分割结合使用。

附言:如果数据可以在脚本执行之前检索,那么这种方法是有意义的。如果数据是在内部生成的,生成逻辑必须更改为支持多实例。

综合方法

毫无疑问,可以将前两种方法结合起来,即创建一个脚本,将函数在列表上并行执行并拥有多个并行实例。

使用示例可能是处理多个CSV文件。如果有100个CSV文件,我们可以决定实例化5个实例,以每个实例传递20个文件。在每个实例中,可以决定并行读取和/或处理CSV文件和/或相关函数中的行。

要采用这种方法,必须仔细监控是否真正为系统带来改进,而不是浪费资源。

英文:

There are basically 3 paths you can take, depending on the context.

Parallelising function execution

This solution has nothing to do with SageMaker. It is applicable to any python script, regardless of the ecosystem, as long as you have the necessary resources to parallelise a task.

Based on the needs of your software, you have to work out whether to parallelise multi-thread or multi-process. This question may clarify some doubts in this regard: Multiprocessing vs. Threading Python

Here is a simple example on how to parallelise:

from multiprocessing import Pool
import os

POOL_SIZE = os.cpu_count()

your_list = [...]

def run_function(i):
    # ...
    return your_result


if __name__ == '__main__':
    with Pool(POOL_SIZE) as pool:
        print(pool.map(run_function, your_list))

Splitting input data into multiple instances

This solution is dependent on the quantity and size of the data. If they are completely independent of each other and have a considerable size, it may make sense to split the data over several instances. This way, execution will be faster and there may also be a reduction in costs based on the instances chosen over the initial larger instance.

It is clear in your case it is the instance_count parameter to set, as the documentation says:

> instance_count (int or PipelineVariable) - The number of instances to
> run the Processing job with. Defaults to 1.

This should be combined with the ProcessingInput split.

P.S.: This approach makes sense to use if the data can be retrieved before the script is executed. If the data is generated internally, the generation logic must be changed so that it is multi-instance.

Combined approach

One can undoubtedly combine the two previous approaches, i.e. create a script that parallelises the execution of a function on a list and have several parallel instances.

An example of use could be to process a number of csvs. If there are 100 csvs, we may decide to instantiate 5 instances so as to pass 20 files per instance. And in each instance decide to parallelise the reading and/or processing of the csvs and/or rows in the relevant functions.

To pursue such an approach, one must monitor well whether one is really bringing improvement to the system rather than wasting resources.

huangapple
  • 本文由 发表于 2023年2月6日 08:15:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/75356400.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定