Tasks for AWS Step Functions
AWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. You build applications from individual components that each perform a discrete function, or task, allowing you to scale and change applications quickly.
A Task state represents a single unit of work performed by a state machine. All work in your state machine is performed by tasks. This module contains a collection of classes that allow you to call various AWS services from your Step Functions state machine.
Be sure to familiarize yourself with the aws-stepfunctions
module documentation first.
This module is part of the AWS Cloud Development Kit project.
Table Of Contents
Paths
Learn more about input and output processing in Step Functions here
Evaluate Expression
Use the EvaluateExpression
to perform simple operations referencing state paths. The
expression
referenced in the task will be evaluated in a Lambda function
(eval()
). This allows you to not have to write Lambda code for simple operations.
Example: convert a wait time from milliseconds to seconds, concat this in a message and wait:
convert_to_seconds = tasks.EvaluateExpression(self, "Convert to seconds",
expression="$.waitMilliseconds / 1000",
result_path="$.waitSeconds"
)
create_message = tasks.EvaluateExpression(self, "Create message",
# Note: this is a string inside a string.
expression="`Now waiting ${$.waitSeconds} seconds...`",
runtime=lambda_.Runtime.NODEJS_LATEST,
result_path="$.message"
)
publish_message = tasks.SnsPublish(self, "Publish message",
topic=sns.Topic(self, "cool-topic"),
message=sfn.TaskInput.from_json_path_at("$.message"),
result_path="$.sns"
)
wait = sfn.Wait(self, "Wait",
time=sfn.WaitTime.seconds_path("$.waitSeconds")
)
sfn.StateMachine(self, "StateMachine",
definition=convert_to_seconds.next(create_message).next(publish_message).next(wait)
)
The EvaluateExpression
supports a runtime
prop to specify the Lambda
runtime to use to evaluate the expression. Currently, only runtimes
of the Node.js family are supported.
API Gateway
Step Functions supports API Gateway through the service integration pattern.
HTTP APIs are designed for low-latency, cost-effective integrations with AWS services, including AWS Lambda, and HTTP endpoints. HTTP APIs support OIDC and OAuth 2.0 authorization, and come with built-in support for CORS and automatic deployments. Previous-generation REST APIs currently offer more features. More details can be found here.
Call REST API Endpoint
The CallApiGatewayRestApiEndpoint
calls the REST API endpoint.
import aws_cdk.aws_apigateway as apigateway
rest_api = apigateway.RestApi(self, "MyRestApi")
invoke_task = tasks.CallApiGatewayRestApiEndpoint(self, "Call REST API",
api=rest_api,
stage_name="prod",
method=tasks.HttpMethod.GET
)
Be aware that the header values must be arrays. When passing the Task Token
in the headers field WAIT_FOR_TASK_TOKEN
integration, use
JsonPath.array()
to wrap the token in an array:
import aws_cdk.aws_apigateway as apigateway
# api: apigateway.RestApi
tasks.CallApiGatewayRestApiEndpoint(self, "Endpoint",
api=api,
stage_name="Stage",
method=tasks.HttpMethod.PUT,
integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
headers=sfn.TaskInput.from_object({
"TaskToken": sfn.JsonPath.array(sfn.JsonPath.task_token)
})
)
Call HTTP API Endpoint
The CallApiGatewayHttpApiEndpoint
calls the HTTP API endpoint.
import aws_cdk.aws_apigatewayv2 as apigatewayv2
http_api = apigatewayv2.HttpApi(self, "MyHttpApi")
invoke_task = tasks.CallApiGatewayHttpApiEndpoint(self, "Call HTTP API",
api_id=http_api.api_id,
api_stack=Stack.of(http_api),
method=tasks.HttpMethod.GET
)
AWS SDK
Step Functions supports calling AWS service’s API actions through the service integration pattern.
You can use Step Functions’ AWS SDK integrations to call any of the over two hundred AWS services directly from your state machine, giving you access to over nine thousand API actions.
# my_bucket: s3.Bucket
get_object = tasks.CallAwsService(self, "GetObject",
service="s3",
action="getObject",
parameters={
"Bucket": my_bucket.bucket_name,
"Key": sfn.JsonPath.string_at("$.key")
},
iam_resources=[my_bucket.arn_for_objects("*")]
)
Use camelCase for actions and PascalCase for parameter names.
The task automatically adds an IAM statement to the state machine role’s policy based on the
service and action called. The resources for this statement must be specified in iamResources
.
Use the iamAction
prop to manually specify the IAM action name in the case where the IAM
action name does not match with the API service/action name:
list_buckets = tasks.CallAwsService(self, "ListBuckets",
service="s3",
action="listBuckets",
iam_resources=["*"],
iam_action="s3:ListAllMyBuckets"
)
Use the additionalIamStatements
prop to pass additional IAM statements that will be added to the
state machine role’s policy. Use it in the case where the call requires more than a single statement
to be executed:
detect_labels = tasks.CallAwsService(self, "DetectLabels",
service="rekognition",
action="detectLabels",
iam_resources=["*"],
additional_iam_statements=[
iam.PolicyStatement(
actions=["s3:getObject"],
resources=["arn:aws:s3:::amzn-s3-demo-bucket/*"]
)
]
)
Cross-region AWS API call
You can call AWS API in a different region from your state machine by using the CallAwsServiceCrossRegion
construct. In addition to the properties for CallAwsService
construct, you have to set region
property to call the API.
# my_bucket: s3.Bucket
get_object = tasks.CallAwsServiceCrossRegion(self, "GetObject",
region="ap-northeast-1",
service="s3",
action="getObject",
parameters={
"Bucket": my_bucket.bucket_name,
"Key": sfn.JsonPath.string_at("$.key")
},
iam_resources=[my_bucket.arn_for_objects("*")]
)
Other properties such as additionalIamStatements
can be used in the same way as the CallAwsService
task.
Athena
Step Functions supports Athena through the service integration pattern.
StartQueryExecution
The StartQueryExecution API runs the SQL query statement.
start_query_execution_job = tasks.AthenaStartQueryExecution(self, "Start Athena Query",
query_string=sfn.JsonPath.string_at("$.queryString"),
query_execution_context=tasks.QueryExecutionContext(
database_name="mydatabase"
),
result_configuration=tasks.ResultConfiguration(
encryption_configuration=tasks.EncryptionConfiguration(
encryption_option=tasks.EncryptionOption.S3_MANAGED
),
output_location=s3.Location(
bucket_name="amzn-s3-demo-bucket",
object_key="folder"
)
),
execution_parameters=["param1", "param2"]
)
You can reuse the query results by setting the resultReuseConfigurationMaxAge
property.
start_query_execution_job = tasks.AthenaStartQueryExecution(self, "Start Athena Query",
query_string=sfn.JsonPath.string_at("$.queryString"),
query_execution_context=tasks.QueryExecutionContext(
database_name="mydatabase"
),
result_configuration=tasks.ResultConfiguration(
encryption_configuration=tasks.EncryptionConfiguration(
encryption_option=tasks.EncryptionOption.S3_MANAGED
),
output_location=s3.Location(
bucket_name="query-results-bucket",
object_key="folder"
)
),
execution_parameters=["param1", "param2"],
result_reuse_configuration_max_age=Duration.minutes(100)
)
GetQueryExecution
The GetQueryExecution API gets information about a single execution of a query.
get_query_execution_job = tasks.AthenaGetQueryExecution(self, "Get Query Execution",
query_execution_id=sfn.JsonPath.string_at("$.QueryExecutionId")
)
GetQueryResults
The GetQueryResults API that streams the results of a single query execution specified by QueryExecutionId from S3.
get_query_results_job = tasks.AthenaGetQueryResults(self, "Get Query Results",
query_execution_id=sfn.JsonPath.string_at("$.QueryExecutionId")
)
StopQueryExecution
The StopQueryExecution API that stops a query execution.
stop_query_execution_job = tasks.AthenaStopQueryExecution(self, "Stop Query Execution",
query_execution_id=sfn.JsonPath.string_at("$.QueryExecutionId")
)
Batch
Step Functions supports Batch through the service integration pattern.
SubmitJob
The SubmitJob API submits an AWS Batch job from a job definition.
import aws_cdk.aws_batch as batch
# batch_job_definition: batch.EcsJobDefinition
# batch_queue: batch.JobQueue
task = tasks.BatchSubmitJob(self, "Submit Job",
job_definition_arn=batch_job_definition.job_definition_arn,
job_name="MyJob",
job_queue_arn=batch_queue.job_queue_arn
)
Bedrock
Step Functions supports Bedrock through the service integration pattern.
InvokeModel
The InvokeModel API invokes the specified Bedrock model to run inference using the input provided. The format of the input body and the response body depend on the model selected.
import aws_cdk.aws_bedrock as bedrock
model = bedrock.FoundationModel.from_foundation_model_id(self, "Model", bedrock.FoundationModelIdentifier.AMAZON_TITAN_TEXT_G1_EXPRESS_V1)
task = tasks.BedrockInvokeModel(self, "Prompt Model",
model=model,
body=sfn.TaskInput.from_object({
"input_text": "Generate a list of five first names.",
"text_generation_config": {
"max_token_count": 100,
"temperature": 1
}
}),
result_selector={
"names": sfn.JsonPath.string_at("$.Body.results[0].outputText")
}
)
Using Input Path for S3 URI
Provide S3 URI as an input or output path to invoke a model
To specify the S3 URI as JSON path to your input or output fields, use props s3InputUri
and s3OutputUri
under BedrockInvokeModelProps and set
feature flag @aws-cdk/aws-stepfunctions-tasks:useNewS3UriParametersForBedrockInvokeModelTask
to true.
If this flag is not enabled, the code will populate the S3Uri using InputPath
and OutputPath
fields, which is not recommended.
import aws_cdk.aws_bedrock as bedrock
model = bedrock.FoundationModel.from_foundation_model_id(self, "Model", bedrock.FoundationModelIdentifier.AMAZON_TITAN_TEXT_G1_EXPRESS_V1)
task = tasks.BedrockInvokeModel(self, "Prompt Model",
model=model,
input=tasks.BedrockInvokeModelInputProps(s3_input_uri=sfn.JsonPath.string_at("$.prompt")),
output=tasks.BedrockInvokeModelOutputProps(s3_output_uri=sfn.JsonPath.string_at("$.prompt"))
)
Using Input Path
Provide S3 URI as an input or output path to invoke a model
Currently, input and output Path provided in the BedrockInvokeModelProps input is defined as S3URI field under task definition of state machine.
To modify the existing behaviour, set @aws-cdk/aws-stepfunctions-tasks:useNewS3UriParametersForBedrockInvokeModelTask
to true.
If this feature flag is enabled, S3URI fields will be generated from other Props(s3InputUri
and s3OutputUri
), and the given inputPath, OutputPath will be rendered as
it is in the JSON task definition.
If the feature flag is set to false
, the behavior will be to populate the S3Uri using the InputPath
and OutputPath
fields, which is not recommended.
import aws_cdk.aws_bedrock as bedrock
model = bedrock.FoundationModel.from_foundation_model_id(self, "Model", bedrock.FoundationModelIdentifier.AMAZON_TITAN_TEXT_G1_EXPRESS_V1)
task = tasks.BedrockInvokeModel(self, "Prompt Model",
model=model,
input_path=sfn.JsonPath.string_at("$.prompt"),
output_path=sfn.JsonPath.string_at("$.prompt")
)
You can apply a guardrail to the invocation by setting guardrail
.
import aws_cdk.aws_bedrock as bedrock
model = bedrock.FoundationModel.from_foundation_model_id(self, "Model", bedrock.FoundationModelIdentifier.AMAZON_TITAN_TEXT_G1_EXPRESS_V1)
task = tasks.BedrockInvokeModel(self, "Prompt Model with guardrail",
model=model,
body=sfn.TaskInput.from_object({
"input_text": "Generate a list of five first names.",
"text_generation_config": {
"max_token_count": 100,
"temperature": 1
}
}),
guardrail=tasks.Guardrail.enable("guardrailId", 1),
result_selector={
"names": sfn.JsonPath.string_at("$.Body.results[0].outputText")
}
)
CodeBuild
Step Functions supports CodeBuild through the service integration pattern.
StartBuild
StartBuild starts a CodeBuild Project by Project Name.
import aws_cdk.aws_codebuild as codebuild
codebuild_project = codebuild.Project(self, "Project",
project_name="MyTestProject",
build_spec=codebuild.BuildSpec.from_object({
"version": "0.2",
"phases": {
"build": {
"commands": ["echo \"Hello, CodeBuild!\""
]
}
}
})
)
task = tasks.CodeBuildStartBuild(self, "Task",
project=codebuild_project,
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
environment_variables_override={
"ZONE": codebuild.BuildEnvironmentVariable(
type=codebuild.BuildEnvironmentVariableType.PLAINTEXT,
value=sfn.JsonPath.string_at("$.envVariables.zone")
)
}
)
StartBuildBatch
StartBuildBatch starts a batch CodeBuild for a project by project name. It is necessary to enable the batch build feature in the CodeBuild project.
import aws_cdk.aws_codebuild as codebuild
project = codebuild.Project(self, "Project",
project_name="MyTestProject",
build_spec=codebuild.BuildSpec.from_object_to_yaml({
"version": 0.2,
"batch": {
"build-list": [{
"identifier": "id",
"buildspec": "version: 0.2\nphases:\n build:\n commands:\n - echo \"Hello, from small!\""
}
]
}
})
)
project.enable_batch_builds()
task = tasks.CodeBuildStartBuildBatch(self, "buildBatchTask",
project=project,
integration_pattern=sfn.IntegrationPattern.REQUEST_RESPONSE,
environment_variables_override={
"test": codebuild.BuildEnvironmentVariable(
type=codebuild.BuildEnvironmentVariableType.PLAINTEXT,
value="testValue"
)
}
)
Note: enableBatchBuilds()
will do nothing for imported projects.
If you are using an imported project, you must ensure that the project is already configured for batch builds.
DynamoDB
You can call DynamoDB APIs from a Task
state.
Read more about calling DynamoDB APIs here
GetItem
The GetItem operation returns a set of attributes for the item with the given primary key.
# my_table: dynamodb.Table
tasks.DynamoGetItem(self, "Get Item",
key={"message_id": tasks.DynamoAttributeValue.from_string("message-007")},
table=my_table
)
PutItem
The PutItem operation creates a new item, or replaces an old item with a new item.
# my_table: dynamodb.Table
tasks.DynamoPutItem(self, "PutItem",
item={
"MessageId": tasks.DynamoAttributeValue.from_string("message-007"),
"Text": tasks.DynamoAttributeValue.from_string(sfn.JsonPath.string_at("$.bar")),
"TotalCount": tasks.DynamoAttributeValue.from_number(10)
},
table=my_table
)
DeleteItem
The DeleteItem operation deletes a single item in a table by primary key.
# my_table: dynamodb.Table
tasks.DynamoDeleteItem(self, "DeleteItem",
key={"MessageId": tasks.DynamoAttributeValue.from_string("message-007")},
table=my_table,
result_path=sfn.JsonPath.DISCARD
)
UpdateItem
The UpdateItem operation edits an existing item’s attributes, or adds a new item to the table if it does not already exist.
# my_table: dynamodb.Table
tasks.DynamoUpdateItem(self, "UpdateItem",
key={
"MessageId": tasks.DynamoAttributeValue.from_string("message-007")
},
table=my_table,
expression_attribute_values={
":val": tasks.DynamoAttributeValue.number_from_string(sfn.JsonPath.string_at("$.Item.TotalCount.N")),
":rand": tasks.DynamoAttributeValue.from_number(20)
},
update_expression="SET TotalCount = :val + :rand"
)
ECS
Step Functions supports ECS/Fargate through the service integration pattern.
RunTask
RunTask starts a new task using the specified task definition.
EC2
The EC2 launch type allows you to run your containerized applications on a cluster of Amazon EC2 instances that you manage.
When a task that uses the EC2 launch type is launched, Amazon ECS must determine where to place the task based on the requirements specified in the task definition, such as CPU and memory. Similarly, when you scale down the task count, Amazon ECS must determine which tasks to terminate. You can apply task placement strategies and constraints to customize how Amazon ECS places and terminates tasks. Learn more about task placement
The latest ACTIVE revision of the passed task definition is used for running the task.
The following example runs a job from a task definition on EC2
vpc = ec2.Vpc.from_lookup(self, "Vpc",
is_default=True
)
cluster = ecs.Cluster(self, "Ec2Cluster", vpc=vpc)
cluster.add_capacity("DefaultAutoScalingGroup",
instance_type=ec2.InstanceType("t2.micro"),
vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC)
)
task_definition = ecs.TaskDefinition(self, "TD",
compatibility=ecs.Compatibility.EC2
)
task_definition.add_container("TheContainer",
image=ecs.ContainerImage.from_registry("foo/bar"),
memory_limit_mi_b=256
)
run_task = tasks.EcsRunTask(self, "Run",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=task_definition,
launch_target=tasks.EcsEc2LaunchTarget(
placement_strategies=[
ecs.PlacementStrategy.spread_across_instances(),
ecs.PlacementStrategy.packed_by_cpu(),
ecs.PlacementStrategy.randomly()
],
placement_constraints=[
ecs.PlacementConstraint.member_of("blieptuut")
]
),
propagated_tag_source=ecs.PropagatedTagSource.TASK_DEFINITION
)
Fargate
AWS Fargate is a serverless compute engine for containers that works with Amazon Elastic Container Service (ECS). Fargate makes it easy for you to focus on building your applications. Fargate removes the need to provision and manage servers, lets you specify and pay for resources per application, and improves security through application isolation by design. Learn more about Fargate
The Fargate launch type allows you to run your containerized applications without the need to provision and manage the backend infrastructure. Just register your task definition and Fargate launches the container for you. The latest ACTIVE revision of the passed task definition is used for running the task. Learn more about Fargate Versioning
The following example runs a job from a task definition on Fargate
vpc = ec2.Vpc.from_lookup(self, "Vpc",
is_default=True
)
cluster = ecs.Cluster(self, "FargateCluster", vpc=vpc)
task_definition = ecs.TaskDefinition(self, "TD",
memory_mi_b="512",
cpu="256",
compatibility=ecs.Compatibility.FARGATE
)
container_definition = task_definition.add_container("TheContainer",
image=ecs.ContainerImage.from_registry("foo/bar"),
memory_limit_mi_b=256
)
run_task = tasks.EcsRunTask(self, "RunFargate",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=task_definition,
assign_public_ip=True,
container_overrides=[tasks.ContainerOverride(
container_definition=container_definition,
environment=[tasks.TaskEnvironmentVariable(name="SOME_KEY", value=sfn.JsonPath.string_at("$.SomeKey"))]
)],
launch_target=tasks.EcsFargateLaunchTarget(),
propagated_tag_source=ecs.PropagatedTagSource.TASK_DEFINITION
)
Override CPU and Memory Parameter
By setting the property cpu or memoryMiB, you can override the Fargate or EC2 task instance size at runtime.
see: https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_TaskOverride.html
vpc = ec2.Vpc.from_lookup(self, "Vpc",
is_default=True
)
cluster = ecs.Cluster(self, "ECSCluster", vpc=vpc)
task_definition = ecs.TaskDefinition(self, "TD",
compatibility=ecs.Compatibility.FARGATE,
cpu="256",
memory_mi_b="512"
)
task_definition.add_container("TheContainer",
image=ecs.ContainerImage.from_registry("foo/bar")
)
run_task = tasks.EcsRunTask(self, "Run",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=task_definition,
launch_target=tasks.EcsFargateLaunchTarget(),
cpu="1024",
memory_mi_b="1048"
)
ECS enable Exec
By setting the property enableExecuteCommand
to true
, you can enable the ECS Exec feature for the task for either Fargate or EC2 launch types.
vpc = ec2.Vpc.from_lookup(self, "Vpc",
is_default=True
)
cluster = ecs.Cluster(self, "ECSCluster", vpc=vpc)
task_definition = ecs.TaskDefinition(self, "TD",
compatibility=ecs.Compatibility.EC2
)
task_definition.add_container("TheContainer",
image=ecs.ContainerImage.from_registry("foo/bar"),
memory_limit_mi_b=256
)
run_task = tasks.EcsRunTask(self, "Run",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=task_definition,
launch_target=tasks.EcsEc2LaunchTarget(),
enable_execute_command=True
)
EMR
Step Functions supports Amazon EMR through the service integration pattern. The service integration APIs correspond to Amazon EMR APIs but differ in the parameters that are used.
Read more about the differences when using these service integrations.
Create Cluster
Creates and starts running a cluster (job flow).
Corresponds to the runJobFlow
API in EMR.
cluster_role = iam.Role(self, "ClusterRole",
assumed_by=iam.ServicePrincipal("ec2.amazonaws.com")
)
service_role = iam.Role(self, "ServiceRole",
assumed_by=iam.ServicePrincipal("elasticmapreduce.amazonaws.com")
)
auto_scaling_role = iam.Role(self, "AutoScalingRole",
assumed_by=iam.ServicePrincipal("elasticmapreduce.amazonaws.com")
)
auto_scaling_role.assume_role_policy.add_statements(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
principals=[
iam.ServicePrincipal("application-autoscaling.amazonaws.com")
],
actions=["sts:AssumeRole"
]
))
tasks.EmrCreateCluster(self, "Create Cluster",
instances=tasks.EmrCreateCluster.InstancesConfigProperty(),
cluster_role=cluster_role,
name=sfn.TaskInput.from_json_path_at("$.ClusterName").value,
service_role=service_role,
auto_scaling_role=auto_scaling_role
)
You can use the launch specification for On-Demand and Spot instances in the fleet.
tasks.EmrCreateCluster(self, "OnDemandSpecification",
instances=tasks.EmrCreateCluster.InstancesConfigProperty(
instance_fleets=[tasks.EmrCreateCluster.InstanceFleetConfigProperty(
instance_fleet_type=tasks.EmrCreateCluster.InstanceRoleType.MASTER,
launch_specifications=tasks.EmrCreateCluster.InstanceFleetProvisioningSpecificationsProperty(
on_demand_specification=tasks.EmrCreateCluster.OnDemandProvisioningSpecificationProperty(
allocation_strategy=tasks.EmrCreateCluster.OnDemandAllocationStrategy.LOWEST_PRICE
)
)
)]
),
name="OnDemandCluster",
integration_pattern=sfn.IntegrationPattern.RUN_JOB
)
tasks.EmrCreateCluster(self, "SpotSpecification",
instances=tasks.EmrCreateCluster.InstancesConfigProperty(
instance_fleets=[tasks.EmrCreateCluster.InstanceFleetConfigProperty(
instance_fleet_type=tasks.EmrCreateCluster.InstanceRoleType.MASTER,
launch_specifications=tasks.EmrCreateCluster.InstanceFleetProvisioningSpecificationsProperty(
spot_specification=tasks.EmrCreateCluster.SpotProvisioningSpecificationProperty(
allocation_strategy=tasks.EmrCreateCluster.SpotAllocationStrategy.CAPACITY_OPTIMIZED,
timeout_action=tasks.EmrCreateCluster.SpotTimeoutAction.TERMINATE_CLUSTER,
timeout=Duration.minutes(5)
)
)
)]
),
name="SpotCluster",
integration_pattern=sfn.IntegrationPattern.RUN_JOB
)
If you want to run multiple steps in parallel,
you can specify the stepConcurrencyLevel
property. The concurrency range is between 1
and 256 inclusive, where the default concurrency of 1 means no step concurrency is allowed.
stepConcurrencyLevel
requires the EMR release label to be 5.28.0 or above.
tasks.EmrCreateCluster(self, "Create Cluster",
instances=tasks.EmrCreateCluster.InstancesConfigProperty(),
name=sfn.TaskInput.from_json_path_at("$.ClusterName").value,
step_concurrency_level=10
)
If you want to use an auto-termination policy,
you can specify the autoTerminationPolicyIdleTimeout
property.
Specifies the amount of idle time after which the cluster automatically terminates. You can specify a minimum of 60 seconds and a maximum of 604800 seconds (seven days).
tasks.EmrCreateCluster(self, "Create Cluster",
instances=tasks.EmrCreateCluster.InstancesConfigProperty(),
name="ClusterName",
auto_termination_policy_idle_timeout=Duration.seconds(100)
)
Termination Protection
Locks a cluster (job flow) so the EC2 instances in the cluster cannot be terminated by user intervention, an API call, or a job-flow error.
Corresponds to the setTerminationProtection
API in EMR.
tasks.EmrSetClusterTerminationProtection(self, "Task",
cluster_id="ClusterId",
termination_protected=False
)
Terminate Cluster
Shuts down a cluster (job flow).
Corresponds to the terminateJobFlows
API in EMR.
tasks.EmrTerminateCluster(self, "Task",
cluster_id="ClusterId"
)
Add Step
Adds a new step to a running cluster.
Corresponds to the addJobFlowSteps
API in EMR.
tasks.EmrAddStep(self, "Task",
cluster_id="ClusterId",
name="StepName",
jar="Jar",
action_on_failure=tasks.ActionOnFailure.CONTINUE
)
To specify a custom runtime role use the executionRoleArn
property.
Note: The EMR cluster must be created with a security configuration and the runtime role must have a specific trust policy. See this blog post for more details.
import aws_cdk.aws_emr as emr
cfn_security_configuration = emr.CfnSecurityConfiguration(self, "EmrSecurityConfiguration",
name="AddStepRuntimeRoleSecConfig",
security_configuration=JSON.parse("""
{
"AuthorizationConfiguration": {
"IAMConfiguration": {
"EnableApplicationScopedIAMRole": true,
"ApplicationScopedIAMRoleConfiguration":
{
"PropagateSourceIdentity": true
}
},
"LakeFormationConfiguration": {
"AuthorizedSessionTagValue": "Amazon EMR"
}
}
}""")
)
task = tasks.EmrCreateCluster(self, "Create Cluster",
instances=tasks.EmrCreateCluster.InstancesConfigProperty(),
name=sfn.TaskInput.from_json_path_at("$.ClusterName").value,
security_configuration=cfn_security_configuration.name
)
execution_role = iam.Role(self, "Role",
assumed_by=iam.ArnPrincipal(task.cluster_role.role_arn)
)
execution_role.assume_role_policy.add_statements(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
principals=[task.cluster_role
],
actions=["sts:SetSourceIdentity"
]
),
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
principals=[task.cluster_role
],
actions=["sts:TagSession"
],
conditions={
"StringEquals": {
"aws:RequestTag/LakeFormationAuthorizedCaller": "Amazon EMR"
}
}
))
tasks.EmrAddStep(self, "Task",
cluster_id="ClusterId",
execution_role_arn=execution_role.role_arn,
name="StepName",
jar="Jar",
action_on_failure=tasks.ActionOnFailure.CONTINUE
)
Cancel Step
Cancels a pending step in a running cluster.
Corresponds to the cancelSteps
API in EMR.
tasks.EmrCancelStep(self, "Task",
cluster_id="ClusterId",
step_id="StepId"
)
Modify Instance Fleet
Modifies the target On-Demand and target Spot capacities for the instance fleet with the specified InstanceFleetName.
Corresponds to the modifyInstanceFleet
API in EMR.
tasks.EmrModifyInstanceFleetByName(self, "Task",
cluster_id="ClusterId",
instance_fleet_name="InstanceFleetName",
target_on_demand_capacity=2,
target_spot_capacity=0
)
Modify Instance Group
Modifies the number of nodes and configuration settings of an instance group.
Corresponds to the modifyInstanceGroups
API in EMR.
tasks.EmrModifyInstanceGroupByName(self, "Task",
cluster_id="ClusterId",
instance_group_name=sfn.JsonPath.string_at("$.InstanceGroupName"),
instance_group=tasks.EmrModifyInstanceGroupByName.InstanceGroupModifyConfigProperty(
instance_count=1
)
)
EMR on EKS
Step Functions supports Amazon EMR on EKS through the service integration pattern. The service integration APIs correspond to Amazon EMR on EKS APIs, but differ in the parameters that are used.
Read more about the differences when using these service integrations.
Setting up the EKS cluster is required.
Create Virtual Cluster
The CreateVirtualCluster API creates a single virtual cluster that’s mapped to a single Kubernetes namespace.
The EKS cluster containing the Kubernetes namespace where the virtual cluster will be mapped can be passed in from the task input.
tasks.EmrContainersCreateVirtualCluster(self, "Create a Virtual Cluster",
eks_cluster=tasks.EksClusterInput.from_task_input(sfn.TaskInput.from_text("clusterId"))
)
The EKS cluster can also be passed in directly.
import aws_cdk.aws_eks as eks
# eks_cluster: eks.Cluster
tasks.EmrContainersCreateVirtualCluster(self, "Create a Virtual Cluster",
eks_cluster=tasks.EksClusterInput.from_cluster(eks_cluster)
)
By default, the Kubernetes namespace that a virtual cluster maps to is “default”, but a specific namespace within an EKS cluster can be selected.
tasks.EmrContainersCreateVirtualCluster(self, "Create a Virtual Cluster",
eks_cluster=tasks.EksClusterInput.from_task_input(sfn.TaskInput.from_text("clusterId")),
eks_namespace="specified-namespace"
)
Delete Virtual Cluster
The DeleteVirtualCluster API deletes a virtual cluster.
tasks.EmrContainersDeleteVirtualCluster(self, "Delete a Virtual Cluster",
virtual_cluster_id=sfn.TaskInput.from_json_path_at("$.virtualCluster")
)
Start Job Run
The StartJobRun API starts a job run. A job is a unit of work that you submit to Amazon EMR on EKS for execution. The work performed by the job can be defined by a Spark jar, PySpark script, or SparkSQL query. A job run is an execution of the job on the virtual cluster.
Required setup:
If not done already, follow the steps to setup EMR on EKS and create an EKS Cluster.
Enable Cluster access
Enable IAM Role access
The following actions must be performed if the virtual cluster ID is supplied from the task input. Otherwise, if it is supplied statically in the state machine definition, these actions will be done automatically.
Create an IAM role
Update the Role Trust Policy of the Job Execution Role.
The job can be configured with spark submit parameters:
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id("de92jdei2910fwedz"),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py"),
spark_submit_parameters="--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
)
)
)
Configuring the job can also be done via application configuration:
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id("de92jdei2910fwedz"),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_name="EMR-Containers-Job",
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py")
)
),
application_config=[tasks.ApplicationConfiguration(
classification=tasks.Classification.SPARK_DEFAULTS,
properties={
"spark.executor.instances": "1",
"spark.executor.memory": "512M"
}
)]
)
Job monitoring can be enabled if monitoring.logging
is set true. This automatically generates an S3 bucket and CloudWatch logs.
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id("de92jdei2910fwedz"),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py"),
spark_submit_parameters="--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
)
),
monitoring=tasks.Monitoring(
logging=True
)
)
Otherwise, providing monitoring for jobs with existing log groups and log buckets is also available.
import aws_cdk.aws_logs as logs
log_group = logs.LogGroup(self, "Log Group")
log_bucket = s3.Bucket(self, "S3 Bucket")
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id("de92jdei2910fwedz"),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py"),
spark_submit_parameters="--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
)
),
monitoring=tasks.Monitoring(
log_group=log_group,
log_bucket=log_bucket
)
)
Users can provide their own existing Job Execution Role.
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_task_input(sfn.TaskInput.from_json_path_at("$.VirtualClusterId")),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_name="EMR-Containers-Job",
execution_role=iam.Role.from_role_arn(self, "Job-Execution-Role", "arn:aws:iam::xxxxxxxxxxxx:role/JobExecutionRole"),
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py"),
spark_submit_parameters="--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
)
)
)
EKS
Step Functions supports Amazon EKS through the service integration pattern. The service integration APIs correspond to Amazon EKS APIs.
Read more about the differences when using these service integrations.
Call
Read and write Kubernetes resource objects via a Kubernetes API endpoint.
Corresponds to the call
API in Step Functions Connector.
The following code snippet includes a Task state that uses eks:call to list the pods.
import aws_cdk.aws_eks as eks
my_eks_cluster = eks.Cluster(self, "my sample cluster",
version=eks.KubernetesVersion.V1_18,
cluster_name="myEksCluster"
)
tasks.EksCall(self, "Call a EKS Endpoint",
cluster=my_eks_cluster,
http_method=tasks.HttpMethods.GET,
http_path="/api/v1/namespaces/default/pods"
)
EventBridge
Step Functions supports Amazon EventBridge through the service integration pattern. The service integration APIs correspond to Amazon EventBridge APIs.
Read more about the differences when using these service integrations.
Put Events
Send events to an EventBridge bus.
Corresponds to the put-events
API in Step Functions Connector.
The following code snippet includes a Task state that uses events:putevents to send an event to the default bus.
import aws_cdk.aws_events as events
my_event_bus = events.EventBus(self, "EventBus",
event_bus_name="MyEventBus1"
)
tasks.EventBridgePutEvents(self, "Send an event to EventBridge",
entries=[tasks.EventBridgePutEventsEntry(
detail=sfn.TaskInput.from_object({
"Message": "Hello from Step Functions!"
}),
event_bus=my_event_bus,
detail_type="MessageFromStepFunctions",
source="step.functions"
)]
)
EventBridge Scheduler
You can call EventBridge Scheduler APIs from a Task
state.
Read more about calling Scheduler APIs here
Create Scheduler
The CreateSchedule API creates a new schedule.
Here is an example of how to create a schedule that puts an event to SQS queue every 5 minutes:
import aws_cdk.aws_scheduler as scheduler
import aws_cdk.aws_kms as kms
# key: kms.Key
# schedule_group: scheduler.CfnScheduleGroup
# target_queue: sqs.Queue
# dead_letter_queue: sqs.Queue
scheduler_role = iam.Role(self, "SchedulerRole",
assumed_by=iam.ServicePrincipal("scheduler.amazonaws.com")
)
# To send the message to the queue
# This policy changes depending on the type of target.
scheduler_role.add_to_principal_policy(iam.PolicyStatement(
actions=["sqs:SendMessage"],
resources=[target_queue.queue_arn]
))
create_schedule_task1 = tasks.EventBridgeSchedulerCreateScheduleTask(self, "createSchedule",
schedule_name="TestSchedule",
action_after_completion=tasks.ActionAfterCompletion.NONE,
client_token="testToken",
description="TestDescription",
start_date=Date(),
end_date=Date(Date().get_time() + 1000 * 60 * 60),
flexible_time_window=Duration.minutes(5),
group_name=schedule_group.ref,
kms_key=key,
schedule=tasks.Schedule.rate(Duration.minutes(5)),
timezone="UTC",
enabled=True,
target=tasks.EventBridgeSchedulerTarget(
arn=target_queue.queue_arn,
role=scheduler_role,
retry_policy=tasks.RetryPolicy(
maximum_retry_attempts=2,
maximum_event_age=Duration.minutes(5)
),
dead_letter_queue=dead_letter_queue
)
)
Glue
Step Functions supports AWS Glue through the service integration pattern.
StartJobRun
You can call the StartJobRun
API from a Task
state.
tasks.GlueStartJobRun(self, "Task",
glue_job_name="my-glue-job",
arguments=sfn.TaskInput.from_object({
"key": "value"
}),
task_timeout=sfn.Timeout.duration(Duration.minutes(30)),
notify_delay_after=Duration.minutes(5)
)
You can configure workers by setting the workerTypeV2
and numberOfWorkers
properties.
workerType
is deprecated and no longer recommended. Use workerTypeV2
which is
a ENUM-like class for more powerful worker configuration around using pre-defined values or
dynamic values.
tasks.GlueStartJobRun(self, "Task",
glue_job_name="my-glue-job",
worker_configuration=tasks.WorkerConfigurationProperty(
worker_type_v2=tasks.WorkerTypeV2.G_1X, # Worker type
number_of_workers=2
)
)
To configure the worker type or number of workers dynamically from StateMachine’s input,
you can configure it using JSON Path values using workerTypeV2
like this:
tasks.GlueStartJobRun(self, "Glue Job Task",
glue_job_name="my-glue-job",
worker_configuration=tasks.WorkerConfigurationProperty(
worker_type_v2=tasks.WorkerTypeV2.of(sfn.JsonPath.string_at("$.glue_jobs_configs.executor_type")),
number_of_workers=sfn.JsonPath.number_at("$.glue_jobs_configs.max_number_workers")
)
)
You can choose the execution class by setting the executionClass
property.
tasks.GlueStartJobRun(self, "Task",
glue_job_name="my-glue-job",
execution_class=tasks.ExecutionClass.FLEX
)
StartCrawlerRun
You can call the StartCrawler
API from a Task
state through AWS SDK service integrations.
import aws_cdk.aws_glue as glue
# my_crawler: glue.CfnCrawler
# You can get the crawler name from `crawler.ref`
tasks.GlueStartCrawlerRun(self, "Task1",
crawler_name=my_crawler.ref
)
# Of course, you can also specify the crawler name directly.
tasks.GlueStartCrawlerRun(self, "Task2",
crawler_name="my-crawler-job"
)
Glue DataBrew
Step Functions supports AWS Glue DataBrew through the service integration pattern.
Start Job Run
You can call the StartJobRun
API from a Task
state.
tasks.GlueDataBrewStartJobRun(self, "Task",
name="databrew-job"
)
Invoke HTTP API
Step Functions supports calling third-party APIs with credentials managed by Amazon EventBridge Connections.
The following snippet creates a new API destination connection, and uses it to make a POST request to the specified URL. The endpoint response is available at the $.ResponseBody
path.
import aws_cdk.aws_events as events
connection = events.Connection(self, "Connection",
authorization=events.Authorization.basic("username", SecretValue.unsafe_plain_text("password"))
)
tasks.HttpInvoke(self, "Invoke HTTP API",
api_root="https://api.example.com",
api_endpoint=sfn.TaskInput.from_text("path/to/resource"),
body=sfn.TaskInput.from_object({"foo": "bar"}),
connection=connection,
headers=sfn.TaskInput.from_object({"Content-Type": "application/json"}),
method=sfn.TaskInput.from_text("POST"),
query_string_parameters=sfn.TaskInput.from_object({"id": "123"}),
url_encoding_format=tasks.URLEncodingFormat.BRACKETS
)
Lambda
Step Functions supports AWS Lambda through the service integration pattern.
Invoke
Invoke a Lambda function.
You can specify the input to your Lambda function through the payload
attribute.
By default, Step Functions invokes Lambda function with the state input (JSON path ‘$’)
as the input.
The following snippet invokes a Lambda Function with the state input as the payload
by referencing the $
path.
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke with state input",
lambda_function=fn
)
When a function is invoked, the Lambda service sends these response elements back.
⚠️ The response from the Lambda function is in an attribute called Payload
The following snippet invokes a Lambda Function by referencing the $.Payload
path
to reference the output of a Lambda executed before it.
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke with empty object as payload",
lambda_function=fn,
payload=sfn.TaskInput.from_object({})
)
# use the output of fn as input
tasks.LambdaInvoke(self, "Invoke with payload field in the state input",
lambda_function=fn,
payload=sfn.TaskInput.from_json_path_at("$.Payload")
)
The following snippet invokes a Lambda and sets the task output to only include the Lambda function response.
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke and set function response as task output",
lambda_function=fn,
output_path="$.Payload"
)
If you want to combine the input and the Lambda function response you can use
the payloadResponseOnly
property and specify the resultPath
. This will put the
Lambda function ARN directly in the “Resource” string, but it conflicts with the
integrationPattern, invocationType, clientContext, and qualifier properties.
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke and combine function response with task input",
lambda_function=fn,
payload_response_only=True,
result_path="$.fn"
)
You can have Step Functions pause a task, and wait for an external process to return a task token. Read more about the callback pattern
To use the callback pattern, set the token
property on the task. Call the Step
Functions SendTaskSuccess
or SendTaskFailure
APIs with the token to
indicate that the task has completed and the state machine should resume execution.
The following snippet invokes a Lambda with the task token as part of the input to the Lambda.
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke with callback",
lambda_function=fn,
integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload=sfn.TaskInput.from_object({
"token": sfn.JsonPath.task_token,
"input": sfn.JsonPath.string_at("$.someField")
})
)
⚠️ The task will pause until it receives that task token back with a SendTaskSuccess
or SendTaskFailure
call. Learn more about Callback with the Task
Token.
AWS Lambda can occasionally experience transient service errors. In this case, invoking Lambda
results in a 500 error, such as ClientExecutionTimeoutException
, ServiceException
, AWSLambdaException
, or SdkClientException
.
As a best practice, the LambdaInvoke
task will retry on those errors with an interval of 2 seconds,
a back-off rate of 2 and 6 maximum attempts. Set the retryOnServiceExceptions
prop to false
to
disable this behavior.
MediaConvert
Step Functions supports AWS MediaConvert through the Optimized integration pattern.
CreateJob
The CreateJob API creates a new transcoding job. For information about jobs and job settings, see the User Guide at http://docs.aws.amazon.com/mediaconvert/latest/ug/what-is.html
You can call the CreateJob
API from a Task
state. Optionally you can specify the integrationPattern
.
Make sure you update the required fields - Role & Settings and refer CreateJobRequest for all other optional parameters.
tasks.MediaConvertCreateJob(self, "CreateJob",
create_job_request={
"Role": "arn:aws:iam::123456789012:role/MediaConvertRole",
"Settings": {
"OutputGroups": [{
"Outputs": [{
"ContainerSettings": {
"Container": "MP4"
},
"VideoDescription": {
"CodecSettings": {
"Codec": "H_264",
"H264Settings": {
"MaxBitrate": 1000,
"RateControlMode": "QVBR",
"SceneChangeDetect": "TRANSITION_DETECTION"
}
}
},
"AudioDescriptions": [{
"CodecSettings": {
"Codec": "AAC",
"AacSettings": {
"Bitrate": 96000,
"CodingMode": "CODING_MODE_2_0",
"SampleRate": 48000
}
}
}
]
}
],
"OutputGroupSettings": {
"Type": "FILE_GROUP_SETTINGS",
"FileGroupSettings": {
"Destination": "s3://EXAMPLE-DESTINATION-BUCKET/"
}
}
}
],
"Inputs": [{
"AudioSelectors": {
"Audio Selector 1": {
"DefaultSelection": "DEFAULT"
}
},
"FileInput": "s3://EXAMPLE-SOURCE-BUCKET/EXAMPLE-SOURCE_FILE"
}
]
}
},
integration_pattern=sfn.IntegrationPattern.RUN_JOB
)
SageMaker
Step Functions supports AWS SageMaker through the service integration pattern.
If your training job or model uses resources from AWS Marketplace,
network isolation is required.
To do so, set the enableNetworkIsolation
property to true
for SageMakerCreateModel
or SageMakerCreateTrainingJob
.
To set environment variables for the Docker container use the environment
property.
Create Training Job
You can call the CreateTrainingJob
API from a Task
state.
tasks.SageMakerCreateTrainingJob(self, "TrainSagemaker",
training_job_name=sfn.JsonPath.string_at("$.JobName"),
algorithm_specification=tasks.AlgorithmSpecification(
algorithm_name="BlazingText",
training_input_mode=tasks.InputMode.FILE
),
input_data_config=[tasks.Channel(
channel_name="train",
data_source=tasks.DataSource(
s3_data_source=tasks.S3DataSource(
s3_data_type=tasks.S3DataType.S3_PREFIX,
s3_location=tasks.S3Location.from_json_expression("$.S3Bucket")
)
)
)],
output_data_config=tasks.OutputDataConfig(
s3_output_location=tasks.S3Location.from_bucket(s3.Bucket.from_bucket_name(self, "Bucket", "amzn-s3-demo-bucket"), "myoutputpath")
),
resource_config=tasks.ResourceConfig(
instance_count=1,
instance_type=ec2.InstanceType(sfn.JsonPath.string_at("$.InstanceType")),
volume_size=Size.gibibytes(50)
), # optional: default is 1 instance of EC2 `M4.XLarge` with `10GB` volume
stopping_condition=tasks.StoppingCondition(
max_runtime=Duration.hours(2)
)
)
You can specify TrainingInputMode via the trainingInputMode property.
To download the data from Amazon Simple Storage Service (Amazon S3) to the provisioned ML storage volume, and mount the directory to a Docker volume, choose
InputMode.FILE
if an algorithm supports it.To stream data directly from Amazon S3 to the container, choose
InputMode.PIPE
if an algorithm supports it.To stream data directly from Amazon S3 to the container with no code changes and to provide file system access to the data, choose
InputMode.FAST_FILE
if an algorithm supports it.
Create Transform Job
You can call the CreateTransformJob
API from a Task
state.
tasks.SageMakerCreateTransformJob(self, "Batch Inference",
transform_job_name="MyTransformJob",
model_name="MyModelName",
model_client_options=tasks.ModelClientOptions(
invocations_max_retries=3, # default is 0
invocations_timeout=Duration.minutes(5)
),
transform_input=tasks.TransformInput(
transform_data_source=tasks.TransformDataSource(
s3_data_source=tasks.TransformS3DataSource(
s3_uri="s3://inputbucket/train",
s3_data_type=tasks.S3DataType.S3_PREFIX
)
)
),
transform_output=tasks.TransformOutput(
s3_output_path="s3://outputbucket/TransformJobOutputPath"
),
transform_resources=tasks.TransformResources(
instance_count=1,
instance_type=ec2.InstanceType.of(ec2.InstanceClass.M4, ec2.InstanceSize.XLARGE)
)
)
Create Endpoint
You can call the CreateEndpoint
API from a Task
state.
tasks.SageMakerCreateEndpoint(self, "SagemakerEndpoint",
endpoint_name=sfn.JsonPath.string_at("$.EndpointName"),
endpoint_config_name=sfn.JsonPath.string_at("$.EndpointConfigName")
)
Create Endpoint Config
You can call the CreateEndpointConfig
API from a Task
state.
tasks.SageMakerCreateEndpointConfig(self, "SagemakerEndpointConfig",
endpoint_config_name="MyEndpointConfig",
production_variants=[tasks.ProductionVariant(
initial_instance_count=2,
instance_type=ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.XLARGE),
model_name="MyModel",
variant_name="awesome-variant"
)]
)
Create Model
You can call the CreateModel
API from a Task
state.
tasks.SageMakerCreateModel(self, "Sagemaker",
model_name="MyModel",
primary_container=tasks.ContainerDefinition(
image=tasks.DockerImage.from_json_expression(sfn.JsonPath.string_at("$.Model.imageName")),
mode=tasks.Mode.SINGLE_MODEL,
model_s3_location=tasks.S3Location.from_json_expression("$.TrainingJob.ModelArtifacts.S3ModelArtifacts")
)
)
Update Endpoint
You can call the UpdateEndpoint
API from a Task
state.
tasks.SageMakerUpdateEndpoint(self, "SagemakerEndpoint",
endpoint_name=sfn.JsonPath.string_at("$.Endpoint.Name"),
endpoint_config_name=sfn.JsonPath.string_at("$.Endpoint.EndpointConfig")
)
SNS
Step Functions supports Amazon SNS through the service integration pattern.
Publish
You can call the Publish
API from a Task
state to publish to an SNS topic.
topic = sns.Topic(self, "Topic")
# Use a field from the execution data as message.
task1 = tasks.SnsPublish(self, "Publish1",
topic=topic,
integration_pattern=sfn.IntegrationPattern.REQUEST_RESPONSE,
message=sfn.TaskInput.from_data_at("$.state.message"),
message_attributes={
"place": tasks.MessageAttribute(
value=sfn.JsonPath.string_at("$.place")
),
"pic": tasks.MessageAttribute(
# BINARY must be explicitly set
data_type=tasks.MessageAttributeDataType.BINARY,
value=sfn.JsonPath.string_at("$.pic")
),
"people": tasks.MessageAttribute(
value=4
),
"handles": tasks.MessageAttribute(
value=["@kslater", "@jjf", null, "@mfanning"]
)
}
)
# Combine a field from the execution data with
# a literal object.
task2 = tasks.SnsPublish(self, "Publish2",
topic=topic,
message=sfn.TaskInput.from_object({
"field1": "somedata",
"field2": sfn.JsonPath.string_at("$.field2")
})
)
Step Functions
Step Functions supports AWS Step Functions through the service integration pattern.
Start Execution
You can manage AWS Step Functions executions.
AWS Step Functions supports it’s own StartExecution
API as a service integration.
# Define a state machine with one Pass state
child = sfn.StateMachine(self, "ChildStateMachine",
definition=sfn.Chain.start(sfn.Pass(self, "PassState"))
)
# Include the state machine in a Task state with callback pattern
task = tasks.StepFunctionsStartExecution(self, "ChildTask",
state_machine=child,
integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
input=sfn.TaskInput.from_object({
"token": sfn.JsonPath.task_token,
"foo": "bar"
}),
name="MyExecutionName"
)
# Define a second state machine with the Task state above
sfn.StateMachine(self, "ParentStateMachine",
definition=task
)
You can utilize Associate Workflow Executions
via the associateWithParent
property. This allows the Step Functions UI to link child
executions from parent executions, making it easier to trace execution flow across state machines.
# child: sfn.StateMachine
task = tasks.StepFunctionsStartExecution(self, "ChildTask",
state_machine=child,
associate_with_parent=True
)
This will add the payload AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$: $$.Execution.Id
to the
input
property for you, which will pass the execution ID from the context object to the
execution input. It requires input
to be an object or not be set at all.
Invoke Activity
You can invoke a Step Functions Activity which enables you to have a task in your state machine where the work is performed by a worker that can be hosted on Amazon EC2, Amazon ECS, AWS Lambda, basically anywhere. Activities are a way to associate code running somewhere (known as an activity worker) with a specific task in a state machine.
When Step Functions reaches an activity task state, the workflow waits for an activity worker to poll for a task. An activity worker polls Step Functions by using GetActivityTask, and sending the ARN for the related activity.
After the activity worker completes its work, it can provide a report of its
success or failure by using SendTaskSuccess
or SendTaskFailure
. These two
calls use the taskToken provided by GetActivityTask to associate the result
with that task.
The following example creates an activity and creates a task that invokes the activity.
submit_job_activity = sfn.Activity(self, "SubmitJob")
tasks.StepFunctionsInvokeActivity(self, "Submit Job",
activity=submit_job_activity
)
Use the Parameters field to create a collection of key-value pairs that are passed as input. The values of each can either be static values that you include in your state machine definition, or selected from either the input or the context object with a path.
submit_job_activity = sfn.Activity(self, "SubmitJob")
tasks.StepFunctionsInvokeActivity(self, "Submit Job",
activity=submit_job_activity,
parameters={
"comment": "Selecting what I care about.",
"MyDetails": {
"size": sfn.JsonPath.string_at("$.product.details.size"),
"exists": sfn.JsonPath.string_at("$.product.availability"),
"StaticValue": "foo"
}
}
)
SQS
Step Functions supports Amazon SQS
Send Message
You can call the SendMessage
API from a Task
state
to send a message to an SQS queue.
queue = sqs.Queue(self, "Queue")
# Use a field from the execution data as message.
task1 = tasks.SqsSendMessage(self, "Send1",
queue=queue,
message_body=sfn.TaskInput.from_json_path_at("$.message")
)
# Combine a field from the execution data with
# a literal object.
task2 = tasks.SqsSendMessage(self, "Send2",
queue=queue,
message_body=sfn.TaskInput.from_object({
"field1": "somedata",
"field2": sfn.JsonPath.string_at("$.field2")
})
)