PigActivity - AWS Data Pipeline

AWS Data Pipeline 신규 고객은 더 이상 이용할 수 없습니다. 의 기존 고객 AWS Data Pipeline 정상적으로 서비스를 계속 사용할 수 있습니다. 자세히 알아보기

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

PigActivity

PigActivity ShellCommandActivity또는 를 사용할 AWS Data Pipeline 필요 없이 Pig 스크립트에 대한 기본 지원을 제공합니다EmrActivity. 또한 데이터 스테이징도 PigActivity 지원합니다. 스테이지 필드가 true로 설정되면 AWS Data Pipeline 에서는 사용자의 추가 코드 없이 입력 데이터가 Pig의 스키마로 스테이징됩니다.

다음 예제 파이프라인은 PigActivity를 사용하는 방법을 보여줍니다. 이 예제 파이프라인은 다음 단계를 수행합니다.

  • MyPigActivity1은 Amazon S3에서 데이터를 로드하고 몇 개의 데이터 열을 선택하여 Amazon S3에 업로드하는 Pig 스크립트를 실행합니다.

  • MyPigActivity2는 첫 번째 출력을 로드하고 몇 개의 열과 세 개의 데이터 행을 선택하여 Amazon S3에 두 번째 출력으로 업로드합니다.

  • MyPigActivity3은 두 번째 출력 데이터를 로드하고, 두 행의 데이터를 삽입하고, 이름이 “다섯 번째”인 열만 Amazon에 RDS 삽입합니다.

  • MyPigActivity4는 Amazon RDS 데이터를 로드하고 데이터의 첫 번째 행을 선택하여 Amazon S3에 업로드합니다.

