ItemReader (Mapa) - AWS Step Functions

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

ItemReader (Mapa)

El campo ItemReader es un objeto JSON que especifica un conjunto de datos y su ubicación. Un estado Map Distributed usa este conjunto de datos como entrada.

El siguiente ejemplo muestra la sintaxis del ItemReader campo en un flujo de trabajo JSONPathbasado, para un conjunto de datos de un archivo delimitado por texto que se almacena en un bucket de Amazon S3.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv", "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

En el siguiente flujo de trabajo JSONatabasado, observe que Parameters se sustituye por Arguments.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

Contenido del ItemReader campo

El contenido del campo ItemReader varía según el conjunto de datos. Por ejemplo, si el conjunto de datos es una matriz JSON transferida desde un paso anterior del flujo de trabajo, el campo ItemReader se omite. Si el conjunto de datos es un origen de datos de Amazon S3, este campo contiene los siguientes subcampos.

Resource

La acción de integración de la API Amazon S3 que utilizará Step Functions, como arn:aws:states:::s3:getObject

Arguments (JSONata) or Parameters (JSONPath)

Un objeto JSON que especifica el nombre del bucket de Amazon S3 y la clave de objeto en los que se almacena el conjunto de datos.

Si el bucket tiene habilitado el control de versiones, también puede proporcionar la versión del objeto de Amazon S3.

ReaderConfig

Un objeto JSON que especifica los detalles siguientes:

  • InputType

    Acepta uno de los siguientes valores:CSV,,JSON, JSONLPARQUET,MANIFEST.

    Especifica el tipo de fuente de datos de Amazon S3, como un archivo delimitado por texto (CSV), un objeto, un archivo JSON, líneas JSON, un archivo Parquet, un manifiesto de Athena o una lista de inventario de Amazon S3. En Workflow Studio, puede seleccionar un tipo de entrada de la fuente de elementos de S3.

    La mayoría de los tipos de entrada que utilizan la S3GetObject recuperación también admiten ExpectedBucketOwner VersionId campos en sus parámetros. Los archivos de parquet son la única excepción que no es compatibleVersionId.

    Los archivos de entrada admiten los siguientes tipos de compresión externa: GZIP, ZSTD.

    Ejemplos de nombres de archivo: myObject.jsonl.gz y. myObject.csv.zstd

    Nota: Los archivos Parquet son un tipo de archivo binario que se comprime internamente. Se admite la compresión GZIP, ZSTD y Snappy.

  • Transformation

    Opcional. El valor será o o. NONE LOAD_AND_FLATTEN

    Si no se especifica, NONE se asumirá. Si se establece enLOAD_AND_FLATTEN, también debe configurarloInputType.

    Comportamiento predeterminado: el mapa iterará sobre los objetos de metadatos devueltos por las llamadas aS3:ListObjectsV2. Si se establece enLOAD_AND_FLATTEN, el mapa leerá y procesará los objetos de datos reales a los que se hace referencia en la lista de resultados.

  • ManifestType

    Opcional. El valor será o ATHENA_DATA oS3_INVENTORY.

    Nota: Si se establece enS3_INVENTORY, no debe especificarlo también InputType porque se supone que el tipo lo esCSV.

  • CSVDelimiter

    Puede especificar este campo cuando InputType es CSV oMANIFEST.

    Acepta uno de los siguientes valores: COMMA (predeterminado),PIPE,SEMICOLON,SPACE,TAB.

    nota

    Con el CSVDelimiter campo, ItemReader puede procesar archivos que estén delimitados por caracteres distintos de una coma. Las referencias a los «archivos CSV» también incluyen los archivos que utilizan delimitadores alternativos especificados en el campo. CSVDelimiter

  • CSVHeaderLocation

    Puede especificar este campo cuando InputType es CSV oMANIFEST.

    Acepta uno de los siguientes valores para especificar la ubicación del encabezado de la columna:

    • FIRST_ROW – Utilice esta opción si la primera línea del archivo es el encabezado.

    • GIVEN – Utilice esta opción para especificar el encabezado dentro de la definición de la máquina de estado.

      Por ejemplo, si el archivo contiene los siguientes datos.

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      Puedes proporcionar la siguiente matriz JSON como encabezado CSV:

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    Tamaño del encabezado CSV

    Step Functions admite encabezados de hasta 10 KiB para archivos delimitados por texto.

  • MaxItems

    De forma predeterminada, el Map estado se repite en todos los elementos del conjunto de datos especificado. Al configurarloMaxItems, puede limitar la cantidad de elementos de datos que se pasan al Map estado. Por ejemplo, si proporciona un archivo delimitado por texto que contiene 1000 filas y establece un límite de 100, el intérprete solo pasará 100 filas al estado del mapa distribuido. El estado Map procesa los elementos en orden secuencial, a partir de la fila siguiente al encabezado.

    Para los JSONPathflujos de trabajo, puede usar MaxItemsPath una ruta de referencia a un par clave-valor en la entrada de estado que se resuelve en un número entero. Tenga en cuenta que puede especificar una MaxItems o ambasMaxItemsPath, pero no ambas.

    nota

    Puede especificar un límite de hasta 100 000 000 000 tras el cual dejará de leer los Distributed Map elementos.

