ItemReader (Map) - AWS Step Functions

ItemReader (Map)

The ItemReader field is a JSON object, which specifies a dataset and its location. A Distributed Map state uses this dataset as its input.

The following example shows the syntax of the ItemReader field in a JSONPath-based workflow, for a dataset in a CSV file that's stored in an Amazon S3 bucket.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }

The following example shows that in JSONata-based workflows, Parameters is replaced with Arguments.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }
Tip

In Workflow Studio, you specify the dataset and its location in the Item source field.

Contents of the ItemReader field

Depending on your dataset, the contents of the ItemReader field varies. For example, if your dataset is a JSON array passed from a previous step in the workflow, the ItemReader field is omitted. If your dataset is an Amazon S3 data source, this field contains the following sub-fields.

ReaderConfig

A JSON object that specifies the following details:

  • InputType

    Specifies the type of Amazon S3 data source, such as CSV file, object, JSON file, or an Amazon S3 inventory list. In Workflow Studio, you can select an input type from the Amazon S3 item source dropdown list under the Item source field.

  • CSVHeaderLocation

    Note

    You must specify this field only if you use a CSV file as dataset.

    Accepts one of the following values to specify the location of the column header:

    Important

    Currently, Step Functions supports CSV headers of up to 10 KiB.

    • FIRST_ROW – Use this option if the first line of the file is the header.

    • GIVEN – Use this option to specify the header within the state machine definition. For example, if your CSV file contains the following data.

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      Provide the following JSON array as a CSV header.

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    Tip

    In Workflow Studio, you can find this option under Additional configuration in the Item source field.

  • MaxItems

    Limits the number of data items passed to the Map state. For example, suppose that you provide a CSV file that contains 1000 rows and specify a limit of 100. Then, the interpreter passes only 100 rows to the Map state. The Map state processes items in sequential order, starting after the header row.

    By default, the Map state iterates over all the items in the specified dataset.

    Note

    Currently, you can specify a limit of up to 100,000,000. The Distributed Map state stops reading items beyond this limit.

    Tip

    In Workflow Studio, you can find this option under Additional configuration in the Item source field.

    Alternatively, you can specify a reference path to an existing key-value pair in your Distributed Map state input. This path must resolve to a positive integer. You specify the reference path in the MaxItemsPath sub-field.

    Important

    You can specify either the MaxItems or the MaxItemsPath sub-field, but not both.

Resource

The Amazon S3 API action that Step Functions must invoke depending on the specified dataset.

Parameters (JSONPath only)

A JSON object that specifies the Amazon S3 bucket name and object key that the dataset is stored in.

Important

Make sure that your Amazon S3 buckets are under the same AWS account and AWS Region as your state machine.

Examples of datasets

You can specify one of the following options as your dataset:

Important

Step Functions needs appropriate permissions to access the Amazon S3 datasets that you use. For information about IAM policies for the datasets, see IAM policies for datasets.

A Distributed Map state can accept a JSON input passed from a previous step in the workflow. This input must either be an array, or must contain an array within a specific node. To select a node that contains the array, you can use the ItemsPath (Map, JSONPath only) field.

To process individual items in the array, the Distributed Map state starts a child workflow execution for each array item. The following tabs show examples of the input passed to the Map state and the corresponding input to a child workflow execution.

Note

Step Functions omits the ItemReader field when your dataset is a JSON array from a previous step.

Input passed to the Map state

Consider the following JSON array of three items.

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

The Distributed Map state starts three child workflow executions. Each execution receives an array item as input. The following example shows the input received by a child workflow execution.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

A Distributed Map state can iterate over the objects that are stored in an Amazon S3 bucket. When the workflow execution reaches the Map state, Step Functions invokes the ListObjectsV2 API action, which returns an array of the Amazon S3 object metadata. In this array, each item contains data, such as ETag and Key, for the data stored in the bucket.

To process individual items in the array, the Distributed Map state starts a child workflow execution. For example, suppose that your Amazon S3 bucket contains 100 images. Then, the array returned after invoking the ListObjectsV2 API action contains 100 items. The Distributed Map state then starts 100 child workflow executions to process each array item.

