调用远程函数 - Amazon SageMaker

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

调用远程函数

要在 @remote 装饰器中调用函数,请使用以下任一方法:

如果您使用 @remote 装饰器方法调用函数,则训练作业将等待函数完成后再开始新任务。但是,如果您使用 RemoteExecutorAPI,则可以并行运行多个作业。以下部分说明了这两种调用函数的方式。

使用 @remote 装饰器调用函数

你可以使用 @remote 装饰器来注释一个函数。 SageMaker 会将装饰器内部的代码转换为 SageMaker 训练作业。之后,训练作业将在装饰器内部调用该函数并等待作业完成。以下代码示例演示如何导入所需的库、启动 SageMaker实例以及如何使用 @remote 装饰器对矩阵乘法进行注释。

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

装饰器的定义如下所示。

def remote( *, **kwarg): ...

当你调用装饰过的函数时, SageMaker Python 会将错误引发的所有异常SDK加载到本地内存中。在以下代码示例中,对 divide 函数的首次调用成功完成,并将结果加载到本地内存中。在第二次调用 divide 函数时,代码返回一个错误,并将该错误加载到本地内存中。

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
注意

装饰函数作为远程作业运行。如果线程中断,则底层作业将不会停止。

如何更改局部变量的值

装饰器函数在远程计算机上运行。在装饰函数中更改非局部变量或输入参数将不会更改本地值。

在以下代码示例中,列表和字典已附加到装饰器函数中。调用装饰器函数时,此情况不会改变。

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

要更改在装饰器函数内部声明的局部变量的值,请从函数返回该变量。以下代码示例说明,在从函数返回局部变量时,该变量的值会发生变化。

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

数据序列化和反序列化

当您调用远程函数时, SageMaker 会在输入和输出阶段自动序列化您的函数参数。使用 c loud pickle 对函数参数和返回值进行序列化。 SageMaker 支持序列化以下 Python 对象和函数。

  • 内置 Python 对象,包括字典、列表、浮点数、整数、字符串、布尔值和元组

  • Numpy 数组

  • Pandas Dataframes

  • Scikit-learn 数据集和估算器

  • PyTorch 模型

  • TensorFlow 模型

  • 的助推器等级 XGBoost

可使用以下各项,但有一些限制。

  • Dask DataFrames

  • XGBoostDmatrix 类

  • TensorFlow 数据集和子类

  • PyTorch 模型

以下部分包含使用前面的 Python 类的最佳实践,但在远程函数中存在一些限制,以及有关序列化数据的 SageMaker 存储位置以及如何管理对这些数据的访问权限的信息。

有关能够有限地支持远程数据序列化的 Python 类的最佳实践

您可以使用此部分中列出的 Python 类,但有一些限制。后续部分将讨论有关如何使用以下 Python 类的最佳实践。

  • Dask DataFrames

  • 这XGBoostDMatric堂课

  • TensorFlow 数据集和子类

  • PyTorch 模型

Dask 是一个用于 Python 中的并行计算的开源库。此部分说明了以下内容。

  • 如何 DataFrame 将 Dask 传递给你的远程函数

  • 如何将汇总统计数据从 Dask DataFrame 转换为 Pandas DataFrame

如何 DataFrame 将 Dask 传递给你的远程函数

Dask DataFrames 通常用于处理大型数据集,因为它们可以容纳需要比可用内存更多的数据集。这是因为 Dask DataFrame 不会将您的本地数据加载到内存中。如果您将 Dask DataFrame 作为函数参数传递给远程函数,Dask 可能会传递对本地磁盘或云存储中数据的引用,而不是数据本身。以下代码显示了在远程函数中传递一个 Dask DataFrame 的示例,该函数将在空 DataFrame函数上运行。

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

只有当你使用时,Dask 才会将 Dask 中的数据加载 DataFrame 到内存中。 DataFrame 如果要在远程函数中使用 Dask DataFrame ,请提供数据的路径。之后,Dask 将直接从您在代码运行时指定的数据路径中读取数据集。

以下代码示例显示了如何在远程函数clean中使用 Dask DataFrame。在代码示例中,raw_data_path传递给 clean 而不是 Dask DataFrame。在代码运行时,直接从 raw_data_path 中指定的 Amazon S3 存储桶的位置读取数据集。然后,该persist函数将数据集保存在内存中以方便后续random_split函数,并使用 Dask DataFrame API 函数将数据集写回 S3 存储桶中的输出数据路径。

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
如何将汇总统计数据从 Dask DataFrame 转换为 Pandas DataFrame

DataFrame 通过调用以下示例代码所示compute的方法, DataFrame 可以将来自 Dask 的汇总统计数据转换为 Pandas。在示例中,S3 存储桶包含一个无法放入内存或 Pandas 数据框的大型 Dask DataFrame 。在以下示例中,远程函数扫描数据集,并将 DataFrame包含输出统计信息的 Dask 返回describe到 Pandas DataFrame。

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrix是用于加载数据的内部数据结构。XGBoost不能为了在计算会话之间轻松移动而对DMatrix对象进行封存。直接传递DMatrix实例将失败,并显示为SerializationError