Requisitos de cuenta y región

Los buckets de Amazon S3 deben estar en la misma máquina de estado Cuenta de AWS y al Región de AWS igual que ella.

Tenga en cuenta que, aunque su máquina de estados pueda acceder a los archivos de depósitos distintos Cuentas de AWS que se encuentran en la misma máquina Región de AWS, Step Functions solo admite la publicación de objetos en depósitos de Amazon S3 que estén en la misma Cuenta de AWS máquina de estados o en la Región de AWS misma ubicación.

Procesamiento de conjuntos de datos anidados (actualizado el 11 de septiembre de 2025)

Con el nuevo Transformation parámetro, puede especificar un valor de, LOAD_AND_FLATTEN y el mapa leerá los objetos de datos reales a los que se hace referencia en la lista de resultados de una llamada aS3:ListObjectsV2.

Antes de esta versión, era necesario crear mapas distribuidos anidados para recuperar los metadatos y, a continuación, procesar los datos reales. El primer mapa recorría en iteración los metadatos devueltos por los flujos de trabajo secundarios S3:ListObjectsV2 e invocaría los flujos de trabajo secundarios. Otro mapa dentro de cada máquina de estados secundaria leería los datos reales de los archivos individuales. Con la opción de transformación, puede realizar ambos pasos a la vez.

Imagine que quiere realizar una auditoría diaria de los últimos 24 archivos de registro que su sistema produce cada hora y almacena en Amazon S3. El estado de su mapa distribuido puede enumerar los archivos de registro con los metadatos de cada objeto yS3:ListObjectsV2, a continuación, iterarlos, o ahora puede cargar y analizar los objetos de datos reales almacenados en su bucket de Amazon S3.

El uso de LOAD_AND_FLATTEN esta opción puede aumentar la escalabilidad, reducir el número de ejecuciones de mapas abiertos y procesar varios objetos de forma simultánea. Los trabajos de Athena y Amazon EMR suelen generar resultados que se pueden procesar con la nueva configuración.

A continuación se muestra un ejemplo de los parámetros de una ItemReader definición:

