AWS Data Pipeline は、新規顧客には利用できなくなりました。の既存のお客様 AWS Data Pipeline は、通常どおりサービスを引き続き使用できます。詳細はこちら
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
RedshiftCopyActivity
DynamoDB または Amazon S3 から Amazon Redshift にデータをコピーします。新しいテーブルにデータをロードすることも、既存のテーブルにデータを簡単にマージすることもできます。
以下に、RedshiftCopyActivity
を使用するユースケースの概要を示します。
-
まず AWS Data Pipeline 、 を使用して Amazon S3 でデータをステージングします。
-
RedshiftCopyActivity
を使用して、Amazon RDSと Amazon から Amazon Redshift EMRにデータを移動します。これにより、Amazon Redshift にデータをロードして分析を行うことができます。
-
を使用してSqlActivity、Amazon Redshift にロードしたデータに対してSQLクエリを実行します。
さらに、RedshiftCopyActivity
はマニフェストファイルをサポートするため、S3DataNode
を操作できます。詳細については、「S3DataNode」を参照してください。
例
以下は、このオブジェクト型の例です。
形式を確実に変換するために、この例では で EMPTYASNULLとIGNOREBLANKLINES特別な変換パラメータを使用しますcommandOptions
。詳細については、Amazon Redshift データベース開発者ガイドのデータ変換パラメータを参照してください。
{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }
以下のパイプライン定義の例では、APPEND
挿入モードを使用するアクティビティを示しています。
{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }
APPEND
オペレーションは、プライマリキーまたはソートキーにかかわらず、テーブルに項目を追加します。たとえば、以下のテーブルがあるとすると、同じ ID とユーザー値のレコードを追加できます。
ID(PK) USER 1 aaa 2 bbb
以下のように、同じ ID とユーザー値のレコードを追加できます。
ID(PK) USER 1 aaa 2 bbb 1 aaa
注記
APPEND
オペレーションが中断されて再試行される場合、結果として再実行されるパイプラインは、最初から追加される可能性があります。これにより、重複するレコードが追加される可能性があるため、行数をカウントするロジックがある場合は、この動作に注意する必要があります。
チュートリアルについては、「AWS Data Pipeline を使用した Amazon Redshift へのデータのコピー」を参照してください。
構文
必須フィールド | 説明 | スロットタイプ |
---|---|---|
insertMode |
ロードするデータ内の行と重複するターゲットテーブル内の AWS Data Pipeline 既存のデータの処理を決定します。 有効な値は、
|
一覧表 |
オブジェクト呼び出しフィールド | 説明 | スロットタイプ |
---|---|---|
schedule |
このオブジェクトは、スケジュール期間の実行中に呼び出されます。 このオブジェクトの依存関係の実行順序を設定するために、別のオブジェクトへのスケジュール参照を指定します。 ほとんどの場合、すべてのオブジェクトがそのスケジュールを継承するように、スケジュール参照をデフォルトのパイプラインオブジェクトに配置することをお勧めします。たとえば、 パイプラインのマスタースケジュールにネストされたスケジュールがある場合、スケジュール参照がある親オブジェクトを作成します。 オプションのスケジュール設定の例については、「スケジュール」を参照してください。 |
参照オブジェクト ( "schedule":{"ref":"myScheduleId"} など) |
必須のグループ (次のいずれかが必要です) | 説明 | スロットタイプ |
---|---|---|
runsOn | アクティビティまたはコマンドを実行するコンピューティングリソース。例えば、Amazon EC2インスタンスや Amazon EMRクラスターなどです。 | リファレンスオブジェクト、例: "runsOn":{"ref"myResourceId"} |
workerGroup | ワーカーグループ。これはルーティングタスクに使用されます。runsOn 値を指定してworkerGroup 存在する場合、 workerGroup は無視されます。 |
文字列 |
オプションのフィールド | 説明 | スロットタイプ |
---|---|---|
attemptStatus | リモートアクティビティから最も最近報告されたステータス。 | 文字列 |
attemptTimeout | リモートの作業完了のタイムアウト。設定された場合、設定された開始時間内に完了しなかったリモートアクティビティを再試行することができます。 | [Period] (期間) |
commandOptions |
テーブルをロードする際に、 入力データノードまたは出力データノードにデータ形式が関連付けられている場合、指定されたパラメータは無視されます。 コピーオペレーションがまず また、場合によっては、Amazon Redshift クラスターからデータをアンロードし、Amazon S3 でファイルを作成する必要があるときに、 コピーおよびアンロード中のパフォーマンスを向上させるには、 |
文字列 |
dependsOn | 実行可能な別のオブジェクトで依存関係を指定します。 | 参照オブジェクト: "dependsOn":{"ref":"myActivityId"} |
failureAndRerunモード | 依存関係が失敗または再実行されたときのコンシューマーノードの動作を示します。 | 一覧表 |
input | 入力データノード。データソースは、Amazon S3、DynamoDB、または Amazon Redshift を使用できます。 | 参照オブジェクト: "input":{"ref":"myDataNodeId"} |
lateAfterTimeout | オブジェクトが完了しなければならない、パイプライン開始からの経過時間。スケジュールタイプが ondemand に設定されていない場合にのみトリガーされます。 |
[Period] (期間) |
maxActiveInstances | コンポーネントで同時にアクティブになるインスタンスの最大数。再実行はアクティブなインスタンスの数にはカウントされません。 | 整数 |
maximumRetries | 失敗時の最大再試行回数 | 整数 |
onFail | 現在のオブジェクトが失敗したときに実行するアクション。 | 参照オブジェクト: "onFail":{"ref":"myActionId"} |
onLateAction | オブジェクトが予定されていないか、まだ完了していない場合にトリガーされるアクション。 | 参照オブジェクト: "onLateAction":{"ref":"myActionId"} |
onSuccess | 現在のオブジェクトが成功したときに実行するアクション。 | 参照オブジェクト: "onSuccess":{"ref":"myActionId"} |
output | 出力データノード。出力場所は、Amazon S3 または Amazon Redshift を使用できます。 | 参照オブジェクト: "output":{"ref":"myDataNodeId"} |
parent | スロットの継承元となる現在のオブジェクトの親。 | 参照オブジェクト: "parent":{"ref":"myBaseObjectId"} |
pipelineLogUri | パイプラインのログをアップロードするための S3 URI (「s3://BucketName/Key/」など)。 | 文字列 |
precondition | オプションで前提条件を定義します。データノードは、すべての前提条件が満たされREADYるまで「」とマークされません。 | 参照オブジェクト: "precondition":{"ref":"myPreconditionId"} |
キュー |
同時発生した複数アクティビティの割り当てと優先順位付けをキュー内の位置に基づいて行うことができる、Amazon Redshift の Amazon Redshift では、同時接続数が 15 に制限されています。詳細については、「Amazon RDS Database デベロッパーガイド」の「キューへのクエリの割り当て」を参照してください。 |
文字列 |
reportProgressTimeout |
設定された場合、指定された期間の進捗状況を報告しないリモートアクティビティは停止されたと見なし、再試行できます。 |
[Period] (期間) |
retryDelay | 2 回の再試行の間のタイムアウト期間。 | [Period] (期間) |
scheduleType |
パイプライン内のオブジェクトのスケジュールを指定できます。値は、
|
一覧表 |
transformSql |
入力データの変換に使用される
DynamoDB または Amazon S3 からデータをコピーすると、 AWS Data Pipeline によって「staging」という名前のテーブルが作成され、データがあらかじめロードされます。このテーブルのデータは、ターゲットテーブルの更新に使用されます。
|
文字列 |
実行時フィールド | 説明 | スロットタイプ |
---|---|---|
@activeInstances | 現在スケジュールされているアクティブなインスタンスオブジェクトのリスト。 | 参照オブジェクト: "activeInstances":{"ref":"myRunnableObjectId"} |
@actualEndTime | このオブジェクトの実行が終了した時刻。 | DateTime |
@actualStartTime | このオブジェクトの実行が開始された時刻。 | DateTime |
cancellationReason | このオブジェクト cancellationReason がキャンセルされた場合の 。 | 文字列 |
@cascadeFailedOn | オブジェクトが失敗した際の依存関係チェーンの説明。 | 参照オブジェクト: "cascadeFailedOn":{"ref":"myRunnableObjectId"} |
emrStepLog | EMR EMRアクティビティの試行でのみ使用できるステップログ | 文字列 |
errorId | このオブジェクトが失敗 errorId した場合の 。 | 文字列 |
errorMessage | このオブジェクトが失敗 errorMessage した場合の 。 | 文字列 |
errorStackTrace | このオブジェクトが失敗した場合は、エラースタックトレース。 | 文字列 |
@finishedTime | このオブジェクトが実行を終了した時刻。 | DateTime |
hadoopJobLog | EMRベースのアクティビティの試行で使用可能な Hadoop ジョブログ。 | 文字列 |
@healthStatus | 終了状態に達した最後のオブジェクトインスタンスの成功または失敗を反映する、オブジェクトのヘルスステータス。 | 文字列 |
@healthStatusFromInstanceId | 終了状態に達した最後のインスタンスオブジェクトの ID。 | 文字列 |
@healthStatusUpdatedTime | ヘルス状態が最後に更新された時間。 | DateTime |
hostname | タスクの試行を取得したクライアントのホスト名。 | 文字列 |
@lastDeactivatedTime | このオブジェクトが最後に非アクティブ化された時刻。 | DateTime |
@latestCompletedRunTime | 実行が完了した最後の実行の時刻。 | DateTime |
@latestRunTime | 実行がスケジュールされた最後の実行の時刻。 | DateTime |
@nextRunTime | 次回にスケジュールされた実行の時刻。 | DateTime |
reportProgressTime | リモートアクティビティで進捗状況が報告された最新の時刻。 | DateTime |
@scheduledEndTime | オブジェクトの予定された終了時刻。 | DateTime |
@scheduledStartTime | オブジェクトの予定された開始時刻。 | DateTime |
@status | このオブジェクトのステータス。 | 文字列 |
@version | オブジェクトが作成されたパイプラインのバージョン。 | 文字列 |
@waitingOn | このオブジェクトが待機している依存関係のリストの説明。 | 参照オブジェクト: "waitingOn":{"ref":"myRunnableObjectId"} |
システムフィールド | 説明 | スロットタイプ |
---|---|---|
@error | 形式が正しくないオブジェクトを説明するエラー。 | 文字列 |
@pipelineId | このオブジェクトが属するパイプラインの ID。 | 文字列 |
@sphere | オブジェクトの球です。ライフサイクルにおける場所を示します。たとえば、コンポーネントオブジェクトにより、試行オブジェクトを実行するインスタンスオブジェクトが発生します。 | 文字列 |