RedshiftCopyActivity - AWS Data Pipeline

AWS Data Pipeline 不再提供給新客戶。現有客戶 AWS Data Pipeline 可繼續正常使用此服務。進一步了解

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

RedshiftCopyActivity

將資料從 DynamoDB 或 Amazon S3 複製到 Amazon Redshift。您可以將資料載入新的資料表,或是輕鬆地將資料併入現有資料表。

以下是使用 RedshiftCopyActivity 的使用案例概觀:

  1. 首先使 AWS Data Pipeline 用在 Amazon S3 中暫存您的資料。

  2. RedshiftCopyActivity於將數據從 Amazon RDS 和 Amazon 移動EMR到 Amazon Redshift。

    這可讓您將資料載入 Amazon Redshift,並在此進行分析。

  3. SqlActivity於對已載入 Amazon Redshift 的資料執行SQL查詢。

此外,RedshiftCopyActivity 可讓您使用 S3DataNode,因為它支援資訊清單檔案。如需詳細資訊,請參閱S3 DataNode

範例

以下為此物件類型的範例。

為了確保格式轉換,此範例在中使用EMPTYASNULLIGNOREBLANKLINES特殊轉換參數commandOptions。如需詳細資訊,請參閱 Amazon Redshift 資料庫開發人員指南中的資料轉換參數

{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }

以下範例管道定義會顯示使用 APPEND 插入模式的活動:

{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }

APPEND 操作會將項目新增到資料表,無論其主索引鍵或排序索引鍵為何。例如,若您有以下資料表,您可以使用相同的 ID 和使用者值附加記錄。

ID(PK) USER 1 aaa 2 bbb

您可以使用相同的 ID 和使用者值附加記錄:

ID(PK) USER 1 aaa 2 bbb 1 aaa
注意

APPEND 操作遭到插斷並進行重試,其導致的重新執行管道可能會從開頭附加。這可能會造成進一步的重複,因此建議您留意此行為,特別是在您擁有任何計算資料列數量的邏輯時。

如需教學,請參閱使用將數據複製到亞馬遜紅移 AWS Data Pipeline

語法

必要欄位 描述 槽類型
insertMode

決定如 AWS Data Pipeline 何處理目標資料表中與要載入之資料列重疊的預先存在資料。

有效值為:KEEP_EXISTINGOVERWRITE_EXISTINGTRUNCATEAPPEND

KEEP_EXISTING 會將新列新增至資料表,並保持任何現有列不變。

KEEP_EXISTING OVERWRITE_EXISTING 使用主索引鍵、排序索引鍵和分發索引鍵,以識別出哪些傳入的列與現有列匹配。請參閱 Amazon Redshift 資料庫開發人員指南中的更新和插入新資料

TRUNCATE 會刪除目的地資料表中的所有資料,再寫入新資料。

APPEND 會將所有記錄新增至 Redshift 資料表結尾。 APPEND 不需要主索引鍵、分發索引鍵或排序索引鍵,因此可能會附加可能重複的項目。

列舉

物件呼叫欄位 描述 槽類型
schedule

在排程間隔的執行期間會呼叫此物件。

指定其他物件的排程參考,以設定此物件的相依性執行順序。

在大部分的情況下,建議將排程參考放在預設的管道物件,讓所有物件都繼承該排程。例如,您可以指定 "schedule": {"ref": "DefaultSchedule"} 在物件上明確設定排程。

如果管道中的主排程包含巢狀排程,請建立具有排程參考的父物件。

如需範例選用排程組態的詳細資訊,請參閱排程

參考物件,例如: "schedule":{"ref":"myScheduleId"}

