

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。[了解详情](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 活动
<a name="dp-object-activities"></a>

以下是 AWS Data Pipeline 活动对象：

**Topics**
+ [CopyActivity](dp-object-copyactivity.md)
+ [EmrActivity](dp-object-emractivity.md)
+ [HadoopActivity](dp-object-hadoopactivity.md)
+ [HiveActivity](dp-object-hiveactivity.md)
+ [HiveCopyActivity](dp-object-hivecopyactivity.md)
+ [PigActivity](dp-object-pigactivity.md)
+ [RedshiftCopyActivity](dp-object-redshiftcopyactivity.md)
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [SqlActivity](dp-object-sqlactivity.md)

# CopyActivity
<a name="dp-object-copyactivity"></a>

将数据从一个位置复制到另一个位置。 `CopyActivity`支持[S3 DataNode](dp-object-s3datanode.md)和[SqlDataNode](dp-object-sqldatanode.md)作为输入和输出，通常执行复制操作 record-by-record。不过，当满足以下条件时，`CopyActivity` 可提供高性能的 Amazon S3 到 Amazon S3 复制：
+ 输入和输出均为 S3 DataNodes
+ 对于输入和输出，`dataFormat` 字段是相同的

如果您提供压缩的数据文件作为输入，并且不使用 S3 数据节点上的 `compression` 字段指示这一点，则 `CopyActivity` 可能会失败。在此情况下，`CopyActivity` 无法正确检测记录结束字符，并且操作会失败。此外，还`CopyActivity`支持从一个目录复制到另一个目录以及将文件复制到一个目录，但是在 record-by-record将目录复制到文件时会发生复制。最后，`CopyActivity` 不支持复制多部分 Amazon S3 文件。

`CopyActivity` 对其 CSV 支持有特定的限制。当您使用 S3 DataNode 作为输入时`CopyActivity`，只能对 Amazon S3 的输入和输出字段使用 CSV 数据文件格式的 Unix/Linux 变体。该 Unix/Linux 变体需要以下内容：
+ 分隔符必须是“,”(逗号) 字符。
+ 记录未用引号引起来。
+ 默认转义字符是 ASCII 值 92 (反斜杠)。
+ 记录结束标识符是 ASCII 值 10 (或“\$1n”)。

基于 Windows 的系统通常使用不同的 end-of-record字符序列：回车符和换行在一起（ASCII 值 13 和 ASCII 值 10）。您必须使用其他机制来适应此差异 (例如，使用预先复制脚本修改输入数据) 以确保 `CopyActivity` 能够正确检测到记录结尾；否则，`CopyActivity` 将反复失败。

在使用 `CopyActivity` 从 PostgreSQL RDS 对象导出到 TSV 数据格式时，默认 NULL 字符为 \$1n。

## 示例
<a name="copyactivity-example"></a>

以下是该对象类型的示例。该对象引用您将在同一管道定义文件中定义的其他三个对象。`CopyPeriod` 为 `Schedule` 对象，`InputData` 和 `OutputData` 为数据节点对象。

```
{
  "id" : "S3ToS3Copy",
  "type" : "CopyActivity",
  "schedule" : { "ref" : "CopyPeriod" },
  "input" : { "ref" : "InputData" },
  "output" : { "ref" : "OutputData" },
  "runsOn" : { "ref" : "MyEc2Resource" }
}
```

## 语法
<a name="copyactivity-syntax"></a>


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求，例如，指定 “schedule”: \$1"ref”: "DefaultSchedule“\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道有一个计划树 (计划位于主计划中)，用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如 “日程安排”：\$1“ref”:” myScheduleId “\$1 | 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行活动或命令的计算资源。例如，Amazon EC2 实例或 Amazon EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myResourceId “\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup。 | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据源。 | 参考对象，例如 “输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如 “onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据源。 | 参考对象，例如 “输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象，例如 “父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如 “前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例，而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划，则必须在默认对象中指定它，并且必须是在管道中为对象指定的唯一 scheduleType。要使用按需管道，您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括：cron、ondemand 和 timeseries。 | 枚举 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用来创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1"ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象 | 字符串 | 

## 另请参阅
<a name="copyactivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [EmrActivity](dp-object-emractivity.md)
+ [使用将 MySQL 数据导出到亚马逊 S3 AWS Data Pipeline](dp-copydata-mysql.md)

# EmrActivity
<a name="dp-object-emractivity"></a>

 运行 EMR 集群。

AWS Data Pipeline 使用与 Amazon EMR 不同的步骤格式；例如，在步骤字段的 JAR 名称后 AWS Data Pipeline 使用逗号分隔的参数。`EmrActivity`以下示例显示格式适合 Amazon EMR 的步骤，后跟其 AWS Data Pipeline 等效步骤：

```
s3://amzn-s3-demo-bucket/MyWork.jar arg1 arg2 arg3
```

```
"s3://amzn-s3-demo-bucket/MyWork.jar,arg1,arg2,arg3"
```

## 示例
<a name="emractivity-example"></a>

以下是该对象类型的示例。该示例使用较旧版本的 Amazon EMR。验证该示例是否适合您使用的 Amazon EMR 集群版本。

该对象引用您将在同一管道定义文件中定义的其他三个对象。`MyEmrCluster` 为 `EmrCluster` 对象，`MyS3Input` 和 `MyS3Output` 为 `S3DataNode` 对象。

**注意**  
在此示例中，您可以将 `step` 字段替换为所需集群字符串，它可以是 Pig 脚本、Hadoop 流式集群、您自己的自定义 JAR (包括其参数) 等。

Hadoop 2.x (AMI 3.x)

```
{
  "id" : "MyEmrActivity",
  "type" : "EmrActivity",
  "runsOn" : { "ref" : "MyEmrCluster" },
  "preStepCommand" : "scp remoteFiles localFiles",
  "step" : ["s3://amzn-s3-demo-bucket/myPath/myStep.jar,firstArg,secondArg,-files,s3://amzn-s3-demo-bucket/myPath/myFile.py,-input,s3://myinputbucket/path,-output,s3://myoutputbucket/path,-mapper,myFile.py,-reducer,reducerName","s3://amzn-s3-demo-bucket/myPath/myotherStep.jar,..."],
  "postStepCommand" : "scp localFiles remoteFiles",
  "input" : { "ref" : "MyS3Input" },
  "output" : { "ref" : "MyS3Output" }
}
```

**注意**  
要通过一个步骤将参数传递给应用程序，您需要在脚本路径中指定区域，如以下示例所示：此外，您可能需要对传递的参数进行转义。例如，如果您使用 `script-runner.jar` 运行一个 Shell 脚本，并且需要将参数传递给该脚本，则必须对用于分隔参数的逗号进行转义。以下步骤槽介绍了如何执行此操作：  

```
"step" : "s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,s3://datapipeline/echo.sh,a\\\\,b\\\\,c"
```
此步骤使用 `script-runner.jar` 运行 `echo.sh` Shell 脚本，并将 `a`、`b` 和 `c` 作为单个参数传递给此脚本。由于将从生成的参数中删除第一个转义字符，因此，您可能需要重新转义。例如，如果您将 `File\.gz` 作为参数 (用 JSON 表示)，则可使用 `File\\\\.gz` 对其进行转义。但由于已丢弃第一个转义，因此，您必须使用 `File\\\\\\\\.gz `。

## 语法
<a name="emractivity-syntax"></a>


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。您可以明确设置针对该对象的计划以满足该要求，例如，指定 "schedule": \$1"ref": "DefaultSchedule"\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道具有一个计划树 (计划位于主计划中)，您可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如，“日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 在其中运行该作业的 Amazon EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myEmrCluster Id "\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据的位置。 | 参考对象，例如，“输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数。 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如，“onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 参考对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据的位置。 | 参考对象，例如，“输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 参考对象，例如，“父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 Amazon S3 URI，例如 's3: BucketName ///Prefix/ '。 | 字符串 | 
| postStepCommand | 所有步骤完成之后运行的 Shell 脚本。要指定多个脚本 (最多 255 个)，请添加多个 postStepCommand 字段。 | 字符串 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如，“前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| preStepCommand | 在任意步骤运行之前运行的 Shell 脚本。要指定多个脚本 (最多 255 个)，请添加多个 preStepCommand 字段。 | 字符串 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| resizeClusterBefore正在跑步 |  在执行此活动前，重新调整集群的大小，以适应指定为输入或输出的 DynamoDB 表。  如果您`EmrActivity`使用`DynamoDBDataNode`作为输入或输出数据节点，并且将设置为`TRUE`，则 AWS Data Pipeline 开始使用`m3.xlarge`实例类型。`resizeClusterBeforeRunning`这将使用 `m3.xlarge` 覆盖您的实例类型，从而可能会增加您的月度成本。   | 布尔值 | 
| resizeClusterMax实例 | 调整大小算法可以请求的最大实例数的限制。 | 整数 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 您可以通过计划类型指定应在间隔开头还是结尾计划管道定义中的对象。值包括：cron、ondemand 和 timeseries。timeseries 计划表示在每个间隔结尾计划实例。cron 计划表示在每个间隔开头计划实例。ondemand 计划让您可以在每次激活时运行一次管道。您不需要克隆或重新创建管道以再次运行它。如果您使用 ondemand 计划，则必须在默认对象中指定它，并且该计划必须是在管道中为对象指定的唯一 scheduleType。要使用 ondemand 管道，请为每个后续运行调用 ActivatePipeline 操作。 | 枚举 | 
| step | 集群要运行的一个或多个步骤。要指定多个步骤 (最多 255 个)，请添加多个步骤字段。请在 JAR 名称后面使用以逗号分隔的参数，例如，“s3://amzn-s3-demo-bucket/MyWork.jar,arg1,arg2,arg3”。 | 字符串 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 参考对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 Amazon EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用于创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1“ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="emractivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [CopyActivity](dp-object-copyactivity.md)
+ [EmrCluster](dp-object-emrcluster.md)

# HadoopActivity
<a name="dp-object-hadoopactivity"></a>

 在集群上运行 MapReduce 作业。集群可以是管理的 EMR 集群， AWS Data Pipeline 也可以是其他资源（如果您使用）。 TaskRunner HadoopActivity 当你想并行运行工作时使用。这允许您使用 YARN 框架的调度资源或 Hadoop 1 中的 MapReduce 资源协商器。如果您想使用 Amazon EMR 步骤操作来按顺序运行工作，您仍可使用 [EmrActivity](dp-object-emractivity.md)。

## 示例
<a name="hadoopactivity-example"></a>

**HadoopActivity 使用由管理的 EMR 集群 AWS Data Pipeline**  
以下 HadoopActivity 对象使用 EmrCluster 资源来运行程序：

```
 {
   "name": "MyHadoopActivity",
   "schedule": {"ref": "ResourcePeriod"},
   "runsOn": {"ref": “MyEmrCluster”},
   "type": "HadoopActivity",
   "preActivityTaskConfig":{"ref":"preTaskScriptConfig”},   
   "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
   "argument": [
     "-files",
     “s3://elasticmapreduce/samples/wordcount/wordSplitter.py“,
     "-mapper",
     "wordSplitter.py",
     "-reducer",
     "aggregate",
     "-input",
     "s3://elasticmapreduce/samples/wordcount/input/",
     "-output",
     “s3://amzn-s3-demo-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}"
   ],
   "maximumRetries": "0",
   "postActivityTaskConfig":{"ref":"postTaskScriptConfig”},
   "hadoopQueue" : “high”
 }
```

以下是相应的*MyEmrCluster*，它在 YARN 中为基于 Hadoop FairScheduler 2 的和队列配置了和队列： AMIs

```
{
  "id" : "MyEmrCluster",
  "type" : "EmrCluster",
   "hadoopSchedulerType" : "PARALLEL_FAIR_SCHEDULING",
  “amiVersion” : “3.7.0”,
  "bootstrapAction" : ["s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\,high\,default,-z,yarn.scheduler.capacity.root.high.capacity=50,-z,yarn.scheduler.capacity.root.low.capacity=10,-z,yarn.scheduler.capacity.root.default.capacity=30”]
}
```

这是 EmrCluster 你在 Hadoop 1 FairScheduler 中用来配置的：

```
{
      "id": "MyEmrCluster",
      "type": "EmrCluster",    
      "hadoopSchedulerType": "PARALLEL_FAIR_SCHEDULING",
      "amiVersion": "2.4.8",
      "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-m,mapred.queue.names=low\\\\,high\\\\,default,-m,mapred.fairscheduler.poolnameproperty=mapred.job.queue.name"
          }
```

以下是基 CapacityScheduler 于 Hadoop 2 的 EmrCluster 配置： AMIs

```
{
      "id": "MyEmrCluster",
      "type": "EmrCluster",
      "hadoopSchedulerType": "PARALLEL_CAPACITY_SCHEDULING",
      "amiVersion": "3.7.0",
      "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\\\\,high,-z,yarn.scheduler.capacity.root.high.capacity=40,-z,yarn.scheduler.capacity.root.low.capacity=60"
    }
```

**HadoopActivity 使用现有 EMR 集群**  
在此示例中，您使用工作组和在现有 TaskRunner EMR 集群上运行程序。以下管道定义用 HadoopActivity 于：
+ 仅在*myWorkerGroup*资源上运行 MapReduce 程序。有关工作线程组的更多信息，请参阅[使用任务运行程序在现有资源上执行工作](dp-how-task-runner-user-managed.md)。
+ 运行 preActivityTask Config and Con postActivityTask fig

```
{
  "objects": [
    {
      "argument": [
        "-files",
        "s3://elasticmapreduce/samples/wordcount/wordSplitter.py",
        "-mapper",
        "wordSplitter.py",
        "-reducer",
        "aggregate",
        "-input",
        "s3://elasticmapreduce/samples/wordcount/input/",
        "-output",
        "s3://amzn-s3-demo-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}"
      ],
      "id": "MyHadoopActivity",
      "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
      "name": "MyHadoopActivity",
      "type": "HadoopActivity"
    },
    {
      "id": "SchedulePeriod",
      "startDateTime": "start_datetime",
      "name": "SchedulePeriod",
      "period": "1 day",
      "type": "Schedule",
      "endDateTime": "end_datetime"
    },
    {
      "id": "ShellScriptConfig",
      "scriptUri": "s3://amzn-s3-demo-bucket/scripts/preTaskScript.sh",
      "name": "preTaskScriptConfig",
      "scriptArgument": [
        "test",
        "argument"
      ],
      "type": "ShellScriptConfig"
    },
    {
      "id": "ShellScriptConfig",
      "scriptUri": "s3://amzn-s3-demo-bucket/scripts/postTaskScript.sh",
      "name": "postTaskScriptConfig",
      "scriptArgument": [
        "test",
        "argument"
      ],
      "type": "ShellScriptConfig"
    },
    {
      "id": "Default",
      "scheduleType": "cron",
      "schedule": {
        "ref": "SchedulePeriod"
      },
      "name": "Default",
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/logs/2015-05-22T18:02:00.343Z642f3fe415",
      "maximumRetries": "0",    
      "workerGroup": "myWorkerGroup",
      "preActivityTaskConfig": {
        "ref": "preTaskScriptConfig"
      },
      "postActivityTaskConfig": {
        "ref": "postTaskScriptConfig"
      }    
    }
  ] 
}
```

## 语法
<a name="hadoopactivity-syntax"></a>


****  

| 必填字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| jarUri | JAR 在 Amazon S3 中的位置或要运行的集群的本地文件系统 HadoopActivity。 | 字符串 | 

 


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求，例如，指定 “schedule”: \$1"ref”: "DefaultSchedule“\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道有一个计划树 (计划位于主计划中)，用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如 “日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行此作业的 EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myEmrCluster Id "\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup。 | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| argument | 要传递给 JAR 的参数。 | 字符串 | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| hadoopQueue | 将在其上提交活动的 Hadoop 计划程序队列名。 | 字符串 | 
| input | 输入数据的位置。 | 参考对象，例如 “输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| mainClass | 你正在执行的 JAR 的主类 HadoopActivity。 | 字符串 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如 “onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据的位置。 | 参考对象，例如 “输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象，例如 “父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| postActivityTaskConfig | 要运行的活动后配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如 “postActivityTaskConfig”：\$1“ref”:” myShellScript ConfigId “\$1 | 
| preActivityTaskConfig | 要运行的活动前配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如 “preActivityTaskConfig”：\$1“ref”:” myShellScript ConfigId “\$1 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如 “前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例，而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划，则必须在默认对象中指定它，并且必须是在管道中为对象指定的唯一 scheduleType。要使用按需管道，您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括：cron、ondemand 和 timeseries。 | 枚举 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用来创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1"ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="hadoopactivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [CopyActivity](dp-object-copyactivity.md)
+ [EmrCluster](dp-object-emrcluster.md)

# HiveActivity
<a name="dp-object-hiveactivity"></a>

在 EMR 集群上运行 Hive 查询。`HiveActivity` 使您能够更轻松地设置 Amazon EMR 活动，并且将自动基于来自 Amazon S3 或 Amazon RDS 的输入数据创建 Hive 表。您只需要指定要在源数据上运行的 HiveQL 即可。 AWS Data Pipeline 根据`HiveActivity`对象中的输入字段自动创建带有`${input1}``${input2}`、等的 Hive 表。

对于 Amazon S3 输入，`dataFormat` 字段用于创建 Hive 列名。

对于 MySQL（Amazon RDS）输入，SQL 查询的列名用于创建 Hive 列名。

**注意**  
此活动使用 Hive [CSV Serde](https://cwiki.apache.org/confluence/display/Hive/CSV+Serde)。

## 示例
<a name="hiveactivity-example"></a>

以下是该对象类型的示例。该对象引用您在同一管道定义文件中定义的三个其他对象。`MySchedule` 为 `Schedule` 对象，`MyS3Input` 和 `MyS3Output` 为数据节点对象。

```
{
  "name" : "ProcessLogData",
  "id" : "MyHiveActivity",
  "type" : "HiveActivity",
  "schedule" : { "ref": "MySchedule" },
  "hiveScript" : "INSERT OVERWRITE TABLE ${output1} select host,user,time,request,status,size from ${input1};",
  "input" : { "ref": "MyS3Input" },
  "output" : { "ref": "MyS3Output" },
  "runsOn" : { "ref": "MyEmrCluster" }
}
```

## 语法
<a name="hiveactivity-syntax"></a>


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。您可以通过在对象上显式设置时间表来满足此要求，例如，指定 “schedule”: \$1"ref”: "DefaultSchedule“\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道具有一个计划树 (计划位于主计划中)，您可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如 “日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| hiveScript | 要运行的 Hive 脚本。 | 字符串 | 
| scriptUri | 要运行 Hive 脚本的位置 (例如，s3://scriptLocation)。 | 字符串 | 

 


****  

| 所需的组 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 在其中运行该 HiveActivity 的 EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myEmrCluster Id "\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup | 字符串 | 
| input | 输入数据源。 | 参考对象，例如 “输入”：\$1"ref”:” myDataNode Id "\$1 | 
| output | 输出数据源。 | 参考对象，例如 “输出”：\$1"ref”:” myDataNode Id "\$1 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| hadoopQueue | 将在其上提交作业的 Hadoop 计划程序队列名。 | 字符串 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数。 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如 “onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象，例如 “父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| postActivityTaskConfig | 要运行的活动后配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如 “postActivityTaskConfig”: \$1“ref”:” myShellScript ConfigId “\$1 | 
| preActivityTaskConfig | 要运行的活动前配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如 “preActivityTaskConfig”: \$1“ref”:” myShellScript ConfigId “\$1 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如 “前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| resizeClusterBefore正在跑步 | 在执行此活动前，重新调整集群的大小，以适应指定为输入或输出的 DynamoDB 数据节点。 如果您的活动使用`DynamoDBDataNode`作为输入或输出数据节点，并且将设置`resizeClusterBeforeRunning`为`TRUE`，则 AWS Data Pipeline 开始使用`m3.xlarge`实例类型。这将使用 `m3.xlarge` 覆盖您的实例类型，从而可能会增加您的月度成本。  | 布尔值 | 
| resizeClusterMax实例 | 调整大小算法可以请求的最大实例数的限制。 | 整数 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例，而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划，则必须在默认对象中指定它，并且必须是在管道中为对象指定的唯一 scheduleType。要使用按需管道，您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括：cron、ondemand 和 timeseries。 | 枚举 | 
| scriptVariable | 指定在运行脚本时要传递给 Hive 的 Amazon EMR 的脚本变量。例如，以下示例脚本变量将 SAMPLE 和 FILTER\$1DATE 变量传递到 Hive：SAMPLE=s3://elasticmapreduce/samples/hive-ads 和 FILTER\$1DATE=\$1\$1format(@scheduledStartTime,'YYYY-MM-dd')\$1%。此字段接受多个值，可与 script 和 scriptUri 字段结合使用。此外，scriptVariable 将起作用，不管 stage 设置为 true 还是 false。在使用 AWS Data Pipeline 表达式和函数将动态值发送到 Hive 时，此字段特别有用。 | 字符串 | 
| stage | 确定是在运行脚本之前还是之后启用暂存。不可对 Hive 11 使用，因此，请使用 Amazon EMR AMI 版本 3.2.0 或更高版本。 | 布尔值 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 Amazon EMR 步骤日志。 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用来创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1"ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="hiveactivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [EmrActivity](dp-object-emractivity.md)

# HiveCopyActivity
<a name="dp-object-hivecopyactivity"></a>

在 EMR 集群上运行 Hive 查询。`HiveCopyActivity` 可让您更轻松地在 DynamoDB 表之间复制数据。`HiveCopyActivity` 接受 HiveQL 语句以在列和行级别筛选来自 DynamoDB 的输入数据。

## 示例
<a name="hivecopyactivity-example"></a>

以下示例说明如何使用 `HiveCopyActivity` 和 `DynamoDBExportDataFormat` 将数据从一个 `DynamoDBDataNode` 复制到另一个 DynamoDBDataNode，并基于时间戳筛选数据。

```
{
  "objects": [
    {
      "id" : "DataFormat.1",
      "name" : "DataFormat.1",
      "type" : "DynamoDBExportDataFormat",
      "column" : "timeStamp BIGINT"
    },
    {
      "id" : "DataFormat.2",
      "name" : "DataFormat.2",
      "type" : "DynamoDBExportDataFormat"
    },
    {
      "id" : "DynamoDBDataNode.1",
      "name" : "DynamoDBDataNode.1",
      "type" : "DynamoDBDataNode",
      "tableName" : "item_mapped_table_restore_temp",
      "schedule" : { "ref" : "ResourcePeriod" },
      "dataFormat" : { "ref" : "DataFormat.1" }
    },
    {
      "id" : "DynamoDBDataNode.2",
      "name" : "DynamoDBDataNode.2",
      "type" : "DynamoDBDataNode",
      "tableName" : "restore_table",
      "region" : "us_west_1",
      "schedule" : { "ref" : "ResourcePeriod" },
      "dataFormat" : { "ref" : "DataFormat.2" }
    },
    {
      "id" : "EmrCluster.1",
      "name" : "EmrCluster.1",
      "type" : "EmrCluster",
      "schedule" : { "ref" : "ResourcePeriod" },
      "masterInstanceType" : "m1.xlarge",
      "coreInstanceCount" : "4"
    },
    {
      "id" : "HiveTransform.1",
      "name" : "Hive Copy Transform.1",
      "type" : "HiveCopyActivity",
      "input" : { "ref" : "DynamoDBDataNode.1" },
      "output" : { "ref" : "DynamoDBDataNode.2" },
      "schedule" :{ "ref" : "ResourcePeriod" },
      "runsOn" : { "ref" : "EmrCluster.1" },
      "filterSql" : "`timeStamp` > unix_timestamp(\"#{@scheduledStartTime}\", \"yyyy-MM-dd'T'HH:mm:ss\")"
    },
    {
      "id" : "ResourcePeriod",
      "name" : "ResourcePeriod",
      "type" : "Schedule",
      "period" : "1 Hour",
      "startDateTime" : "2013-06-04T00:00:00",
      "endDateTime" : "2013-06-04T01:00:00"
    }
  ]
}
```

## 语法
<a name="hivecopyactivity-syntax"></a>


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求，例如，指定 “schedule”: \$1"ref”: "DefaultSchedule“\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道有一个计划树 (计划位于主计划中)，用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如 “日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 指定在其上运行的集群。 | 参考对象，例如 “runson”：\$1“ref”:” myResourceId “\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| filterSql | Hive SQL 语句片段，用于筛选要复制的 DynamoDB 或 Amazon S3 数据的子集。过滤器应仅包含谓词，不能以WHERE子句开头，因为它是自动 AWS Data Pipeline 添加的。 | 字符串 | 
| input | 输入数据源。这必须是 S3DataNode 或 DynamoDBDataNode。如果您使用 DynamoDBNode，请指定 DynamoDBExportDataFormat。 | 参考对象，例如 “输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数。 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如 “onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据源。如果输入是 S3DataNode，则这必须是 DynamoDBDataNode。否则，这可以是 S3DataNode 或 DynamoDBDataNode。如果您使用 DynamoDBNode，请指定 DynamoDBExportDataFormat。 | 参考对象，例如 “输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象，例如 “父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 Amazon S3 URI，例如  's3://BucketName/Key/'。 | 字符串 | 
| postActivityTaskConfig | 要运行的活动后配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如 “postActivityTaskConfig”：\$1“ref”:” myShellScript ConfigId “\$1 | 
| preActivityTaskConfig | 要运行的活动前配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如 “preActivityTaskConfig”：\$1“ref”:” myShellScript ConfigId “\$1 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如 “前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| resizeClusterBefore正在跑步 | 在执行此活动前，重新调整集群的大小，以适应指定为输入或输出的 DynamoDB 数据节点。 如果您的活动使用`DynamoDBDataNode`作为输入或输出数据节点，并且将设置`resizeClusterBeforeRunning`为`TRUE`，则 AWS Data Pipeline 开始使用`m3.xlarge`实例类型。这将使用 `m3.xlarge` 覆盖您的实例类型，从而可能会增加您的月度成本。  | 布尔值 | 
| resizeClusterMax实例 | 调整大小算法可以请求的最大实例数的限制 | 整数 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例，而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划，则必须在默认对象中指定它，并且必须是在管道中为对象指定的唯一 scheduleType。要使用按需管道，您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括：cron、ondemand 和 timeseries。 | 枚举 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 Amazon EMR 步骤日志。 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用来创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1"ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="hivecopyactivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [EmrActivity](dp-object-emractivity.md)

# PigActivity
<a name="dp-object-pigactivity"></a>

PigActivity 为中的 Pig 脚本提供原生支持， AWS Data Pipeline 无需使用`ShellCommandActivity`或`EmrActivity`。此外，还 PigActivity 支持数据暂存。在将 stage 字段设置为 true 时， AWS Data Pipeline 会在 Pig 中将输入数据暂存为架构，而无需用户编写其他代码。

## 示例
<a name="pigactivity-example"></a>

以下示例管道说明如何使用 `PigActivity`。该示例管道执行以下步骤：
+ MyPigActivity1 从 Amazon S3 加载数据并运行一个 Pig 脚本，该脚本选择几列数据并将其上传到 Amazon S3。
+ MyPigActivity2 加载第一个输出，选择几列三行数据，然后将其作为第二个输出上传到 Amazon S3。
+ MyPigActivity3 加载第二个输出数据，在 Amazon RDS 中插入两行数据，仅插入名为 “第五行” 的列。
+ MyPigActivity4 加载亚马逊 RDS 数据，选择第一行数据，然后将其上传到 Amazon S3。

```
{
  "objects": [
    {
      "id": "MyInputData1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "directoryPath": "s3://amzn-s3-demo-bucket/pigTestInput",
      "name": "MyInputData1",
      "dataFormat": {
        "ref": "MyInputDataType1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyPigActivity4",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData3"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path/",
      "name": "MyPigActivity4",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "type": "PigActivity",
      "dependsOn": {
        "ref": "MyPigActivity3"
      },
      "output": {
        "ref": "MyOutputData4"
      },
      "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;",
      "stage": "true"
    },
    {
      "id": "MyPigActivity3",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData2"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "name": "MyPigActivity3",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;",
      "type": "PigActivity",
      "dependsOn": {
        "ref": "MyPigActivity2"
      },
      "output": {
        "ref": "MyOutputData3"
      },
      "stage": "true"
    },
    {
      "id": "MyOutputData2",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "name": "MyOutputData2",
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput2",
      "dataFormat": {
        "ref": "MyOutputDataType2"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyOutputData1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "name": "MyOutputData1",
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput1",
      "dataFormat": {
        "ref": "MyOutputDataType1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyInputDataType1",
      "name": "MyInputDataType1",
      "column": [
        "First STRING",
        "Second STRING",
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING",
        "Ninth STRING",
        "Tenth STRING"
      ],
      "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)",
      "type": "RegEx"
    },
    {
      "id": "MyEmrResource",
      "region": "us-east-1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "keyPair": "example-keypair",
      "masterInstanceType": "m1.small",
      "enableDebugging": "true",
      "name": "MyEmrResource",
      "actionOnTaskFailure": "continue",
      "type": "EmrCluster"
    },
    {
      "id": "MyOutputDataType4",
      "name": "MyOutputDataType4",
      "column": "one STRING",
      "type": "CSV"
    },
    {
      "id": "MyOutputData4",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput3",
      "name": "MyOutputData4",
      "dataFormat": {
        "ref": "MyOutputDataType4"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyOutputDataType1",
      "name": "MyOutputDataType1",
      "column": [
        "First STRING",
        "Second STRING",
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING"
      ],
      "columnSeparator": "*",
      "type": "Custom"
    },
    {
      "id": "MyOutputData3",
      "username": "___",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "insertQuery": "insert into #{table} (one) values (?)",
      "name": "MyOutputData3",
      "*password": "___",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "connectionString": "jdbc:mysql://example-database-instance:3306/example-database",
      "selectQuery": "select * from #{table}",
      "table": "example-table-name",
      "type": "MySqlDataNode"
    },
    {
      "id": "MyOutputDataType2",
      "name": "MyOutputDataType2",
      "column": [
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING"
      ],
      "type": "TSV"
    },
    {
      "id": "MyPigActivity2",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData1"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "name": "MyPigActivity2",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "dependsOn": {
        "ref": "MyPigActivity1"
      },
      "type": "PigActivity",
      "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;",
      "output": {
        "ref": "MyOutputData2"
      },
      "stage": "true"
    },
    {
      "id": "MyEmrResourcePeriod",
      "startDateTime": "2013-05-20T00:00:00",
      "name": "MyEmrResourcePeriod",
      "period": "1 day",
      "type": "Schedule",
      "endDateTime": "2013-05-21T00:00:00"
    },
    {
      "id": "MyPigActivity1",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyInputData1"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "scriptUri": "s3://amzn-s3-demo-bucket/script/pigTestScipt.q",
      "name": "MyPigActivity1",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "scriptVariable": [
        "column1=First",
        "column2=Second",
        "three=3"
      ],
      "type": "PigActivity",
      "output": {
        "ref": "MyOutputData1"
      },
      "stage": "true"
    }
  ]
}
```

`pigTestScript.q` 的内容如下所示。

```
B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;
```

## 语法
<a name="pigactivity-syntax"></a>


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求，例如，指定 “schedule”: \$1"ref”: "DefaultSchedule“\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道有一个计划树 (计划位于主计划中)，用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如，“日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| script | 要运行的 Pig 脚本。 | 字符串 | 
| scriptUri | 要运行的 Pig 脚本的位置 (例如，s3://scriptLocation)。 | 字符串 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行它的 EMR 集群。 PigActivity  | 参考对象，例如 “runson”：\$1“ref”:” myEmrCluster Id "\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据源。 | 参考对象，例如，“输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数。 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如，“onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 参考对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据源。 | 参考对象，例如，“输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 参考对象，例如，“父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 Amazon S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| postActivityTaskConfig | 要运行的活动后配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如，“postActivityTaskConfig”: \$1“ref”:” myShellScript ConfigId “\$1 | 
| preActivityTaskConfig | 要运行的活动前配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如，“preActivityTaskConfig”: \$1“ref”:” myShellScript ConfigId “\$1 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如，“前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| resizeClusterBefore正在跑步 | 在执行此活动前，重新调整集群的大小，以适应指定为输入或输出的 DynamoDB 数据节点。 如果您的活动使用`DynamoDBDataNode`作为输入或输出数据节点，并且将设置`resizeClusterBeforeRunning`为`TRUE`，则 AWS Data Pipeline 开始使用`m3.xlarge`实例类型。这将使用 `m3.xlarge` 覆盖您的实例类型，从而可能会增加您的月度成本。  | 布尔值 | 
| resizeClusterMax实例 | 调整大小算法可以请求的最大实例数的限制。 | 整数 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例，而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划，则必须在默认对象中指定它，并且必须是在管道中为对象指定的唯一 scheduleType。要使用按需管道，您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括：cron、ondemand 和 timeseries。 | 枚举 | 
| scriptVariable | 要传递到 Pig 脚本的参数。您可以将 scriptVariable 与 script 或 scriptUri 一起使用。 | 字符串 | 
| stage | 确定是否启用暂存功能并允许您的 Pig 脚本访问分阶段数据表，例如 \$1 \$1INPUT1\$1 和 \$1 \$1\$1。OUTPUT1 | 布尔值 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如，“ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 参考对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 Amazon EMR 步骤日志。 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用于创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1“ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="pigactivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [EmrActivity](dp-object-emractivity.md)

# RedshiftCopyActivity
<a name="dp-object-redshiftcopyactivity"></a>

将数据从 DynamoDB 或 Amazon S3 复制到 Amazon Redshift。您可以将数据加载到新表中，或轻松地将数据并入现有表中。

下面概括了使用 `RedshiftCopyActivity` 的使用案例：

1. 首先使用 AWS Data Pipeline 在 Amazon S3 中暂存您的数据。

1. 使用 `RedshiftCopyActivity` 将数据从 Amazon RDS 和 Amazon EMR 移动到 Amazon Redshift。

   这可让您将数据加载到 Amazon Redshift 中，您可以在此处对数据进行分析。

1. 使用 [SqlActivity](dp-object-sqlactivity.md) 可以对已加载到 Amazon Redshift 中的数据执行 SQL 查询。

 此外，借助 `RedshiftCopyActivity`，您可以使用 `S3DataNode`，因为它支持清单文件。有关更多信息，请参阅 [S3 DataNode](dp-object-s3datanode.md)。

## 示例
<a name="redshiftcopyactivity-example"></a>

以下是该对象类型的示例。

为了确保格式转换，本示例在 `commandOptions` 中使用 [EMPTYASNULL](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-emptyasnull) 和 [IGNOREBLANKLINES](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-ignoreblanklines) 特殊转换参数。有关信息，请参阅 *Amazon Redshift 数据库开发人员指南*中的[数据转换参数](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html)。

```
{
  "id" : "S3ToRedshiftCopyActivity",
  "type" : "RedshiftCopyActivity",
  "input" : { "ref": "MyS3DataNode" },
  "output" : { "ref": "MyRedshiftDataNode" },
  "insertMode" : "KEEP_EXISTING",
  "schedule" : { "ref": "Hour" },
  "runsOn" : { "ref": "MyEc2Resource" },
  "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"]
}
```

以下示例管道定义说明了一个使用 `APPEND` 插入模式的活动：

```
{
  "objects": [
    {
      "id": "CSVId1",
      "name": "DefaultCSV1",
      "type": "CSV"
    },
    {
      "id": "RedshiftDatabaseId1",
      "databaseName": "dbname",
      "username": "user",
      "name": "DefaultRedshiftDatabase1",
      "*password": "password",
      "type": "RedshiftDatabase",
      "clusterId": "redshiftclusterId"
    },
    {
      "id": "Default",
      "scheduleType": "timeseries",
      "failureAndRerunMode": "CASCADE",
      "name": "Default",
      "role": "DataPipelineDefaultRole",
      "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
      "id": "RedshiftDataNodeId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "tableName": "orders",
      "name": "DefaultRedshiftDataNode1",
      "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));",
      "type": "RedshiftDataNode",
      "database": {
        "ref": "RedshiftDatabaseId1"
      }
    },
    {
      "id": "Ec2ResourceId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "securityGroups": "MySecurityGroup",
      "name": "DefaultEc2Resource1",
      "role": "DataPipelineDefaultRole",
      "logUri": "s3://myLogs",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "type": "Ec2Resource"
    },
    {
      "id": "ScheduleId1",
      "startDateTime": "yyyy-mm-ddT00:00:00",
      "name": "DefaultSchedule1",
      "type": "Schedule",
      "period": "period",
      "endDateTime": "yyyy-mm-ddT00:00:00"
    },
    {
      "id": "S3DataNodeId1",
      "schedule": {
        "ref": "ScheduleId1"
      },
      "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv",
      "name": "DefaultS3DataNode1",
      "dataFormat": {
        "ref": "CSVId1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "RedshiftCopyActivityId1",
      "input": {
        "ref": "S3DataNodeId1"
      },
      "schedule": {
        "ref": "ScheduleId1"
      },
      "insertMode": "APPEND",
      "name": "DefaultRedshiftCopyActivity1",
      "runsOn": {
        "ref": "Ec2ResourceId1"
      },
      "type": "RedshiftCopyActivity",
      "output": {
        "ref": "RedshiftDataNodeId1"
      }
    }
  ]
}
```

`APPEND` 操作向表中添加项，无论主键或排序键如何。例如，如果您有以下表，则可追加具有相同的 ID 和用户值的记录。

```
ID(PK)     USER
1          aaa
2          bbb
```

您可以追加具有相同的 ID 和用户值的记录：

```
ID(PK)     USER
1          aaa
2          bbb
1          aaa
```

**注意**  
如果 `APPEND` 操作中断并重试，生成的重新运行管道可能会从开始位置追加。这可能会导致进一步复制，因此，您应了解此行为，尤其是当您有任何计算行数的逻辑时。

有关教程，请参阅 [使用将数据复制到亚马逊 Redshift AWS Data Pipeline](dp-copydata-redshift.md)。

## 语法
<a name="redshiftcopyactivity-syntax"></a>


****  

| 必填字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| insertMode |   确定 AWS Data Pipeline 如何处理目标表中与要加载的数据中的行重叠的预先存在的数据。 有效值包括：`KEEP_EXISTING`、`OVERWRITE_EXISTING`、`TRUNCATE` 和 `APPEND`。 `KEEP_EXISTING` 添加新行到表中，同时保留任何现有的行不变。 `KEEP_EXISTING` 和 ` OVERWRITE_EXISTING` 使用主键、排序键和分配键来识别哪些传入行与现有行匹配。请参阅 Amazon Redshift *数据库开发人员指南*中的[更新和插入新数据](https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html)。 `TRUNCATE` 先删除目标表中的所有数据，然后写入新数据。  `APPEND` 会将所有记录添加到 Redshift 表的结尾。`APPEND` 不需要主键、分配键或排序键，因此会附加可能存在重复的项。  | 枚举 | 

 


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 |  该对象在计划间隔的执行中调用。 指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。 在大多数情况下，我们建议将计划引用放在默认管道对象上，以便所有对象继承该计划。例如，您可以通过指定 `"schedule": {"ref": "DefaultSchedule"}`，明确地针对该对象设置计划。 如果您的管道中的主计划包含嵌套计划，则可以创建具有计划引用的父对象。 有关示例可选计划配置的更多信息，请参阅[计划](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。  | 引用对象，例如： "schedule":\$1"ref":"myScheduleId"\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行活动或命令的计算资源。例如，Amazon EC2 实例或 Amazon EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myResourceId “\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup。 | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| commandOptions |  获取在 `COPY` 操作期间传递到 Amazon Redshift 数据节点的参数。有关更多信息，请参阅 Amazon Redshift *数据库开发人员指南*中的 [COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html)。 在加载表时，`COPY` 会尝试将字符串隐式转换为目标列的数据类型。如果您收到错误或有其他转换需求，则除了自动发生的默认数据转换之外，您还可以指定其他转换参数。有关信息，请参阅 Amazon Redshift *数据库开发人员指南*中的[数据转换参数](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html)。 如果数据格式与输入或输出数据节点关联，则忽略提供的参数。 由于复制操作首先使用 `COPY` 将数据插入暂存表，然后使用 `INSERT` 命令将数据从暂存表复制到目标表中，一些 `COPY` 参数不适用，例如 `COPY` 命令启用表上自动压缩的功能。如果需要压缩，请向 `CREATE TABLE` 语句添加列编码详细信息。 此外，在某些需要从 Amazon Redshift 集群卸载数据和在 Amazon S3 中创建文件的情况下，`RedshiftCopyActivity` 依赖 Amazon Redshift 的 `UNLOAD` 操作。 为提高复制和卸载过程中的性能，请从 `UNLOAD` 命令指定 `PARALLEL OFF` 参数。有关更多信息，请参阅 Amazon Redshift *数据库开发人员指南*中的 [UNLOAD](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html)。  | 字符串 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 引用对象："dependsOn":\$1"ref":"myActivityId"\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据节点。数据源可以是 Amazon S3、DynamoDB 或 Amazon Redshift。 | 引用对象： "input":\$1"ref":"myDataNodeId"\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 引用对象："onFail":\$1"ref":"myActionId"\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 引用对象： "onLateAction":\$1"ref":"myActionId"\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 引用对象： "onSuccess":\$1"ref":"myActionId"\$1 | 
| output | 输出数据节点。输出位置可以是 Amazon S3 或 Amazon Redshift。 | 引用对象： "output":\$1"ref":"myDataNodeId"\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象："parent":\$1"ref":"myBaseObjectId"\$1 | 
| pipelineLogUri | 用于上传管道日志的 S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 引用对象："precondition":\$1"ref":"myPreconditionId"\$1 | 
| 队列 |  对应于 Amazon Redshift 中的 `query_group ` 设置，这允许您根据放置在队列中的位置分配并优先处理并发活动。 Amazon Redshift 将同时连接的数量限制为 15。有关更多信息，请参阅 Amazon RDS *数据库开发人员指南*中的[向队列分配查询](https://docs.aws.amazon.com/AmazonRDS/latest/DeveloperGuide/cm-c-executing-queries.html)。  | 字符串 | 
| reportProgressTimeout |  远程工作对 `reportProgress` 的连续调用的超时时间。 如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。  | 周期 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType |  允许您指定是否计划管道中的对象。值包括：`cron`、`ondemand` 和 `timeseries`。 `timeseries` 计划表示在每个间隔结束时计划实例。 `Cron` 计划表示在每个间隔开始时计划实例。 `ondemand` 计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。  要使用 `ondemand` 管道，请为每个后续运行调用 `ActivatePipeline` 操作。 如果您使用 `ondemand` 计划，您必须在默认对象中指定它，并且该计划必须是在管道中为对象指定的唯一 `scheduleType`。  | 枚举 | 
| transformSql |  用于转换输入数据的 `SQL SELECT` 表达式。 在名为 `staging` 的表上运行 `transformSql` 表达式。 当您从 DynamoDB 或 Amazon S3 复制数据时， AWS Data Pipeline 会创建一个名为“staging”的表，并且最初在该表中加载数据。此表中的数据用于更新目标表。 `transformSql` 的输出架构必须与最终目标表的架构匹配。 如果您指定了 `transformSql` 选项，则会从指定的 SQL 语句创建第二个暂存表。然后，来自这第二个暂存表的数据更新到最终的目标表中。  | 字符串 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 引用对象："activeInstances":\$1"ref":"myRunnableObjectId"\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象： "cascadeFailedOn":\$1"ref":"myRunnableObjectId"\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用来创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 引用对象： "waitingOn":\$1"ref":"myRunnableObjectId"\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的球体。表示对象在生命周期中的位置。例如，组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

# ShellCommandActivity
<a name="dp-object-shellcommandactivity"></a>

 运行命令或脚本。您可以使用 `ShellCommandActivity` 运行时间序列或类似 Cron 的计划任务。

当 `stage` 字段设置为 true 并与 `S3DataNode` 结合使用时，`ShellCommandActivity` 支持暂存数据概念，这意味着，您可以将数据从 Amazon S3 移至暂存位置（如 Amazon EC2）或您的本地环境，使用脚本和 `ShellCommandActivity` 对数据执行工作，并将数据移回 Amazon S3。

在这种情况下，当 Shell 命令连接到输入 `S3DataNode` 时，Shell 脚本使用 `${INPUT1_STAGING_DIR}`、`${INPUT2_STAGING_DIR}` 及其他字段 (请参阅 `ShellCommandActivity` 输入字段) 直接操作数据。

同样，shell 命令输出可暂存到一个输出目录中，以便通过 `${OUTPUT1_STAGING_DIR}`、 `${OUTPUT2_STAGING_DIR}` 等引用的方式自动推送到 Amazon S3。

这些表达式可作为命令行参数传递到 shell 命令，以供您在数据转换逻辑中使用。

`ShellCommandActivity` 返回 Linux 样式的错误代码和字符串。如果 `ShellCommandActivity` 生成错误，则返回的 `error` 为非零值。

## 示例
<a name="shellcommandactivity-example"></a>

以下是该对象类型的示例。

```
{
  "id" : "CreateDirectory",
  "type" : "ShellCommandActivity",
  "command" : "mkdir new-directory"
}
```

## 语法
<a name="shellcommandactivity-syntax"></a>


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 |  该对象在 `schedule` 间隔的执行中调用。 要设置该对象的依赖项执行顺序，请指定对另一个对象的 `schedule` 引用。 为满足此要求，可在对象上明确设置 `schedule`，例如，指定 `"schedule": {"ref": "DefaultSchedule"}`。 在大多数情况下，最好将 `schedule` 引用放在默认管道对象上，以便所有对象继承该计划。如果管道包含计划树 (计划位于主计划中)，可以创建具有计划引用的父对象。 为了分散负载， AWS Data Pipeline 可以稍微提前创建物理对象，但要按计划运行它们。 有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。  | 参考对象，例如 “日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 命令 | 要运行的命令。使用 \$1 引用位置参数，使用 scriptArgument 指定命令的参数。此值与任何关联参数必须在从中运行任务运行程序的环境中起作用。 | 字符串 | 
| scriptUri | 要下载并作为 shell 命令运行的文件的 Amazon S3 URI 路径。仅指定一个 scriptUri 或 command 字段。scriptUri 不能使用参数，请使用 command。 | 字符串 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行活动或命令的计算资源，例如 Amazon EC2 实例或 Amazon EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myResourceId “\$1 | 
| workerGroup | 用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在指定开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据的位置。 | 参考对象，例如 “输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数。 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如 “onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象未完成的情况下将触发的操作。 | 引用对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据的位置。 | 参考对象，例如 “输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象，例如 “父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 Amazon S3 URI，例如 's3://BucketName/Key/'。 | 字符串 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如 “前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| reportProgressTimeout | 远程活动对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType |  允许您指定应在间隔的开头还是结尾计划您管道定义中的对象。 值为 `cron`、`ondemand` 和 `timeseries`。 如果设置为 `timeseries`，则在每个间隔结束时计划实例。 如果设置为 `Cron`，则在每个间隔开始时计划实例。 如果设置为 `ondemand`，您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果使用 `ondemand` 计划，则在默认对象中指定它，并且该计划必须是在管道中为对象指定的唯一 `scheduleType`。要使用 `ondemand` 管道，请为每个后续运行调用 `ActivatePipeline` 操作。  | 枚举 | 
| scriptArgument | 传递到命令所指定命令的 JSON 格式的字符串数组。例如，如果命令是 echo \$11 \$12，请将 scriptArgument 指定为 "param1", "param2"。对于多个自变量和参数，按如下方式传递 scriptArgument： "scriptArgument":"arg1","scriptArgument":"param1","scriptArgument":"arg2","scriptArgument":"param2"。scriptArgument 只能与 command 一起使用；将其与 scriptUri 一起使用会导致错误。 | 字符串 | 
| stage | 确定是否启用了暂存并且 Shell 命令有权访问暂存数据变量，例如 \$1\$1INPUT1\$1STAGING\$1DIR\$1 和  \$1\$1OUTPUT1\$1STAGING\$1DIR\$1。 | 布尔值 | 
| stderr | 接收来自命令的重定向系统错误消息的 路径。如果您使用 runsOn 字段，则由于运行活动的资源的短期性质，该字段必须为 Amazon S3 路径。不过，如果指定 workerGroup 字段，则允许使用本地文件路径。 | 字符串 | 
| stdout | 接收来自命令的重定向输出的 Amazon S3 路径。如果您使用 runsOn 字段，则由于运行活动的资源的短期性质，该字段必须为 Amazon S3 路径。不过，如果指定 workerGroup 字段，则允许使用本地文件路径。 | 字符串 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 导致对象失败的依赖项链的描述。 | 引用对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 Amazon EMR 活动时可用的 Amazon EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 Amazon EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 对象的状态。 | 字符串 | 
| @version | 用于创建对象的 AWS Data Pipeline 版本。 | 字符串 | 
| @waitingOn | 该对象等待的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1"ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象在生命周期中的位置。组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="shellcommandactivity-seealso"></a>
+ [CopyActivity](dp-object-copyactivity.md)
+ [EmrActivity](dp-object-emractivity.md)

# SqlActivity
<a name="dp-object-sqlactivity"></a>

在数据库上运行 SQL 查询 (脚本)。

## 示例
<a name="sqlactivity-example"></a>

以下是该对象类型的示例。

```
{
  "id" : "MySqlActivity",
  "type" : "SqlActivity",
  "database" : { "ref": "MyDatabaseID" },
  "script" : "SQLQuery" | "scriptUri" : s3://scriptBucket/query.sql,
  "schedule" : { "ref": "MyScheduleID" },
}
```

## 语法
<a name="sqlactivity-syntax"></a>


****  

| 必填字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| database | 要在其上运行提供的 SQL 脚本的数据库。 | 参考对象，例如 “数据库”：\$1"ref”:” myDatabaseId “\$1 | 

 


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 |  该对象在计划间隔的执行中调用。您必须指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。您可以明确地针对该对象设置计划，例如通过指定 `"schedule": {"ref": "DefaultSchedule"}`。 在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。 如果管道有一个嵌套在主计划中的计划树，则可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。  | 参考对象，例如 “日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| script | 要运行的 SQL 脚本。您必须指定 script 或 scriptUri。在将脚本存储在 Amazon S3 中时，脚本不会计算为表达式。在将脚本存储在 Amazon S3 中时，为 scriptArgument 指定多个值会很有用。 | 字符串 | 
| scriptUri | 一个 URI，指定要在此活动中执行的 SQL 脚本的位置。 | 字符串 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行活动或命令的计算资源。例如，Amazon EC2 实例或 Amazon EMR 集群。 | 参考对象，例如 “runson”：\$1“ref”:” myResourceId “\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据的位置。 | 参考对象，例如 “输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 自管道的计划开始时间后的时段，对象运行必须在该时段内开始。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如 “onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 如果在管道预定启动后的时间段内，某个对象尚未计划或仍未完成，则应触发的操作（由 “lateAfterTimeout” 指定）。 | 引用对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据的位置。这仅适用于从脚本内部进行引用（例如\$1\$1output.tablename\$1），以及通过在输出数据节点中设置 “createTableSql” 来创建输出表。SQL 查询的输出不会写入输出数据节点。 | 参考对象，例如 “输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 引用对象，例如 “父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如 “前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| 队列 | [仅 Amazon Redshift] 对应于 Amazon Redshift 中的 query\$1group 设置，允许您根据在队列中的放置位置来分配并发活动以及确定它们的优先级。Amazon Redshift 将同时连接的数量限制为 15。有关更多信息，请参阅《Amazon Redshift 数据库开发人员指南》中的[向队列分配查询](https://docs.aws.amazon.com/redshift/latest/dg/cm-c-executing-queries.html)。 | 字符串 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType |  计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。值包括：`cron`、`ondemand` 和 `timeseries`。  `timeseries` 计划表示在每个间隔结束时计划实例。 `cron` 计划表示在每个间隔开始时计划实例。 `ondemand` 计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果您使用 `ondemand` 计划，则必须在默认对象中指定它，并且该计划必须是在管道中为对象指定的唯一 `scheduleType`。要使用 `ondemand` 管道，请为每个后续运行调用 `ActivatePipeline` 操作。  | 枚举 | 
| scriptArgument | 脚本的变量列表。或者，您可以直接在脚本字段中放置表达式。在将脚本存储在 Amazon S3 中时，scriptArgument 的多个值会很有用。示例：\$1 \$1format (@ scheduledStartTime、“YY-MM-DD HH: MM: SS”\$1\$1 n \$1 \$1format (plusPeriod (@，“1 天”) scheduledStartTime、“HH: MM: SS”\$1 YY-MM-DD  | 字符串 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如 “ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 引用对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 EMR 步骤日志 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用来创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1"ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 