特征处理器管道的计划执行和基于事件的执行 - Amazon SageMaker

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

特征处理器管道的计划执行和基于事件的执行

可以将 Amazon F SageMaker eature Store 功能处理管道执行配置为根据预先配置的计划自动和异步启动,也可以根据其他 AWS 服务事件的结果自动和异步启动。例如,您可以计划在每个月的第一天执行特征处理管道,或者将两个管道链接在一起,以便在源管道执行完成后自动执行目标管道。

基于计划的执行

功能处理器SDK通过集成 Amazon S EventBridge cheduler 提供定期运行的特性处理器管道。scheduleAPI可以使用ScheduleExpression参数使用、或cron表达式来指定计划 atrate,其表达式与 Amazon 支持的表达式相同 EventBridge。从语义上讲,调度API是一个 upsert 操作,因为它会更新已经存在的计划;否则,它会创建它。有关 EventBridge 表达式和示例的更多信息,请参阅《日程 EventBridge 安排 EventBridge 器用户指南》中的日程安排类型

以下示例使用特征处理器 scheduleAPI,使用atrate、和cron表达式。

from sagemaker.feature_store.feature_processor import schedule pipeline_name='feature-processor-pipeline' event_bridge_schedule_arn = schedule( pipeline_name=pipeline_name, schedule_expression="at(2020-11-30T00:00:00)" ) event_bridge_schedule_arn = schedule( pipeline_name=pipeline_name, schedule_expression="rate(24 hours)" ) event_bridge_schedule_arn = schedule( pipeline_name=pipeline_name, schedule_expression="cron(0 0-23/1 ? * * 2023-2024)" )

中日期和时间输入的默认时区scheduleAPI为。UTC有关 EventBridge 调度程序调度表达式的更多信息,请参阅《 EventBridge 调度器API参考》文档ScheduleExpression中的。

计划的特征处理器管道执行为您的转换函数提供了计划执行时间,可用作基于日期范围的输入的幂等性令牌或固定参考点。要禁用(即暂停)或重新启用计划,请分别使用scheduleAPI带‘DISABLED’‘ENABLED’state参数。

有关特征处理器的信息,请参阅功能处理器SDK数据源

基于事件的执行

可以将特征处理管道配置为在 AWS 事件发生时自动执行。特征处理SDK提供了一个接受源事件列表和目标管道的put_trigger函数。源事件必须是 FeatureProcessorPipelineEvent(用于指定管道和执行状态事件)的实例。

put_trigger函数配置 Amazon EventBridge 规则和目标以路由事件,并允许您指定 EventBridge 事件模式以响应任何 AWS 事件。有关这些概念的信息,请参阅 Amazon EventBridge 规则目标事件模式

可以启用或禁用触发器。 EventBridge 将使用role_arn参数中提供的角色启动目标管道执行put_triggerAPI。如果在 Amazon SageMaker Studio SDK Classic 或笔记本环境中使用,则默认使用执行角色。有关如何获取执行角色的信息,请参阅获取你的执行角色

以下示例将:

  • 使用的 Pip SageMaker eline to_pipelineAPI,它接收您的目标管道名称 (target-pipeline) 和您的转换函数 (transform)。有关您的特征处理器和转换函数的信息,请参阅功能处理器SDK数据源

  • 使用触发器 put_triggerAPI,FeatureProcessorPipelineEvent用于接收事件和您的目标管道名称 (target-pipeline)。

    FeatureProcessorPipelineEvent 定义了源管道 (source-pipeline) 的状态变为 Succeeded 时的触发器。有关特征处理器管道事件函数的信息,请参阅 Feature Store 阅读文档中的 FeatureProcessorPipelineEvent

from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent to_pipeline(pipeline_name="target-pipeline", step=transform) put_trigger( source_pipeline_events=[ FeatureProcessorPipelineEvent( pipeline_name="source-pipeline", status=["Succeeded"] ) ], target_pipeline="target-pipeline" )

有关使用基于事件的触发器为特征处理器管道创建连续执行和自动重试的示例,请参阅使用基于事件的触发器进行连续执行和自动重试

有关使用基于事件的触发器创建连续流式处理 和使用基于事件的触发器自动重试的示例,请参阅流式处理自定义数据源示例