本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Step Functions 和 Amazon Redshift 運行ETL/ELT工作流 API
此範例專案示範如何使用 Step Functions 數和 Amazon Redshift 資料API來執行將資料載入 Amazon Redshift 資料倉儲的ETL/ELT工作流程。
在這個項目中,Step Functions 使用 AWS Lambda 函數和 Amazon Redshift Data API 可建立所需的資料庫物件並產生一組範例資料,然後 parallel 執行兩個工作,以執行載入維度表格,後面接著事實資料表。一旦兩個維度載入任務都順利結束,Step Functions 會為事實資料表執行載入任務、執行驗證任務,然後暫停 Amazon Redshift 叢集。
注意
您可以修改ETL邏輯以接收來自其他來源 (例如 Amazon S3) 的資料,這些來源可以使用該COPY命令將資料從 Amazon S3 複製到 Amazon Redshift 表格。
如需 Amazon Redshift 和 Step Functions 服務整合的詳細資訊,請參閱下列內容:
注意
此範例專案可能需要付費。
對於新的 AWS 用戶,免費用量方案可用。在此方案中,特定用量層級以下的服務皆為免費。如需關於 AWS 成本和免費方案,請參閱 AWS Step Functions 定價
步驟 1:建立狀態機
-
開啟 Step Functions 主控台
,然後選擇建立狀態機器。 -
ETL job in Amazon Redshift
在搜尋方塊中輸入,然後選擇ETL工作 Amazon Redshift從返回的搜索結果中。 -
選擇 Next (下一步) 繼續。
-
選擇 [執行示範] 以建立唯讀和 ready-to-deploy 工作流程,或選擇 [在其上建置] 建立可編輯的狀態機定義,您可以在其上建置並稍後進行部署。
此範例專案會部署下列資源:
-
同時 Amazon Redshift 叢集
-
Two Lambda 函數
-
同時 Amazon Redshift 結構描述
-
五 Amazon Redshift 表
-
同時 AWS Step Functions 狀態機器
-
相關 AWS Identity and Access Management (IAM)角色。
您可以瀏覽中工作的ETL工作流程圖表 Amazon Redshift在「Step Functions」主控台中。
-
-
選擇「使用範本」繼續進行選取。
接下來的步驟取決於您之前的選擇:
-
執行示範 — 您可以先檢閱狀態機器,然後再建立唯讀專案,其中包含部署的資源 AWS CloudFormation 到您的 AWS 帳戶.
您可以檢視狀態機器定義,當您準備就緒時,請選擇 [部署並執行] 以部署專案並建立資源。
部署最多可能需要 10 分鐘的時間來建立資源和權限。您可以使用「堆疊 ID」連結來監控進度 AWS CloudFormation.
部署完成後,您應該會在控制台中看到新的狀態機器。
-
建立在其上 — 您可以檢閱和編輯工作流程定義。您可能需要在範例專案中設定預留位置的值,然後才能嘗試執行自訂工作流程。
注意
部署到您帳戶的服務可能需要支付標準費用。
步驟 2:運行狀態機
-
在 [狀態電腦] 頁面上,選擇您的範例專案。
-
在範例專案頁面上,選擇 [開始執行]。
-
在 [開始執行] 對話方塊中,執行下列動作:
-
(選擇性) 輸入自訂執行名稱,以覆寫產生的預設值。
非ASCII名稱和記錄
Step Functions 接受包含非ASCII字元的狀態機器、執行項目、活動和標籤的名稱。由於此類字元不適用於 Amazon CloudWatch,因此我們建議您僅使用ASCII字元,以便在中追蹤指標 CloudWatch。
-
(選擇性) 在「輸入」方塊中,將輸入值輸入為JSON。如果您正在運行演示,則可以跳過此步驟。
-
選擇 Start execution (開始執行)。
「Step Functions」主控台會將您導向「執行詳細資訊」頁面,您可以在其中選擇「圖形」檢視中的狀態,以瀏覽步驟詳情窗格中的相關資訊。
-
狀態機代碼示例
此示例項目中的狀態機集成了 AWS Lambda 將ETL邏輯當做 InputPath 直接傳遞給這些資源,並使用 Amazon Redshift 資料以非同步方式執行。API
瀏覽以下狀態機器定義,以查看 Step Functions 如何控制 AWS Lambda 和 Amazon Redshift 數據API。
有關 Step Functions 如何控制其他 AWS 服務,請參閱整合服務與 Step Functions。
{
"Comment": "An ETL workflow for loading dimension and fact tables",
"StartAt": "InitializeCheckCluster",
"States": {
"InitializeCheckCluster": {
"Type": "Pass",
"Next": "GetStateOfCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterAvailable",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterAvailable": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializeBuildDB"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "InitializeResumeCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "ClusterWait"
}
]
},
"ClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckCluster"
},
"InitializeResumeCluster": {
"Type": "Pass",
"Next": "ResumeCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "resume"
}
}
},
"ResumeCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "ClusterWait",
"InputPath": "$",
"ResultPath": "$"
},
"InitializeBuildDB": {
"Type": "Pass",
"Next": "BuildDB",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "build_database",
"sql_statement": [
"create schema if not exists {0} authorization {1};",
"create table if not exists {0}.customer",
"(c_customer_sk int4 not null encode az64",
",c_customer_id char(16) not null encode zstd",
",c_current_addr_sk int4 encode az64",
",c_first_name char(20) encode zstd",
",c_last_name char(30) encode zstd",
",primary key (c_customer_sk)",
") distkey(c_customer_sk);",
"--",
"create table if not exists {0}.customer_address",
"(ca_address_sk int4 not null encode az64",
",ca_address_id char(16) not null encode zstd",
",ca_state char(2) encode zstd",
",ca_zip char(10) encode zstd",
",ca_country varchar(20) encode zstd",
",primary key (ca_address_sk)",
") distkey(ca_address_sk);",
"--",
"create table if not exists {0}.date_dim",
"(d_date_sk integer not null encode az64",
",d_date_id char(16) not null encode zstd",
",d_date date encode az64",
",d_day_name char(9) encode zstd",
",primary key (d_date_sk)",
") diststyle all;",
"--",
"create table if not exists {0}.item",
"(i_item_sk int4 not null encode az64",
",i_item_id char(16) not null encode zstd",
",i_rec_start_date date encode az64",
",i_rec_end_date date encode az64",
",i_current_price numeric(7,2) encode az64",
",i_category char(50) encode zstd",
",i_product_name char(50) encode zstd",
",primary key (i_item_sk)",
") distkey(i_item_sk) sortkey(i_category);",
"--",
"create table if not exists {0}.store_sales",
"(ss_sold_date_sk int4",
",ss_item_sk int4 not null encode az64",
",ss_customer_sk int4 encode az64",
",ss_addr_sk int4 encode az64",
",ss_store_sk int4 encode az64",
",ss_ticket_number int8 not null encode az64",
",ss_quantity int4 encode az64",
",ss_net_paid numeric(7,2) encode az64",
",ss_net_profit numeric(7,2) encode az64",
",primary key (ss_item_sk, ss_ticket_number)",
") distkey(ss_item_sk) sortkey(ss_sold_date_sk);"
]
}
}
},
"BuildDB": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBuildDBStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetBuildDBStatus": {
"Type": "Task",
"Next": "CheckBuildDBStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBuildDBStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailBuildDB"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "InitializeBaselineData"
}
],
"Default": "BuildDBWait"
},
"BuildDBWait": {
"Type": "Wait",
"Seconds": 15,
"Next": "GetBuildDBStatus"
},
"FailBuildDB": {
"Type": "Fail",
"Cause": "Database Build Failed",
"Error": "Error"
},
"InitializeBaselineData": {
"Type": "Pass",
"Next": "LoadBaselineData",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_baseline_data",
"sql_statement": [
"begin transaction;",
"truncate table {0}.customer;",
"insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)",
"values",
"(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),",
"(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),",
"(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),",
"(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');",
"analyze {0}.customer;",
"--",
"truncate table {0}.customer_address;",
"insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),",
"(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),",
"(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');",
"analyze {0}.customer_address;",
"--",
"truncate table {0}.item;",
"insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),",
"(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),",
"(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),",
"(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),",
"(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),",
"(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');",
"--",
"analyze {0}.item;",
"truncate table {0}.date_dim;",
"insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)",
"values",
"(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),",
"(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),",
"(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),",
"(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),",
"(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),",
"(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),",
"(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),",
"(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),",
"(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');",
"--",
"analyze {0}.date_dim;",
"-- commit and End transaction",
"commit;",
"end transaction;"
]
}
}
},
"LoadBaselineData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBaselineData",
"InputPath": "$",
"ResultPath": "$"
},
"GetBaselineData": {
"Type": "Task",
"Next": "CheckBaselineData",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBaselineData": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailLoadBaselineData"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "ParallelizeDimensionLoadJob"
}
],
"Default": "BaselineDataWait"
},
"BaselineDataWait": {
"Type": "Wait",
"Seconds": 20,
"Next": "GetBaselineData"
},
"FailLoadBaselineData": {
"Type": "Fail",
"Cause": "Load Baseline Data Failed",
"Error": "Error"
},
"ParallelizeDimensionLoadJob": {
"Type": "Parallel",
"Next": "InitializeSalesFactLoadJob",
"ResultPath": "$.status",
"Branches": [
{
"StartAt": "InitializeCustomerAddressDimensionLoadJob",
"States": {
"InitializeCustomerAddressDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteCustomerAddressDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_customer_address",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_customer_address;",
"create table if not exists {0}.stg_customer_address",
"(ca_address_id varchar(16) encode zstd",
",ca_state varchar(2) encode zstd",
",ca_zip varchar(10) encode zstd",
",ca_country varchar(20) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"('AAAAAAAACFBBAAAA','NE','','United States'),",
"('AAAAAAAAGAEFAAAA','NE','61749','United States'),",
"('AAAAAAAAPJKKAAAA','OK','','United States'),",
"('AAAAAAAAMIHGAAAA','AL','','United States');",
"/* Perform UPDATE for existing data with refreshed attribute values */",
"update {0}.customer_address",
" set ca_state = stg_customer_address.ca_state,",
" ca_zip = stg_customer_address.ca_zip,",
" ca_country = stg_customer_address.ca_country",
" from {0}.stg_customer_address",
" where customer_address.ca_address_id = stg_customer_address.ca_address_id;",
"/* Perform insert for new rows */",
"insert into {0}.customer_address",
"(ca_address_sk",
",ca_address_id",
",ca_state",
",ca_zip",
",ca_country",
")",
"with max_customer_address_sk as",
"(select max(ca_address_sk) max_ca_address_sk",
"from {0}.customer_address)",
"select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk",
",stg_customer_address.ca_address_id",
",stg_customer_address.ca_state",
",stg_customer_address.ca_zip",
",stg_customer_address.ca_country",
"from {0}.stg_customer_address,",
"max_customer_address_sk",
"where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteCustomerAddressDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetCustomerAddressDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetCustomerAddressDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckCustomerAddressDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckCustomerAddressDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailCustomerAddressDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteCustomerAddressDimensionLoad"
}
],
"Default": "CustomerAddressWait"
},
"CustomerAddressWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetCustomerAddressDimensionLoadStatus"
},
"CompleteCustomerAddressDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailCustomerAddressDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
},
{
"StartAt": "InitializeItemDimensionLoadJob",
"States": {
"InitializeItemDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteItemDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_item",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_item;",
"create table if not exists {0}.stg_item",
"(i_item_id varchar(16) encode zstd",
",i_rec_start_date date encode zstd",
",i_rec_end_date date encode zstd",
",i_current_price numeric(7,2) encode zstd",
",i_category varchar(50) encode zstd",
",i_product_name varchar(50) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_item",
"(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),",
"('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),",
"('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),",
"('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),",
"('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),",
"('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),",
"('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),",
"('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');",
"/************************************************************************************************************************",
"** Type 2 is maintained for i_current_price column.",
"** Update all attributes for the item when the price is not changed",
"** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match",
"*************************************************************************************************************************/",
"update {0}.item",
" set i_category = stg_item.i_category,",
" i_product_name = stg_item.i_product_name",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price = stg_item.i_current_price;",
"insert into {0}.item",
"(i_item_sk",
",i_item_id",
",i_rec_start_date",
",i_rec_end_date",
",i_current_price",
",i_category",
",i_product_name",
")",
"with max_item_sk as",
"(select max(i_item_sk) max_item_sk",
" from {0}.item)",
"select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk",
" ,stg_item.i_item_id",
" ,trunc(sysdate) as i_rec_start_date",
" ,null as i_rec_end_date",
" ,stg_item.i_current_price",
" ,stg_item.i_category",
" ,stg_item.i_product_name",
" from {0}.stg_item, {0}.item, max_item_sk",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Sunset penultimate records that were inserted as type 2 */",
"update {0}.item",
" set i_rec_end_date = trunc(sysdate)",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteItemDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetItemDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetItemDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckItemDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckItemDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailItemDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteItemDimensionLoad"
}
],
"Default": "ItemWait"
},
"ItemWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetItemDimensionLoadStatus"
},
"CompleteItemDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailItemDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
}
]
},
"InitializeSalesFactLoadJob": {
"Type": "Pass",
"Next": "ExecuteSalesFactLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "load_sales_fact",
"sql_statement": [
"begin transaction;",
"/* Create a stg_store_sales staging table */",
"drop table if exists {0}.stg_store_sales;",
"create table {0}.stg_store_sales",
"(sold_date date encode zstd",
",i_item_id varchar(16) encode zstd",
",c_customer_id varchar(16) encode zstd",
",ca_address_id varchar(16) encode zstd",
",ss_ticket_number integer encode zstd",
",ss_quantity integer encode zstd",
",ss_net_paid numeric(7,2) encode zstd",
",ss_net_profit numeric(7,2) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_store_sales",
"(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)",
"values",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),",
"('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);",
"/* Delete any rows from target store_sales for the input date for idempotency */",
"delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');",
"/* Insert data from staging table to the target table */",
"insert into {0}.store_sales",
"(ss_sold_date_sk",
",ss_item_sk",
",ss_customer_sk",
",ss_addr_sk",
",ss_ticket_number",
",ss_quantity",
",ss_net_paid",
",ss_net_profit",
")",
"select date_dim.d_date_sk ss_sold_date_sk",
" ,item.i_item_sk ss_item_sk",
" ,customer.c_customer_sk ss_customer_sk",
" ,customer_address.ca_address_sk ss_addr_sk",
" ,ss_ticket_number",
" ,ss_quantity",
" ,ss_net_paid",
" ,ss_net_profit",
" from {0}.stg_store_sales as store_sales",
" inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date",
" left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null",
" left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id",
" left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;",
"/* Drop staging table */",
"drop table if exists {0}.stg_store_sales;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteSalesFactLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetSalesFactLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetSalesFactLoadStatus": {
"Type": "Task",
"Next": "CheckSalesFactLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckSalesFactLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesFactLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "SalesETLPipelineComplete"
}
],
"Default": "SalesWait"
},
"SalesWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetSalesFactLoadStatus"
},
"FailSalesFactLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
},
"ClusterUnavailable": {
"Type": "Fail",
"Cause": "Redshift cluster is not available",
"Error": "Error"
},
"SalesETLPipelineComplete": {
"Type": "Pass",
"Next": "ValidateSalesMetric",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "validate_sales_metric",
"sql_statement": [
"select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')"
]
}
}
},
"ValidateSalesMetric": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetValidateSalesMetricStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetValidateSalesMetricStatus": {
"Type": "Task",
"Next": "CheckValidateSalesMetricStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckValidateSalesMetricStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesMetricValidation"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "DataValidationComplete"
}
],
"Default": "SalesValidationWait"
},
"SalesValidationWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetValidateSalesMetricStatus"
},
"FailSalesMetricValidation": {
"Type": "Fail",
"Cause": "Data Validation Failed",
"Error": "Error"
},
"DataValidationComplete": {
"Type": "Pass",
"Next": "InitializePauseCluster"
},
"InitializePauseCluster": {
"Type": "Pass",
"Next": "PauseCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "pause"
}
}
},
"PauseCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "PauseClusterWait",
"InputPath": "$",
"ResultPath": "$.clusterStatus",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "ClusterPausedComplete"
}
]
},
"InitializeCheckPauseCluster": {
"Type": "Pass",
"Next": "GetStateOfPausedCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfPausedCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterPaused",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterPaused": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializePauseCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "ClusterPausedComplete"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "PauseClusterWait"
}
]
},
"PauseClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckPauseCluster"
},
"ClusterPausedComplete": {
"Type": "Pass",
"End": true
}
}
}
範例IAM政策
下面的例子 AWS Identity and Access Management (IAM) 範例專案產生的原則包含執行狀態機器及相關資源所需的最低權限。我們建議您僅在IAM原則中加入必要的權限。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE"
],
"Effect": "Allow"
}
]
}
有關如何在使用步驟函數與其他功能IAM時進行配置的資訊 AWS 服務,請參閱Step Functions 式如何為整合式服務產生IAM原則。