AWS Data Pipeline は、新規顧客には利用できなくなりました。の既存のお客様 AWS Data Pipeline は、通常どおりサービスを引き続き使用できます。詳細はこちら
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
パイプラインのアクティビティによるデータとテーブルのステージング
AWS Data Pipeline では、パイプラインの入出力データをステージングすることで、ShellCommandActivity
や HiveActivity
などの特定のアクティビティの使用が容易になります。
データのステージングを使用して、入力データノードからアクティビティを実行するリソースにデータをコピーできます。同様に、リソースから出力データノードにデータをコピーすることもできます。
Amazon EMR または Amazon EC2 のリソースでステージングされたデータは、アクティビティのシェルコマンドまたは Hive スクリプトで特別な変数を用いることで使用可能です。
テーブルのステージングはデータのステージングと似ていますが、具体的には、ステージングされたデータがデータベーステーブルの形式である点が異なります。
AWS Data Pipeline では、以下のシナリオのステージングがサポートされます。
-
ShellCommandActivity
によるデータのステージング -
Hive およびステージングをサポートするデータノードによるテーブルのステージング
-
Hive およびステージングをサポートしないデータノードによるテーブルのステージング
注記
ステージングは、ShellCommandActivity
などのアクティビティで stage
フィールドが true
に設定されている場合にのみ機能します。詳細については、「ShellCommandActivity」を参照してください。
また、データノードとアクティビティの関係には、次の 4 種類があります。
- リソースでローカルにデータをステージング
-
入力データはリソースのローカルファイルシステムに自動的にコピーされます。出力データはリソースのローカルファイルシステムから出力データノードに自動的にコピーされます。たとえば、
ShellCommandActivity
の入出力で staging = true に設定すると、入力データは INPUTx_STAGING_DIR、出力データは OUTPUTx_STAGING_DIR として使用可能になります (x は入力または出力の番号です)。 - アクティビティの入出力定義のステージング
-
入力データ形式(列名、テーブル名)がアクティビティのリソースに自動的にコピーされます。たとえば、
HiveActivity
で staging = true に設定する場合です。Hive テーブルからテーブル定義をステージングするために、入力S3DataNode
で指定されたデータ形式が使用されます。 - ステージングが有効ではない
-
アクティビティで入出力オブジェクトとそのフィールドは使用できますが、データそのものは使用できません。たとえば、デフォルトが
EmrActivity
であるか、それ以外のアクティビティで staging = false と設定する場合です。この設定では、AWS Data Pipeline の式構文を使用してデータフィールドを参照するためにアクティビティでデータフィールドを使用できますが、これは依存関係が満たされている場合にのみ参照されます。これは、依存関係のチェックとしてのみ機能します。アクティビティを実行するリソースに対する入力からデータをコピーする責任は、アクティビティのコードにあります。 - オブジェクト間の依存関係
-
2 個のオブジェクト間に依存関係があり、結果としてステージングが有効でない場合と同じような状況になります。このため、データノードまたはアクティビティは、別のアクティビティを実行するための前提条件として機能します。
ShellCommandActivity によるデータのステージング
ShellCommandActivity
でデータ入出力として S3DataNode
オブジェクトを使用するシナリオを検討します。AWS Data Pipeline はデータノードを自動的にステージングし、以下の例に示すように、それらのデータノードがローカルファイルフォルダであるかのように環境変数 ${INPUT1_STAGING_DIR}
および ${OUTPUT1_STAGING_DIR}
を使用してシェルコマンドにアクセスできるようにします。INPUT1_STAGING_DIR
という名前の変数の数値部分と、アクティビティが参照するデータノード数に応じた OUTPUT1_STAGING_DIR
増分。
注記
このシナリオは、記述されているように、データ入出力が S3DataNode
オブジェクトの場合にのみ機能します。さらに、出力データのステージングは、出力 S3DataNode
で directoryPath
が設定されている場合にのみ許可されます。
{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...
Hive およびステージングをサポートするデータノードによるテーブルのステージング
HiveActivity
でデータ入出力として S3DataNode
オブジェクトを使用するシナリオを検討します。AWS Data Pipeline はデータノードを自動的にステージングし、以下の例の HiveActivity
に示すように、それらのデータノードが変数 ${input1}
と ${output1}
を使用する Hive テーブルであるかのように Hive スクリプトにアクセスできるようにします。input
という名前の変数の数値部分と、アクティビティが参照するデータノード数に応じた output
増分。
注記
このシナリオは、記述されているように、データ入出力が S3DataNode
オブジェクトまたは MySqlDataNode
オブジェクトの場合にのみ機能します。テーブルのステージングは DynamoDBDataNode
ではサポートされません。
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...
Hive およびステージングをサポートしないデータノードによるテーブルのステージング
HiveActivity
で入力データとして DynamoDBDataNode
、出力として S3DataNode
を使用するシナリオを検討します。DynamoDBDataNode
ではデータのステージングは使用できないため、まず、DynamoDB テーブルを参照するために変数名 #{input.tableName}
を使用して、Hive スクリプト内でテーブルを手動で作成する必要があります。この DynamoDB テーブルが出力となる場合も同様の命名法が適用されますが、変数が #{output.tableName}
である点が異なります。この例の出力 S3DataNode
オブジェクトではステージングを使用できるため、出力データノードを ${output1}
として参照できます。
注記
AWS Data Pipeline では tableName
または directoryPath
にアクセスするために式を使用するため、この例では、テーブル名変数に #(ハッシュ)文字プレフィックスが付きます。AWS Data Pipeline で式の評価がどのように機能するかの詳細については、「式の評価」を参照してください。
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...