使用标准和快递工作流程执行选择性检查点操作 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用标准和快递工作流程执行选择性检查点操作

此示例项目演示了如何通过运行执行选择性检查点的模拟电子商务工作流来组合标准和快速工作流。部署此示例项目将创建标准工作流状态机、嵌套的 Express Workflows 状态机、 AWS Lambda 函数、亚马逊简单队列服务 (AmazonSQS) 队列和亚马逊简单通知服务 (AmazonSNS) 主题。

有关快速工作流、嵌套工作流和 Step Functions 服务集成的更多信息,请参阅以下内容:

第 1 步:创建状态机并预置资源

  1. 打开 Step Functions 控制台,然后选择创建状态机

  2. 在搜索框中键入 Selective checkpointing example,然后从返回的搜索结果中选择选择性检查点示例

  3. 选择下一步以继续。

  4. 选择 “运行演示” 以创建只读和 ready-to-deploy 工作流程,或者选择 “在其上构建” 以创建可编辑的状态机定义,您可以在此基础上构建并稍后部署。

    该示例项目部署了以下资源:

    • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 AWS Lambda 函数

    • 亚马逊SQS队列

    • 亚马逊的SNS话题

    • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 AWS Step Functions 标准类型的状态机

    • 一个快速类型的 Step Functions 状态机

    • 相关 AWS Identity and Access Management (IAM) 角色

    下图显示了选择性检查点示例示例项目的工作流图:

    选择性检查点示例示例项目的工作流图。
  5. 选择使用模板继续进行选择。

后续步骤取决于您之前的选择:

  1. 运行演示 — 您可以先查看状态机,然后再使用部署的资源创建只读项目 AWS CloudFormation 给你的 AWS 账户.

    您可以查看状态机定义,准备就绪后,选择部署并运行以部署项目并创建资源。

    部署最多可能需要 10 分钟才能创建资源和权限。您可以使用堆栈 ID 链接来监控进度 AWS CloudFormation.

    部署完成后,您应该会在控制台中看到您的新状态机。

  2. 在此基础上再接再厉 — 您可以查看和编辑工作流程定义。在尝试运行自定义工作流程之前,您可能需要为示例项目中的占位符设置值。

注意

部署到您的账户的服务可能会收取标准费用。

部署示例项目的资源后,执行以下操作。

第 2 步:运行状态机

  1. 状态机页面上,选择您的示例项目。

  2. 在示例项目页面上,选择启动执行

  3. 启动执行对话框中,执行以下操作:

    1. (可选)输入自定义执行名称以覆盖生成的默认值。

      非ASCII姓名和日志

      Step Functions 接受状态机、执行、活动和包含非ASCII字符的标签的名称。由于此类字符不适用于亚马逊 CloudWatch,因此我们建议您仅使用ASCII字符,以便您可以跟踪中的指标 CloudWatch。

    2. (可选)在输入框中,将输入值输入为JSON。如果您正在运行演示,则可以跳过此步骤。

    3. 选择启动执行

    Step Functions 控制台将引导您进入执行详情页面,您可以在图表视图中选择状态以浏览步骤详细信息窗格中的相关信息。

  4. 转到您的CloudWatch 日志日志组并检查日志。日志组的名称将类似于示例-ExpressLogGroup-wJalr XUtnFEMI

父级(标准工作流)的示例状态机代码

此示例项目中的状态机与亚马逊SQS、亚马逊SNS和 Step Functions Express 工作流程集成。

浏览此示例状态机,了解 Step Functions 如何处理来自亚马逊SQS和亚马逊的输入SNS,然后使用嵌套的 Express Workflows 状态机更新后端系统。

有关如何做的更多信息 AWS Step Functions 可以控制其他 AWS 服务,请参阅将服务与 Step Functions 集成

