ItemReader (地图) - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

ItemReader (地图)

ItemReader字段是一个JSON对象,用于指定数据集及其位置。分布式 Map 状态使用此数据集作为其输入。

以下示例显示了JSONPath基于工作流程中存储在 Amazon S3 存储桶中的CSV文件中的数据集的ItemReader字段语法。

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }

以下示例显示了在JSONata基于的工作流程中,Parameters将其替换为参数

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }
提示

在 Workflow Studio 中,您可以在项目来源字段中指定数据集及其位置。

该 ItemReader 字段的内容

根据您的数据集,ItemReader 字段的内容会有所不同。例如,如果您的数据集是从工作流程的上一步传递的JSON数组,则该ItemReader字段将被省略。如果您的数据集是 Amazon S3 数据来源,则此字段包含以下子字段。

ReaderConfig

一个指定以下详细信息的JSON对象:

  • InputType

    接受以下值之一:CSVJSONMANIFEST

    指定 Amazon S3 数据源的类型,例如CSV文件、对象、JSON文件或 Amazon S3 清单清单。在 Workflow Studio 中,您可以从项目来源字段下的 Amazon S3 项目来源下拉列表中选择一种输入类型。

  • CSVHeaderLocation

    注意

    仅当使用CSV文件作为数据集时,才必须指定此字段。

    接受以下值之一来指定列标题的位置:

    重要

    目前,Step Functi CSV ons 支持最大 10 KiB 的标头。

    • FIRST_ROW – 如果文件的第一行是标题,则使用此选项。

    • GIVEN – 使用此选项在状态机定义中指定标题。例如,如果您的CSV文件包含以下数据。

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      提供以下JSON数组作为CSV标题。

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    提示

    在 Workflow Studio 中,您可以在项目来源字段的其他配置下找到此选项。

  • MaxItems

    限制传递给 Map 状态的数据项数量。例如,假设您提供的CSV文件包含 1000 行,并指定限制为 100。然后,解释器将Map 状态传递 100 行。Map 状态从标题行之后开始,按顺序处理项。

    默认情况下,Map 状态会迭代指定数据集中的所有项。

    注意

    目前,您可以将上限指定为 1 亿。分布式 Map 状态将停止读取超过此限制的项。

    提示

    在 Workflow Studio 中,您可以在项目来源字段的其他配置下找到此选项。

    或者,您可以在分布式 Map 状态输入中指定现有键值对的参考路径。此路径必须解析为正整数。您可以在 MaxItemsPath 子字段中指定参考路径。

    重要

    您可以指定 MaxItemsMaxItemsPath 子字段,但不能同时指定两者。

Resource

Step Functions 必须根据指定的数据集调用的 Amazon S3 API 操作。

Parameters (JSONPath only)

一个JSON对象,用于指定 Amazon S3 存储桶名称和存储数据集的对象密钥。

重要

确保您的 Amazon S3 存储桶 AWS 账户 与 AWS 区域 您的状态机处于相同的位置。

数据集示例

您可以指定下列选项之一作为数据集:

重要

Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集IAM策略的信息,请参阅IAM数据集策略

分布式地图状态可以接受工作流中上一步传递的JSON输入。此输入必须是数组,或者必须包含特定节点内的数组。要选择包含数组的节点,可以使用 ItemsPath (JSONPath仅限地图) 字段。

要处理数组中的单个项,分布式 Map 状态会为每个数组项启动子工作流执行。以下选项卡显示了传递给 Map 状态的输入以及子工作流执行的相应输入的示例。

注意

当您的数据集是上一步中的JSON数组时,Step Functions 会省略该ItemReader字段。

Input passed to the Map state

考虑以下由三个项目JSON组成的数组。

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

分布式 Map 状态将启动三个子工作流执行。每次执行都会接收一个数组项作为输入。以下示例显示了子工作流执行接收的输入。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

分布式 Map 状态 可以迭代存储在 Amazon S3 存储桶中的对象。当工作流程执行达到Map状态时,Step Functions 会调用 ListObjectsV2 API 操作,该操作将返回 Amazon S3 对象元数据的数组。在此数组中,每个项目都包含存储在存储桶中的数据的数据,例如ETag密钥

要处理数组中的各个项目,分布式 Map 状态 会启动一个子工作流执行。例如,假设您的 Amazon S3 存储桶包含 100 张图片。然后,调用ListObjectsV2API操作后返回的数组包含 100 个项目。然后,分布式 Map 状态 将启动 100 个子工作流执行,以处理每个数组项。

