AWS Data Pipeline は、新規顧客には利用できなくなりました。の既存のお客様 AWS Data Pipeline は、通常どおりサービスを引き続き使用できます。詳細はこちら
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
PigActivity
PigActivity は、 ShellCommandActivity
または を使用する AWS Data Pipeline 必要なく、 で Pig スクリプトをネイティブにサポートしますEmrActivity
。さらに、 はデータステージング PigActivity をサポートします。stage フィールドが true に設定されていると、 AWS Data Pipeline は Pig のスキーマとして入力データをステージングします(ユーザーがコードを追加する必要はありません)。
例
次のパイプライン例は PigActivity
の使い方を示します。パイプライン例では、以下のステップを実行します。
-
MyPigActivity1 は Amazon S3 からデータをロードし、いくつかの列のデータを選択して Amazon S3 にアップロードする Pig スクリプトを実行します。
-
MyPigActivity2 は最初の出力をロードし、数列と 3 行のデータを選択し、2 番目の出力として Amazon S3 にアップロードします。
-
MyPigActivity3 は 2 番目の出力データをロードし、2 行のデータと「fifth」という名前の列のみを Amazon に挿入しますRDS。
-
MyPigActivity4 は Amazon RDSデータをロードし、データの最初の行を選択して Amazon S3 にアップロードします。
{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://
example-bucket
/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path
/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path
", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket
/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket
/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair
", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket
/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance
:3306/example-database
", "selectQuery": "select * from #{table}", "table": "example-table-name
", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path
", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path
", "scriptUri": "s3://example-bucket
/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }
pigTestScript.q
の内容は次のとおりです。
B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;
構文
オブジェクト呼び出しフィールド | 説明 | スロットタイプ |
---|---|---|
schedule | このオブジェクトは、スケジュール期間の実行中に呼び出されます。ユーザーは、このオブジェクトの依存関係の実行順序を設定するには、別のオブジェクトへのスケジュール参照を指定する必要があります。ユーザーは、オブジェクトに明示的にスケジュールを設定することで、この要件を満たすことができます。例えば、「スケジュール」: {"ref": "DefaultSchedule"} を指定します。ほとんどの場合、すべてのオブジェクトがそのスケジュールを継承するように、スケジュール参照をデフォルトのパイプラインオブジェクトに配置することをお勧めします。または、パイプラインにスケジュールのツリー (マスタースケジュール内のスケジュール) がある場合、ユーザーは、スケジュール参照がある親オブジェクトを作成することができます。オプションのスケジュール設定の例については、「https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html」を参照してください。 | リファレンスオブジェクト、例:「schedule」:{「ref"myScheduleId」} |
必須のグループ (次のいずれかが必要です) | 説明 | スロットタイプ |
---|---|---|
script | 実行する Pig スクリプト。 | 文字列 |
scriptUri | 実行する Pig スクリプトの場所 (s3:// などscriptLocation)。 | 文字列 |
必須のグループ (次のいずれかが必要です) | 説明 | スロットタイプ |
---|---|---|
runsOn | EMR これが PigActivity 実行されるクラスター。 | リファレンスオブジェクト、例:runsOn「」:{「ref"myEmrClusterId」} |
workerGroup | ワーカーグループ。これはルーティングタスクに使用されます。値 runsOn を指定して、workerGroup がある場合、workerGroup は無視されます。 |
文字列 |
オプションのフィールド | 説明 | スロットタイプ |
---|---|---|
attemptStatus | リモートアクティビティから最も最近報告されたステータス。 | 文字列 |
attemptTimeout | リモートの作業完了のタイムアウト。設定された場合、設定された開始時間内に完了しなかったリモートアクティビティを再試行することができます。 | [Period] (期間) |
dependsOn | 実行可能な別のオブジェクトで依存関係を指定します。 | リファレンスオブジェクト、例:dependsOn「」:{「ref"myActivityId」} |
failureAndRerunモード | 依存関係が失敗または再実行されたときのコンシューマーノードの動作を示します。 | 一覧表 |
input | 入力データソース。 | リファレンスオブジェクト、例えば「input」:{「ref"myDataNodeId」} |
lateAfterTimeout | オブジェクトが完了しなければならない、パイプライン開始からの経過時間。スケジュールタイプが ondemand に設定されていない場合にのみトリガーされます。 |
[Period] (期間) |
maxActiveInstances | コンポーネントで同時にアクティブになるインスタンスの最大数。再実行はアクティブなインスタンスの数にはカウントされません。 | 整数 |
maximumRetries | 失敗時の最大再試行回数。 | 整数 |
onFail | 現在のオブジェクトが失敗したときに実行するアクション。 | リファレンスオブジェクト、例:onFail「」:{「ref"myActionId」} |
onLateAction | オブジェクトが予定されていないか、まだ完了していない場合にトリガーされるアクション。 | リファレンスオブジェクト、例:onLateAction「」:{「ref"myActionId」} |
onSuccess | 現在のオブジェクトが成功したときに実行するアクション。 | リファレンスオブジェクト、例:onSuccess「」:{「ref"myActionId」} |
output | 出力データソース。 | リファレンスオブジェクト、例:「output」:{「ref"myDataNodeId」} |
parent | スロットの継承元となる現在のオブジェクトの親。 | リファレンスオブジェクト、例:「parent」:{「ref"myBaseObjectId」} |
pipelineLogUri | パイプラインのログをアップロードするための Amazon S3 URI (「s3://BucketName/Key/」など)。 | 文字列 |
postActivityTask設定 | 実行するポストアクティビティ設定スクリプト。これは、Amazon S33 URIのシェルスクリプトの と引数のリストで構成されます。 | リファレンスオブジェクト、例:postActivityTask「Config」:{「ref"myShellScriptConfigId」} |
preActivityTask設定 | 実行するプリアクティビティ設定スクリプト。これは、Amazon S3 URIのシェルスクリプトの と引数のリストで構成されます。 Amazon S3 | リファレンスオブジェクト、例:preActivityTask「Config」:{「ref"myShellScriptConfigId」} |
precondition | オプションで前提条件を定義します。すべての前提条件が満たされREADYるまで、データノードは「」とマークされません。 | リファレンスオブジェクト、例:「precondition」:{「ref"myPreconditionId」} |
reportProgressTimeout | reportProgress へのリモート作業の連続した呼び出しのタイムアウト。設定された場合、指定された期間の進捗状況を報告しないリモートアクティビティは停止されたと見なし、再試行できます。 |
[Period] (期間) |
resizeClusterBefore実行中 | 入力または出力として指定された DynamoDB データノードに対応するため、このアクティビティを実行する前にクラスターのサイズを変更します。注記アクティビティで を入力データノードまたは出力データノード |
ブール値 |
resizeClusterMaxインスタンス | サイズ変更アルゴリズムによってリクエストできるインスタンスの最大数の制限。 | 整数 |
retryDelay | 2 回の再試行の間のタイムアウト期間。 | [Period] (期間) |
scheduleType | スケジュールタイプによって、パイプライン定義のオブジェクトを、期間の最初にスケジュールするか、最後にスケジュールするかを指定できます。[Time Series Style Scheduling] は、インスタンスが各間隔の最後にスケジュールされることを意味し、[Cron Style Scheduling] は、インスタンスが各間隔の最初にスケジュールされることを意味します。オンデマンドスケジュールにより、アクティベーションごとに 1 回パイプラインを実行することができます。つまり、パイプラインを再実行するために、クローンしたり再作成したりする必要はありません。オンデマンドスケジュールを使用する場合は、デフォルトオブジェクトで指定する必要があり、パイプライン内のオブジェクトに対して scheduleType 指定されるのは のみである必要があります。オンデマンドパイプラインを使用するには、後続の実行ごとに ActivatePipeline オペレーションを呼び出すだけです。値は、cron、ondemand、および timeseries です。 | 一覧表 |
scriptVariable | Pig スクリプトに渡す引数。スクリプトまたは scriptVariable で を使用できますscriptUri。 | 文字列 |
ステージ | ステージングを有効にするかどうかを決定し、Pig スクリプトが ${INPUT1} や ${OUTPUT1} などのステージングされたデータテーブルにアクセスできるようにします。 | ブール値 |
実行時フィールド | 説明 | スロットタイプ |
---|---|---|
@activeInstances | 現在スケジュールされているアクティブなインスタンスオブジェクトのリスト。 | リファレンスオブジェクト、例:activeInstances「」:{「ref"myRunnableObjectId」} |
@actualEndTime | このオブジェクトの実行が終了した時刻。 | DateTime |
@actualStartTime | このオブジェクトの実行が開始された時刻。 | DateTime |
cancellationReason | このオブジェクト cancellationReason がキャンセルされた場合の 。 | 文字列 |
@cascadeFailedOn | オブジェクトが失敗した際の依存関係チェーンの説明。 | リファレンスオブジェクト、例:cascadeFailedOn「」:{「ref"myRunnableObjectId」} |
emrStepLog | Amazon EMR ステップログは、EMRアクティビティの試行でのみ使用できます。 | 文字列 |
errorId | このオブジェクトが失敗 errorId した場合の 。 | 文字列 |
errorMessage | このオブジェクトが失敗 errorMessage した場合の 。 | 文字列 |
errorStackTrace | このオブジェクトが失敗した場合は、エラースタックトレース。 | 文字列 |
@finishedTime | このオブジェクトが実行を終了した時刻。 | DateTime |
hadoopJobLog | EMRベースのアクティビティの試行で使用可能な Hadoop ジョブログ。 | 文字列 |
@healthStatus | 終了状態に達した最後のオブジェクトインスタンスの成功または失敗を反映する、オブジェクトのヘルスステータス。 | 文字列 |
@healthStatusFromInstanceId | 終了状態に達した最後のインスタンスオブジェクトの ID。 | 文字列 |
@healthStatusUpdatedTime | ヘルス状態が最後に更新された時間。 | DateTime |
hostname | タスクの試行を取得したクライアントのホスト名。 | 文字列 |
@lastDeactivatedTime | このオブジェクトが最後に非アクティブ化された時刻。 | DateTime |
@latestCompletedRunTime | 実行が完了した最後の実行の時刻。 | DateTime |
@latestRunTime | 実行がスケジュールされた最後の実行の時刻。 | DateTime |
@nextRunTime | 次回にスケジュールされた実行の時刻。 | DateTime |
reportProgressTime | リモートアクティビティで進捗状況が報告された最新の時刻。 | DateTime |
@scheduledEndTime | オブジェクトの予定された終了時刻。 | DateTime |
@scheduledStartTime | オブジェクトの予定された開始時刻。 | DateTime |
@status | このオブジェクトのステータス。 | 文字列 |
@version | オブジェクトを作成したパイプラインのバージョン。 | 文字列 |
@waitingOn | このオブジェクトが待機している依存関係のリストの説明。 | リファレンスオブジェクト、例:waitingOn「」:{「ref"myRunnableObjectId」} |
システムフィールド | 説明 | スロットタイプ |
---|---|---|
@error | 形式が正しくないオブジェクトを説明するエラー。 | 文字列 |
@pipelineId | このオブジェクトが属するパイプラインの ID。 | 文字列 |
@sphere | オブジェクトの球は、ライフサイクルにおける場所を示します。コンポーネントオブジェクトにより、試行オブジェクトを実行するインスタンスオブジェクトが発生します。 | 文字列 |