{ "QueryLanguage": "JSONata", "States": { ... "Map": { ... "ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { // InputType is required if Transformation is LOAD_AND_FLATTEN. "InputType": "CSV | JSON | JSONL | PARQUET", // Transformation is OPTIONAL and defaults to NONE if not present "Transformation": "NONE | LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket1", "Prefix": "{% $states.input.PrefixKey %}" } }, ... } }

Ejemplos de conjuntos de datos

Puede especificar una de las opciones siguientes como conjunto de datos:

nota

Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

Un estado Map Distributed puede aceptar una entrada JSON transferida desde un paso anterior del flujo de trabajo.

La entrada puede ser una matriz JSON, un objeto JSON o una matriz dentro de un nodo de un objeto JSON.

Step Functions iterará directamente sobre los elementos de una matriz o los pares clave-valor de un objeto JSON.

Para seleccionar un nodo específico que contenga una matriz de un objeto JSON, puedes usar la expresión ItemsPath ( JSONPath Solo en el mapa) o usar una JSONata expresión en el Items campo para los estados. JSONata

Para procesar elementos individuales, el estado del mapa distribuido inicia la ejecución de un flujo de trabajo secundario para cada elemento. Las siguientes pestañas muestran ejemplos de la entrada que se transfiere al estado Map y la entrada correspondiente a la ejecución de un flujo de trabajo secundario.

nota

El ItemReader campo no es necesario cuando el conjunto de datos son datos JSON de un paso anterior.

Input passed to the Map state

Considera la siguiente matriz JSON de tres elementos.

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

El estado Map Distributed inicia tres ejecuciones de flujos de trabajo secundarios. Cada ejecución recibe un elemento de matriz como entrada. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Un estado Map Distributed puede iterar sobre los objetos que se almacenan en un bucket de Amazon S3. Cuando la ejecución del flujo de trabajo alcanza el Map estado, Step Functions invoca la acción de la API ListObjectsV2, que devuelve una matriz de metadatos del objeto Amazon S3. En esta matriz, cada elemento contiene datos, como ETaguna clave, de los datos reales almacenados en el depósito.

Para procesar los elementos individuales de la matriz, el estado Map Distributed inicia la ejecución de un flujo de trabajo secundario. Por ejemplo, suponga que su bucket de Amazon S3 contiene 100 imágenes. A continuación, la matriz devuelta tras invocar la acción de la ListObjectsV2 API contiene 100 elementos de metadatos. A continuación, el estado del mapa distribuido inicia 100 ejecuciones de flujos de trabajo secundarios para procesar cada elemento.

Para procesar los objetos de datos directamente, sin flujos de trabajo anidados, puede elegir la opción de transformación LOAD_AND_FLATTEN para procesar los elementos directamente.

nota
  • Step Functions también incluirá un elemento para cada carpeta creada en el bucket de Amazon S3 mediante la consola de Amazon S3. Los elementos de la carpeta dan como resultado el inicio de más ejecuciones de flujos de trabajo secundarios.

    Para evitar crear ejecuciones de flujos de trabajo secundarias adicionales para cada carpeta, le recomendamos que las utilice AWS CLI para crear carpetas. Para obtener más información, consulte Uso de comandos de S3 de alto nivel en la Guía del usuario de AWS Command Line Interface .

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

ItemReader syntax

En este ejemplo, ha organizado los datos, que incluyen imágenes, archivos JSON y objetos, dentro de un prefijo llamado processData en un bucket de Amazon S3 llamado amzn-s3-demo-bucket.

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Prefix": "processData" } }
Input passed to a child workflow execution

El estado del mapa distribuido inicia tantas ejecuciones de flujos de trabajo secundarios como el número de elementos de metadatos presentes en el bucket de Amazon S3. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

Gracias a la compatibilidad mejorada con S3 ListObjects V2 como fuente de entrada en Distributed Map, sus máquinas de estado pueden leer y procesar varios objetos de datos directamente de los buckets de Amazon S3, ¡lo que elimina la necesidad de mapas anidados para procesar los metadatos!

Con LOAD_AND_FLATTEN esta opción, su máquina de estados hará lo siguiente:

  • Lea el contenido real de cada objeto listado en la ListObjectsV2 llamada a Amazon S3.

  • Analice el contenido en función de InputType (CSV, JSON, JSONL, Parquet).

  • Cree elementos a partir del contenido del archivo (filas/registros) en lugar de los metadatos.

Con la opción de transformación, ya no necesitará mapas distribuidos anidados para procesar los metadatos. El uso de la opción LOAD_AND_FLATTEN aumenta la escalabilidad, reduce el número de reproducciones de mapas activos y procesa varios objetos de forma simultánea.

La siguiente configuración muestra la configuración de un: ItemReader

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { "InputType": "JSON", "Transformation": "LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "S3_BUCKET_NAME", "Prefix": "S3_BUCKET_PREFIX" } }
Recomendación de prefijo de bucket

Te recomendamos incluir una barra al final del prefijo. Por ejemplo, si selecciona datos con el prefijo defolder1, su máquina de estados procesará ambos. folder1/myData.csv folder10/myData.csv El uso folder1/ procesará estrictamente una sola carpeta.

Un estado Map Distributed puede aceptar un archivo JSON almacenado en un bucket de Amazon S3 como conjunto de datos. El archivo JSON debe contener una matriz o un objeto JSON.

Cuando la ejecución del flujo de trabajo alcanza el Map estado, Step Functions invoca la acción de la GetObjectAPI para recuperar el archivo JSON especificado.

Si el archivo JSON contiene una estructura de objetos anidados, puedes seleccionar el nodo específico con tu conjunto de datos con una. ItemsPointer Por ejemplo, la siguiente configuración extraería una lista anidada de los productos destacados del inventario.

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON", "ItemsPointer": "/inventory/products/featured" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "nested-data-file.json" } }

A continuación, el estado Map se repite sobre cada elemento de la matriz e inicia la ejecución de un flujo de trabajo secundario para cada elemento. Por ejemplo, si el archivo JSON contiene 1000 elementos de matriz, el estado Map inicia 1000 ejecuciones de flujos de trabajo secundarios.

nota
  • La entrada de ejecución utilizada para iniciar la ejecución de un flujo de trabajo secundario no puede superar los 256 KiB. Sin embargo, Step Functions permite leer un elemento de hasta 8 MB de un archivo delimitado por texto, un archivo JSON o un archivo de líneas JSON si, a continuación, se aplica el ItemSelector campo opcional para reducir el tamaño del elemento.

  • Step Functions admite 10 GB como tamaño máximo de un archivo individual en Amazon S3.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

Para este ejemplo, imagine que tiene un archivo JSON llamado factcheck.json. Ha almacenado este archivo en un prefijo llamado jsonDataset en un bucket de Amazon S3. A continuación se muestra un ejemplo de conjunto de datos JSON.

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

El estado Map Distributed inicia tantas ejecuciones de flujos de trabajo secundarios como el número de elementos de la matriz presentes en el archivo JSON. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Un estado de mapa distribuido puede aceptar un archivo de líneas JSON almacenado en un bucket de Amazon S3 como conjunto de datos.

nota
  • La entrada de ejecución utilizada para iniciar la ejecución de un flujo de trabajo secundario no puede superar los 256 KiB. Sin embargo, Step Functions permite leer un elemento de hasta 8 MB de un archivo delimitado por texto, un archivo JSON o un archivo de líneas JSON si, a continuación, se aplica el ItemSelector campo opcional para reducir el tamaño del elemento.

  • Step Functions admite 10 GB como tamaño máximo de un archivo individual en Amazon S3.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

Para este ejemplo, imagine que tiene un archivo de líneas JSON llamadofactcheck.jsonl. Ha almacenado este archivo en un prefijo llamado jsonlDataset en un bucket de Amazon S3. El siguiente es un ejemplo del contenido del archivo.

{"verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech"} {"verdict": "false", "statement_date": "6/7/2022", "statement_source": "television"} {"verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news"}
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSONL" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonlDataset/factcheck.jsonl" } }
Input to a child workflow execution

El estado del mapa distribuido inicia tantas ejecuciones de flujos de trabajo secundarios como el número de líneas presentes en el archivo JSONL. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }
nota

Con el CSVDelimiter campo, ItemReader puede procesar archivos que estén delimitados por caracteres distintos de una coma. Las referencias a los «archivos CSV» también incluyen los archivos que utilizan delimitadores alternativos especificados en el campo. CSVDelimiter

Un estado de mapa distribuido puede aceptar un archivo delimitado por texto que esté almacenado en un bucket de Amazon S3 como conjunto de datos. Si utiliza un archivo delimitado por texto como conjunto de datos, debe especificar un encabezado de columna. Para obtener información sobre cómo especificar un encabezado, consulteContenido del ItemReader campo.

Step Functions analiza los archivos delimitados por texto según las siguientes reglas:

  • El delimitador que separa los campos se especifica en. CSVDelimiter ReaderConfig El delimitador está predeterminado en. COMMA

  • Los retornos de carro son un delimitador que separa los registros.

  • Los campos se tratan como cadenas. Para las conversiones de tipos de datos, utilice la función intrínseca States.StringToJson en ItemSelector (Mapa).

  • No es necesario incluir comillas dobles (" ") para delimitar cadenas. No obstante, las cadenas entre comillas dobles pueden contener comas y retornos de carro sin que funcionen como delimitadores de registro.

  • Para conservar las comillas dobles en una secuencia de conservación, repítalas.

  • Las barras invertidas (\) son otra forma de escapar de los caracteres especiales. Las barras invertidas solo funcionan con otras barras invertidas, comillas dobles y el separador de campos configurado, como una coma o una barra vertical. La barra invertida seguida de cualquier otro carácter se elimina silenciosamente.

  • Puede conservar las barras invertidas repitiéndolas. Por ejemplo:

    path,size C:\\Program Files\\MyApp.exe,6534512
  • Las barras invertidas que escapan a las comillas dobles (\") solo funcionan cuando se incluyen en pares, por lo que recomendamos evitar las comillas dobles repitiéndolas:. ""

  • Si el número de campos de una fila es inferior al número de campos del encabezado, Step Functions proporciona cadenas vacías para los valores que faltan.

  • Si el número de campos de una fila es mayor que el número de campos del encabezado, Step Functions omite los campos adicionales.

Para obtener más información sobre cómo Step Functions analiza un archivo delimitado por texto, consulte. Example of parsing an input CSV file

Cuando la ejecución del flujo de trabajo alcanza el Map estado, Step Functions invoca la acción de la GetObjectAPI para recuperar el archivo especificado. A continuación, el Map estado se repite en cada fila del archivo e inicia la ejecución de un flujo de trabajo secundario para procesar los elementos de cada fila. Por ejemplo, supongamos que proporciona un archivo delimitado por texto que contiene 100 filas como entrada. Entonces, el intérprete pasa cada fila al estado Map. El estado Map procesa los elementos en orden de serie, a partir de la fila siguiente al encabezado.

nota
  • La entrada de ejecución utilizada para iniciar la ejecución de un flujo de trabajo secundario no puede superar los 256 KiB. Sin embargo, Step Functions permite leer un elemento de hasta 8 MB de un archivo delimitado por texto, un archivo JSON o un archivo de líneas JSON si, a continuación, se aplica el ItemSelector campo opcional para reducir el tamaño del elemento.

  • Step Functions admite 10 GB como tamaño máximo de un archivo individual en Amazon S3.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

ItemReader syntax

Por ejemplo, supongamos que tiene un archivo CSV llamado ratings.csv. A continuación, ha almacenado este archivo dentro de un prefijo llamado csvDataset en un bucket de Amazon S3.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW", "CSVDelimiter": "PIPE" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }
Input to a child workflow execution

El estado Map Distributed inicia tantas ejecuciones de flujos de trabajo secundarios como el número de filas presentes en el archivo CSV, excluida la fila del encabezado, si está en el archivo. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

Los archivos Parquet se pueden utilizar como fuente de entrada. Los archivos de Apache Parquet almacenados en Amazon S3 proporcionan un procesamiento de datos en columnas eficiente a escala.

Al utilizar archivos Parquet, se aplican las siguientes condiciones:

  • 256 MB es el tamaño máximo de un grupo de filas y 5 MB es el tamaño máximo de pie de página. Si proporciona archivos de entrada que superan cualquiera de estos límites, la máquina de estados devolverá un error de tiempo de ejecución.

  • El VersionId campo no es compatible conInputType=Parquet.

  • La compresión de datos interna con GZIP, ZSTD y Snappy es compatible de forma nativa. No se necesitan extensiones de nombre de archivo.

A continuación se muestra un ejemplo de configuración de ASL para InputType configurar Parquet:

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "PARQUET" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "my-parquet-data-file-1.parquet" } }
Procesamiento de trabajos a gran escala

