AWS Data Pipeline 신규 고객은 더 이상 이용할 수 없습니다. 의 기존 고객 AWS Data Pipeline 정상적으로 서비스를 계속 사용할 수 있습니다. 자세히 알아보기
기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
RedshiftCopyActivity
DynamoDB 또는 Amazon S3에서 Amazon Redshift로 데이터를 복사합니다. 데이터를 기존 테이블로 로드하거나 기존 테이블에 데이터를 쉽게 병합할 수 있습니다.
다음은 RedshiftCopyActivity
를 사용할 사용 사례의 개요입니다.
-
Amazon S3에서 데이터를 AWS Data Pipeline 스테이징하는 데 사용하는 것부터 시작하십시오.
-
RDS아마존과 아마존에서 아마존 Redshift로 데이터를 이동하는
RedshiftCopyActivity
EMR 데 사용합니다.이렇게 하면 Amazon Redshift에 데이터를 로드하여 데이터를 분석할 수 있습니다.
-
Amazon Redshift에 로드한 데이터에 대해 SQL 쿼리를 수행하는 데 사용합니다SqlActivity.
또한 RedshiftCopyActivity
는 매니페스트 파일을 지원하므로 S3DataNode
작업을 할 수 있습니다. 자세한 내용은 S3 DataNode 단원을 참조하십시오.
예
다음은 이 객체 유형의 예제입니다.
형식 변환을 보장하기 위해 이 예제에서는 에서 IGNOREBLANKLINEScommandOptions
특수 변환 파라미터를 사용합니다 EMPTYASNULL. 자세한 내용은 Amazon Redshift 데이터베이스 개발자 가이드의 데이터 변환 파라미터를 참조하십시오.
{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }
다음 예제 파이프라인 정의는 APPEND
삽입 모드를 사용하는 활동을 보여줍니다.
{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }
APPEND
작업은 기본 키 또는 정렬 키에 관계 없이 테이블에 항목을 추가합니다. 예를 들어 다음 테이블이 있는 경우 동일한 ID와 사용자 값으로 레코드를 추가할 수 있습니다.
ID(PK) USER 1 aaa 2 bbb
동일한 ID와 사용자 값으로 레코드를 추가할 수 있습니다.
ID(PK) USER 1 aaa 2 bbb 1 aaa
참고
APPEND
작업이 중단되었다가 재시도되는 경우, 결과 재실행 파이프라인이 처음부터 잠재적으로 추가됩니다. 이로 인해 더 많은 중복이 발생할 수 있으므로 특히 행 수를 계산하는 논리가 있는 경우에는 이 동작을 숙지하고 있어야 합니다.
자습서는 AWS Data Pipeline을(를) 사용하여 데이터를 Amazon Redshift로 복사을 참조하세요.
구문
필수 필드 | 설명 | 슬롯 유형 |
---|---|---|
insertMode |
대상 테이블의 기존 데이터 중 로드할 데이터의 행과 겹치는 데이터를 AWS Data Pipeline 어떻게 처리할지 결정합니다. 유효 값은
|
열거 |
액체 호출 필드 | 설명 | 슬롯 유형 |
---|---|---|
schedule |
이 객체는 예약 간격을 실행할 때 호출됩니다. 이 객체의 종속 실행 순서를 설정하려면 다른 객체로 일정 참조를 지정합니다. 대부분의 경우, 모든 객체가 일정을 상속할 수 있도록 해당 일정 참조를 기본 파이프라인 객체에 두는 것이 좋습니다. 예를 들어 파이프라인의 마스터 일정에 중첩된 일정이 포함된 경우, 일정 참조가 있는 부모 객체를 생성합니다. 선택형 일정 구성 예제에 대한 자세한 내용은 일정을 참조하십시오. |
"schedule":{"ref":"myScheduleId"} 와 같은 참조 객체 |
필수 그룹(다음 중 하나를 제공해야 함) | 설명 | 슬롯 유형 |
---|---|---|
runsOn | 활동 또는 명령을 실행할 전산 리소스입니다. 아마존 EC2 인스턴스 또는 아마존 EMR 클러스터를 예로 들 수 있습니다. | 참조 객체, 예: "runsOn“: {" ref”:” myResourceId “} |
workerGroup | 작업자 그룹입니다. 이것은 작업 라우팅에 사용됩니다. runsOn 값을 제공하고 workerGroup 존재하면 workerGroup 무시됩니다. |
String |
선택 필드 | 설명 | 슬롯 유형 |
---|---|---|
attemptStatus | 원격 활동에서 가장 최근에 보고된 상태입니다. | String |
attemptTimeout | 원격 작업 완료의 제한 시간입니다. 이 필드를 설정하면 설정된 시작 시간 이내에 완료되지 않는 원격 활동을 재시도할 수 있습니다. | 기간 |
commandOptions |
데이터 형식이 입력 또는 출력 데이터 노드와 연결된 경우에는 제공된 파라미터는 무시됩니다. 복사 작업에서는 먼저 또한 Amazon Redshift 클러스터에서 데이터를 언로드하고 Amazon S3 에서 파일을 생성해야 하는 경우, 복사 및 언로드 중에 성능을 개선하려면 |
String |
dependsOn | 실행 가능한 다른 객체의 종속성을 지정합니다. | 참조 객체: "dependsOn":{"ref":"myActivityId"} |
failureAndRerun모드 | 종속 요소에 장애가 있거나 재시도될 때의 소비자 노드 거동을 설명합니다. | 열거 |
입력 | 입력 데이터 노드입니다. Amazon S3, DynamoDB 또는 Amazon Redshift가 데이터 소스가 될 수 있습니다. | 참조 객체: "input":{"ref":"myDataNodeId"} |
lateAfterTimeout | 파이프라인 시작 후 객체가 완료되어야 하는 경과 시간입니다. 스케줄 유형이 ondemand (으)로 설정되지 않은 경우에만 트리거됩니다. |
기간 |
maxActiveInstances | 구성요소의 동시 활성 인스턴스 최대수입니다. 재실행은 활성 인스턴스의 수에 포함되지 않습니다. | Integer |
maximumRetries | 장애 시 재시도 최대 횟수 | Integer |
onFail | 현재 객체 장애 시 실행할 작업입니다. | 참조 객체: "onFail":{"ref":"myActionId"} |
onLateAction | 객체가 아직 예약되지 않았거나 아직 완료되지 않은 경우에 트리거되어야 하는 작업입니다. | 참조 객체: "onLateAction":{"ref":"myActionId"} |
onSuccess | 현재 객체 성공 시 실행할 작업입니다. | 참조 객체: "onSuccess":{"ref":"myActionId"} |
output | 출력 데이터 노드입니다. Amazon S3 또는 Amazon Redshift가 출력 위치가 될 수 있습니다. | 참조 객체: "output":{"ref":"myDataNodeId"} |
parent | 슬롯을 상속할 현재 객체의 부모입니다. | 참조 객체: "parent":{"ref":"myBaseObjectId"} |
pipelineLogUri | 파이프라인 로그를 업로드하기 위한 S3 URI (예: 's3://BucketName/Key/ '). | String |
precondition | 또는 사전 조건을 정의합니다. 모든 사전 조건이 충족될 때까지 데이터 노드는 READY "“로 표시되지 않습니다. | 참조 객체: "precondition":{"ref":"myPreconditionId"} |
대기열 |
Amazon Redshift에서 Amazon Redshift는 동시 연결 수를 15로 제한합니다. 자세한 내용은 Amazon RDS Database 개발자 안내서의 대기열에 쿼리 할당을 참조하십시오. |
String |
reportProgressTimeout |
원격 작업에서 이 필드를 설정하면 지정된 기간 동안 진행 상황을 보고하지 않는 원격 활동은 중단된 것으로 간주하고 재시도할 수 있습니다. |
기간 |
retryDelay | 두 번의 재시도 사이의 제한 시간 간격입니다. | 기간 |
scheduleType |
파이프라인에서 객체에 대한 일정 조정을 지정할 수 있습니다. 값은
|
열거 |
transformSql |
입력 데이터를 변환할 때 사용되는
DynamoDB 또는 Amazon S3의 데이터를 복사하면, AWS Data Pipeline 이 "스테이징"이라고 하는 테이블을 생성하고, 처음에는 여기에 데이터를 로드합니다. 이 테이블의 데이터는 대상 테이블을 업데이트할 때 사용됩니다.
|
String |
실행 시간 필드 | 설명 | 슬롯 유형 |
---|---|---|
@activeInstances | 현재 예약되어 있는 활성 인스턴스 객체의 목록입니다. | 참조 객체: "activeInstances":{"ref":"myRunnableObjectId"} |
@actualEndTime | 이 객체의 실행이 완료된 시간입니다. | DateTime |
@actualStartTime | 이 객체의 실행이 시작된 시간입니다. | DateTime |
cancellationReason | 이 객체가 취소된 cancellationReason 경우 | String |
@cascadeFailedOn | 객체 실패가 발생한 종속 체인에 대한 설명입니다. | 참조 객체: "cascadeFailedOn":{"ref":"myRunnableObjectId"} |
emrStepLog | EMR단계 로그는 EMR 활동 시도 시에만 사용할 수 있습니다. | String |
errorId | 이 개체가 실패한 errorId 경우 | String |
errorMessage | 이 개체가 실패한 errorMessage 경우 | String |
errorStackTrace | 이 객체가 실패한 경우의 오류 스택 트레이스입니다. | String |
@finishedTime | 이 객체의 실행이 완료된 시간입니다. | DateTime |
hadoopJobLog | EMR기반 활동 시도 시 Hadoop 작업 로그를 사용할 수 있습니다. | String |
@healthStatus | 종료 상태에 도달한 마지막 객체 인스턴스의 성공 또는 실패를 반영하는 객체의 상태입니다. | String |
@healthStatusFromInstanceId | 종료 상태에 도달한 마지막 인스턴스 객체의 ID입니다. | String |
@ 시간 healthStatusUpdated | 상태가 마지막으로 업데이트된 시간입니다. | DateTime |
hostname | 작업 시도를 선택한 클라이언트의 호스트 이름입니다. | String |
@lastDeactivatedTime | 이 객체가 마지막으로 비활성화된 시간입니다. | DateTime |
@ latestCompletedRun 시간 | 실행이 완료된 최근 실행 시간입니다. | DateTime |
@latestRunTime | 실행이 예약된 최근 실행 시간입니다. | DateTime |
@nextRunTime | 다음으로 예약된 실행 시간입니다. | DateTime |
reportProgressTime | 원격 활동에서 진행 상황을 보고한 가장 최근 시간입니다. | DateTime |
@scheduledEndTime | 객체의 일정 종료 시간. | DateTime |
@scheduledStartTime | 객체의 일정 시작 시간. | DateTime |
@상태 | 이 객체의 상태입니다. | String |
@version | 객체와 함께 생성된 파이프라인 버전입니다. | String |
@waitingOn | 이 객체가 대기 중인 종속 요소 목록에 대한 설명입니다. | 참조 객체: "waitingOn":{"ref":"myRunnableObjectId"} |
시스템 필드 | 설명 | 슬롯 유형 |
---|---|---|
@오류 | 잘못 형성된 객체를 설명하는 오류. | String |
@pipelineId | 이 객체가 속하는 파이프라인의 ID. | String |
@sphere | 객체의 구입니다. 수명 주기상 객체의 위치를 나타냅니다. 예를 들어 구성 요소 객체가 인스턴스 객체를 트리거하고, 인스턴스 객체는 시도 객체를 실행합니다. | String |