AWS Data Pipeline não está mais disponível para novos clientes. Clientes existentes da AWS Data Pipeline pode continuar usando o serviço normalmente. Saiba mais
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Preparar dados e tabelas com atividades de pipeline
O AWS Data Pipeline pode preparar dados de entrada e saída nos pipelines para facilitar o uso de determinadas atividades, como ShellCommandActivity
e HiveActivity
.
A preparação de dados permite a você copiar os dados do nó de dados de entrada para o recurso que executa a atividade e, de maneira semelhante, do recurso para o nó de dados de saída.
Os dados preparados no recurso Amazon EMR ou Amazon EC2 são disponibilizados usando variáveis especiais nos comandos shell da atividade ou nos scripts do Hive.
A preparação da tabela é semelhante à preparação dos dados, exceto pelos dados preparados assumirem a forma de tabelas de banco de dados, mais especificamente.
O AWS Data Pipeline oferece suporte aos seguintes cenários de preparação:
-
Preparação de dados com
ShellCommandActivity
-
Preparação da tabela com Hive e nós de dados compatíveis com preparação
-
Preparação da tabela com Hive e nós de dados incompatíveis com preparação
nota
A preparação funciona somente quando o campo stage
é definido como true
em uma atividade, como ShellCommandActivity
. Para obter mais informações, consulte ShellCommandActivity.
Além disso, os nós de dados e as atividades podem estar relacionados de quatro maneiras:
- Preparar dados localmente em um recurso
-
Os dados de entrada são copiados automaticamente para o sistema de arquivos local. Os dados de saída são copiados automaticamente do sistema de arquivos local para o nó de dados de saída. Por exemplo, quando você configura entradas e saídas
ShellCommandActivity
com staging = true, os dados de entrada são disponibilizados como INPUTx_STAGING_DIR e os dados de saída são disponibilizados como OUTPUTx_STAGING_DIR, em que x é o número de entrada ou saída. - Preparar definições de entrada e saída para uma atividade
-
O formato de dados de entrada (nomes de coluna e de tabela) é copiado automaticamente para o recurso da atividade. Por exemplo, quando você configura
HiveActivity
com staging = true. O formato de dados especificado na entradaS3DataNode
é usado para preparar a definição da tabela Hive. - Preparação não ativada
-
Os objetos de entrada e saída e os campos estão disponíveis para a atividade, mas os dados não. Por exemplo,
EmrActivity
por padrão, ou quando você configura outras atividades com staging = false. Nessa configuração, os campos de dados estão disponíveis para a atividade fazer uma referência a eles usando a sintaxe da expressão do AWS Data Pipeline, e isso ocorre somente quando a dependência é atendida. Isso funciona somente como verificação de dependência. O código na atividade é responsável por copiar os dados da entrada para o recurso que executa a atividade. - Relação de dependência entre objetos
-
Existe uma relação de dependência entre dois objetos, o que resulta em uma situação semelhante a quando a preparação não está ativada. Isso faz um nó de dados ou uma atividade funcionar como uma precondição para a execução de outra atividade.
Preparação de dados com ShellCommandActivity
Considere um cenário que use um ShellCommandActivity
com objetos S3DataNode
como entrada e saída de dados. O AWS Data Pipeline prepara automaticamente os nós de dados para torná-los acessíveis ao comando shell como se fossem pastas de arquivos locais usando as variáveis de ambiente ${INPUT1_STAGING_DIR}
e ${OUTPUT1_STAGING_DIR}
, conforme mostrado no exemplo a seguir. A parte numérica das variáveis chamadas INPUT1_STAGING_DIR
e OUTPUT1_STAGING_DIR
é incrementada dependendo do número de nós de dados e das referências de atividade.
nota
Esse cenário funcionará somente conforme descrito se as entradas e as saídas de dados forem objetos S3DataNode
. Além disso, a preparação de dados de saída é permitida somente quando directoryPath
é definido no objeto S3DataNode
de saída.
{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...
Preparação da tabela com Hive e nós de dados compatíveis com preparação
Considere um cenário que use um HiveActivity
com objetos S3DataNode
como entrada e saída de dados. O AWS Data Pipeline prepara automaticamente os nós de dados para torná-los acessíveis ao script do Hive como se fossem tabelas do Hive usando as variáveis de ambiente ${input1}
e ${output1}
, conforme mostrado no exemplo a seguir para HiveActivity
. A parte numérica das variáveis chamadas input
e output
é incrementada dependendo do número de nós de dados e das referências de atividade.
nota
Esse cenário funcionará somente conforme descrito se as entradas e as saídas de dados forem objetos S3DataNode
ou MySqlDataNode
. A preparação de tabelas não é compatível com DynamoDBDataNode
.
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...
Preparação da tabela com Hive e nós de dados incompatíveis com preparação
Considere um cenário que use um HiveActivity
com DynamoDBDataNode
como entrada de dados e um objeto S3DataNode
como a saída. Nenhuma preparação de dados está disponível para DynamoDBDataNode
. Por isso, você deve primeiro criar manualmente a tabela dentro do script do Hive usando o nome da variável #{input.tableName}
para referenciar a tabela do DynamoDB. Uma nomenclatura semelhante se aplicará se a tabela do DynamoDB for a saída, exceto se você usar a variável #{output.tableName}
. A preparação está disponível para o objeto S3DataNode
de saída neste exemplo. Por isso, você pode se referir ao nó de dados de saída como ${output1}
.
nota
Neste exemplo, a variável do nome da tabela tem o prefixo de caractere # (hash), pois o AWS Data Pipeline usa expressões para acessar o tableName
ou directoryPath
. Para obter mais informações sobre como a avaliação da expressão funciona no AWS Data Pipeline, consulte Avaliação de expressões.
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...