Para trabajos a gran escala, Step Functions utilizará muchos lectores de entrada. Los lectores intercalan su procesamiento, lo que puede provocar que algunos lectores se detengan mientras otros avanzan. Se espera un progreso intermitente a gran escala.

Puede usar los archivos de manifiesto de Athena, generados a partir de los resultados de las UNLOAD consultas, para especificar la fuente de los archivos de datos del estado de su mapa. Se configura ManifestType comoATHENA_DATA, y InputType como CSVJSONL, oParquet.

Al ejecutar una UNLOAD consulta, Athena genera un archivo de manifiesto de datos además de los objetos de datos reales. El archivo de manifiesto proporciona una lista CSV estructurada de los archivos de datos. Tanto el manifiesto como los archivos de datos se guardan en la ubicación de resultados de la consulta de Athena en Amazon S3.

UNLOAD (<YOUR_SELECT_QUERY>) TO 'S3_URI_FOR_STORING_DATA_OBJECT' WITH (format = 'JSON')

Resumen conceptual del proceso:

  1. Seleccione sus datos de una tabla mediante una UNLOAD consulta en Athena.

  2. Athena generará un archivo de manifiesto (CSV) y los objetos de datos en Amazon S3.

  3. Configure Step Functions para leer el archivo de manifiesto y procesar la entrada.

