ResultWriter (地图) - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

ResultWriter (地图)

管理状态和转换数据

ResultWriter字段是一个 JSON 对象,它为分布式地图状态启动的子工作流程执行的输出结果提供选项。如果您选择导出结果,则可以为输出结果指定不同的格式选项以及用于存储输出结果的 Amazon S3 位置。默认情况下,Step Functions 不会导出这些结果。

该 ResultWriter 字段的内容

ResultWriter字段包含以下子字段。字段的选择决定了输出的格式以及是否将其导出到 Amazon S3。

ResultWriter

一个 JSON 对象,用于指定以下详细信息:

  • Resource

    Step Functions 为导出执行结果而调用的亚马逊 S3 API 操作。

  • Parameters

    一个 JSON 对象,用于指定存储执行输出的 Amazon S3 存储桶名称和前缀。

  • WriterConfig

    此字段允许您配置以下选项。

    • Transformation

      • NONE-返回子工作流程执行的输出以及工作流程元数据不变。将子工作流程执行结果导出到 Amazon S3 时WriterConfig为默认值,未指定。

      • COMPACT-返回子工作流程执行的输出。未指定ResultWriter时的默认。

      • FLATTEN-返回子工作流程执行的输出。如果子工作流程执行返回数组,则此选项会在将结果返回到状态输出或将结果写入 Amazon S3 对象之前对数组进行扁平化。

        注意

        如果子工作流程执行失败,Step Functions 会将其执行结果保持不变。结果等同于设置TransformationNONE

    • OutputType

      • JSON-将结果格式化为 JSON 数组。

      • JSONL-将结果格式化为 JSON 行。

必填字段组合

ResultWriter字段不能为空。必须指定其中一组子字段。

  • WriterConfig-用于预览格式化后的输出,而不将结果保存到 Amazon S3。

  • ResourceParameters-将结果保存到 Amazon S3 中,无需额外格式化。

  • 所有三个字段:WriterConfigResourceParameters-用于格式化输出并将其保存到 Amazon S3。

示例配置和转换输出

以下主题演示了不同转换选项可能的配置设置ResultWriter和处理结果的示例。

以下示例演示了使用三个字段的可能组合的配置:WriterConfigResourcesParameters

WriterConfig

此示例配置了状态输出在预览中的显示方式,并在WriterConfig字段中指定了输出格式和转换。本来Resource可以提供 Amazon S3 存储桶规格的 and Parameters 字段表示状态输出资源。结果将传递到下一个状态。

"ResultWriter": { "WriterConfig": { "Transformation": "FLATTEN", "OutputType": "JSON" } }
仅限资源参数

此示例将状态输出导出到指定的 Amazon S3 存储桶,而不进行不存在的WriterConfig字段本应指定的额外格式和转换。

"ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Prefix": "csvProcessJobs" }
所有三个字段:WriterConfig、“资源” 和 “参数”

此示例根据WriterConfig字段中的规格格式化状态输出。它还会根据ResourceParameters字段中的规格将其导出到 Amazon S3 存储桶。

"ResultWriter": { "WriterConfig": { "Transformation": "FLATTEN", "OutputType": "JSON" }, "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Prefix": "csvProcessJobs" } }

在这些示例中,假设每个子工作流程执行都会返回一个输出,该输出是一个对象数组。

[ { "customer_id": "145538", "order_id": "100000" }, { "customer_id": "898037", "order_id": "100001" } ]

这些示例演示了不同Transformation值的格式化输出,其中 o OutputType f JSON

转型无

这是使用NONE转换时处理结果的示例。输出保持不变,并且包含工作流程元数据。

