

AWS Data Pipeline 不再提供給新客戶。的現有客戶 AWS Data Pipeline 可以繼續正常使用服務。[進一步了解](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# PigActivity
<a name="dp-object-pigactivity"></a>

PigActivity 在 中為 Pig 指令碼提供原生支援， AWS Data Pipeline 無需使用 `ShellCommandActivity`或 `EmrActivity`。此外，PigActivity 支援資料預備。當預備欄位設為 True 時， AWS Data Pipeline 會將輸入資料做為 Pig 中的結構描述預備，而無須使用者輸入額外的程式碼。

## 範例
<a name="pigactivity-example"></a>

以下範例管道示範如何使用 `PigActivity`。範例管道會執行下列步驟：
+ MyPigActivity1 從 Amazon S3 載入資料，並執行 Pig 指令碼，以選取幾欄資料並將其上傳至 Amazon S3。
+ MyPigActivity2 會載入第一個輸出、選取幾欄和三列資料，並將其上傳到 Amazon S3 做為第二個輸出。
+ MyPigActivity3 會載入第二個輸出資料、將兩列資料插入 Amazon RDS，以及僅將名為 "fifth" 的資料欄插入 Amazon RDS。
+ MyPigActivity4 會載入 Amazon RDS 資料、選取第一列資料，並將其上傳至 Amazon S3。

```
{
  "objects": [
    {
      "id": "MyInputData1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "directoryPath": "s3://amzn-s3-demo-bucket/pigTestInput",
      "name": "MyInputData1",
      "dataFormat": {
        "ref": "MyInputDataType1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyPigActivity4",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData3"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path/",
      "name": "MyPigActivity4",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "type": "PigActivity",
      "dependsOn": {
        "ref": "MyPigActivity3"
      },
      "output": {
        "ref": "MyOutputData4"
      },
      "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;",
      "stage": "true"
    },
    {
      "id": "MyPigActivity3",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData2"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "name": "MyPigActivity3",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;",
      "type": "PigActivity",
      "dependsOn": {
        "ref": "MyPigActivity2"
      },
      "output": {
        "ref": "MyOutputData3"
      },
      "stage": "true"
    },
    {
      "id": "MyOutputData2",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "name": "MyOutputData2",
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput2",
      "dataFormat": {
        "ref": "MyOutputDataType2"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyOutputData1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "name": "MyOutputData1",
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput1",
      "dataFormat": {
        "ref": "MyOutputDataType1"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyInputDataType1",
      "name": "MyInputDataType1",
      "column": [
        "First STRING",
        "Second STRING",
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING",
        "Ninth STRING",
        "Tenth STRING"
      ],
      "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)",
      "type": "RegEx"
    },
    {
      "id": "MyEmrResource",
      "region": "us-east-1",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "keyPair": "example-keypair",
      "masterInstanceType": "m1.small",
      "enableDebugging": "true",
      "name": "MyEmrResource",
      "actionOnTaskFailure": "continue",
      "type": "EmrCluster"
    },
    {
      "id": "MyOutputDataType4",
      "name": "MyOutputDataType4",
      "column": "one STRING",
      "type": "CSV"
    },
    {
      "id": "MyOutputData4",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "directoryPath": "s3://amzn-s3-demo-bucket/PigActivityOutput3",
      "name": "MyOutputData4",
      "dataFormat": {
        "ref": "MyOutputDataType4"
      },
      "type": "S3DataNode"
    },
    {
      "id": "MyOutputDataType1",
      "name": "MyOutputDataType1",
      "column": [
        "First STRING",
        "Second STRING",
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING"
      ],
      "columnSeparator": "*",
      "type": "Custom"
    },
    {
      "id": "MyOutputData3",
      "username": "___",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "insertQuery": "insert into #{table} (one) values (?)",
      "name": "MyOutputData3",
      "*password": "___",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "connectionString": "jdbc:mysql://example-database-instance:3306/example-database",
      "selectQuery": "select * from #{table}",
      "table": "example-table-name",
      "type": "MySqlDataNode"
    },
    {
      "id": "MyOutputDataType2",
      "name": "MyOutputDataType2",
      "column": [
        "Third STRING",
        "Fourth STRING",
        "Fifth STRING",
        "Sixth STRING",
        "Seventh STRING",
        "Eighth STRING"
      ],
      "type": "TSV"
    },
    {
      "id": "MyPigActivity2",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyOutputData1"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "name": "MyPigActivity2",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "dependsOn": {
        "ref": "MyPigActivity1"
      },
      "type": "PigActivity",
      "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;",
      "output": {
        "ref": "MyOutputData2"
      },
      "stage": "true"
    },
    {
      "id": "MyEmrResourcePeriod",
      "startDateTime": "2013-05-20T00:00:00",
      "name": "MyEmrResourcePeriod",
      "period": "1 day",
      "type": "Schedule",
      "endDateTime": "2013-05-21T00:00:00"
    },
    {
      "id": "MyPigActivity1",
      "scheduleType": "CRON",
      "schedule": {
        "ref": "MyEmrResourcePeriod"
      },
      "input": {
        "ref": "MyInputData1"
      },
      "pipelineLogUri": "s3://amzn-s3-demo-bucket/path",
      "scriptUri": "s3://amzn-s3-demo-bucket/script/pigTestScipt.q",
      "name": "MyPigActivity1",
      "runsOn": {
        "ref": "MyEmrResource"
      },
      "scriptVariable": [
        "column1=First",
        "column2=Second",
        "three=3"
      ],
      "type": "PigActivity",
      "output": {
        "ref": "MyOutputData1"
      },
      "stage": "true"
    }
  ]
}
```

`pigTestScript.q` 的內容如下所示。

```
B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;
```

## 語法
<a name="pigactivity-syntax"></a>


****  

| 物件呼叫欄位 | Description | 槽類型 | 
| --- | --- | --- | 
| schedule | 在排程間隔的執行期間會呼叫此物件。使用者必須指定另一個物件的排程參考，設定此物件的相依性執行順序。使用者可以明確設定物件的排程以滿足這項需求，例如，指定 "schedule": \$1"ref": "DefaultSchedule"\$1。在大部分的情況下，建議您將排程參考放在預設的管道物件，讓所有物件都繼承該排程。或者，如果管道有排程的樹狀目錄 (主排程內還有排程)，使用者可以建立有排程參考的父物件。如需範例選用排程組態的詳細資訊，請參閱[https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html](https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html)。 | 參考物件，例如 "schedule":\$1"ref":"myScheduleId"\$1 | 

 


****  

| 必要的群組 (下列其中之一為必要) | Description | 槽類型 | 
| --- | --- | --- | 
| script | 要執行的 Pig 指令碼。 | String | 
| scriptUri | 要執行 Pig 指令碼的位置 (例如 s3://scriptLocation)。 | String | 

 


****  

| 必要的群組 (下列其中之一為必要) | Description | 槽類型 | 
| --- | --- | --- | 
| runsOn | 要在其中執行此 PigActivity 的 EMR 叢集。 | 參考物件，例如 "runsOn":\$1"ref":"myEmrClusterId"\$1 | 
| workerGroup | 工作者群組。這是用於路由任務。如果您提供 runsOn 值，且 workerGroup 存在，則會忽略 workerGroup。 | String | 

 


****  

| 選用欄位 | Description | 槽類型 | 
| --- | --- | --- | 
| attemptStatus | 遠端活動最新回報的狀態。 | String | 
| attemptTimeout | 遠端工作完成的逾時。如果設定，則系統可能會重試未在設定開始時間內完成的遠端活動。 | Period | 
| dependsOn | 指定與其他可執行物件的相依性。 | 參考物件，例如 "dependsOn":\$1"ref":"myActivityId"\$1 | 
| failureAndRerunMode | 描述相依性故障或重新執行時的消費者節點行為。 | 列舉 | 
| input | 輸入資料來源。 | 參考物件，例如 "input":\$1"ref":"myDataNodeId"\$1 | 
| lateAfterTimeout | 物件必須完成的管道啟動後經過的時間。只有在排程類型未設定為 時，才會觸發它ondemand。 | Period | 
| maxActiveInstances | 同時作用中的元件執行個體數目上限。重新執行不計入作用中的執行個體數量。 | Integer | 
| maximumRetries | 故障時嘗試重試的次數上限。 | Integer | 
| onFail | 目前物件發生故障時要執行的動作。 | 參考物件，例如 "onFail":\$1"ref":"myActionId"\$1 | 
| onLateAction | 某個物件尚未排程或仍未完成時，應該觸發的動作。 | 參考物件，例如 "onLateAction":\$1"ref":"myActionId"\$1 | 
| onSuccess | 目前物件成功時要執行的動作。 | 參考物件，例如 "onSuccess":\$1"ref":"myActionId"\$1 | 
| output | 輸出資料來源。 | 參考物件，例如 "output":\$1"ref":"myDataNodeId"\$1 | 
| parent | 目前物件的父系，其插槽會被繼承。 | 參考物件，例如 "parent":\$1"ref":"myBaseObjectId"\$1 | 
| pipelineLogUri | 用於上傳管道日誌的 Amazon S3 URI （例如 's3：//BucketName/Key/')。 | String | 
| postActivityTaskConfig | 要執行的活動後組態指令碼。這包含 Amazon S33 中 shell 指令碼的 URI 和引數清單。 | 參考物件，例如 "postActivityTaskConfig":\$1"ref":"myShellScriptConfigId"\$1 | 
| preActivityTaskConfig | 要執行的活動前組態指令碼。這包含 Amazon S3 中的 shell 指令碼 URI 和引數清單。 | 參考物件，例如 "preActivityTaskConfig":\$1"ref":"myShellScriptConfigId"\$1 | 
| precondition | 選擇是否定義先決條件。在所有先決條件滿足前，資料節點不會標示為"READY"。 | 參考物件，例如 "precondition":\$1"ref":"myPreconditionId"\$1 | 
| reportProgressTimeout | 遠端工作連續呼叫 reportProgress 的逾時。如果設定，則不回報指定時段進度的遠端活動，可能會視為已停滯而重試。 | Period | 
| resizeClusterBeforeRunning | 在執行此活動之前調整叢集的大小，以容納指定為輸入或輸出的 DynamoDB 資料節點。 如果您的活動使用 `DynamoDBDataNode`做為輸入或輸出資料節點，而且如果您`resizeClusterBeforeRunning`將 設定為 `TRUE`，則 會使用`m3.xlarge`執行個體類型 AWS Data Pipeline 開始。這會將您選擇的執行個體類型覆寫為 `m3.xlarge`，可能會增加您的每月成本。  | Boolean | 
| resizeClusterMaxInstances | 調整大小演算法可請求的執行個體數目上限。 | Integer | 
| retryDelay | 兩次重試嘗試之間的逾時持續時間。 | Period | 
| scheduleType | 排程類型可讓您指定管道定義的物件應該排程在間隔開頭還是間隔結尾。時間序列樣式排程表示執行個體排程在每個間隔的結尾，而 Cron 樣式排程表示執行個體排程在每個間隔的開頭。隨需排程可讓您在每次啟用時執行一次管道。這表示您不必複製或重新建立管道，然後再執行一次。若您使用隨需排程，則必須在預設物件中指定此排程，且其必須是針對管道中物件指定的唯一 scheduleType。若要使用隨需管道，您只要針對每次後續執行呼叫 ActivatePipeline 操作即可。值為：Cron、ondemand 和 timeseries。 | 列舉 | 
| scriptVariable | 要傳遞給 Pig 指令碼的引數。您可以搭配使用 scriptVariable 和 script 或 scriptUri。 | String | 
| stage | 決定是否啟用接移，並讓您的 Pig 指令碼存取暫存資料的資料表，例如 \$1\$1INPUT1\$1 和 \$1\$1OUTPUT1\$1。 | Boolean | 

 


****  

| 執行時間欄位 | Description | 槽類型 | 
| --- | --- | --- | 
| @activeInstances | 目前已排程的作用中執行個體物件清單。 | 參考物件，例如 "activeInstances":\$1"ref":"myRunnableObjectId"\$1 | 
| @actualEndTime | 此物件執行完成的時間。 | DateTime | 
| @actualStartTime | 此物件執行開始的時間。 | DateTime | 
| cancellationReason | 若此物件已取消，會提供 cancellationReason。 | String | 
| @cascadeFailedOn | 物件失敗所在的相依鏈的描述。 | 參考物件，例如 "cascadeFailedOn":\$1"ref":"myRunnableObjectId"\$1 | 
| emrStepLog | Amazon EMR 步驟日誌僅適用於 EMR 活動嘗試。 | String | 
| errorId | 若此物件失敗，會提供 errorId。 | String | 
| errorMessage | 若此物件失敗，會提供 errorMessage。 | String | 
| errorStackTrace | 如果此物件失敗，則為錯誤堆疊追蹤。 | String | 
| @finishedTime | 此物件完成其執行的時間。 | DateTime | 
| hadoopJobLog | 嘗試 EMR 型活動可用的 Hadoop 任務日誌。 | String | 
| @healthStatus | 反映已達終止狀態之最後一個物件執行個體成功或失敗的物件運作狀態。 | String | 
| @healthStatusFromInstanceId | 已達終止狀態之最後一個執行個體物件的 ID。 | String | 
| @healthStatusUpdatedTime | 上次更新運作狀態的時間。 | DateTime | 
| hostname | 選取任務嘗試之用戶端的主機名稱。 | String | 
| @lastDeactivatedTime | 此物件最後停用的時間。 | DateTime | 
| @latestCompletedRunTime | 執行完成最近一次執行的時間。 | DateTime | 
| @latestRunTime | 執行排程最近一次執行的時間。 | DateTime | 
| @nextRunTime | 下次要排程執行的時間。 | DateTime | 
| reportProgressTime | 遠端活動最近報告進度的時間。 | DateTime | 
| @scheduledEndTime | 物件的排程結束時間。 | DateTime | 
| @scheduledStartTime | 物件的排程開始時間。 | DateTime | 
| @status | 此物件的狀態。 | String | 
| @version | 建立物件時使用的管道版本。 | String | 
| @waitingOn | 此物件等待之相依性清單的描述。 | 參考物件，例如 "waitingOn":\$1"ref":"myRunnableObjectId"\$1 | 

 


****  

| 系統欄位 | Description | 槽類型 | 
| --- | --- | --- | 
| @error | 描述格式錯誤物件的錯誤。 | String | 
| @pipelineId | 此物件所屬管道的 ID。 | String | 
| @sphere | 物件範圍代表其在生命週期中的位置：Component 物件會引發執行 Attempt 物件的 Instance 物件。 | String | 

## 另請參閱
<a name="pigactivity-seealso"></a>
+ [ShellCommandActivity](dp-object-shellcommandactivity.md)
+ [EmrActivity](dp-object-emractivity.md)