

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。[了解详情](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# PigActivity
<a name="dp-object-pigactivity"></a>

PigActivity 为中的 Pig 脚本提供原生支持， AWS Data Pipeline 无需使用`ShellCommandActivity`或`EmrActivity`。此外，还 PigActivity 支持数据暂存。在将 stage 字段设置为 true 时， AWS Data Pipeline 会在 Pig 中将输入数据暂存为架构，而无需用户编写其他代码。

## 示例
<a name="pigactivity-example"></a>

以下示例管道说明如何使用 `PigActivity`。该示例管道执行以下步骤：
+ MyPigActivity1 从 Amazon S3 加载数据并运行一个 Pig 脚本，该脚本选择几列数据并将其上传到 Amazon S3。
+ MyPigActivity2 加载第一个输出，选择几列三行数据，然后将其作为第二个输出上传到 Amazon S3。
+ MyPigActivity3 加载第二个输出数据，在 Amazon RDS 中插入两行数据，仅插入名为 “第五行” 的列。
+ MyPigActivity4 加载亚马逊 RDS 数据，选择第一行数据，然后将其上传到 Amazon S3。

```
{
  "objects": [
    {
      "id": "MyInputData1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "directoryPath": "s3://amzn-s3-demo-bucket/pigTestInput",
      "name": "MyInputData1",
      "dataFormat": {
        "ref": "MyInputDataType1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyPigActivity4",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData3"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path/",
      "name": "MyPigActivity4",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "type": "PigActivity",
      "dependsOn": {
        "ref": "MyPigActivity3"
      },
      "output": {
        "ref": "MyOutputData4"
      },
      "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;",
      "stage": "true"
    },
    {
      "id": "MyPigActivity3",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData2"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "name": "MyPigActivity3",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;",
      "type": "PigActivity",
      "dependsOn": {
        "ref": "MyPigActivity2"
      },
      "output": {
        "ref": "MyOutputData3"
      },
      "stage": "true"
    },
    {
      "id": "MyOutputData2",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "name": "MyOutputData2",
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput2",
      "dataFormat": {
        "ref": "MyOutputDataType2"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyOutputData1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "name": "MyOutputData1",
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput1",
      "dataFormat": {
        "ref": "MyOutputDataType1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyInputDataType1",
      "name": "MyInputDataType1",
      "column": [
        "First STRING",
        "Second STRING",
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING",
        "Ninth STRING",
        "Tenth STRING"
      ],
      "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)",
      "type": "RegEx"
    },
    {
      "id": "MyEmrResource",
      "region": "us-east-1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "keyPair": "example-keypair",
      "masterInstanceType": "m1.small",
      "enableDebugging": "true",
      "name": "MyEmrResource",
      "actionOnTaskFailure": "continue",
      "type": "EmrCluster"
    },
    {
      "id": "MyOutputDataType4",
      "name": "MyOutputDataType4",
      "column": "one STRING",
      "type": "CSV"
    },
    {
      "id": "MyOutputData4",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput3",
      "name": "MyOutputData4",
      "dataFormat": {
        "ref": "MyOutputDataType4"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyOutputDataType1",
      "name": "MyOutputDataType1",
      "column": [
        "First STRING",
        "Second STRING",
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING"
      ],
      "columnSeparator": "*",
      "type": "Custom"
    },
    {
      "id": "MyOutputData3",
      "username": "___",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "insertQuery": "insert into #{table} (one) values (?)",
      "name": "MyOutputData3",
      "*password": "___",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "connectionString": "jdbc:mysql://example-database-instance:3306/example-database",
      "selectQuery": "select * from #{table}",
      "table": "example-table-name",
      "type": "MySqlDataNode"
    },
    {
      "id": "MyOutputDataType2",
      "name": "MyOutputDataType2",
      "column": [
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING"
      ],
      "type": "TSV"
    },
    {
      "id": "MyPigActivity2",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData1"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "name": "MyPigActivity2",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "dependsOn": {
        "ref": "MyPigActivity1"
      },
      "type": "PigActivity",
      "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;",
      "output": {
        "ref": "MyOutputData2"
      },
      "stage": "true"
    },
    {
      "id": "MyEmrResourcePeriod",
      "startDateTime": "2013-05-20T00:00:00",
      "name": "MyEmrResourcePeriod",
      "period": "1 day",
      "type": "Schedule",
      "endDateTime": "2013-05-21T00:00:00"
    },
    {
      "id": "MyPigActivity1",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyInputData1"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "scriptUri": "s3://amzn-s3-demo-bucket/script/pigTestScipt.q",
      "name": "MyPigActivity1",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "scriptVariable": [
        "column1=First",
        "column2=Second",
        "three=3"
      ],
      "type": "PigActivity",
      "output": {
        "ref": "MyOutputData1"
      },
      "stage": "true"
    }
  ]
}
```

`pigTestScript.q` 的内容如下所示。

```
B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;
```

## 语法
<a name="pigactivity-syntax"></a>


****  

| 对象调用字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| 计划 | 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用，以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求，例如，指定 “schedule”: \$1"ref”: "DefaultSchedule“\$1。在大多数情况下，最好将计划引用放在默认管道对象上，以便所有对象继承该计划。或者，如果管道有一个计划树 (计划位于主计划中)，用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息，请参阅 [https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 参考对象，例如，“日程安排”：\$1“ref”:” myScheduleId “\$1 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| script | 要运行的 Pig 脚本。 | 字符串 | 
| scriptUri | 要运行的 Pig 脚本的位置 (例如，s3://scriptLocation)。 | 字符串 | 

 


****  

| 所需的组 (下列选项之一是必需的) | 说明 | 槽位类型 | 
| --- | --- | --- | 
| runsOn | 运行它的 EMR 集群。 PigActivity  | 参考对象，例如 “runson”：\$1“ref”:” myEmrCluster Id "\$1 | 
| workerGroup | 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup，则将忽略 workerGroup | 字符串 | 

 


****  

| 可选字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| attemptStatus | 来自远程活动的最近报告的状态。 | 字符串 | 
| attemptTimeout | 远程工作完成的超时时间。如果设置此字段，则可能会重试未在设定的开始时间内完成的远程活动。 | 周期 | 
| dependsOn | 指定与另一个可运行对象的依赖关系。 | 参考对象，例如 “dependSon”：\$1“ref”:” myActivityId “\$1 | 
| failureAndRerun模式 | 描述依赖项失败或重新运行时的使用者节点行为。 | 枚举 | 
| input | 输入数据源。 | 参考对象，例如，“输入”：\$1"ref”:” myDataNode Id "\$1 | 
| lateAfterTimeout | 管道启动后经过的时间，在此时间内，对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 | 周期 | 
| maxActiveInstances | 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 | 整数 | 
| maximumRetries | 失败后的最大重试次数。 | 整数 | 
| onFail | 当前对象失败时要运行的操作。 | 参考对象，例如，“onFail”：\$1“ref”:” myActionId “\$1 | 
| onLateAction | 在尚未计划对象或对象仍未完成的情况下将触发的操作。 | 参考对象，例如 onLateAction ““: \$1" ref”:” myActionId “\$1 | 
| onSuccess | 当前对象成功时要运行的操作。 | 参考对象，例如 “onSuccess”：\$1“ref”:” myActionId “\$1 | 
| output | 输出数据源。 | 参考对象，例如，“输出”：\$1"ref”:” myDataNode Id "\$1 | 
| parent | 槽将继承自的当前对象的父级。 | 参考对象，例如，“父对象”：\$1"ref”:” myBaseObject Id "\$1 | 
| pipelineLogUri | 用于上传管道日志的 Amazon S3 URI（例如 's3: BucketName ///Key/ '）。 | 字符串 | 
| postActivityTaskConfig | 要运行的活动后配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如，“postActivityTaskConfig”: \$1“ref”:” myShellScript ConfigId “\$1 | 
| preActivityTaskConfig | 要运行的活动前配置脚本。这由 Amazon S3 中 Shell 脚本的 URI 和参数列表组成。 | 参考对象，例如，“preActivityTaskConfig”: \$1“ref”:” myShellScript ConfigId “\$1 | 
| precondition | (可选) 定义先决条件。在满足所有先决条件之前，数据节点不会标记为“READY”。 | 参考对象，例如，“前提条件”：\$1“ref”:” myPreconditionId “\$1 | 
| reportProgressTimeout | 远程工作对 reportProgress 的连续调用的超时时间。如果设置此字段，则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 | 周期 | 
| resizeClusterBefore正在跑步 | 在执行此活动前，重新调整集群的大小，以适应指定为输入或输出的 DynamoDB 数据节点。 如果您的活动使用`DynamoDBDataNode`作为输入或输出数据节点，并且将设置`resizeClusterBeforeRunning`为`TRUE`，则 AWS Data Pipeline 开始使用`m3.xlarge`实例类型。这将使用 `m3.xlarge` 覆盖您的实例类型，从而可能会增加您的月度成本。  | 布尔值 | 
| resizeClusterMax实例 | 调整大小算法可以请求的最大实例数的限制。 | 整数 | 
| retryDelay | 两次重试之间的超时时间。 | 周期 | 
| scheduleType | 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例，而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着，您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划，则必须在默认对象中指定它，并且必须是在管道中为对象指定的唯一 scheduleType。要使用按需管道，您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括：cron、ondemand 和 timeseries。 | 枚举 | 
| scriptVariable | 要传递到 Pig 脚本的参数。您可以将 scriptVariable 与 script 或 scriptUri 一起使用。 | 字符串 | 
| stage | 确定是否启用暂存功能并允许您的 Pig 脚本访问分阶段数据表，例如 \$1 \$1INPUT1\$1 和 \$1 \$1\$1。OUTPUT1 | 布尔值 | 

 


****  

| 运行时字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @activeInstances | 当前计划的有效实例对象的列表。 | 参考对象，例如，“ActiveInstances”：\$1"ref”:” myRunnableObject Id "\$1 | 
| @actualEndTime | 该对象的执行完成时间。 | DateTime | 
| @actualStartTime | 该对象的执行开始时间。 | DateTime | 
| cancellationReason | 该对象被取消时显示的 cancellationReason。 | 字符串 | 
| @cascadeFailedOn | 对象在其上失败的依赖项链的描述。 | 参考对象，例如 cascadeFailedOn ““: \$1" ref”:” myRunnableObject Id "\$1 | 
| emrStepLog | 仅在尝试 EMR 活动时可用的 Amazon EMR 步骤日志。 | 字符串 | 
| errorId | 该对象失败时显示的 errorId。 | 字符串 | 
| errorMessage | 该对象失败时显示的 errorMessage。 | 字符串 | 
| errorStackTrace | 该对象失败时显示的错误堆栈跟踪。 | 字符串 | 
| @finishedTime | 该对象完成其执行的时间。 | DateTime | 
| hadoopJobLog | 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 | 字符串 | 
| @healthStatus | 对象的运行状况，反映进入终止状态的上个对象实例成功还是失败。 | 字符串 | 
| @healthStatusFromInstanceId | 进入终止状态的上个实例对象的 ID。 | 字符串 | 
| @ T healthStatusUpdated ime | 上次更新运行状况的时间。 | DateTime | 
| hostname | 已执行任务尝试的客户端的主机名。 | 字符串 | 
| @lastDeactivatedTime | 上次停用该对象的时间。 | DateTime | 
| @ T latestCompletedRun ime | 已完成执行的最新运行的时间。 | DateTime | 
| @latestRunTime | 已计划执行的最新运行的时间。 | DateTime | 
| @nextRunTime | 计划下次运行的时间。 | DateTime | 
| reportProgressTime | 远程活动报告进度的最近时间。 | DateTime | 
| @scheduledEndTime | 对象的计划结束时间。 | DateTime | 
| @scheduledStartTime | 对象的计划开始时间。 | DateTime | 
| @status | 该对象的状态。 | 字符串 | 
| @version | 用于创建对象的管道版本。 | 字符串 | 
| @waitingOn | 该对象在其上处于等待状态的依赖项列表的描述。 | 参考对象，例如 “waitingOn”：\$1“ref”:” myRunnableObject Id "\$1 | 

 


****  

| 系统字段 | 说明 | 槽位类型 | 
| --- | --- | --- | 
| @error | 用于描述格式不正确的对象的错误消息。 | 字符串 | 
| @pipelineId | 该对象所属的管道的 ID。 | 字符串 | 
| @sphere | 对象的范围指明对象在生命周期中的位置：组件对象产生实例对象，后者执行尝试对象。 | 字符串 | 

## 另请参阅
<a name="pigactivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [EmrActivity](dp-object-emractivity.md)