

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。[了解详情](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# EmrActivity
<a name="dp-object-emractivity"></a>

 运行 EMR 集群。

AWS Data Pipeline 使用与 Amazon EMR 不同的步骤格式；例如，在步骤字段的 JAR 名称后 AWS Data Pipeline 使用逗号分隔的参数。`EmrActivity`以下示例显示格式适合 Amazon EMR 的步骤，后跟其 AWS Data Pipeline 等效步骤：

```
s3://amzn-s3-demo-bucket/MyWork.jar arg1 arg2 arg3
```

```
"s3://amzn-s3-demo-bucket/MyWork.jar,arg1,arg2,arg3"
```

## 示例
<a name="emractivity-example"></a>

以下是该对象类型的示例。该示例使用较旧版本的 Amazon EMR。验证该示例是否适合您使用的 Amazon EMR 集群版本。

该对象引用您将在同一管道定义文件中定义的其他三个对象。`MyEmrCluster` 为 `EmrCluster` 对象，`MyS3Input` 和 `MyS3Output` 为 `S3DataNode` 对象。

**注意**  
在此示例中，您可以将 `step` 字段替换为所需集群字符串，它可以是 Pig 脚本、Hadoop 流式集群、您自己的自定义 JAR (包括其参数) 等。

Hadoop 2.x (AMI 3.x)

```
{
  "id" : "MyEmrActivity",
  "type" : "EmrActivity",
  "runsOn" : { "ref" : "MyEmrCluster" },
  "preStepCommand" : "scp remoteFiles localFiles",
  "step" : ["s3://amzn-s3-demo-bucket/myPath/myStep.jar,firstArg,secondArg,-files,s3://amzn-s3-demo-bucket/myPath/myFile.py,-input,s3://myinputbucket/path,-output,s3://myoutputbucket/path,-mapper,myFile.py,-reducer,reducerName","s3://amzn-s3-demo-bucket/myPath/myotherStep.jar,..."],
  "postStepCommand" : "scp localFiles remoteFiles",
  "input" : { "ref" : "MyS3Input" },
  "output" : { "ref" : "MyS3Output" }
}
```

**注意**  
要通过一个步骤将参数传递给应用程序，您需要在脚本路径中指定区域，如以下示例所示：此外，您可能需要对传递的参数进行转义。例如，如果您使用 `script-runner.jar` 运行一个 Shell 脚本，并且需要将参数传递给该脚本，则必须对用于分隔参数的逗号进行转义。以下步骤槽介绍了如何执行此操作：  

```
"step" : "s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,s3://datapipeline/echo.sh,a\\\\,b\\\\,c"
```
此步骤使用 `script-runner.jar` 运行 `echo.sh` Shell 脚本，并将 `a`、`b` 和 `c` 作为单个参数传递给此脚本。由于将从生成的参数中删除第一个转义字符，因此，您可能需要重新转义。例如，如果您将 `File\.gz` 作为参数 (用 JSON 表示)，则可使用 `File\\\\.gz` 对其进行转义。但由于已丢弃第一个转义，因此，您必须使用 `File\\\\\\\\.gz `。

## 语法
<a name="emractivity-syntax"></a>


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。您可以明确设置针对该对象的计划以满足该要求，例如，指定 "schedule": \$1"ref": "DefaultSchedule"\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道具有一个计划树 (计划位于主计划中)，您可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如，“日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 在其中运行该作业的 Amazon EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myEmrCluster Id "\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据的位置。 | 参考对象，例如，“输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数。 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如，“onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 参考对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据的位置。 | 参考对象，例如，“输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 参考对象，例如，“父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 Amazon S3 URI，例如 's3: BucketName ///Prefix/ '。 | 字符串 | 
| postStepCommand | 所有步骤完成之后运行的 Shell 脚本。要指定多个脚本 (最多 255 个)，请添加多个 postStepCommand 字段。 | 字符串 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如，“前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| preStepCommand | 在任意步骤运行之前运行的 Shell 脚本。要指定多个脚本 (最多 255 个)，请添加多个 preStepCommand 字段。 | 字符串 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| resizeClusterBefore正在跑步 |  在执行此活动前，重新调整集群的大小，以适应指定为输入或输出的 DynamoDB 表。  如果您`EmrActivity`使用`DynamoDBDataNode`作为输入或输出数据节点，并且将设置为`TRUE`，则 AWS Data Pipeline 开始使用`m3.xlarge`实例类型。`resizeClusterBeforeRunning`这将使用 `m3.xlarge` 覆盖您的实例类型，从而可能会增加您的月度成本。   | 布尔值 | 
| resizeClusterMax实例 | 调整大小算法可以请求的最大实例数的限制。 | 整数 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 您可以通过计划类型指定应在间隔开头还是结尾计划管道定义中的对象。值包括：cron、ondemand 和 timeseries。timeseries 计划表示在每个间隔结尾计划实例。cron 计划表示在每个间隔开头计划实例。ondemand 计划让您可以在每次激活时运行一次管道。您不需要克隆或重新创建管道以再次运行它。如果您使用 ondemand 计划，则必须在默认对象中指定它，并且该计划必须是在管道中为对象指定的唯一 scheduleType。要使用 ondemand 管道，请为每个后续运行调用 ActivatePipeline 操作。 | 枚举 | 
| step | 集群要运行的一个或多个步骤。要指定多个步骤 (最多 255 个)，请添加多个步骤字段。请在 JAR 名称后面使用以逗号分隔的参数，例如，“s3://amzn-s3-demo-bucket/MyWork.jar,arg1,arg2,arg3”。 | 字符串 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 参考对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 Amazon EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用于创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1“ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="emractivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [CopyActivity](dp-object-copyactivity.md)
+ [EmrCluster](dp-object-emrcluster.md)