[ { "ExecutionArn": "arn:aws:states:region:account-id:execution:orderProcessing/getOrders:da4e9fc7-abab-3b27-9a77-a277e463b709", "Input": ..., "InputDetails": { "Included": true }, "Name": "da4e9fc7-abab-3b27-9a77-a277e463b709", "Output": "[{\"customer_id\":\"145538\",\"order_id\":\"100000\"},{\"customer_id\":\"898037\",\"order_id\":\"100001\"}]", "OutputDetails": { "Included": true }, "RedriveCount": 0, "RedriveStatus": "NOT_REDRIVABLE", "RedriveStatusReason": "Execution is SUCCEEDED and cannot be redriven", "StartDate": "2025-02-04T01:49:50.099Z", "StateMachineArn": "arn:aws:states:region:account-id:stateMachine:orderProcessing/getOrders", "Status": "SUCCEEDED", "StopDate": "2025-02-04T01:49:50.163Z" }, ... { "ExecutionArn": "arn:aws:states:region:account-id:execution:orderProcessing/getOrders:f43a56f7-d21e-3fe9-a40c-9b9b8d0adf5a", "Input": ..., "InputDetails": { "Included": true }, "Name": "f43a56f7-d21e-3fe9-a40c-9b9b8d0adf5a", "Output": "[{\"customer_id\":\"169881\",\"order_id\":\"100005\"},{\"customer_id\":\"797471\",\"order_id\":\"100006\"}]", "OutputDetails": { "Included": true }, "RedriveCount": 0, "RedriveStatus": "NOT_REDRIVABLE", "RedriveStatusReason": "Execution is SUCCEEDED and cannot be redriven", "StartDate": "2025-02-04T01:49:50.135Z", "StateMachineArn": "arn:aws:states:region:account-id:stateMachine:orderProcessing/getOrders", "Status": "SUCCEEDED", "StopDate": "2025-02-04T01:49:50.227Z" } ]
转型紧凑型

这是使用COMPACT转换时处理结果的示例。请注意,它是子工作流程执行与原始数组结构的组合输出。

[ [ { "customer_id": "145538", "order_id": "100000" }, { "customer_id": "898037", "order_id": "100001" } ], ..., [ { "customer_id": "169881", "order_id": "100005" }, { "customer_id": "797471", "order_id": "100006" } ] ]
转型趋于平缓

这是使用FLATTEN转换时处理结果的示例。请注意,它是将子工作流程执行数组拼合为一个数组的组合输出。

[ { "customer_id": "145538", "order_id": "100000" }, { "customer_id": "898037", "order_id": "100001" }, ... { "customer_id": "169881", "order_id": "100005" }, { "customer_id": "797471", "order_id": "100006" } ]

导出到 Amazon S3

重要

确保您用于导出 Map Run 结果的 Amazon S3 存储桶 AWS 账户 与 AWS 区域 您的状态机相同。否则,您的状态机执行将因 States.ResultWriterFailed 错误而失败。

如果您的输出负载大小超过 256 KiB,则将结果导出到 Amazon S3 存储桶会很有帮助。Step Functions 整合了所有子工作流执行数据,例如执行输入和输出、ARN 和执行状态。然后,它将状态相同的执行导出到指定 Amazon S3 位置的相应文件中。

以下示例使用JSONPath显示了Parameters用于导出子工作流程执行结果的ResultWriter字段的语法。在此示例中,您将结果存储在名为 amzn-s3-demo-destination-bucket 存储桶种,该存储桶位于名为 csvProcessJobs 的前缀中。

{ "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Prefix": "csvProcessJobs" } } }

对于JSONata州,Parameters将替换为Arguments

{ "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Arguments": { "Bucket": "amzn-s3-demo-destination-bucket", "Prefix": "csvProcessJobs" } } }
提示

在 Workflow Studio 中,您可以通过选择将 Map 状态结果导出到 Amazon S3 来导出子工作流执行结果。然后,提供您要将结果导出到其中的 Amazon S3 存储桶的名称和前缀。

Step Functions 需要适当的权限才能访问要导出结果的存储桶和文件夹。有关所需 IAM 策略的信息,请参阅 适用的 IAM 政策 ResultWriter

如果要导出子工作流执行结果,分布式 Map 状态执行将按以下格式返回 Map Run ARN 以及有关 Amazon S3 导出位置的数据:

