Scheduled and event based executions for Feature Processor pipelines
Amazon SageMaker Feature Store Feature Processing pipeline executions can be configured to start automatically and asynchronously based on a preconfigured schedule or as a result of another AWS service event. For example, you can schedule Feature Processing pipelines to execute on the first of every month or chain two pipelines together so that a target pipeline is executed automatically after a source pipeline execution completes.
Schedule based executions
The Feature Processor SDK provides a schedule
at
, rate
, or cron
expression using the ScheduleExpression
parameter with the same expressions
supported by Amazon EventBridge. The schedule API is semantically an upsert operation in that it
updates the schedule if it already exists; otherwise, it creates it. For more
information on the EventBridge expressions and examples, see Schedule types on EventBridge Scheduler
in the EventBridge Scheduler User Guide.
The following examples use the Feature Processor schedule
at
,
rate
, and cron
expressions.
from sagemaker.feature_store.feature_processor import schedule pipeline_name='feature-processor-pipeline' event_bridge_schedule_arn = schedule( pipeline_name=pipeline_name, schedule_expression="at(2020-11-30T00:00:00)" ) event_bridge_schedule_arn = schedule( pipeline_name=pipeline_name, schedule_expression="rate(24 hours)" ) event_bridge_schedule_arn = schedule( pipeline_name=pipeline_name, schedule_expression="cron(0 0-23/1 ? * * 2023-2024)" )
The default timezone for date and time inputs in the schedule
API are in
UTC. For more information about EventBridge Scheduler schedule expressions, see ScheduleExpression
in the EventBridge Scheduler API Reference
documentation.
Scheduled Feature Processor pipeline executions provide your transformation function
with the scheduled execution time, to be used as an idempotency token or a fixed
reference point for date range–based inputs. To disable (i.e., pause) or re-enable a
schedule, use the state
parameter of the schedule
‘DISABLED’
or
‘ENABLED’
, respectively.
For information about Feature Processor, see Feature Processor SDK data sources.
Event based executions
A Feature Processing pipeline can be configured to automatically execute when an AWS
event occurs. The Feature Processing SDK provides a put_trigger
FeatureProcessorPipelineEvent
The put_trigger
function configures an Amazon EventBridge rule and target to route
events and allows you to specify an EventBridge event pattern to respond to any AWS event.
For information on these concepts, see Amazon EventBridge rules, targets, and event
patterns.
Triggers can be enabled or disabled. EventBridge will start a target pipeline execution using
the role provided in the role_arn
parameter of the put_trigger
API. The execution role is used by default if the SDK is used in a Amazon SageMaker Studio Classic or
Notebook environment. For information on how to get your execution role, see Get your execution role.
The following example sets up:
-
A SageMaker Pipeline using the
to_pipeline
API, that takes in your target pipeline name (target-pipeline
) and your transformation function (transform
). For information on your Feature Processor and transform function, see Feature Processor SDK data sources. -
A trigger using the
put_trigger
API, that takes inFeatureProcessorPipelineEvent
for the event and your target pipeline name (target-pipeline
).The
FeatureProcessorPipelineEvent
defines the trigger for when the status of your source pipeline (source-pipeline
) becomesSucceeded
. For information on the Feature Processor Pipeline event function, seeFeatureProcessorPipelineEvent
in the Feature Store Read the Docs.
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent to_pipeline(pipeline_name="target-pipeline", step=transform) put_trigger( source_pipeline_events=[ FeatureProcessorPipelineEvent( pipeline_name="source-pipeline", status=["Succeeded"] ) ], target_pipeline="target-pipeline" )
For an example of using event based triggers to create continuous executions and automatic retries for your Feature Processor pipeline, see Continuous executions and automatic retries using event based triggers.
For an example of using event based triggers to create continuous streaming and automatic retries using event based triggers, see Streaming custom data source examples.