{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance:3306/example-database", "selectQuery": "select * from #{table}", "table": "example-table-name", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path", "scriptUri": "s3://example-bucket/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }

pigTestScript.q의 내용은 다음과 같습니다.

B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;

구문

액체 호출 필드 설명 슬롯 유형
schedule 이 객체는 예약 간격을 실행할 때 호출됩니다. 이 객체의 종속 실행 순서를 설정하려면 사용자가 다른 객체로 일정 참조를 지정해야 합니다. 사용자는 객체에 일정을 명시적으로 설정하여 이 요구 사항을 충족할 수 있습니다 (예: “schedule”: {"ref”: ""} 지정). DefaultSchedule 대부분의 경우에서는 모든 객체가 상속할 수 있도록 일정 참조를 기본 파이프라인 객체에 두는 것이 좋습니다. 또는 파이프라인에 일정 트리가 있는 경우(마스터 일정 안의 일정) 사용자가 일정 참조가 있는 부모 객체를 생성할 수 있습니다. 선택형 일정 구성 예제에 대한 자세한 내용은 https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html 단원을 참조하십시오. 참조 개체 (예: “스케줄”: {"ref”:” “} myScheduleId

필수 그룹(다음 중 하나를 제공해야 함) 설명 슬롯 유형
스크립트 실행할 Pig 스크립트입니다. String
scriptUri 실행할 Pig 스크립트의 위치 (예: s3://scriptLocation) String

필수 그룹(다음 중 하나를 제공해야 함) 설명 슬롯 유형
runsOn EMR이 스크립트가 PigActivity 실행되는 클러스터. 참조 객체 (예: "runsOn“: {" ref”:” myEmrCluster Id "}
workerGroup 작업자 그룹입니다. 이것은 작업 라우팅에 사용됩니다. workerGroup이 있을 때 runsOn 값을 제공하면 workerGroup이 무시됩니다. String

선택 필드 설명 슬롯 유형
attemptStatus 원격 활동에서 가장 최근에 보고한 상태입니다. String
attemptTimeout 원격 작업 완료의 제한 시간입니다. 이 필드를 설정하면 설정된 시작 시간 이내에 완료되지 않는 원격 활동을 재시도할 수 있습니다. 기간
dependsOn 실행 가능한 다른 객체의 종속성을 지정합니다. 참조 객체 (예: "dependsOn“: {" ref”:” myActivityId “}
failureAndRerun모드 종속 요소에 장애가 있거나 재시도될 때의 소비자 노드 거동을 설명합니다. 열거
입력 입력 데이터 소스입니다. 참조 객체 (예: “입력”: {"ref”:” myDataNode Id "}
lateAfterTimeout 파이프라인 시작 후 객체가 완료되어야 하는 경과 시간입니다. 스케줄 유형이 ondemand(으)로 설정되지 않은 경우에만 트리거됩니다. 기간
maxActiveInstances 구성요소의 동시 활성 인스턴스 최대수입니다. 재실행은 활성 인스턴스의 수에 포함되지 않습니다. Integer
maximumRetries 장애 시 재시도 최대 횟수. Integer
onFail 현재 객체 장애 시 실행할 작업입니다. 참조 객체 (예: "onFail“: {" ref”:” myActionId “}
onLateAction 객체가 아직 예약되지 않았거나 아직 완료되지 않은 경우에 트리거되어야 하는 작업입니다. 참조 객체 (예: "onLateAction“: {" ref”:” myActionId “}
onSuccess 현재 객체 성공 시 실행할 작업입니다. 참조 객체 (예: "onSuccess“: {" ref”:” myActionId “}
output 출력 데이터 소스입니다. 참조 객체 (예: “출력”: {"ref”:” myDataNode Id "}
parent 슬롯을 상속할 현재 객체의 부모입니다. 참조 객체 (예: “부모”: {"ref”:” myBaseObject Id "}
pipelineLogUri 파이프라인에 대한 로그를 업로드하기 위한 Amazon S3 URI (예: 's3://BucketName/Key/ ') String
postActivityTaskConfig 실행할 사후 활동 구성 스크립트입니다. 이는 Amazon URI S33의 셸 스크립트와 인수 목록으로 구성됩니다. 참조 객체 (예: "postActivityTaskConfig”: {"ref”:” “} myShellScript ConfigId
preActivityTaskConfig 실행할 사전 활동 구성 스크립트입니다. 이는 Amazon S3의 셸 스크립트와 인수 목록으로 구성됩니다. URI 참조 객체 (예: "preActivityTaskConfig”: {"ref”:” “} myShellScript ConfigId
precondition 또는 사전 조건을 정의합니다. 모든 사전 조건이 충족될 때까지 데이터 노드는 READY "“로 표시되지 않습니다. 참조 객체 (예: “전제 조건”: {"ref”:” “} myPreconditionId
reportProgressTimeout 원격 작업에서 reportProgress를 연속으로 호출하는 제한 시간입니다. 이 필드를 설정하면 지정된 기간 동안 진행 상황을 보고하지 않는 원격 활동은 중단된 것으로 간주하고 재시도할 수 있습니다. 기간
resizeClusterBefore실행 중 이 활동을 수행하기 전에 입력 또는 출력으로 지정된 DynamoDB 데이터 노드가 포함되도록 클러스터 크기를 조정합니다.
참고

활동에서 a를 DynamoDBDataNode 입력 또는 출력 데이터 노드로 사용하고 를 resizeClusterBeforeRunningTRUE 설정하면 m3.xlarge 인스턴스 유형을 사용하기 AWS Data Pipeline 시작합니다. 그러면 해당 인스턴스 유형 선택을 m3.xlarge가 덮어써서 월 요금이 증가할 수 있습니다.

resizeClusterMax인스턴스 크기 조정 알고리즘으로 요청할 수 있는 인스턴스의 최대 수에 대한 제한입니다. Integer
retryDelay 두 번의 재시도 사이의 제한 시간 간격입니다. 기간
scheduleType 일정 유형을 사용하여 파이프라인 정의에 있는 객체의 일정을 간격 시작 시점으로 또는 종료 시점으로 지정할 수 있습니다. 시계열 스타일 일정 조정은 각 간격이 종료될 때 인스턴스 일정이 지정되고 Cron 스타일 일정 조정은 각 간격이 시작될 때 인스턴스 일정이 지정됩니다. 온디맨드 일정을 사용하면 파이프라인을 활성화될 때 한 번씩 실행할 수 있습니다. 이 경우 다시 실행하기 위해 파이프라인을 복제하거나 다시 생성할 필요가 없습니다. 온디맨드 일정을 사용하는 경우 기본 개체에 지정해야 하며 파이프라인의 개체에만 scheduleType 지정되어야 합니다. 온디맨드 파이프라인을 사용하려면 이후 실행할 때마다 ActivatePipeline 작업을 호출하면 됩니다. 값은 cron, ondemand 및 timeseries입니다. 열거
scriptVariable Pig 스크립트에 전달할 인수입니다. 스크립트 또는 scriptVariable 와 함께 사용할 수 있습니다. scriptUri String
stage 스테이징 활성화 여부를 결정하고 Pig 스크립트가 $ {} 및 $ {INPUT1} 와 같은 스테이징 데이터 테이블에 액세스할 수 있도록 합니다. OUTPUT1

실행 시간 필드 설명 슬롯 유형
@activeInstances 현재 예약되어 있는 활성 인스턴스 객체의 목록입니다. 참조 객체 (예: "activeInstances“: {" ref”:” Id "} myRunnableObject
@actualEndTime 이 객체의 실행이 완료된 시간입니다. DateTime
@actualStartTime 이 객체의 실행이 시작된 시간입니다. DateTime
cancellationReason 이 개체가 취소된 cancellationReason 경우 String
@cascadeFailedOn 객체 실패가 발생한 종속 체인에 대한 설명입니다. 참조 객체 (예: "cascadeFailedOn“: {" ref”:” myRunnableObject Id "}
emrStepLog Amazon EMR 단계 로그는 EMR 활동 시도에서만 사용할 수 있습니다. String
errorId 이 객체가 실패한 errorId 경우 String
errorMessage 이 개체가 실패한 errorMessage 경우 String
errorStackTrace 이 객체가 실패한 경우의 오류 스택 트레이스입니다. String
@finishedTime 이 객체의 실행이 완료된 시간입니다. DateTime
hadoopJobLog EMR기반 활동 시도 시 Hadoop 작업 로그를 사용할 수 있습니다. String
@healthStatus 종료 상태에 도달한 마지막 객체 인스턴스의 성공 또는 실패를 반영하는 객체의 상태입니다. String
@healthStatusFromInstanceId 종료 상태에 도달한 마지막 인스턴스 객체의 ID입니다. String
@ 시간 healthStatusUpdated 상태가 마지막으로 업데이트된 시간입니다. DateTime
hostname 작업 시도를 선택한 클라이언트의 호스트 이름입니다. String
@lastDeactivatedTime 이 객체가 마지막으로 비활성화된 시간입니다. DateTime
@ latestCompletedRun 시간 실행이 완료된 최근 실행 시간입니다. DateTime
@latestRunTime 실행이 예약된 최근 실행 시간입니다. DateTime
@nextRunTime 다음으로 예약된 실행 시간입니다. DateTime
reportProgressTime 원격 활동에서 진행 상황을 보고한 가장 최근 시간입니다. DateTime
@scheduledEndTime 객체의 일정 종료 시간. DateTime
@scheduledStartTime 객체의 일정 시작 시간. DateTime
@상태 이 객체의 상태입니다. String
@version 객체와 함께 생성된 파이프라인 버전입니다. String
@waitingOn 이 객체가 대기 중인 종속 요소 목록에 대한 설명입니다. 참조 객체 (예: "waitingOn“: {" ref”:” myRunnableObject Id "}

시스템 필드 설명 슬롯 유형
@오류 잘못 형성된 객체를 설명하는 오류. String
@pipelineId 이 객체가 속하는 파이프라인의 ID. String
@sphere 객체의 타원 무늬는 수명 주기 내 위치를 나타냅니다. Component Objects는 Attempt Objects를 실행하는 Instance Objects를 야기합니다. String

참고