如何将数据对象传递给远程函数并使用它进行训练 XGBoost

要将 Pandas DataFrame 转换为DMatrix实例并在远程函数中使用它进行训练,请将其直接传递给远程函数,如以下代码示例所示。

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow 数据集和子类是训练期间 TensorFlow 用来加载数据的内部对象。 TensorFlow 不能为了在计算会话之间轻松移动而对数据集和子类进行封存。直接传递 Tensorflow 数据集或子类将失败,并显示 SerializationError。使用 Tensorflow I/O APIs 从存储中加载数据,如以下代码示例所示。

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch 模型是可序列化的,可以在本地环境和远程函数之间传递。如果您的本地环境和远程环境具有不同的设备类型,例如(GPUs和CPUs),则无法将经过训练的模型返回到本地环境。例如,如果以下代码是在本地环境中开发的,GPUs但没有但在带的实例中运行GPUs,则直接返回经过训练的模型将导致DeserializationError

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

要将在一个GPU环境中训练的模型返回到仅包含CPU功能的模型,请APIs直接使用 PyTorch 模型 I/O,如下面的代码示例所示。

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

您的序列化数据 SageMaker 存储在哪里

当您调用远程函数时, SageMaker 会在输入和输出阶段自动序列化您的函数参数和返回值。此序列化数据存储在 S3 存储桶的根目录下。您可以在配置文件中指定根目录 <s3_root_uri>。这将自动为您生成参数 job_name

在根目录下 SageMaker 创建一个<job_name>文件夹,其中包含您当前的工作目录、序列化函数、序列化函数的参数、结果以及调用序列化函数时出现的任何异常。

<job_name> 下,目录 workdir 包含当前工作目录的压缩存档。压缩存档包括工作目录中的所有 Python 文件和 requirements.txt 文件,该文件指定运行 Remote 函数所需的任何依赖项。

以下是您在配置文件中指定的 S3 存储桶下的文件夹结构示例。

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

您在 S3 存储桶中指定的根目录不适用于长期存储。序列化数据与序列化期间使用的 Python 版本和机器学习 (ML) 框架版本紧密相关。如果升级 Python 版本或机器学习框架,则可能无法使用序列化数据。可以改为执行以下操作。

  • 以与 Python 版本和机器学习框架无关的格式存储模型和模型构件。

  • 如果您升级 Python 或机器学习框架,请访问长期存储中的模型结果。

重要

要在指定时长后删除序列化数据,请在 S3 存储桶上设置生命周期配置

注意

使用 Python p ickle 模块序列化的文件可能不如其他数据格式(包括 Parque CSV t 和. JSON 请小心加载来自未知来源的经过 pickle 处理的文件。

有关 Remote 函数的配置文件中应包含的内容的更多信息,请参阅配置文件

对序列化数据的访问权限

管理员可以为序列化数据提供设置,包括其位置和配置文件中的任何加密设置。默认情况下,序列化数据使用 AWS Key Management Service (AWS KMS) 密钥加密。管理员也可以使用存储桶策略限制对配置文件中指定的根目录的访问权限。可以跨项目和作业共享并使用配置文件。有关更多信息,请参阅配置文件

使用调RemoteExecutorAPI用函数

您可以使用RemoteExecutorAPI来调用函数。 SageMaker Python SDK 会将RemoteExecutor调用中的代码转换为 SageMaker训练作业。之后,训练作业将以异步操作的形式调用该函数并返回 future。如果使用 RemoteExecutorAPI,则可以并行运行多个训练作业。有关 Python 中的 future 的更多信息,请参阅 Futures

以下代码示例演示如何导入所需的库、定义函数、启动 SageMaker 实例,以及API如何使用提交并行运行2作业的请求。

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

RemoteExecutor 类是 concurrent.futures.Executor 库的实现。

以下代码示例说明如何定义一个函数并使用 RemoteExecutorAPI 调用该函数。在此示例中,RemoteExecutor 将提交所有 4 作业,但仅并行提交 2。最后两个作业将重用集群,且开销最小。

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

max_parallel_job 参数仅用作速率限制机制,而不会优化计算资源分配。在上一个代码示例中,在提交任何作业之前,RemoteExecutor 不会为两个并行作业预留计算资源。有关 @remote 装饰器的 max_parallel_job 或其他参数的更多信息,请参阅 Remote 函数类和方法规范

未来的课堂 RemoteExecutor API

Future 类是一个公共类,它表示异步调用训练作业时的返回函数。Future 类实现了 concurrent.futures.Future 类。此类可用于对底层作业进行操作并将数据加载到内存中。