기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Step Functions와 Amazon Redshift를 사용하여ETL/ELT워크플로를 실행합니다. API
이 샘플 프로젝트는 Step Functions와 Amazon Redshift 데이터를 사용하여 Amazon Redshift API 데이터 웨어하우스로 데이터를 로드하는ELT/워크플로를 실행하는 ETL 방법을 보여줍니다.
이 프로젝트에서 Step Functions는 다음을 사용합니다. AWS Lambda 함수와 Amazon Redshift Data를 사용하여 필요한 데이터베이스 객체를 생성하고 예제 데이터 세트를 생성한 다음 차원 테이블을 로드한 다음 팩트 테이블을 로드하는 두 개의 작업을 병렬로 실행합니다. API 두 차원 로드 작업 모두 성공적으로 종료되면 Step Functions는 팩트 테이블에 대한 로드 작업을 실행하고 검증 작업을 실행한 다음 Amazon Redshift 클러스터를 일시 중지합니다.
참고
Amazon S3와 같은 다른 소스에서 데이터를 수신하도록 ETL 로직을 수정할 수 있습니다. Amazon S3는 COPY명령을 사용하여 Amazon S3에서 Amazon Redshift 테이블로 데이터를 복사할 수 있습니다.
Amazon Redshift 및 Step Functions 서비스 통합에 대한 자세한 내용은 다음을 참조하세요.
참고
이 샘플 프로젝트를 사용할 때 요금이 발생할 수 있습니다.
새 제품인 경우 AWS 사용자에게는 프리 티어가 제공됩니다. 이 계층에서 특정 사용 수준 미만의 서비스는 무료입니다. 에 대한 자세한 내용은 AWS 비용 및 프리 티어는 다음을 참조하십시오. AWS Step Functions 가격
1단계: 상태 시스템 만들기
-
Step Functions 콘솔
을 열고 상태 시스템 생성을 선택합니다. -
검색 상자에 입력한 다음 ETL 채용정보를 선택합니다.
ETL job in Amazon Redshift
Amazon Redshift반환된 검색 결과에서 -
다음을 선택하여 계속 진행합니다.
-
데모 실행을 선택하여 읽기 전용 및 ready-to-deploy 워크플로를 생성하거나 Build on it를 선택하여 빌드하고 나중에 배포할 수 있는 편집 가능한 상태 머신 정의를 생성합니다.
이 샘플 프로젝트는 다음 리소스를 배포합니다.
-
원래 요청 ping에 대한 Amazon Redshift cluster
-
2 Lambda 함수
-
원래 요청 ping에 대한 Amazon Redshift 스키마
-
5 Amazon Redshift 테이블
-
원래 요청 ping에 대한 AWS Step Functions 상태 시스템
-
관련 AWS Identity and Access Management (IAM) 역할.
에서 ETL 작업에 대한 워크플로 그래프를 탐색할 수 있습니다.Amazon RedshiftStep Functions 콘솔에서
-
-
템플릿 사용을 선택하여 계속 선택합니다.
다음 단계는 이전 선택에 따라 달라집니다.
-
데모 실행 — 에서 배포한 리소스로 읽기 전용 프로젝트를 만들기 전에 상태 머신을 검토할 수 있습니다. AWS CloudFormation 다음 주소로 AWS 계정.
상태 머신 정의를 볼 수 있으며 준비가 되면 Deploy and run을 선택하여 프로젝트를 배포하고 리소스를 생성할 수 있습니다.
배포하는 데 리소스 및 권한을 생성하는 데 최대 10분이 걸릴 수 있습니다. Stack ID 링크를 사용하여 진행 상황을 모니터링할 수 있습니다. AWS CloudFormation.
배포가 완료되면 콘솔에서 새 상태 머신을 확인할 수 있습니다.
-
기반 구축 — 워크플로 정의를 검토하고 편집할 수 있습니다. 사용자 지정 워크플로를 실행하기 전에 샘플 프로젝트에서 자리 표시자의 값을 설정해야 할 수도 있습니다.
참고
계정에 배포된 서비스에는 표준 요금이 적용될 수 있습니다.
2단계: 상태 시스템 실행
-
상태 시스템 페이지에서 샘플 프로젝트를 선택합니다.
-
샘플 프로젝트 페이지에서 실행 시작을 선택합니다.
-
실행 시작 대화 상자에서 다음을 수행합니다.
-
(선택 사항) 생성된 기본값을 재정의하려면 사용자 지정 실행 이름을 입력합니다.
ASCII비이름 및 로깅
Step Functions는 ASCII -자가 아닌 문자를 포함하는 상태 머신, 실행, 액티비티 및 레이블의 이름을 허용합니다. CloudWatchAmazon에서는 이러한 문자를 사용할 수 없으므로 지표를 추적할 수 있도록 ASCII 문자만 사용하는 것이 좋습니다 CloudWatch.
-
(선택 사항) 입력 상자에 입력 값을 로 입력합니다JSON. 데모를 실행 중인 경우 이 단계를 건너뛸 수 있습니다.
-
실행 시작을 선택합니다.
Step Functions 콘솔은 그래프 보기에서 상태를 선택하여 단계 세부 정보 창에서 관련 정보를 탐색할 수 있는 실행 세부 정보 페이지로 안내합니다.
-
예제 상태 시스템 코드
이 예제 프로젝트의 상태 머신은 다음과 통합됩니다. AWS Lambda ETL로직을 해당 리소스에 InputPath 직접 전달하고 Amazon Redshift Data를 사용하여 비동기적으로 실행합니다. API
다음 스테이트 머신 정의를 살펴보고 Step Functions가 어떻게 제어되는지 살펴보세요. AWS Lambda 그리고 아마존 Redshift 데이터. API
Step Functions가 다른 사람을 제어하는 방법에 대한 자세한 내용은 AWS 서비스는 을 참조하십시오Step Functions와 서비스 통합.
{
"Comment": "An ETL workflow for loading dimension and fact tables",
"StartAt": "InitializeCheckCluster",
"States": {
"InitializeCheckCluster": {
"Type": "Pass",
"Next": "GetStateOfCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterAvailable",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterAvailable": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializeBuildDB"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "InitializeResumeCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "ClusterWait"
}
]
},
"ClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckCluster"
},
"InitializeResumeCluster": {
"Type": "Pass",
"Next": "ResumeCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "resume"
}
}
},
"ResumeCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "ClusterWait",
"InputPath": "$",
"ResultPath": "$"
},
"InitializeBuildDB": {
"Type": "Pass",
"Next": "BuildDB",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "build_database",
"sql_statement": [
"create schema if not exists {0} authorization {1};",
"create table if not exists {0}.customer",
"(c_customer_sk int4 not null encode az64",
",c_customer_id char(16) not null encode zstd",
",c_current_addr_sk int4 encode az64",
",c_first_name char(20) encode zstd",
",c_last_name char(30) encode zstd",
",primary key (c_customer_sk)",
") distkey(c_customer_sk);",
"--",
"create table if not exists {0}.customer_address",
"(ca_address_sk int4 not null encode az64",
",ca_address_id char(16) not null encode zstd",
",ca_state char(2) encode zstd",
",ca_zip char(10) encode zstd",
",ca_country varchar(20) encode zstd",
",primary key (ca_address_sk)",
") distkey(ca_address_sk);",
"--",
"create table if not exists {0}.date_dim",
"(d_date_sk integer not null encode az64",
",d_date_id char(16) not null encode zstd",
",d_date date encode az64",
",d_day_name char(9) encode zstd",
",primary key (d_date_sk)",
") diststyle all;",
"--",
"create table if not exists {0}.item",
"(i_item_sk int4 not null encode az64",
",i_item_id char(16) not null encode zstd",
",i_rec_start_date date encode az64",
",i_rec_end_date date encode az64",
",i_current_price numeric(7,2) encode az64",
",i_category char(50) encode zstd",
",i_product_name char(50) encode zstd",
",primary key (i_item_sk)",
") distkey(i_item_sk) sortkey(i_category);",
"--",
"create table if not exists {0}.store_sales",
"(ss_sold_date_sk int4",
",ss_item_sk int4 not null encode az64",
",ss_customer_sk int4 encode az64",
",ss_addr_sk int4 encode az64",
",ss_store_sk int4 encode az64",
",ss_ticket_number int8 not null encode az64",
",ss_quantity int4 encode az64",
",ss_net_paid numeric(7,2) encode az64",
",ss_net_profit numeric(7,2) encode az64",
",primary key (ss_item_sk, ss_ticket_number)",
") distkey(ss_item_sk) sortkey(ss_sold_date_sk);"
]
}
}
},
"BuildDB": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBuildDBStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetBuildDBStatus": {
"Type": "Task",
"Next": "CheckBuildDBStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBuildDBStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailBuildDB"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "InitializeBaselineData"
}
],
"Default": "BuildDBWait"
},
"BuildDBWait": {
"Type": "Wait",
"Seconds": 15,
"Next": "GetBuildDBStatus"
},
"FailBuildDB": {
"Type": "Fail",
"Cause": "Database Build Failed",
"Error": "Error"
},
"InitializeBaselineData": {
"Type": "Pass",
"Next": "LoadBaselineData",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_baseline_data",
"sql_statement": [
"begin transaction;",
"truncate table {0}.customer;",
"insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)",
"values",
"(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),",
"(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),",
"(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),",
"(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');",
"analyze {0}.customer;",
"--",
"truncate table {0}.customer_address;",
"insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),",
"(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),",
"(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');",
"analyze {0}.customer_address;",
"--",
"truncate table {0}.item;",
"insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),",
"(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),",
"(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),",
"(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),",
"(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),",
"(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');",
"--",
"analyze {0}.item;",
"truncate table {0}.date_dim;",
"insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)",
"values",
"(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),",
"(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),",
"(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),",
"(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),",
"(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),",
"(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),",
"(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),",
"(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),",
"(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');",
"--",
"analyze {0}.date_dim;",
"-- commit and End transaction",
"commit;",
"end transaction;"
]
}
}
},
"LoadBaselineData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBaselineData",
"InputPath": "$",
"ResultPath": "$"
},
"GetBaselineData": {
"Type": "Task",
"Next": "CheckBaselineData",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBaselineData": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailLoadBaselineData"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "ParallelizeDimensionLoadJob"
}
],
"Default": "BaselineDataWait"
},
"BaselineDataWait": {
"Type": "Wait",
"Seconds": 20,
"Next": "GetBaselineData"
},
"FailLoadBaselineData": {
"Type": "Fail",
"Cause": "Load Baseline Data Failed",
"Error": "Error"
},
"ParallelizeDimensionLoadJob": {
"Type": "Parallel",
"Next": "InitializeSalesFactLoadJob",
"ResultPath": "$.status",
"Branches": [
{
"StartAt": "InitializeCustomerAddressDimensionLoadJob",
"States": {
"InitializeCustomerAddressDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteCustomerAddressDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_customer_address",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_customer_address;",
"create table if not exists {0}.stg_customer_address",
"(ca_address_id varchar(16) encode zstd",
",ca_state varchar(2) encode zstd",
",ca_zip varchar(10) encode zstd",
",ca_country varchar(20) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"('AAAAAAAACFBBAAAA','NE','','United States'),",
"('AAAAAAAAGAEFAAAA','NE','61749','United States'),",
"('AAAAAAAAPJKKAAAA','OK','','United States'),",
"('AAAAAAAAMIHGAAAA','AL','','United States');",
"/* Perform UPDATE for existing data with refreshed attribute values */",
"update {0}.customer_address",
" set ca_state = stg_customer_address.ca_state,",
" ca_zip = stg_customer_address.ca_zip,",
" ca_country = stg_customer_address.ca_country",
" from {0}.stg_customer_address",
" where customer_address.ca_address_id = stg_customer_address.ca_address_id;",
"/* Perform insert for new rows */",
"insert into {0}.customer_address",
"(ca_address_sk",
",ca_address_id",
",ca_state",
",ca_zip",
",ca_country",
")",
"with max_customer_address_sk as",
"(select max(ca_address_sk) max_ca_address_sk",
"from {0}.customer_address)",
"select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk",
",stg_customer_address.ca_address_id",
",stg_customer_address.ca_state",
",stg_customer_address.ca_zip",
",stg_customer_address.ca_country",
"from {0}.stg_customer_address,",
"max_customer_address_sk",
"where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteCustomerAddressDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetCustomerAddressDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetCustomerAddressDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckCustomerAddressDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckCustomerAddressDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailCustomerAddressDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteCustomerAddressDimensionLoad"
}
],
"Default": "CustomerAddressWait"
},
"CustomerAddressWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetCustomerAddressDimensionLoadStatus"
},
"CompleteCustomerAddressDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailCustomerAddressDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
},
{
"StartAt": "InitializeItemDimensionLoadJob",
"States": {
"InitializeItemDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteItemDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_item",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_item;",
"create table if not exists {0}.stg_item",
"(i_item_id varchar(16) encode zstd",
",i_rec_start_date date encode zstd",
",i_rec_end_date date encode zstd",
",i_current_price numeric(7,2) encode zstd",
",i_category varchar(50) encode zstd",
",i_product_name varchar(50) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_item",
"(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),",
"('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),",
"('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),",
"('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),",
"('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),",
"('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),",
"('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),",
"('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');",
"/************************************************************************************************************************",
"** Type 2 is maintained for i_current_price column.",
"** Update all attributes for the item when the price is not changed",
"** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match",
"*************************************************************************************************************************/",
"update {0}.item",
" set i_category = stg_item.i_category,",
" i_product_name = stg_item.i_product_name",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price = stg_item.i_current_price;",
"insert into {0}.item",
"(i_item_sk",
",i_item_id",
",i_rec_start_date",
",i_rec_end_date",
",i_current_price",
",i_category",
",i_product_name",
")",
"with max_item_sk as",
"(select max(i_item_sk) max_item_sk",
" from {0}.item)",
"select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk",
" ,stg_item.i_item_id",
" ,trunc(sysdate) as i_rec_start_date",
" ,null as i_rec_end_date",
" ,stg_item.i_current_price",
" ,stg_item.i_category",
" ,stg_item.i_product_name",
" from {0}.stg_item, {0}.item, max_item_sk",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Sunset penultimate records that were inserted as type 2 */",
"update {0}.item",
" set i_rec_end_date = trunc(sysdate)",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteItemDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetItemDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetItemDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckItemDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckItemDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailItemDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteItemDimensionLoad"
}
],
"Default": "ItemWait"
},
"ItemWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetItemDimensionLoadStatus"
},
"CompleteItemDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailItemDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
}
]
},
"InitializeSalesFactLoadJob": {
"Type": "Pass",
"Next": "ExecuteSalesFactLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "load_sales_fact",
"sql_statement": [
"begin transaction;",
"/* Create a stg_store_sales staging table */",
"drop table if exists {0}.stg_store_sales;",
"create table {0}.stg_store_sales",
"(sold_date date encode zstd",
",i_item_id varchar(16) encode zstd",
",c_customer_id varchar(16) encode zstd",
",ca_address_id varchar(16) encode zstd",
",ss_ticket_number integer encode zstd",
",ss_quantity integer encode zstd",
",ss_net_paid numeric(7,2) encode zstd",
",ss_net_profit numeric(7,2) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_store_sales",
"(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)",
"values",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),",
"('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);",
"/* Delete any rows from target store_sales for the input date for idempotency */",
"delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');",
"/* Insert data from staging table to the target table */",
"insert into {0}.store_sales",
"(ss_sold_date_sk",
",ss_item_sk",
",ss_customer_sk",
",ss_addr_sk",
",ss_ticket_number",
",ss_quantity",
",ss_net_paid",
",ss_net_profit",
")",
"select date_dim.d_date_sk ss_sold_date_sk",
" ,item.i_item_sk ss_item_sk",
" ,customer.c_customer_sk ss_customer_sk",
" ,customer_address.ca_address_sk ss_addr_sk",
" ,ss_ticket_number",
" ,ss_quantity",
" ,ss_net_paid",
" ,ss_net_profit",
" from {0}.stg_store_sales as store_sales",
" inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date",
" left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null",
" left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id",
" left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;",
"/* Drop staging table */",
"drop table if exists {0}.stg_store_sales;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteSalesFactLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetSalesFactLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetSalesFactLoadStatus": {
"Type": "Task",
"Next": "CheckSalesFactLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckSalesFactLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesFactLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "SalesETLPipelineComplete"
}
],
"Default": "SalesWait"
},
"SalesWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetSalesFactLoadStatus"
},
"FailSalesFactLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
},
"ClusterUnavailable": {
"Type": "Fail",
"Cause": "Redshift cluster is not available",
"Error": "Error"
},
"SalesETLPipelineComplete": {
"Type": "Pass",
"Next": "ValidateSalesMetric",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "validate_sales_metric",
"sql_statement": [
"select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')"
]
}
}
},
"ValidateSalesMetric": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetValidateSalesMetricStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetValidateSalesMetricStatus": {
"Type": "Task",
"Next": "CheckValidateSalesMetricStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckValidateSalesMetricStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesMetricValidation"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "DataValidationComplete"
}
],
"Default": "SalesValidationWait"
},
"SalesValidationWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetValidateSalesMetricStatus"
},
"FailSalesMetricValidation": {
"Type": "Fail",
"Cause": "Data Validation Failed",
"Error": "Error"
},
"DataValidationComplete": {
"Type": "Pass",
"Next": "InitializePauseCluster"
},
"InitializePauseCluster": {
"Type": "Pass",
"Next": "PauseCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "pause"
}
}
},
"PauseCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "PauseClusterWait",
"InputPath": "$",
"ResultPath": "$.clusterStatus",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "ClusterPausedComplete"
}
]
},
"InitializeCheckPauseCluster": {
"Type": "Pass",
"Next": "GetStateOfPausedCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfPausedCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterPaused",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterPaused": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializePauseCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "ClusterPausedComplete"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "PauseClusterWait"
}
]
},
"PauseClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckPauseCluster"
},
"ClusterPausedComplete": {
"Type": "Pass",
"End": true
}
}
}
IAM정책 예시
다음 예시는 AWS Identity and Access Management 예제 프로젝트에서 생성된 (IAM) 정책에는 상태 머신 및 관련 리소스를 실행하는 데 필요한 최소 권한이 포함됩니다. IAM정책에 필요한 권한만 포함하는 것이 좋습니다.
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE"
],
"Effect": "Allow"
}
]
}
Step Functions를 다른 기능과 함께 사용할 IAM 때 구성하는 방법에 대한 자세한 내용은 AWS 서비스는 을 참조하십시오Step Functions가 통합 서비스를 위한 IAM 정책을 생성하는 방법.