La función puede procesar los formatos de salida CSV, JSONL y Parquet de Athena. Todos los objetos a los que se hace referencia en un único archivo de manifiesto deben tener el mismo formato. InputType Tenga en cuenta que los objetos CSV exportados mediante una UNLOAD consulta no incluyen el encabezado en la primera línea. Comprueba CSVHeaderLocation si necesitas proporcionar los encabezados de las columnas.

El contexto del mapa también incluirá una opción $states.context.Map.Item.Source para que pueda personalizar el procesamiento en función de la fuente de los datos.

El siguiente es un ejemplo de configuración de una ItemReader configuración para usar un manifiesto de Athena:

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "ManifestType": "ATHENA_DATA", "InputType": "CSV | JSONL | PARQUET" }, "Arguments": { "Bucket": "<S3_BUCKET_NAME>", "Key": "<S3_KEY_PREFIX><QUERY_ID>-manifest.csv" } }
Uso del patrón de manifiesto de Athena en Workflow Studio

Un escenario común para el procesamiento de datos es aplicar un mapa a los datos procedentes de una consulta de Athena UNLOAD. El mapa invoca una función Lambda para procesar cada elemento descrito en el manifiesto de Athena. Step Functions Workflow Studio proporciona un patrón listo para usar que combina todos estos componentes en un bloque que puede arrastrar al lienzo de su máquina de estados.