{ "Comment": "An example of combining standard and express workflows to run a mock e-commerce workflow that does selective checkpointing.", "StartAt": "Approve Order Request", "States": { "Approve Order Request": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sqs:sendMessage.waitForTaskToken", "Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": { "MessageTitle": "Order Request received. Pausing workflow to wait for manual approval. ", "TaskToken.$": "$$.Task.Token" } }, "Next": "Notify Order Success", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Notify Order Failure" } ] }, "Notify Order Success": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Order has been approved. Resuming workflow.", "TopicArn": "<SNS_ARN>" }, "Next": "Process Payment" }, "Notify Order Failure": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Order not approved. Order failed.", "TopicArn": "<SNS_ARN>" }, "End": true }, "Process Payment": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sqs:sendMessage.waitForTaskToken", "Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": { "MessageTitle": "Payment sent to third-party for processing. Pausing workflow to wait for response.", "TaskToken.$": "$$.Task.Token" } }, "Next": "Notify Payment Success", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Notify Payment Failure" } ] }, "Notify Payment Success": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Payment processing succeeded. Resuming workflow.", "TopicArn": "<SNS_ARN>" }, "Next": "Workflow to Update Backend Systems" }, "Notify Payment Failure": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Payment processing failed.", "TopicArn": "<SNS_ARN>" }, "End": true }, "Workflow to Update Backend Systems": { "Comment": "Starting an execution of an Express workflow to handle backend updates. Express workflows are fast and cost-effective for steps where checkpointing isn't required.", "Type": "Task", "Resource": "arn:<PARTITION>:states:::states:startExecution.sync", "Parameters": { "StateMachineArn": "<UPDATE_DATABASE_EXPRESS_STATE_MACHINE_ARN>", "Input": { "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" } }, "Next": "Ship the Package" }, "Ship the Package": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Order and payment received, database is updated and the package is ready to ship.", "TopicArn": "<SNS_ARN>" }, "End": true } } }

父状态机的IAM角色示例

这些例子 AWS Identity and Access Management (IAM) 示例项目生成的策略包括执行状态机和相关资源所需的最低权限。我们建议您在IAM策略中仅包含必要的权限。

亚马逊SNS政策:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "sns:Publish" ], "Resource": "arn:aws:sns:us-east-1:123456789012:Checkpoint-SNSTopic-wJalrXUtnFEMI", "Effect": "Allow" } ] }

亚马逊SQS政策:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "sqs:SendMessage" ], "Resource": "arn:aws:sqs:us-east-1:123456789012:Checkpoint-SQSQueue-je7MtGbClwBF", "Effect": "Allow" } ] }

状态执行策略:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "states:StartExecution", "states:DescribeExecution", "states:StopExecution" ], "Resource": "*", "Effect": "Allow" }, { "Action": [ "events:PutTargets", "events:PutRule", "events:DescribeRule" ], "Resource": "arn:aws:events:us-east-1:123456789012:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule", "Effect": "Allow" } ] }

嵌套状态机(快速工作流)的示例状态机代码

此示例项目中的状态机在父状态机调用时更新后端信息。

浏览此示例状态机,了解 Step Functions 如何更新模拟电子商务后端系统的不同组件。

有关如何做的更多信息 AWS Step Functions 可以控制其他 AWS 服务,请参阅将服务与 Step Functions 集成

选择性检查点工作流。
{ "StartAt": "Update Order History", "States": { "Update Order History": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "Checkpoint-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI", "Payload": { "Message": "Update order history." } }, "Next": "Update Data Warehouse" }, "Update Data Warehouse": { "Type" : "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "Checkpoint-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI", "Payload": { "Message": "Update data warehouse." } }, "Next": "Update Customer Profile" }, "Update Customer Profile": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "Checkpoint-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI", "Payload": { "Message": "Update customer profile." } }, "Next": "Update Inventory" }, "Update Inventory": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "Checkpoint-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI", "Payload": { "Message": "Update inventory." } }, "End": true } } }

子状态机的IAM角色示例

此示例 AWS Identity and Access Management (IAM) 示例项目生成的策略包括执行状态机和相关资源所需的最低权限。我们建议您在IAM策略中仅包含必要的权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:Example-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI" ], "Effect": "Allow" } ] }

以下策略可确保有足够的 CloudWatch 日志权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

有关在将 Step Functions 与其他功能一起使用IAM时如何进行配置的信息 AWS 服务,请参阅Step Functions 如何为集成服务生成IAM策略