注意
  • 目前,Step Functions 还会为您使用 Amazon S3 控制台在特定 Amazon S3 存储桶中创建的每个文件夹都提供一个项目。这会导致由分布式 Map 状态 启动额外的子工作流执行。为避免为该文件夹创建额外的子工作流程执行,我们建议您使用 AWS CLI 来创建文件夹。有关更多信息,请参阅《AWS Command Line Interface 用户指南》中的高级别 Amazon S3 命令

  • Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集IAM策略的信息,请参阅IAM数据集策略

以下选项卡显示了该数据集的 ItemReader 字段语法和传递给子工作流执行的输入的示例。

ItemReader syntax

在此示例中,您已将数据(包括图像、JSON文件和对象)组织在名为的 Amazon S3 存储桶processData中命名的前缀中amzn-s3-demo-bucket

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Prefix": "processData" } }
Input passed to a child workflow execution

分布式 Map 状态 启动的子工作流执行与 Amazon S3 存储桶中存在的项目数量一样。以下示例显示了子工作流执行接收的输入。

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

分布式地图状态可以接受存储在 Amazon S3 存储桶中的JSON文件作为数据集。该JSON文件必须包含一个数组。

当工作流程执行达到Map状态时,Step Functions 会调用该GetObjectAPI操作来获取指定JSON文件。然后,Map 状态会迭代数组中的每个项目,并开始对每个项目执行子工作流。例如,如果您的JSON文件包含 1000 个数组项目,则该Map状态将启动 1000 个子工作流程执行。

注意
  • 用于启动子工作流程执行的执行输入不能超过 256 KiB。但是,如果您随后应用可选ItemSelector字段来减小项目的大小,Step Functions 则支持从CSV或JSON文件中读取最大 8 MB 的项目。

  • 目前,Step Functions 支持 Amazon S3 清单报告中单个文件的最大大小为 10 GB。但是,如果每个文件都小于 10 GB,Step Functions 能够处理大小可以超过 10 GB。

  • Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集IAM策略的信息,请参阅IAM数据集策略

以下选项卡显示了该数据集的 ItemReader 字段语法和传递给子工作流执行的输入的示例。

在这个例子中,假设你有一个名为的JSON文件factcheck.json。您已将此文件存储在 Amazon S3 存储桶中的名为 jsonDataset 的前缀中。以下是JSON数据集的示例。

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

分布式地图状态启动的子工作流程执行次数与JSON文件中存在的数组项目数一样多。以下示例显示了子工作流执行接收的输入。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

分布式地图状态可以接受存储在 Amazon S3 存储桶中的CSV文件作为数据集。如果您使用CSV文件作为数据集,则需要指定CSV列标题。有关如何指定CSV标题的信息,请参见该 ItemReader 字段的内容

Step Functions 根据以下规则解析CSV文件:

  • 逗号(,)是用于分隔字段的分隔符。

  • 换行符是分隔记录的分隔符。

  • 字段被视为字符串。对于数据类型转换,使用 ItemSelector (地图) 中的 States.StringToJson 内置函数。

  • 不需要使用双引号(" ")将字符串括起来。但是,用双引号括起来的字符串可以包含逗号和换行符,但不用作记录分隔符。

  • 可以通过重复双引号来保留双引号。

  • 如果一行中的字段数少于标题中的字段数,Step Functions 会为缺失的值提供空字符串

  • 如果一行中的字段数多于标题中的字段数,Step Functions 会跳过多余的字段。

有关 Step Functions 如何解析CSV文件的更多信息,请参阅Example of parsing an input CSV file

当工作流程执行达到Map状态时,Step Functions 会调用该GetObjectAPI操作来获取指定CSV文件。然后,该Map状态会遍历CSV文件中的每一行,并启动子工作流程执行以处理每行中的项目。例如,假设您提供了一个包含 100 行的CSV文件作为输入。然后,解释器将每一行传递给 Map 状态。Map 状态从标题行之后开始,按顺序处理项目。

注意
  • 用于启动子工作流程执行的执行输入不能超过 256 KiB。但是,如果您随后应用可选ItemSelector字段来减小项目的大小,Step Functions 则支持从CSV或JSON文件中读取最大 8 MB 的项目。

  • 目前,Step Functions 支持 Amazon S3 清单报告中单个文件的最大大小为 10 GB。但是,如果每个文件都小于 10 GB,Step Functions 能够处理大小可以超过 10 GB。

  • Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集IAM策略的信息,请参阅IAM数据集策略

