

AWS Data Pipeline 不再提供給新客戶。的現有客戶 AWS Data Pipeline 可以繼續正常使用服務。[進一步了解](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用管道活動預備資料和資料表
<a name="dp-concepts-staging"></a>

AWS Data Pipeline 可以在管道中暫存輸入和輸出資料，以便更輕鬆地使用某些活動，例如 `ShellCommandActivity`和 `HiveActivity`。

資料預備可讓您將資料從輸入資料節點複製到執行活動的資源，並且以相似的方式，從資源複製到輸出資料節點。

使用活動 shell 命令或 Hive 指令碼中的特殊變數，即可取得 Amazon EMR 或 Amazon EC2 資源上的暫存資料。

資料表預備與資料預備相似，其不同處在於其預備的資料會特別採取資料庫資料表的形式。

AWS Data Pipeline 支援下列預備案例：
+ 使用 `ShellCommandActivity` 進行資料預備
+ 使用 Hive 及支援預備的資料節點進行資料表預備
+ 使用 Hive 及不支援預備的資料節點進行資料表預備

**注意**  
預備只有在活動 (例如 `ShellCommandActivity`) 上的 `stage` 欄位設為 `true` 時才能運作。如需詳細資訊，請參閱[ShellCommandActivity](dp-object-shellcommandactivity.md)。

此外，資料節點和活動可以透過四種方式相關：

在資源上於本機預備資料  
輸入資料會自動複製到資源的本機檔案系統。輸出資料會自動從資源的本機檔案系統複製到輸出資料節點。例如，當您設定 `ShellCommandActivity` 輸入和輸出，並設定 staging = true 時，輸入資料可透過 INPUT*x*\$1STAGING\$1DIR 取得，輸出資料則可透過 OUTPUT*x*\$1STAGING\$1DIR 取得，其中 *x* 是輸入和輸出的數字。

活動的預備輸入及輸出定義  
輸入資料格式 (資料行名稱和資料表名稱) 會自動複製到活動的資源。例如，當您設定 `HiveActivity`，並設定 staging = true 時。輸入 `S3DataNode` 上指定的資料格式會用來從 Hive 資料表預備資料表定義。

未啟用預備  
活動可以取得輸入和輸出物件及其欄位，但無法取得資料本身。例如，根據預設的 `EmrActivity`，或是當您以 staging = false 設定其他活動時。在此組態中，資料欄位可供活動使用 AWS Data Pipeline 表達式語法進行參考，只有在滿足相依性時才會發生這種情況。其用途僅只是檢查依存項目。活動中的程式碼會負責將資料從輸入複製到執行活動的資源。

物件之間的依存項目關係  
兩個物件之間存在一種依存關係，這會在未啟用預備時導致類似的情況。這會使資料節點或活動做為執行另一個活動的先決條件。

## 使用 ShellCommandActivity 進行資料預備
<a name="dp-concepts-datastaging"></a>

考慮使用 `ShellCommandActivity` `S3DataNode` 物件做為資料輸入和輸出的案例。 AWS Data Pipeline 會自動將資料節點分段，使其可存取 shell 命令，就像是使用環境變數`${INPUT1_STAGING_DIR}`的本機檔案資料夾`${OUTPUT1_STAGING_DIR}`一樣，如下列範例所示。名為 `INPUT1_STAGING_DIR` 和 `OUTPUT1_STAGING_DIR` 變數的數字部分，會根據您活動參考的資料節點數累加。

**注意**  
 此案例只有在您的資料輸入和輸出為 `S3DataNode` 物件時，才會以說明的方式運作。此外，只有在輸出 `S3DataNode` 物件上有設定 `directoryPath` 時，才允許輸出資料預備。

```
{
  "id": "AggregateFiles",
  "type": "ShellCommandActivity",
  "stage": "true",
  "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv",
  "input": {
    "ref": "MyInputData"
  },
  "output": {
    "ref": "MyOutputData"
  }
},
{
  "id": "MyInputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items"
  }
},                    
{
  "id": "MyOutputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}"
  }
},
...
```

## 使用 Hive 及支援預備的資料節點進行資料表預備
<a name="dp-concepts-tablestaging"></a>

考慮使用 `HiveActivity` 搭配 `S3DataNode` 物件做為資料輸入和輸出的案例。 AWS Data Pipeline 會自動分階段資料節點，使其可存取 Hive 指令碼，就像是使用 變數`${input1}`的 Hive 資料表`${output1}`，如下列 範例所示`HiveActivity`。名為 `input` 和 `output` 變數的數字部分，會根據您活動參考的資料節點數累加。

**注意**  
 此案例只有在您的資料輸入和輸出為 `S3DataNode` 或 `MySqlDataNode` 物件時，才會以說明的方式運作。`DynamoDBDataNode` 不支援資料表預備。

```
{
  "id": "MyHiveActivity",
  "type": "HiveActivity",
  "schedule": {
    "ref": "MySchedule"
  },
  "runsOn": {
    "ref": "MyEmrResource"
  },
  "input": {
    "ref": "MyInputData"
  },
  "output": {
    "ref": "MyOutputData"
  },
  "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};"
},
{
  "id": "MyInputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/input"
  }
},                    
{
  "id": "MyOutputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/output"
  }
},
...
```

## 使用 Hive 及不支援預備的資料節點進行資料表預備
<a name="dp-concepts-nostaging"></a>

考慮搭配 `DynamoDBDataNode` 做為資料輸入，`S3DataNode` 物件做為輸出，使用 `HiveActivity` 的案例。沒有資料預備可供 使用`DynamoDBDataNode`，因此您必須先使用變數名稱來`#{input.tableName}`參考 DynamoDB 資料表，在 hive 指令碼中手動建立資料表。如果 DynamoDB 資料表是輸出，則適用類似的命名法，除非您使用變數 `#{output.tableName}`。預備可供此範例中的輸出 `S3DataNode` 物件使用，因此您可以以 `${output1}` 參考輸出資料節點。

**注意**  
在此範例中，資料表名稱變數具有 \$1 （雜湊） 字元字首，因為 AWS Data Pipeline 使用表達式來存取 `tableName`或 `directoryPath`。如需表達式評估如何在 中運作的詳細資訊 AWS Data Pipeline，請參閱 [表達式評估](dp-pipeline-expressions.md#dp-datatype-functions)。

```
{
  "id": "MyHiveActivity",
  "type": "HiveActivity",
  "schedule": {
    "ref": "MySchedule"
  },
  "runsOn": {
    "ref": "MyEmrResource"
  },
  "input": {
    "ref": "MyDynamoData"
  },
  "output": {
    "ref": "MyS3Data"
  },
  "hiveScript": "-- Map DynamoDB Table
SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com;
SET dynamodb.throughput.read.percent = 0.5;
CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); 
INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;"
},
{
  "id": "MyDynamoData",
  "type": "DynamoDBDataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "tableName": "MyDDBTable"
},                 
{
  "id": "MyS3Data",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/output"
  }
},
...
```