Copie intermédiaire des données et des tables avec les activités de pipeline - AWS Data Pipeline

AWS Data Pipeline n'est plus disponible pour les nouveaux clients. Les clients existants de AWS Data Pipeline peuvent continuer à utiliser le service normalement. En savoir plus

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Copie intermédiaire des données et des tables avec les activités de pipeline

AWS Data Pipeline peut effectuer une copie intermédiaire des données entrantes et sortantes dans vos pipelines pour faciliter l'utilisation de certaines activités, telles que ShellCommandActivity et HiveActivity.

Une copie intermédiaire vous permet de copier des données du nœud d'entrée vers la ressource exécutant l'activité et, de la même manière, de la ressource vers le nœud de sortie.

Les données intermédiaires sur la ressource Amazon EMR ou Amazon EC2 sont disponibles à l'aide de variables spéciales dans les commandes shell ou les scripts Hive de l'activité.

La copie intermédiaire de tables est similaire au déploiement de données, à la différence près que les données ayant fait l'objet d'une copie intermédiaire prennent la forme de tables de base de données, tout spécialement.

AWS Data Pipeline prend en charge les scénarios de copie intermédiaire suivants :

  • Copie intermédiaire de données avec ShellCommandActivity

  • Copie intermédiaire de tables avec Hive et nœuds de données pris en charge par la copie intermédiaire

  • Copie intermédiaire de tables avec Hive et nœuds de données non pris en charge par la copie intermédiaire

Note

La copie intermédiaire fonctionne uniquement lorsque le champ stage a la valeur true sur une activité, telle que ShellCommandActivity. Pour plus d'informations, veuillez consulter ShellCommandActivity.

En outre, les nœuds de données et les activités peuvent avoir des liens de quatre manières :

Copie intermédiaire de données localement sur une ressource

Les données d'entrée sont automatiquement copiées dans le système de fichiers local des ressources. Les données de sortie sont automatiquement copiées du système de fichiers local des ressources vers le nœud de données de sortie. Par exemple, lorsque vous configurez des entrées et des sorties ShellCommandActivity avec la copie intermédiaire = true, les données d'entrée sont disponibles en tant qu'INPUTx_STAGING_DIR et les données de sortie sont disponibles en tant qu'OUTPUTx_STAGING_DIR, où x correspond au nombre d'entrée ou de sortie.

Copie intermédiaire des définitions d'entrée et de sortie pour une activité

Le format des données d'entrée (noms des colonnes et noms des tables) est automatiquement copié dans la ressource de l'activité. Par exemple, lorsque vous configurez HiveActivity avec la copie intermédiaire = true. Le format de données spécifié sur l'entrée S3DataNode est utilisé pour effectuer une copie intermédiaire de la définition de table à partir de la table Hive.

Copie intermédiaire non activée

Les objets d'entrée et de sortie et leurs champs sont disponibles pour l'activité, mais les données elles-mêmes ne le sont pas. Par exemple, EmrActivity par défaut ou lorsque vous configurez d'autres activités avec copie intermédiaire = false. Dans cette configuration, les champs de données sont disponibles pour l'activité afin d'y faire référence à l'aide de la syntaxe des expressions AWS Data Pipeline, et ceci se produit uniquement lorsque la dépendance est satisfaite. Ceci sert à vérifier la dépendance uniquement. Le code de l'activité est responsable de la copie des données de l'entrée vers la ressource exécutant l'activité.

Relation de dépendance entre les objets

Il existe une relation de dépendance entre deux objets, ce qui donne lieu à une situation similaire à celle lorsque la copie intermédiaire n'est pas activée. Ceci entraîne un nœud de données ou une activité à agir en tant que condition préalable pour l'exécution d'une autre activité.

Stationnage des données avec ShellCommandActivity

Prenons l'exemple d'un scénario qui utilise une ShellCommandActivity avec des objets S3DataNode en tant qu'entrée et sortie de données. AWS Data Pipeline effectue automatiquement une copie intermédiaire des nœuds de données pour les rendre accessibles à la commande shell comme s'ils étaient des dossiers de fichiers locaux en utilisant les variables d'environnement ${INPUT1_STAGING_DIR} et ${OUTPUT1_STAGING_DIR}, comme indiqué dans l'exemple suivant. La partie numérique des variables nommées INPUT1_STAGING_DIR et OUTPUT1_STAGING_DIR s'incrémente en fonction du nombre de nœuds de données référencés par votre activité.

Note

Ce scénario fonctionne uniquement comme indiqué si vos entrées et sorties de données sont des objets S3DataNode. De plus, la copie intermédiaire des données de sortie est autorisée uniquement lorsque le directoryPath est défini sur l'objet S3DataNode de sortie.

{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...

Copie intermédiaire de tables avec Hive et nœuds de données pris en charge par la copie intermédiaire

Prenons l'exemple d'un scénario qui utilise une HiveActivity avec des objets S3DataNode en tant qu'entrée et sortie de données. AWS Data Pipeline effectue automatiquement une copie intermédiaire des nœuds de données pour les rendre accessibles au script Hive comme s'ils étaient des tables Hive en utilisant les variables ${input1} et ${output1}, comme indiqué dans l'exemple suivant pour HiveActivity. La partie numérique des variables nommées input et output s'incrémente en fonction du nombre de nœuds de données référencés par votre activité.

Note

Ce scénario fonctionne uniquement comme indiqué si vos entrées et sorties de données sont des objets S3DataNode ou MySqlDataNode. La copie intermédiaire de tables n'est pas prise en charge pour DynamoDBDataNode.

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...

Copie intermédiaire de tables avec Hive et nœuds de données non pris en charge par la copie intermédiaire

Prenons l'exemple d'un scénario qui utilise une HiveActivity avec DynamoDBDataNode en tant qu'entrée de données et un objet S3DataNode en tant que sortie. Aucun transfert de données n'est disponible. Vous devez donc d'abord créer la table manuellement dans votre script Hive, en utilisant le nom de la variable #{input.tableName} pour faire référence à la table DynamoDB. DynamoDBDataNode Une nomenclature similaire s'applique si la table DynamoDB est la sortie, sauf que vous utilisez une variable. #{output.tableName} La copie intermédiaire est disponible pour l'objet S3DataNode de sortie dans cet exemple, vous pouvez donc faire référence au nœud de données de sortie en tant que ${output1}.

Note

Dans cet exemple, la variable du nom de la table comporte un préfixe # (hachage), car AWS Data Pipeline utilise des expressions pour accéder au tableName ou au directoryPath. Pour plus d'informations sur le fonctionnement de l'évaluation des expressions dans AWS Data Pipeline, consultez Evaluation d'expression.

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...