Un estado Map Distributed puede aceptar un archivo de manifiesto de inventario de Amazon S3 almacenado en un bucket de Amazon S3 como conjunto de datos.

Cuando la ejecución del flujo de trabajo alcanza el Map estado, Step Functions invoca la acción de la GetObjectAPI para recuperar el archivo de manifiesto de inventario de Amazon S3 especificado.

De forma predeterminada, el Map estado recorre en iteración los objetos del inventario para devolver una matriz de metadatos de objetos de inventario de Amazon S3.

Si especifica que ManifestType es S3_INVESTORY, no se InputType puede especificar.

nota
  • Step Functions admite 10 GB como tamaño máximo de un archivo individual en un informe de inventario de Amazon S3 tras la descompresión. Sin embargo, Step Functions puede procesar más de 10 GB si cada archivo individual tiene menos de 10 GB.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

A continuación se muestra un ejemplo de un archivo de inventario de ejemplo en formato CSV: Este archivo incluye los objetos denominados csvDataset yimageDataset, que se almacenan en un bucket de Amazon S3 que lleva ese nombre amzn-s3-demo-source-bucket.

"amzn-s3-demo-source-bucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "amzn-s3-demo-source-bucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "amzn-s3-demo-source-bucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "amzn-s3-demo-source-bucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
importante