Note
  • Currently, Step Functions also includes an item for each folder you create in a specific Amazon S3 bucket using the Amazon S3 console. This results in an extra child workflow execution started by the Distributed Map state. To avoid creating an extra child workflow execution for the folder, we recommend that you use the AWS CLI to create folders. For more information, see High-level Amazon S3 commands in the AWS Command Line Interface User Guide.

  • Step Functions needs appropriate permissions to access the Amazon S3 datasets that you use. For information about IAM policies for the datasets, see IAM policies for datasets.

The following tabs show examples of the ItemReader field syntax and the input passed to a child workflow execution for this dataset.

ItemReader syntax

In this example, you've organized your data, which includes images, JSON files, and objects, within a prefix named processData in an Amazon S3 bucket named amzn-s3-demo-bucket.

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Prefix": "processData" } }
Input passed to a child workflow execution

The Distributed Map state starts as many child workflow executions as the number of items present in the Amazon S3 bucket. The following example shows the input received by a child workflow execution.

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

A Distributed Map state can accept a JSON file that's stored in an Amazon S3 bucket as a dataset. The JSON file must contain an array.

When the workflow execution reaches the Map state, Step Functions invokes the GetObject API action to fetch the specified JSON file. The Map state then iterates over each item in the array and starts a child workflow execution for each item. For example, if your JSON file contains 1000 array items, the Map state starts 1000 child workflow executions.

Note
  • The execution input used to start a child workflow execution can't exceed 256 KiB. However, Step Functions supports reading an item of up to 8 MB from a CSV or JSON file if you then apply the optional ItemSelector field to reduce the item's size.

  • Currently, Step Functions supports 10 GB as the maximum size of an individual file in an Amazon S3 inventory report. However, Step Functions can process more than 10 GB if each individual file is under 10 GB.

  • Step Functions needs appropriate permissions to access the Amazon S3 datasets that you use. For information about IAM policies for the datasets, see IAM policies for datasets.

The following tabs show examples of the ItemReader field syntax and the input passed to a child workflow execution for this dataset.

For this example, imagine you have a JSON file named factcheck.json. You've stored this file within a prefix named jsonDataset in an Amazon S3 bucket. The following is an example of the JSON dataset.

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

The Distributed Map state starts as many child workflow executions as the number of array items present in the JSON file. The following example shows the input received by a child workflow execution.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

A Distributed Map state can accept a CSV file that's stored in an Amazon S3 bucket as a dataset. If you use a CSV file as your dataset, you need to specify a CSV column header. For information about how to specify a CSV header, see Contents of the ItemReader field.

Step Functions parses CSV files based on the following rules:

  • Commas (,) are a delimiter that separates fields.

  • Newlines are a delimiter that separates records.

  • Fields are treated as strings. For data type conversions, use the States.StringToJson intrinsic function in ItemSelector (Map).

  • Double quotation marks (" ") are not required to enclose strings. However, strings that are enclosed by double quotation marks can contain commas and newlines without acting as record delimiters.

  • You can preserve double quotes by repeating them.

  • If the number of fields in a row is less than the number of fields in the header, Step Functions provides empty strings for the missing values.

  • If the number of fields in a row is more than the number of fields in the header, Step Functions skips the additional fields.

For more information about how Step Functions parses a CSV file, see Example of parsing an input CSV file.

When the workflow execution reaches the Map state, Step Functions invokes the GetObject API action to fetch the specified CSV file. The Map state then iterates over each row in the CSV file and starts a child workflow execution to process the items in each row. For example, suppose that you provide a CSV file that contains 100 rows as input. Then, the interpreter passes each row to the Map state. The Map state processes items in serial order, starting after the header row.

Note
  • The execution input used to start a child workflow execution can't exceed 256 KiB. However, Step Functions supports reading an item of up to 8 MB from a CSV or JSON file if you then apply the optional ItemSelector field to reduce the item's size.

  • Currently, Step Functions supports 10 GB as the maximum size of an individual file in an Amazon S3 inventory report. However, Step Functions can process more than 10 GB if each individual file is under 10 GB.

  • Step Functions needs appropriate permissions to access the Amazon S3 datasets that you use. For information about IAM policies for the datasets, see IAM policies for datasets.