{ "MapRunArn": "arn:aws:states:us-east-2:account-id:mapRun:csvProcess/Map:ad9b5f27-090b-3ac6-9beb-243cd77144a7", "ResultWriterDetails": { "Bucket": "amzn-s3-demo-destination-bucket", "Key": "csvProcessJobs/ad9b5f27-090b-3ac6-9beb-243cd77144a7/manifest.json" } }

Step Functions 将具有相同状态的执行结果导出到各自的文件中。例如,如果您的子工作流执行结果为 500 个成功和 200 个失败,则 Step Functions 将在指定的 Amazon S3 位置为成功和失败结果创建两个文件。在此示例中,成功结果文件包含 500 个成功结果,而失败结果文件包含 200 个失败结果。

对于给定的执行尝试,Step Functions 会根据您的执行输出在指定的 Amazon S3 位置创建以下文件:

  • manifest.json – 包含 Map Run 元数据,例如导出位置、Map Run ARN 以及有关结果文件的信息。

    如果您redriven了 Map Run,manifest.json 文件会包含 Map Run 的所有尝试中所有成功子工作流执行的引用。但是,此文件包含对特定redrive的失败和待执行的引用。

  • SUCCEEDED_n.json – 包含所有成功执行子工作流的合并数据。n 表示文件的索引号。索引号从 0 开始。例如 SUCCEEDED_1.json

  • FAILED_n.json – 包含所有失败、超时和中止的子工作流执行的合并数据。使用此文件从失败的执行中恢复。n 表示文件的索引。索引号从 0 开始。例如 FAILED_1.json

  • PENDING_n.json – 包含由于 Map Run 失败或中止而未启动的所有子工作流执行的合并数据。n 表示文件的索引。索引号从 0 开始。例如 PENDING_1.json

Step Functions 支持最大为 5 GB 的单个结果文件。如果文件大小超过 5 GB,Step Functions 会创建另一个文件来写入超出部分的执行结果,并在文件名后面附加一个索引号。例如,如果 SUCCEEDED_0.json 文件的大小超过 5 GB,Step Functions 会创建一个 SUCCEEDED_1.json 文件来记录超出部分结果。

如果您未指定导出子工作流执行结果,则状态机执行将返回子工作流执行结果数组,如以下示例所示:

[ { "statusCode": 200, "inputReceived": { "show_id": "s1", "release_year": "2020", "rating": "PG-13", "type": "Movie" } }, { "statusCode": 200, "inputReceived": { "show_id": "s2", "release_year": "2021", "rating": "TV-MA", "type": "TV Show" } }, ... ]
注意

如果返回的输出大小超过 256 KiB,则状态机执行失败并返回错误。States.DataLimitExceeded

适用的 IAM 政策 ResultWriter

当您使用 Step Functions 控制台创建工作流时,Step Functions 可以根据工作流定义中的资源自动生成 IAM 策略。生成的策略包括允许状态机角色调用分布式地图状态StartExecution API 操作和访问 AWS 资源(例如 Amazon S3 存储桶和对象以及 Lambda 函数)所需的最低权限。

我们建议您的 IAM 策略中仅包含必要的权限。例如,如果您的工作流程包含分布式模式的Map状态,则将策略范围缩小到包含您的数据的特定的 Amazon S3 存储桶和文件夹。

重要

如果您在分布式 Map 状态 输入中指定了 Amazon S3 存储桶和对象或前缀,并将参考路径指向现有键值对,请务必更新工作流程的 IAM 策略。将策略范围缩小到运行时该路径解析到的存储桶和对象名称。

以下 IAM 策略示例授予使用 PutObject API 操作将子工作流程执行结果写入 Amazon S3 存储桶csvJobs中名为的文件夹所需的最低权限。

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-destination-bucket/csvJobs/*" ] } ] }

如果您正在向其写入子工作流程执行结果的 Amazon S3 存储桶使用AWS Key Management Service(AWS KMS)密钥进行加密,则必须在 IAM 策略中包含必要的 AWS KMS 权限。有关更多信息,请参阅 AWS KMS key 加密的 Amazon S3 存储桶的 IAM 权限