Step Functions no admite un informe de inventario de Amazon S3 definido por el usuario como conjunto de datos.

El formato de salida del informe de inventario de Amazon S3 debe ser CSV.

Para obtener más información sobre los inventarios de Amazon S3 y cómo configurarlos, consulte Amazon S3 Inventory.

El siguiente ejemplo de un archivo de manifiesto de inventario de Amazon S3 muestra los encabezados CSV de los metadatos del objeto de inventario.

{ "sourceBucket" : "amzn-s3-demo-source-bucket", "destinationBucket" : "arn:aws:s3:::amzn-s3-demo-inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "amzn-s3-demo-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

ItemReader syntax
"ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Key": "destination-prefix/amzn-s3-demo-bucket/config-id/YYYY-MM-DDTHH-MMZ/manifest.json" } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "amzn-s3-demo-source-bucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

En función de los campos que haya seleccionado al configurar el informe de inventario de Amazon S3, el contenido del manifest.json archivo puede diferir del del ejemplo.

Recomendaciones de políticas de IAM para conjuntos de datos

Al crear flujos de trabajo con la consola de Step Functions, Step Functions puede generar automáticamente políticas de IAM basadas en los recursos de la definición de flujo de trabajo. Las políticas generadas incluyen los privilegios mínimos necesarios para permitir que la función de máquina de estado invoque la acción de la StartExecution API para los AWS recursos de estado y acceso del mapa distribuido, como los buckets y objetos de Amazon S3 y las funciones de Lambda.

Recomendamos incluir solo los permisos necesarios en sus políticas de IAM. Por ejemplo, si tu flujo de trabajo incluye un Map estado en modo distribuido, aplica tus políticas al bucket y la carpeta específicos de Amazon S3 que contienen tus datos.

importante

Si especifica un bucket y un objeto de Amazon S3, o un prefijo, con una ruta de referencia a un par clave-valor existente en la entrada del estado Map Distributed, no olvide actualizar las políticas de IAM para el flujo de trabajo. Limite las políticas hasta los nombres de objeto y bucket a los que se dirige la ruta en tiempo de ejecución.

Los siguientes ejemplos muestran técnicas para conceder los privilegios mínimos necesarios para acceder a sus conjuntos de datos de Amazon S3 mediante la ListObjectsV2 y las acciones de la GetObjectAPI.

ejemplo condición de usar un objeto Amazon S3 como conjunto de datos

La siguiente condición otorga el mínimo de privilegios para acceder a los objetos de una processImages carpeta de un bucket de Amazon S3.

"Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } }
ejemplo usar un archivo CSV como conjunto de datos

En el siguiente ejemplo, se muestran las acciones necesarias para acceder a un archivo CSV denominadoratings.csv.

"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/csvDataset/ratings.csv" ]
ejemplo usar un inventario de Amazon S3 como conjunto de datos

A continuación, se muestran ejemplos de recursos para un manifiesto de inventario y archivos de datos de Amazon S3.

"Resource": [ "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/data/*" ]
ejemplo usar ListObjects V2 para restringirlo a un prefijo de carpeta

Al usar ListObjectsV2, se generarán dos políticas. Una es necesaria para poder enumerar el contenido del depósito (ListBucket) y otra política permitirá recuperar los objetos del depósito (GetObject).

A continuación, se muestran ejemplos de acciones, recursos y una condición:

"Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "/path/to/your/json/" ] } }
"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/path/to/your/json/*" ]

Tenga en cuenta que no se GetObject limitará al ámbito y utilizará un comodín (*) para el objeto.