以下选项卡显示了该数据集的 ItemReader 字段语法和传递给子工作流执行的输入的示例。

ItemReader syntax

例如,假设你有一个名为的CSV文件ratings.csv。然后,您已将此文件存储在 Amazon S3 存储桶中名为 csvDataset 的前缀中。

{ "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } } }
Input to a child workflow execution

Di stributed Map 状态启动的子工作流程执行次数与CSV文件中存在的行数一样多,不包括标题行(如果在文件中)。以下示例显示了子工作流执行接收的输入。

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

分布式 Map 状态 可以接受存储在 Amazon S3 存储桶中的 Amazon S3 清单文件作为数据集。

当工作流程执行达到Map状态时,Step Functions 会调用GetObjectAPI操作来获取指定的 Amazon S3 清单清单文件。然后,Map 状态会迭代清单中的对象,以返回 Amazon S3 清单对象元数据数组。

注意
  • 目前,Step Functions 支持 Amazon S3 清单报告中单个文件的最大大小为 10 GB。但是,如果每个文件都小于 10 GB,Step Functions 能够处理大小可以超过 10 GB。

  • Step Functions 需要适当的权限,才能访问您使用的Amazon S3 数据集。有关数据集IAM策略的信息,请参阅IAM数据集策略

以下是CSV格式库存文件的示例。此文件包含名为 csvDatasetimageDataset 的对象,其中存储在名为 amzn-s3-demo-source-bucket 的 Amazon S3 存储桶中。

"amzn-s3-demo-source-bucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "amzn-s3-demo-source-bucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "amzn-s3-demo-source-bucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "amzn-s3-demo-source-bucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
重要

目前,Step Functions 不支持将用户定义的 Amazon S3 清单报告作为数据集。您还必须确保 Amazon S3 库存报告的输出格式为CSV。有关 Amazon S3 清单及其设置方法的更多信息,请参阅《Amazon S3 用户指南》中的 Amazon S3 清单

以下清单清单文件示例显示了清单对象元数据的CSV标题。

{ "sourceBucket" : "amzn-s3-demo-source-bucket", "destinationBucket" : "arn:aws:s3:::amzn-s3-demo-inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "amzn-s3-demo-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

以下选项卡显示了该数据集的 ItemReader 字段语法和传递给子工作流执行的输入的示例。

ItemReader syntax
{ "ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Key": "destination-prefix/amzn-s3-demo-bucket/config-ID/YYYY-MM-DDTHH-MMZ/manifest.json" } } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "amzn-s3-demo-source-bucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

根据您在配置 Amazon S3 清单报告时选择的字段,您的 manifest.json 文件内容可能与所示示例有所不同。

IAM数据集策略

当您使用 Step Functions 控制台创建工作流程时,Step Functions 可以根据工作流程定义中的资源自动生成IAM策略。这些策略包括允许状态机角色调用分布式地图状态StartExecutionAPI操作所需的最低权限。这些策略还包括 Step Functions 访问 AWS 资源(例如 Amazon S3 存储桶和对象以及 Lambda 函数)所需的最低权限。我们强烈建议您在IAM策略中仅包含必要的权限。例如,如果您的工作流包含分布式模式下的 Map 状态,则将策略范围缩小到包含您的数据集的特定 Amazon S3 存储桶和文件夹。

重要

如果您在分布式地图状态输入中指定 Amazon S3 存储桶和对象或前缀,以及指向现有键值对的参考路径,请务必更新工作流程的IAM策略。将策略范围缩小到运行时该路径解析到的存储桶和对象名称。

以下IAM策略示例授予使用 ListObjectsV2GetObjectAPI操作访问您的 Amazon S3 数据集所需的最低权限。

例 IAM将 Amazon S3 对象作为数据集的策略

以下示例显示了一个IAM策略,该策略授予访问名为的 Amazon S3 存储桶processImages中组织的对象的最低权限amzn-s3-demo-bucket

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } } } ] }
例 IAM将CSV文件作为数据集的策略

以下示例显示了授予访问名为CSV的文件的最低权限的IAM策略ratings.csv

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/csvDataset/ratings.csv" ] } ] }
例 IAM将 Amazon S3 清单作为数据集的策略

以下示例显示了授予访问 Amazon S3 库存报告的最低权限的IAM策略。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::destination-prefix/amzn-s3-demo-bucket/config-ID/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::destination-prefix/amzn-s3-demo-bucket/config-ID/data/*" ] } ] }