The following tabs show examples of the ItemReader field syntax and the input passed to a child workflow execution for this dataset.

ItemReader syntax

For example, say that you have a CSV file named ratings.csv. Then, you've stored this file within a prefix that's named csvDataset in an Amazon S3 bucket.

{ "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } } }
Input to a child workflow execution

The Distributed Map state starts as many child workflow executions as the number of rows present in the CSV file, excluding the header row, if in the file. The following example shows the input received by a child workflow execution.

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

A Distributed Map state can accept an Amazon S3 inventory manifest file that's stored in an Amazon S3 bucket as a dataset.

When the workflow execution reaches the Map state, Step Functions invokes the GetObject API action to fetch the specified Amazon S3 inventory manifest file. The Map state then iterates over the objects in the inventory to return an array of Amazon S3 inventory object metadata.

Note
  • Currently, Step Functions supports 10 GB as the maximum size of an individual file in an Amazon S3 inventory report. However, Step Functions can process more than 10 GB if each individual file is under 10 GB.

  • Step Functions needs appropriate permissions to access the Amazon S3 datasets that you use. For information about IAM policies for the datasets, see IAM policies for datasets.

The following is an example of an inventory file in CSV format. This file includes the objects named csvDataset and imageDataset, which are stored in an Amazon S3 bucket that's named amzn-s3-demo-source-bucket.

"amzn-s3-demo-source-bucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "amzn-s3-demo-source-bucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "amzn-s3-demo-source-bucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "amzn-s3-demo-source-bucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
Important

Currently, Step Functions doesn't support user-defined Amazon S3 inventory report as a dataset. You must also make sure that the output format of your Amazon S3 inventory report is CSV. For more information about Amazon S3 inventories and how to set them up, see Amazon S3 Inventory in the Amazon S3 User Guide.

The following example of an inventory manifest file shows the CSV headers for the inventory object metadata.

{ "sourceBucket" : "amzn-s3-demo-source-bucket", "destinationBucket" : "arn:aws:s3:::amzn-s3-demo-inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "amzn-s3-demo-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

The following tabs show examples of the ItemReader field syntax and the input passed to a child workflow execution for this dataset.

ItemReader syntax
{ "ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Key": "destination-prefix/amzn-s3-demo-bucket/config-ID/YYYY-MM-DDTHH-MMZ/manifest.json" } } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "amzn-s3-demo-source-bucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

Depending on the fields you selected while configuring the Amazon S3 inventory report, the contents of your manifest.json file may vary from the example shown.

IAM policies for datasets

When you create workflows with the Step Functions console, Step Functions can automatically generate IAM policies based on the resources in your workflow definition. These policies include the least privileges necessary to allow the state machine role to invoke the StartExecution API action for the Distributed Map state. These policies also include the least privileges necessary Step Functions to access AWS resources, such as Amazon S3 buckets and objects and Lambda functions. We highly recommend that you include only those permissions that are necessary in your IAM policies. For example, if your workflow includes a Map state in Distributed mode, scope your policies down to the specific Amazon S3 bucket and folder that contains your dataset.

Important

If you specify an Amazon S3 bucket and object, or prefix, with a reference path to an existing key-value pair in your Distributed Map state input, make sure that you update the IAM policies for your workflow. Scope the policies down to the bucket and object names the path resolves to at runtime.

The following IAM policy examples grant the least privileges required to access your Amazon S3 datasets using the ListObjectsV2 and GetObject API actions.

Example IAM policy for Amazon S3 object as dataset

The following example shows an IAM policy that grants the least privileges to access the objects organized within processImages in an Amazon S3 bucket named amzn-s3-demo-bucket.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } } } ] }
Example IAM policy for a CSV file as dataset

The following example shows an IAM policy that grants least privileges to access a CSV file named ratings.csv.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/csvDataset/ratings.csv" ] } ] }
Example IAM policy for an Amazon S3 inventory as dataset

The following example shows an IAM policy that grants least privileges to access an Amazon S3 inventory report.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::destination-prefix/amzn-s3-demo-bucket/config-ID/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::destination-prefix/amzn-s3-demo-bucket/config-ID/data/*" ] } ] }