

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。[了解详情](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# HadoopActivity
<a name="dp-object-hadoopactivity"></a>

 在集群上运行 MapReduce 作业。集群可以是管理的 EMR 集群， AWS Data Pipeline 也可以是其他资源（如果您使用）。 TaskRunner HadoopActivity 当你想并行运行工作时使用。这允许您使用 YARN 框架的调度资源或 Hadoop 1 中的 MapReduce 资源协商器。如果您想使用 Amazon EMR 步骤操作来按顺序运行工作，您仍可使用 [EmrActivity](dp-object-emractivity.md)。

## 示例
<a name="hadoopactivity-example"></a>

**HadoopActivity 使用由管理的 EMR 集群 AWS Data Pipeline**  
以下 HadoopActivity 对象使用 EmrCluster 资源来运行程序：

```
 {
   "name": "MyHadoopActivity",
   "schedule": {"ref": "ResourcePeriod"},
   "runsOn": {"ref": “MyEmrCluster”},
   "type": "HadoopActivity",
   "preActivityTaskConfig":{"ref":"preTaskScriptConfig”},   
   "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
   "argument": [
     "-files",
     “s3://elasticmapreduce/samples/wordcount/wordSplitter.py“,
     "-mapper",
     "wordSplitter.py",
     "-reducer",
     "aggregate",
     "-input",
     "s3://elasticmapreduce/samples/wordcount/input/",
     "-output",
     “s3://amzn-s3-demo-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}"
   ],
   "maximumRetries": "0",
   "postActivityTaskConfig":{"ref":"postTaskScriptConfig”},
   "hadoopQueue" : “high”
 }
```

以下是相应的*MyEmrCluster*，它在 YARN 中为基于 Hadoop FairScheduler 2 的和队列配置了和队列： AMIs

```
{
  "id" : "MyEmrCluster",
  "type" : "EmrCluster",
   "hadoopSchedulerType" : "PARALLEL_FAIR_SCHEDULING",
  “amiVersion” : “3.7.0”,
  "bootstrapAction" : ["s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\,high\,default,-z,yarn.scheduler.capacity.root.high.capacity=50,-z,yarn.scheduler.capacity.root.low.capacity=10,-z,yarn.scheduler.capacity.root.default.capacity=30”]
}
```

这是 EmrCluster 你在 Hadoop 1 FairScheduler 中用来配置的：

```
{
      "id": "MyEmrCluster",
      "type": "EmrCluster",    
      "hadoopSchedulerType": "PARALLEL_FAIR_SCHEDULING",
      "amiVersion": "2.4.8",
      "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-m,mapred.queue.names=low\\\\,high\\\\,default,-m,mapred.fairscheduler.poolnameproperty=mapred.job.queue.name"
          }
```

以下是基 CapacityScheduler 于 Hadoop 2 的 EmrCluster 配置： AMIs

```
{
      "id": "MyEmrCluster",
      "type": "EmrCluster",
      "hadoopSchedulerType": "PARALLEL_CAPACITY_SCHEDULING",
      "amiVersion": "3.7.0",
      "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\\\\,high,-z,yarn.scheduler.capacity.root.high.capacity=40,-z,yarn.scheduler.capacity.root.low.capacity=60"
    }
```

**HadoopActivity 使用现有 EMR 集群**  
在此示例中，您使用工作组和在现有 TaskRunner EMR 集群上运行程序。以下管道定义用 HadoopActivity 于：
+ 仅在*myWorkerGroup*资源上运行 MapReduce 程序。有关工作线程组的更多信息，请参阅[使用任务运行程序在现有资源上执行工作](dp-how-task-runner-user-managed.md)。
+ 运行 preActivityTask Config and Con postActivityTask fig

```
{
  "objects": [
    {
      "argument": [
        "-files",
        "s3://elasticmapreduce/samples/wordcount/wordSplitter.py",
        "-mapper",
        "wordSplitter.py",
        "-reducer",
        "aggregate",
        "-input",
        "s3://elasticmapreduce/samples/wordcount/input/",
        "-output",
        "s3://amzn-s3-demo-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}"
      ],
      "id": "MyHadoopActivity",
      "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
      "name": "MyHadoopActivity",
      "type": "HadoopActivity"
    },
    {
      "id": "SchedulePeriod",
      "startDateTime": "start_datetime",
      "name": "SchedulePeriod",
      "period": "1 day",
      "type": "Schedule",
      "endDateTime": "end_datetime"
    },
    {
      "id": "ShellScriptConfig",
      "scriptUri": "s3://amzn-s3-demo-bucket/scripts/preTaskScript.sh",
      "name": "preTaskScriptConfig",
      "scriptArgument": [
        "test",
        "argument"
      ],
      "type": "ShellScriptConfig"
    },
    {
      "id": "ShellScriptConfig",
      "scriptUri": "s3://amzn-s3-demo-bucket/scripts/postTaskScript.sh",
      "name": "postTaskScriptConfig",
      "scriptArgument": [
        "test",
        "argument"
      ],
      "type": "ShellScriptConfig"
    },
    {
      "id": "Default",
      "scheduleType": "cron",
      "schedule": {
        "ref": "SchedulePeriod"
      },
      "name": "Default",
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/logs/2015-05-22T18:02:00.343Z642f3fe415",
      "maximumRetries": "0",    
      "workerGroup": "myWorkerGroup",
      "preActivityTaskConfig": {
        "ref": "preTaskScriptConfig"
      },
      "postActivityTaskConfig": {
        "ref": "postTaskScriptConfig"
      }    
    }
  ] 
}
```

## 语法
<a name="hadoopactivity-syntax"></a>


****  

| 必填字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| jarUri | JAR 在 Amazon S3 中的位置或要运行的集群的本地文件系统 HadoopActivity。 | 字符串 | 

 


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求，例如，指定 “schedule”: \$1"ref”: "DefaultSchedule“\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道有一个计划树 (计划位于主计划中)，用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如 “日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行此作业的 EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myEmrCluster Id "\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup。 | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| argument | 要传递给 JAR 的参数。 | 字符串 | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| hadoopQueue | 将在其上提交活动的 Hadoop 计划程序队列名。 | 字符串 | 
| input | 输入数据的位置。 | 参考对象，例如 “输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| mainClass | 你正在执行的 JAR 的主类 HadoopActivity。 | 字符串 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如 “onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据的位置。 | 参考对象，例如 “输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象，例如 “父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| postActivityTaskConfig | 要运行的活动后配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如 “postActivityTaskConfig”：\$1“ref”:” myShellScript ConfigId “\$1 | 
| preActivityTaskConfig | 要运行的活动前配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如 “preActivityTaskConfig”：\$1“ref”:” myShellScript ConfigId “\$1 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如 “前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例，而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划，则必须在默认对象中指定它，并且必须是在管道中为对象指定的唯一 scheduleType。要使用按需管道，您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括：cron、ondemand 和 timeseries。 | 枚举 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用来创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1"ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="hadoopactivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [CopyActivity](dp-object-copyactivity.md)
+ [EmrCluster](dp-object-emrcluster.md)