必要的群組 (下列其中之一為必要) 描述 槽類型
runsOn 執行活動或命令的可運算資源。例如,Amazon EC2 實例或 Amazon EMR 集群。 引用對象,例如 runsOn「「:{" ref」:」myResourceId「}
workerGroup 工作者群組。這是用於路由任務。如果您提供一個runsOn值並且workerGroup存在,workerGroup則會忽略。 字串

選用欄位 描述 槽類型
attemptStatus 遠端活動最新回報的狀態。 字串
attemptTimeout 遠端工作完成的逾時。如果設定,則系統可能會重試未在設定開始時間內完成的遠端活動。 期間
commandOptions

COPY作業期間將參數傳遞至 Amazon Redshift 資料節點。如需參數的相關資訊,請參閱 Amazon Redshift 資料庫開發人員指南COPY中的。

COPY 載入資料表時,會嘗試隱含地將來源資料中的字串轉換為目標欄的資料類型。除了自動進行的預設資料轉換之外,如果您收到錯誤或有其他轉換需求,您也可以指定其他轉換參數。如需詳細資訊,請參閱 Amazon Redshift 資料庫開發人員指南中的資料轉換參數

如果資料格式與輸入或輸出資料節點相關聯,則會忽略提供的參數。

由於複製操作會先使用 COPY 將資料插入臨時資料表中,然後使用 INSERT 命令將資料從臨時資料表複製到目的地資料表,因此某些 COPY 參數會不適用,例如 COPY 命令啟用資料表自動壓縮的功能。如果壓縮為必要,請將欄編碼詳細資訊新增至 CREATE TABLE 陳述式。

此外,在某些情況下,當它需要從 Amazon Redshift 叢集卸載資料並在 Amazon S3 中建立檔案時,需要RedshiftCopyActivity仰賴 Amazon Redshift 的UNLOAD操作。

若要在複製和卸載期間改善效能,請從 UNLOAD 命令指定 PARALLEL OFF 參數。如需參數的相關資訊,請參閱 Amazon Redshift 資料庫開發人員指南UNLOAD中的。

字串
dependsOn 指定與另一個可執行物件的相依性。 參考物件:"dependsOn":{"ref":"myActivityId"}
failureAndRerun模式 描述相依性故障或重新執行時的消費者節點行為 列舉
input 輸入資料節點。資料來源可以是 Amazon S3、DynamoDB 或 Amazon Redshift。 參考物件: "input":{"ref":"myDataNodeId"}
lateAfterTimeout 管線開始後,物件必須在其中完成的經過時間。僅當明細表類型未設定為時,才會觸發此選項ondemand 期間
maxActiveInstances 同時作用中的元件執行個體數目上限。重新執行不計入作用中的執行個體數量。 Integer
maximumRetries 故障時嘗試重試的次數上限 Integer
onFail 目前物件發生故障時要執行的動作。 參考物件:"onFail":{"ref":"myActionId"}
onLateAction 某個物件尚未排程或仍未完成時,應該觸發的動作。 參考物件: "onLateAction":{"ref":"myActionId"}
onSuccess 目前物件成功時要執行的動作。 參考物件: "onSuccess":{"ref":"myActionId"}
output 輸出資料節點。輸出位置可以是 Amazon S3 或 Amazon Redshift。 參考物件: "output":{"ref":"myDataNodeId"}
parent 目前物件的父系,其插槽會被繼承。 參考物件:"parent":{"ref":"myBaseObjectId"}
pipelineLogUri 用於上傳管道日誌的 S3URI(例如 's3://BucketName/鍵/')。 字串
precondition 選擇是否定義先決條件。在符合所有先決條件之前,資料節點不會標記 READY ""。 參考物件:"precondition":{"ref":"myPreconditionId"}
佇列

對應於 Amazon Redshift 中的query_group 設定,可讓您根據並行活動在佇列中的位置指派並排定其優先順序。

Amazon Redshift 會將同時連線數限制在 15。如需詳細資訊,請參閱 Amazon 資RDS料庫開發人員指南中的將查詢指派給佇列

字串
reportProgressTimeout

遠端工作連續呼叫 reportProgress 的逾時。

如果設定,則不回報指定時段進度的遠端活動,可能會視為已停滯而重試。

期間
retryDelay 兩次重試嘗試之間的逾時持續時間。 期間
scheduleType

允許您指定管道中物件的排程。值為:cronondemandtimeseries

timeseries 排程表示執行個體會排程在每個間隔的結尾。

Cron 排程表示執行個體會排程在每個間隔的開頭。

ondemand 排程可讓您在每次啟用時執行一次管道。這表示您不必複製或重新建立管道,然後再執行一次。

若要使用 ondemand 管道,請針對每次後續執行呼叫 ActivatePipeline 操作。

若您使用 ondemand 排程,則必須在預設物件中指定此排程,且其必須是針對管道中物件指定的唯一 scheduleType

列舉
transformSql

用於轉換輸入資料的 SQL SELECT 表達式。

在資料表執行名為 stagingtransformSql 表達式。

當您從 DynamoDB 或 Amazon S3 複製資料時, AWS Data Pipeline 會建立一個名為「暫存」的表格,然後在該表格中載入資料。此資料表中的資料用於更新目標資料表。

transformSql 的輸出結構描述,必須與最終目標表格的結構描述相符。

如果您指定選transformSql項,則會從指定的陳述式建立第二個臨時資料SQL表。然後,第二個臨時資料表中的資料會更新於最終目標資料表。

字串

執行時間欄位 描述 槽類型
@activeInstances 目前已排程的作用中執行個體物件清單。 參考物件:"activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime 此物件執行完成的時間。 DateTime
@actualStartTime 此物件執行開始的時間。 DateTime
cancellationReason cancellationReason 如果此物件已取消。 字串
@cascadeFailedOn 物件失敗所在的相依鏈的描述。 參考物件: "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog EMR步驟記錄僅適用於EMR活動嘗試 字串
errorId errorId 如果此對象失敗。 字串
errorMessage errorMessage 如果此對象失敗。 字串
errorStackTrace 如果此物件失敗,則為錯誤堆疊追蹤。 字串
@finishedTime 此物件完成其執行的時間。 DateTime
hadoopJobLog Hadoop 工作日誌可用於EMR基於活動的嘗試。 字串
@healthStatus 反映已達終止狀態之最後一個物件執行個體成功或失敗的物件運作狀態。 字串
@healthStatusFromInstanceId 已達終止狀態之最後一個執行個體物件的 ID。 字串
@ healthStatusUpdated 時間 上次更新運作狀態的時間。 DateTime
hostname 選取任務嘗試之用戶端的主機名稱。 字串
@lastDeactivatedTime 此物件最後停用的時間。 DateTime
@ latestCompletedRun 時間 執行完成最近一次執行的時間。 DateTime
@latestRunTime 執行排程最近一次執行的時間。 DateTime
@nextRunTime 下次要排程執行的時間。 DateTime
reportProgressTime 遠端活動最近報告進度的時間。 DateTime
@scheduledEndTime 物件的排程結束時間。 DateTime
@scheduledStartTime 物件的排程開始時間。 DateTime
@status 此物件的狀態。 字串
@version 建立物件使用的管道版本。 字串
@waitingOn 此物件等待之相依性清單的描述。 參考物件: "waitingOn":{"ref":"myRunnableObjectId"}

系統欄位 描述 槽類型
@error 描述格式錯誤物件的錯誤。 字串
@pipelineId 此物件所屬管道的 ID。 字串
@sphere 物件的球體。代表其在生命週期中的位置。例如,元件物件會引發執行個體物件,該物件會執行嘗試物件。 字串