Datos y tablas transitorios con actividades de canalización - AWS Data Pipeline

AWS Data Pipeline ya no está disponible para nuevos clientes. Los clientes actuales de AWS Data Pipeline pueden seguir utilizando el servicio con normalidad. Más información

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Datos y tablas transitorios con actividades de canalización

AWS Data Pipeline puede utilizar datos transitorios de entrada y salida en las canalizaciones para facilitar el uso de determinadas actividades, tales como ShellCommandActivity y HiveActivity.

El uso transitorio de datos le permite copiar datos del nodo de datos de entrada en el recurso donde se ejecuta la actividad y, de manera similar, desde el recurso en el nodo de datos de salida.

Los datos transitorios del recurso de Amazon EMR o Amazon EC2 están disponibles mediante el uso de variables especiales en los comandos de shell o scripts de Hive de la actividad.

El uso de tablas transitorias es similar al de datos transitorios, excepto en que los datos almacenados adoptan la forma de tablas almacenadas, específicamente.

AWS Data Pipeline admite los escenarios siguientes:

  • Uso transitorio de datos con ShellCommandActivity

  • Tablas transitorias con nodos de datos compatibles con el uso de datos transitorios y Hive

  • Tablas transitorias con nodos de datos no compatibles con el uso de datos transitorios y Hive

nota

Funciones solo transitorias cuando el campo stage esté establecido en true en una actividad, como ShellCommandActivity. Para obtener más información, consulte ShellCommandActivity.

Además, las actividades y los nodos de datos se pueden relacionar de cuatro maneras:

Almacenamiento local de los datos en un recurso

Los datos de entrada se copian automáticamente en el sistema de archivos local del recurso. Los datos de salida se copian automáticamente del sistema de archivos local del recurso en el nodo de datos de salida. Por ejemplo, cuando se configuran entradas y salidas ShellCommandActivity con staging = true, los datos de entrada están disponibles como INPUTx_STAGING_DIR y los datos de salida están disponibles como OUTPUT_STAGING_DIR, donde x es el número de entrada o salida.

Almacenamiento transitorio de definiciones de entrada y salida para una actividad

El formato de datos de entrada (nombres de columna y nombres de tabla) se copia automáticamente en el recurso de la actividad. Por ejemplo, cuando se configura HiveActivity con staging = true. El formato de datos especificado en la entrada S3DataNode se utiliza para el uso transitorio de la definición de tabla de la tabla Hive.

Uso transitorio no habilitado

Los objetos de entrada y salida y sus campos están disponibles para la actividad, pero no los propios datos. Por ejemplo, EmrActivity de forma predeterminada o al configurar otras actividades con staging = false. En esta configuración, los campos de datos están disponibles para que la actividad pueda hacer referencia a ellos mediante la sintaxis de expresión de AWS Data Pipeline, pero esto solo ocurre cuando se satisface la dependencia. Esto sirve solamente como comprobación de dependencias. El código de la actividad es responsable de copiar los datos de la entrada en el recurso que ejecuta la actividad.

Relación de dependencia entre objetos

Existe una relación de dependencia entre dos objetos, que produce como resultado una situación similar a cuando no se ha habilitado el uso transitorio. Esto provoca que un nodo de datos o una actividad actúen como una condición previa para la ejecución de otra actividad.

Uso transitorio de datos con ShellCommandActivity

Considere un escenario en que se utilice ShellCommandActivity con objetos S3DataNode como entrada y salida de datos. AWS Data Pipeline almacena automáticamente los nodos de datos de manera transitoria para ponerlos a disposición del comando de shell como si fueran carpetas de archivos locales mediante las variables de entorno ${INPUT1_STAGING_DIR} y ${OUTPUT1_STAGING_DIR}, tal y como se muestra en el siguiente ejemplo. La parte numérica de las variables denominadas INPUT1_STAGING_DIR y OUTPUT1_STAGING_DIR se incrementa en función del número de nodos de datos a los que hace referencia la actividad.

nota

Esta situación solo funciona como se describe si las entradas y salidas de datos son objetos S3DataNode. Además, el uso transitorio de datos de salida solamente se permite cuando directoryPath se establece en el objeto S3DataNode de salida.

{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...

Tablas transitorias con nodos de datos compatibles con el uso de datos transitorios y Hive

Considere un escenario en que se utilice HiveActivity con objetos S3DataNode como entrada y salida de datos. AWS Data Pipeline almacena automáticamente los nodos de datos de manera transitoria para ponerlos a disposición del script de Hive como si fueran tablas de Hive mediante las variables de ${input1} y ${output1}, tal y como se muestra en el siguiente ejemplo para HiveActivity. La parte numérica de las variables denominadas input y output se incrementa en función del número de nodos de datos a los que hace referencia la actividad.

nota

Esta situación solo funciona como se describe si las entradas y salidas de datos son objetos S3DataNode o MySqlDataNode. El uso transitorio de tablas no se admite para DynamoDBDataNode.

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...

Tablas transitorias con nodos de datos no compatibles con el uso de datos transitorios y Hive

Considere un escenario en que se utilice HiveActivity con objetos DynamoDBDataNode como entrada y un objeto S3DataNode como salida de datos. No está disponible ningún uso transitorio de datos para DynamoDBDataNode, por lo que primero debe crear manualmente la tabla dentro del script de Hive utilizando el nombre de variable #{input.tableName} para hacer referencia a la tabla de DynamoDB. Se aplica una nomenclatura similar si la tabla de DynamoDB es la salida, excepto si se usa la variable #{output.tableName}. El uso transitorio está disponible para el objeto S3DataNode en este ejemplo, así que se puede hacer referencia al nodo de datos de salida como ${output1}.

nota

En este ejemplo, la variable del nombre de la tabla tiene como prefijo el carácter # (hash) porque AWS Data Pipeline utiliza expresiones para obtener acceso a tableName o directoryPath. Para obtener más información acerca de cómo funciona la evaluación de expresiones en AWS Data Pipeline, consulte Evaluación de expresiones.

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...