AWS Data Pipeline não está mais disponível para novos clientes. Clientes existentes da AWS Data Pipeline pode continuar usando o serviço normalmente. Saiba mais
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
HadoopActivity
Executa um MapReduce trabalho em um cluster. O cluster pode ser um EMR cluster gerenciado por AWS Data Pipeline ou outro recurso, se você usar TaskRunner. Use HadoopActivity quando quiser executar o trabalho em paralelo. Isso permite que você use os recursos de agendamento da YARN estrutura ou do negociador de MapReduce recursos no Hadoop 1. Se você quiser executar o trabalho sequencialmente usando a ação Amazon EMR Step, você ainda pode usar. EmrActivity
Exemplos
HadoopActivity usando um EMR cluster gerenciado pelo AWS Data Pipeline
O HadoopActivity objeto a seguir usa um EmrCluster recurso para executar um programa:
{ "name": "MyHadoopActivity", "schedule": {"ref": "ResourcePeriod"}, "runsOn": {"ref": “MyEmrCluster”}, "type": "HadoopActivity", "preActivityTaskConfig":{"ref":"preTaskScriptConfig”}, "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "argument": [ "-files", “s3://elasticmapreduce/samples/wordcount/wordSplitter.py“, "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", “s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "maximumRetries": "0", "postActivityTaskConfig":{"ref":"postTaskScriptConfig”}, "hadoopQueue" : “high” }
Aqui está o correspondente MyEmrCluster
, que configura as filas FairScheduler e YARN para o Hadoop 2 baseado em: AMIs
{ "id" : "MyEmrCluster", "type" : "EmrCluster", "hadoopSchedulerType" : "PARALLEL_FAIR_SCHEDULING", “amiVersion” : “3.7.0”, "bootstrapAction" : ["s3://
Region
.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\,high\,default,-z,yarn.scheduler.capacity.root.high.capacity=50,-z,yarn.scheduler.capacity.root.low.capacity=10,-z,yarn.scheduler.capacity.root.default.capacity=30”] }
Isso é o EmrCluster que você usa para configurar FairScheduler no Hadoop 1:
{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_FAIR_SCHEDULING", "amiVersion": "2.4.8", "bootstrapAction": "s3://
Region
.elasticmapreduce/bootstrap-actions/configure-hadoop,-m,mapred.queue.names=low\\\\,high\\\\,default,-m,mapred.fairscheduler.poolnameproperty=mapred.job.queue.name" }
As EmrCluster configurações a seguir são baseadas em CapacityScheduler Hadoop 2: AMIs
{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_CAPACITY_SCHEDULING", "amiVersion": "3.7.0", "bootstrapAction": "s3://
Region
.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\\\\,high,-z,yarn.scheduler.capacity.root.high.capacity=40,-z,yarn.scheduler.capacity.root.low.capacity=60" }
HadoopActivity usando um EMR cluster existente
Neste exemplo, você usa grupos de trabalho e a TaskRunner para executar um programa em um cluster existente. EMR A seguinte definição de pipeline é usada HadoopActivity para:
-
Execute um MapReduce programa somente em
myWorkerGroup
recursos. Para obter mais informações sobre grupos de operadores, consulte Executar trabalho em recursos existentes usando o Task Runner. -
Execute um preActivityTask Config e Config postActivityTask
{ "objects": [ { "argument": [ "-files", "s3://elasticmapreduce/samples/wordcount/wordSplitter.py", "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", "s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "id": "MyHadoopActivity", "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "name": "MyHadoopActivity", "type": "HadoopActivity" }, { "id": "SchedulePeriod", "startDateTime": "start_datetime", "name": "SchedulePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "end_datetime" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/preTaskScript.sh", "name": "preTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/postTaskScript.sh", "name": "postTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "Default", "scheduleType": "cron", "schedule": { "ref": "SchedulePeriod" }, "name": "Default", "pipelineLogUri": "s3://test-bucket/logs/2015-05-22T18:02:00.343Z642f3fe415", "maximumRetries": "0", "workerGroup": "myWorkerGroup", "preActivityTaskConfig": { "ref": "preTaskScriptConfig" }, "postActivityTaskConfig": { "ref": "postTaskScriptConfig" } } ] }
Sintaxe
Campos obrigatórios | Descrição | Tipo de slot |
---|---|---|
jarUri | Localização de um JAR no Amazon S3 ou no sistema de arquivos local do cluster com o qual executar. HadoopActivity | String |
Campos de invocação de objetos | Descrição | Tipo de slot |
---|---|---|
schedule | Esse objeto é invocado durante a execução de um intervalo de programação. Os usuários precisam especificar uma referência de programação para outro objeto de modo a definir a ordem de execução de dependência desse objeto. Os usuários podem satisfazer esse requisito definindo explicitamente uma programação no objeto, por exemplo, especificando “agenda”: {"ref”: "DefaultSchedule“}. Na maioria dos casos, é melhor colocar a referência de programação no objeto de pipeline padrão para que todos os objetos herdem essa programação. Como alternativa, se o pipeline tiver uma árvore de programações (outras programações dentro de uma programação principal), os usuários poderão criar um objeto principal que tenha uma referência de programação. Para obter mais informações sobre o exemplo de configurações opcionais de programação, consulte https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html. | Objeto de referência, por exemplo, “agenda”: {"ref”:” myScheduleId “} |
Grupo obrigatório (um dos seguintes é obrigatório) | Descrição | Tipo de slot |
---|---|---|
runsOn | EMRCluster no qual esse trabalho será executado. | Objeto de referência, por exemplo, "runsOn“: {" ref”:” myEmrCluster Id "} |
workerGroup | O grupo de operadores. Isso é usado para tarefas de roteamento. Se você fornecer um runsOn valor e workerGroup existir, ele será workerGroup ignorado. | String |
Campos opcionais | Descrição | Tipo de slot |
---|---|---|
argument | Argumentos a serem passados para JAR o. | String |
attemptStatus | Status mais recente da atividade remota. | String |
attemptTimeout | Tempo limite para conclusão do trabalho remoto. Se configurada, uma atividade remota não concluída dentro do prazo definido poderá ser executada novamente. | Período |
dependsOn | Especifique a dependência em outro objeto executável. | Objeto de referência, por exemplo, "dependsOn“: {" ref”:” myActivityId “} |
failureAndRerunModo | Descreve o comportamento do nó do consumidor quando as dependências apresentam falhas ou são executadas novamente. | Enumeração |
hadoopQueue | O nome da fila do programador do Hadoop em que a atividade será enviada. | String |
input | Local dos dados de entrada. | Objeto de referência, por exemplo, “input”: {"ref”:” myDataNode Id "} |
lateAfterTimeout | O tempo decorrido após o início do pipeline no qual o objeto deve ser concluído. Ele é acionado somente quando o tipo de programação não está definido como ondemand . |
Período |
mainClass | A classe principal da com HadoopActivity a JAR qual você está executando. | String |
maxActiveInstances | O número máximo de instâncias ativas simultâneas de um componente. Novas execuções não contam para o número de instâncias ativas. | Inteiro |
maximumRetries | Quantidade máxima de novas tentativas com falha. | Inteiro |
onFail | Uma ação a ser executada quando há falha no objeto atual. | Objeto de referência, por exemplo, "onFail“: {" ref”:” myActionId “} |
onLateAction | Ações que devem ser acionadas se um objeto ainda não foi agendado ou não foi concluído. | Objeto de referência, por exemplo, "onLateAction“: {" ref”:” myActionId “} |
onSuccess | Uma ação a ser executada quando o objeto atual é executado com êxito. | Objeto de referência, por exemplo, "onSuccess“: {" ref”:” myActionId “} |
output | Local dos dados de saída. | Objeto de referência, por exemplo, “output”: {"ref”:” myDataNode Id "} |
parent | Pai do objeto atual a partir do qual os slots serão herdados. | Objeto de referência, por exemplo, “parent”: {"ref”:” myBaseObject Id "} |
pipelineLogUri | O S3 URI (como 's3://BucketName/Key/ ') para carregar registros para o pipeline. | String |
postActivityTaskConfig | Script de configuração pós-atividade a ser executado. Isso consiste em um script URI de shell no Amazon S3 e uma lista de argumentos. | Objeto de referência, por exemplo, "postActivityTaskConfig”: {"ref”:” myShellScript ConfigId “} |
preActivityTaskConfig | Script de configuração pré-atividade a ser executado. Isso consiste em um script URI de shell no Amazon S3 e uma lista de argumentos. | Objeto de referência, por exemplo, "preActivityTaskConfig”: {"ref”:” myShellScript ConfigId “} |
precondition | Se desejar, você pode definir uma precondição. Um nó de dados não é marcado como "READY" até que todas as condições prévias tenham sido atendidas. | Objeto de referência, por exemplo, “pré-condição”: {"ref”:” myPreconditionId “} |
reportProgressTimeout | Tempo limite para trabalho remoto: chamadas sucessivas para. reportProgress Se definidas, as atividades remotas sem progresso para o período especificado podem ser consideradas como interrompidas e executadas novamente. | Período |
retryDelay | A duração do tempo limite entre duas novas tentativas. | Período |
scheduleType | O tipo de programação permite que você especifique se os objetos na sua definição de pipeline devem ser programados no início ou no final do intervalo. Programação com estilo de séries temporais significa que as instâncias são programadas no final de cada intervalo, e Programação com estilo Cron significa que as instâncias são programadas no início de cada intervalo. Uma programação sob demanda permite que você execute um pipeline uma vez por ativação. Isso significa que você não precisa clonar nem recriar o pipeline para executá-lo novamente. Se você usar uma programação sob demanda, ela deverá ser especificada no objeto padrão e deverá ser a única scheduleType especificada para objetos no pipeline. Para usar pipelines sob demanda, basta chamar a ActivatePipeline operação para cada execução subsequente. Os valores são: cron, ondemand e timeseries. | Enumeração |
Campos de tempo de execução | Descrição | Tipo de slot |
---|---|---|
@activeInstances | Lista dos objetos da instância ativa agendados no momento. | Objeto de referência, por exemplo, "activeInstances“: {" ref”:” myRunnableObject Id "} |
@actualEndTime | Hora em que a execução deste objeto foi concluída. | DateTime |
@actualStartTime | Hora em que a execução deste objeto foi iniciada. | DateTime |
cancellationReason | cancellationReason Se esse objeto foi cancelado. | String |
@cascadeFailedOn | Descrição da cadeia de dependência na qual o objeto apresentou falha. | Objeto de referência, por exemplo, "cascadeFailedOn“: {" ref”:” myRunnableObject Id "} |
emrStepLog | EMRregistros de etapas disponíveis somente em tentativas de EMR atividade | String |
errorId | errorId Se esse objeto falhar. | String |
errorMessage | errorMessage Se esse objeto falhar. | String |
errorStackTrace | O rastreamento de pilha com erro se esse objeto apresentou falha. | String |
@finishedTime | A hora em que esse objeto terminou a execução. | DateTime |
hadoopJobLog | Registros de tarefas do Hadoop disponíveis em tentativas de atividades EMR baseadas. | String |
@healthStatus | O status de integridade do objeto que indica se houve sucesso ou falha na última instância concluída do objeto. | String |
@healthStatusFromInstanceId | ID do último objeto da instância concluído. | String |
@ healthStatusUpdated Hora | Hora em que o status de integridade foi atualizado pela última vez. | DateTime |
hostname | O nome do host do cliente que capturou a tentativa da tarefa. | String |
@lastDeactivatedTime | A hora em que esse objeto foi desativado pela última vez. | DateTime |
@ latestCompletedRun Hora | Hora da última execução concluída. | DateTime |
@latestRunTime | Hora da última execução programada. | DateTime |
@nextRunTime | Hora da próxima execução a ser programada. | DateTime |
reportProgressTime | A última vez que a atividade remota relatou progresso. | DateTime |
@scheduledEndTime | Horário de término da programação para o objeto. | DateTime |
@scheduledStartTime | Horário de início da programação para o objeto. | DateTime |
@status | O status deste objeto. | String |
@version | A versão do pipeline com que o objeto foi criado. | String |
@waitingOn | Descrição da lista de dependências em que este objeto está aguardando. | Objeto de referência, por exemplo, "waitingOn“: {" ref”:” myRunnableObject Id "} |
Campos do sistema | Descrição | Tipo de slot |
---|---|---|
@error | Erro ao descrever o objeto malformado. | String |
@pipelineId | ID do pipeline ao qual este objeto pertence. | String |
@sphere | A esfera de um objeto denota seu lugar no ciclo de vida: os objetos componentes dão origem aos objetos de instância que executam os objetos de tentativa. | String |