

AWS Data Pipeline は新規顧客には利用できなくなりました。の既存のお客様は、通常どおりサービスを AWS Data Pipeline 引き続き使用できます。[詳細はこちら](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# パイプラインのアクティビティによるデータとテーブルのステージング
<a name="dp-concepts-staging"></a>

AWS Data Pipeline では、パイプラインに入出力データをステージングして、 `ShellCommandActivity`や などの特定のアクティビティを簡単に使用できます`HiveActivity`。

データのステージングを使用して、入力データノードからアクティビティを実行するリソースにデータをコピーできます。同様に、リソースから出力データノードにデータをコピーすることもできます。

Amazon EMR または Amazon EC2 のリソースでステージングされたデータは、アクティビティのシェルコマンドまたは Hive スクリプトで特別な変数を用いることで使用可能です。

テーブルのステージングはデータのステージングと似ていますが、具体的には、ステージングされたデータがデータベーステーブルの形式である点が異なります。

AWS Data Pipeline では、次のステージングシナリオがサポートされています。
+ `ShellCommandActivity` によるデータのステージング
+ Hive およびステージングをサポートするデータノードによるテーブルのステージング
+ Hive およびステージングをサポートしないデータノードによるテーブルのステージング

**注記**  
ステージングは、`ShellCommandActivity` などのアクティビティで `stage` フィールドが `true` に設定されている場合にのみ機能します。詳細については、「[ShellCommandActivity](dp-object-shellcommandactivity.md)」を参照してください。

また、データノードとアクティビティの関係には、次の 4 種類があります。

リソースでローカルにデータをステージング  
入力データはリソースのローカルファイルシステムに自動的にコピーされます。出力データはリソースのローカルファイルシステムから出力データノードに自動的にコピーされます。たとえば、`ShellCommandActivity` の入出力で staging = true に設定すると、入力データは INPUT*x*\$1STAGING\$1DIR、出力データは OUTPUT*x*\$1STAGING\$1DIR として使用可能になります (*x* は入力または出力の番号です)。

アクティビティの入出力定義のステージング  
入力データ形式（列名、テーブル名）がアクティビティのリソースに自動的にコピーされます。たとえば、`HiveActivity` で staging = true に設定する場合です。Hive テーブルからテーブル定義をステージングするために、入力 `S3DataNode` で指定されたデータ形式が使用されます。

ステージングが有効ではない  
アクティビティで入出力オブジェクトとそのフィールドは使用できますが、データそのものは使用できません。たとえば、デフォルトが `EmrActivity` であるか、それ以外のアクティビティで staging = false と設定する場合です。この設定では、データフィールドは、アクティビティが AWS Data Pipeline 式構文を使用して参照するために使用できます。これは、依存関係が満たされた場合にのみ発生します。これは、依存関係のチェックとしてのみ機能します。アクティビティを実行するリソースに対する入力からデータをコピーする責任は、アクティビティのコードにあります。

オブジェクト間の依存関係  
2 個のオブジェクト間に依存関係があり、結果としてステージングが有効でない場合と同じような状況になります。このため、データノードまたはアクティビティは、別のアクティビティを実行するための前提条件として機能します。

## ShellCommandActivity によるデータのステージング
<a name="dp-concepts-datastaging"></a>

`S3DataNode` オブジェクト`ShellCommandActivity`をデータ入力および出力として使用するシナリオを考えてみましょう。次の例`${INPUT1_STAGING_DIR}``${OUTPUT1_STAGING_DIR}`に示すように、 はデータノード AWS Data Pipeline を自動的にステージングして、環境変数を使用するローカルファイルフォルダであるかのようにシェルコマンドにアクセスできるようにします。`INPUT1_STAGING_DIR` という名前の変数の数値部分と、アクティビティが参照するデータノード数に応じた `OUTPUT1_STAGING_DIR` 増分。

**注記**  
 このシナリオは、記述されているように、データ入出力が `S3DataNode` オブジェクトの場合にのみ機能します。さらに、出力データのステージングは、出力 `S3DataNode` で `directoryPath` が設定されている場合にのみ許可されます。

```
{
  "id": "AggregateFiles",
  "type": "ShellCommandActivity",
  "stage": "true",
  "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv",
  "input": {
    "ref": "MyInputData"
  },
  "output": {
    "ref": "MyOutputData"
  }
},
{
  "id": "MyInputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items"
  }
},                    
{
  "id": "MyOutputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}"
  }
},
...
```

## Hive およびステージングをサポートするデータノードによるテーブルのステージング
<a name="dp-concepts-tablestaging"></a>

`S3DataNode` オブジェクト`HiveActivity`をデータ入力および出力として を使用するシナリオを考えてみましょう。次の の例`${input1}``${output1}`に示すように、 はデータノード AWS Data Pipeline を自動的にステージングして、変数を使用する Hive テーブルであるかのように Hive スクリプトにアクセスできるようにします`HiveActivity`。`input` という名前の変数の数値部分と、アクティビティが参照するデータノード数に応じた `output` 増分。

**注記**  
 このシナリオは、記述されているように、データ入出力が `S3DataNode` オブジェクトまたは `MySqlDataNode` オブジェクトの場合にのみ機能します。テーブルのステージングは `DynamoDBDataNode` ではサポートされません。

```
{
  "id": "MyHiveActivity",
  "type": "HiveActivity",
  "schedule": {
    "ref": "MySchedule"
  },
  "runsOn": {
    "ref": "MyEmrResource"
  },
  "input": {
    "ref": "MyInputData"
  },
  "output": {
    "ref": "MyOutputData"
  },
  "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};"
},
{
  "id": "MyInputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/input"
  }
},                    
{
  "id": "MyOutputData",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/output"
  }
},
...
```

## Hive およびステージングをサポートしないデータノードによるテーブルのステージング
<a name="dp-concepts-nostaging"></a>

`HiveActivity` で入力データとして `DynamoDBDataNode`、出力として `S3DataNode` を使用するシナリオを検討します。`DynamoDBDataNode` ではデータのステージングは使用できないため、まず、DynamoDB テーブルを参照するために変数名 `#{input.tableName}` を使用して、Hive スクリプト内でテーブルを手動で作成する必要があります。この DynamoDB テーブルが出力となる場合も同様の命名法が適用されますが、変数が `#{output.tableName}` である点が異なります。この例の出力 `S3DataNode` オブジェクトではステージングを使用できるため、出力データノードを `${output1}` として参照できます。

**注記**  
この例では、 は式を使用して `tableName`または にアクセスするため、テーブル名変数には \$1 (ハッシュ) 文字プレフィックス AWS Data Pipeline が付いています`directoryPath`。式評価の仕組みの詳細については AWS Data Pipeline、「」を参照してください[式の評価](dp-pipeline-expressions.md#dp-datatype-functions)。

```
{
  "id": "MyHiveActivity",
  "type": "HiveActivity",
  "schedule": {
    "ref": "MySchedule"
  },
  "runsOn": {
    "ref": "MyEmrResource"
  },
  "input": {
    "ref": "MyDynamoData"
  },
  "output": {
    "ref": "MyS3Data"
  },
  "hiveScript": "-- Map DynamoDB Table
SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com;
SET dynamodb.throughput.read.percent = 0.5;
CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); 
INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;"
},
{
  "id": "MyDynamoData",
  "type": "DynamoDBDataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "tableName": "MyDDBTable"
},                 
{
  "id": "MyS3Data",
  "type": "S3DataNode",
  "schedule": {
    "ref": "MySchedule"
  },
  "directoryPath": "s3://test-hive/output"
  }
},
...
```