翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Step Functions と Amazon Redshift を使用して ETL/ELT ワークフローを実行する API
このサンプルプロジェクトでは、Step Functions と Amazon Redshift Data を使用して、Amazon Redshift データウェアハウスにデータをロードする ETL/ELT ワークフローAPIを実行する方法を示します。
このプロジェクトでは、Step Functions は を使用します。 AWS Lambda 関数と Amazon Redshift Data APIを使用して、必要なデータベースオブジェクトを作成し、一連のサンプルデータを生成し、ディメンションテーブルのロードを実行する 2 つのジョブを並行して実行し、その後にファクトテーブルを実行します。両方のディメンションロードジョブが正常に終了すると、Step Functions はファクトテーブルのロードジョブを実行し、検証ジョブを実行し、Amazon Redshift クラスターを一時停止します。
注記
Amazon S3 などの他のソースからデータを受信するようにETLロジックを変更できます。このロジックでは、 COPY コマンドを使用して Amazon S3 から Amazon Redshift テーブルにデータをコピーできます。 Amazon S3
Amazon Redshift とStep Functions サービスの統合の詳細については、以下を参照してください。
注記
このサンプルプロジェクトでは、料金が発生する場合があります。
新しい の場合 AWS ユーザーには、無料利用枠があります。この枠では、サービスを利用しても一定のレベル以下であれば無料です。の詳細については、「」を参照してください。 AWS のコストと無料利用枠については、「」を参照してください。 AWS Step Functions の料金
ステップ 1: ステートマシンを作成する
-
Step Functions コンソール
を開き、[ステートマシンの作成] を選択します。 -
ETL job in Amazon Redshift
検索ボックスに「」と入力し、「」でETLジョブを選択します。Amazon Redshift 返される検索結果から。 -
[次へ] を選択して続行します。
-
デモの実行 を選択して読み取り専用と ready-to-deploy ワークフローを作成するか、ビルド を選択して編集可能なステートマシン定義を作成し、構築後にデプロイできます。
このサンプルプロジェクトは、以下のリソースをデプロイします。
-
An Amazon Redshift クラスター
-
2 Lambda 関数
-
An Amazon Redshift スキーマ
-
Five Amazon Redshift テーブル
-
An AWS Step Functions ステートマシン
-
関連 AWS Identity and Access Management (IAM) ロール。
でETLジョブのワークフローグラフを調べることができます。Amazon Redshift Step Functions コンソールの 。
-
-
[テンプレートの使用] を選択して選択を続行します。
次のステップは、前の選択肢によって異なります。
-
デモの実行 – によってデプロイされたリソースを使用して読み取り専用プロジェクトを作成する前に、ステートマシンを確認できます。 AWS CloudFormation を に AWS アカウント.
ステートマシンの定義を表示でき、準備ができたら、デプロイと実行を選択してプロジェクトをデプロイし、リソースを作成します。
デプロイでは、リソースとアクセス許可の作成に最大 10 分かかる場合があります。スタック ID リンクを使用して、 の進行状況をモニタリングできます。 AWS CloudFormation.
デプロイが完了すると、コンソールに新しいステートマシンが表示されます。
-
構築 – ワークフロー定義を確認して編集できます。カスタムワークフローの実行を試みる前に、サンプルプロジェクトのプレースホルダーの値を設定する必要がある場合があります。
注記
アカウントにデプロイされたサービスには、標準料金が適用される場合があります。
ステップ 2: ステートマシンを実行する
-
[ステートマシン] ページで、サンプルプロジェクトを選択します。
-
サンプルプロジェクトページで、[実行を開始] を選択します。
-
[実行を開始] ダイアログボックスで、以下の操作を行います。
-
(オプション) 生成されたデフォルトを上書きするカスタム実行名を入力します。
非ASCII名前とログ記録
Step Functions は、 ASCII以外の文字を含むステートマシン、実行、アクティビティ、およびラベルの名前を受け入れます。このような文字は Amazon では機能しないため CloudWatch、 でメトリクスを追跡できるようにASCII、文字のみを使用することをお勧めします CloudWatch。
-
(オプション) 入力ボックスに、入力値を として入力しますJSON。デモを実行している場合は、このステップをスキップできます。
-
[実行のスタート] を選択します。
Step Functions コンソールで実行の詳細ページに移動し、グラフビューで状態を選択してペインの関連情報を確認できますステップの詳細。
-
ステートマシンのコード例
このサンプルプロジェクトのステートマシンは、 と統合されます。 AWS Lambda ロジックを ETL としてそれらのリソース InputPath に直接渡し、Amazon Redshift Data を使用して非同期的に実行されるAPI。
次のステートマシン定義を参照して、Step Functions による制御方法を確認します。 AWS Lambda および Amazon Redshift データ API。
Step Functions が他の を制御する方法の詳細については、「」を参照してください。 AWS サービスについては、「」を参照してくださいサービスと Step Functions の統合。
{
"Comment": "An ETL workflow for loading dimension and fact tables",
"StartAt": "InitializeCheckCluster",
"States": {
"InitializeCheckCluster": {
"Type": "Pass",
"Next": "GetStateOfCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterAvailable",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterAvailable": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializeBuildDB"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "InitializeResumeCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "ClusterWait"
}
]
},
"ClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckCluster"
},
"InitializeResumeCluster": {
"Type": "Pass",
"Next": "ResumeCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "resume"
}
}
},
"ResumeCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "ClusterWait",
"InputPath": "$",
"ResultPath": "$"
},
"InitializeBuildDB": {
"Type": "Pass",
"Next": "BuildDB",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "build_database",
"sql_statement": [
"create schema if not exists {0} authorization {1};",
"create table if not exists {0}.customer",
"(c_customer_sk int4 not null encode az64",
",c_customer_id char(16) not null encode zstd",
",c_current_addr_sk int4 encode az64",
",c_first_name char(20) encode zstd",
",c_last_name char(30) encode zstd",
",primary key (c_customer_sk)",
") distkey(c_customer_sk);",
"--",
"create table if not exists {0}.customer_address",
"(ca_address_sk int4 not null encode az64",
",ca_address_id char(16) not null encode zstd",
",ca_state char(2) encode zstd",
",ca_zip char(10) encode zstd",
",ca_country varchar(20) encode zstd",
",primary key (ca_address_sk)",
") distkey(ca_address_sk);",
"--",
"create table if not exists {0}.date_dim",
"(d_date_sk integer not null encode az64",
",d_date_id char(16) not null encode zstd",
",d_date date encode az64",
",d_day_name char(9) encode zstd",
",primary key (d_date_sk)",
") diststyle all;",
"--",
"create table if not exists {0}.item",
"(i_item_sk int4 not null encode az64",
",i_item_id char(16) not null encode zstd",
",i_rec_start_date date encode az64",
",i_rec_end_date date encode az64",
",i_current_price numeric(7,2) encode az64",
",i_category char(50) encode zstd",
",i_product_name char(50) encode zstd",
",primary key (i_item_sk)",
") distkey(i_item_sk) sortkey(i_category);",
"--",
"create table if not exists {0}.store_sales",
"(ss_sold_date_sk int4",
",ss_item_sk int4 not null encode az64",
",ss_customer_sk int4 encode az64",
",ss_addr_sk int4 encode az64",
",ss_store_sk int4 encode az64",
",ss_ticket_number int8 not null encode az64",
",ss_quantity int4 encode az64",
",ss_net_paid numeric(7,2) encode az64",
",ss_net_profit numeric(7,2) encode az64",
",primary key (ss_item_sk, ss_ticket_number)",
") distkey(ss_item_sk) sortkey(ss_sold_date_sk);"
]
}
}
},
"BuildDB": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBuildDBStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetBuildDBStatus": {
"Type": "Task",
"Next": "CheckBuildDBStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBuildDBStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailBuildDB"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "InitializeBaselineData"
}
],
"Default": "BuildDBWait"
},
"BuildDBWait": {
"Type": "Wait",
"Seconds": 15,
"Next": "GetBuildDBStatus"
},
"FailBuildDB": {
"Type": "Fail",
"Cause": "Database Build Failed",
"Error": "Error"
},
"InitializeBaselineData": {
"Type": "Pass",
"Next": "LoadBaselineData",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_baseline_data",
"sql_statement": [
"begin transaction;",
"truncate table {0}.customer;",
"insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)",
"values",
"(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),",
"(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),",
"(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),",
"(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');",
"analyze {0}.customer;",
"--",
"truncate table {0}.customer_address;",
"insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),",
"(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),",
"(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');",
"analyze {0}.customer_address;",
"--",
"truncate table {0}.item;",
"insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),",
"(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),",
"(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),",
"(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),",
"(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),",
"(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');",
"--",
"analyze {0}.item;",
"truncate table {0}.date_dim;",
"insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)",
"values",
"(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),",
"(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),",
"(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),",
"(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),",
"(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),",
"(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),",
"(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),",
"(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),",
"(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');",
"--",
"analyze {0}.date_dim;",
"-- commit and End transaction",
"commit;",
"end transaction;"
]
}
}
},
"LoadBaselineData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBaselineData",
"InputPath": "$",
"ResultPath": "$"
},
"GetBaselineData": {
"Type": "Task",
"Next": "CheckBaselineData",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBaselineData": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailLoadBaselineData"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "ParallelizeDimensionLoadJob"
}
],
"Default": "BaselineDataWait"
},
"BaselineDataWait": {
"Type": "Wait",
"Seconds": 20,
"Next": "GetBaselineData"
},
"FailLoadBaselineData": {
"Type": "Fail",
"Cause": "Load Baseline Data Failed",
"Error": "Error"
},
"ParallelizeDimensionLoadJob": {
"Type": "Parallel",
"Next": "InitializeSalesFactLoadJob",
"ResultPath": "$.status",
"Branches": [
{
"StartAt": "InitializeCustomerAddressDimensionLoadJob",
"States": {
"InitializeCustomerAddressDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteCustomerAddressDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_customer_address",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_customer_address;",
"create table if not exists {0}.stg_customer_address",
"(ca_address_id varchar(16) encode zstd",
",ca_state varchar(2) encode zstd",
",ca_zip varchar(10) encode zstd",
",ca_country varchar(20) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"('AAAAAAAACFBBAAAA','NE','','United States'),",
"('AAAAAAAAGAEFAAAA','NE','61749','United States'),",
"('AAAAAAAAPJKKAAAA','OK','','United States'),",
"('AAAAAAAAMIHGAAAA','AL','','United States');",
"/* Perform UPDATE for existing data with refreshed attribute values */",
"update {0}.customer_address",
" set ca_state = stg_customer_address.ca_state,",
" ca_zip = stg_customer_address.ca_zip,",
" ca_country = stg_customer_address.ca_country",
" from {0}.stg_customer_address",
" where customer_address.ca_address_id = stg_customer_address.ca_address_id;",
"/* Perform insert for new rows */",
"insert into {0}.customer_address",
"(ca_address_sk",
",ca_address_id",
",ca_state",
",ca_zip",
",ca_country",
")",
"with max_customer_address_sk as",
"(select max(ca_address_sk) max_ca_address_sk",
"from {0}.customer_address)",
"select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk",
",stg_customer_address.ca_address_id",
",stg_customer_address.ca_state",
",stg_customer_address.ca_zip",
",stg_customer_address.ca_country",
"from {0}.stg_customer_address,",
"max_customer_address_sk",
"where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteCustomerAddressDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetCustomerAddressDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetCustomerAddressDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckCustomerAddressDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckCustomerAddressDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailCustomerAddressDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteCustomerAddressDimensionLoad"
}
],
"Default": "CustomerAddressWait"
},
"CustomerAddressWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetCustomerAddressDimensionLoadStatus"
},
"CompleteCustomerAddressDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailCustomerAddressDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
},
{
"StartAt": "InitializeItemDimensionLoadJob",
"States": {
"InitializeItemDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteItemDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_item",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_item;",
"create table if not exists {0}.stg_item",
"(i_item_id varchar(16) encode zstd",
",i_rec_start_date date encode zstd",
",i_rec_end_date date encode zstd",
",i_current_price numeric(7,2) encode zstd",
",i_category varchar(50) encode zstd",
",i_product_name varchar(50) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_item",
"(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),",
"('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),",
"('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),",
"('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),",
"('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),",
"('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),",
"('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),",
"('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');",
"/************************************************************************************************************************",
"** Type 2 is maintained for i_current_price column.",
"** Update all attributes for the item when the price is not changed",
"** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match",
"*************************************************************************************************************************/",
"update {0}.item",
" set i_category = stg_item.i_category,",
" i_product_name = stg_item.i_product_name",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price = stg_item.i_current_price;",
"insert into {0}.item",
"(i_item_sk",
",i_item_id",
",i_rec_start_date",
",i_rec_end_date",
",i_current_price",
",i_category",
",i_product_name",
")",
"with max_item_sk as",
"(select max(i_item_sk) max_item_sk",
" from {0}.item)",
"select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk",
" ,stg_item.i_item_id",
" ,trunc(sysdate) as i_rec_start_date",
" ,null as i_rec_end_date",
" ,stg_item.i_current_price",
" ,stg_item.i_category",
" ,stg_item.i_product_name",
" from {0}.stg_item, {0}.item, max_item_sk",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Sunset penultimate records that were inserted as type 2 */",
"update {0}.item",
" set i_rec_end_date = trunc(sysdate)",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteItemDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetItemDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetItemDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckItemDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckItemDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailItemDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteItemDimensionLoad"
}
],
"Default": "ItemWait"
},
"ItemWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetItemDimensionLoadStatus"
},
"CompleteItemDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailItemDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
}
]
},
"InitializeSalesFactLoadJob": {
"Type": "Pass",
"Next": "ExecuteSalesFactLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "load_sales_fact",
"sql_statement": [
"begin transaction;",
"/* Create a stg_store_sales staging table */",
"drop table if exists {0}.stg_store_sales;",
"create table {0}.stg_store_sales",
"(sold_date date encode zstd",
",i_item_id varchar(16) encode zstd",
",c_customer_id varchar(16) encode zstd",
",ca_address_id varchar(16) encode zstd",
",ss_ticket_number integer encode zstd",
",ss_quantity integer encode zstd",
",ss_net_paid numeric(7,2) encode zstd",
",ss_net_profit numeric(7,2) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_store_sales",
"(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)",
"values",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),",
"('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);",
"/* Delete any rows from target store_sales for the input date for idempotency */",
"delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');",
"/* Insert data from staging table to the target table */",
"insert into {0}.store_sales",
"(ss_sold_date_sk",
",ss_item_sk",
",ss_customer_sk",
",ss_addr_sk",
",ss_ticket_number",
",ss_quantity",
",ss_net_paid",
",ss_net_profit",
")",
"select date_dim.d_date_sk ss_sold_date_sk",
" ,item.i_item_sk ss_item_sk",
" ,customer.c_customer_sk ss_customer_sk",
" ,customer_address.ca_address_sk ss_addr_sk",
" ,ss_ticket_number",
" ,ss_quantity",
" ,ss_net_paid",
" ,ss_net_profit",
" from {0}.stg_store_sales as store_sales",
" inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date",
" left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null",
" left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id",
" left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;",
"/* Drop staging table */",
"drop table if exists {0}.stg_store_sales;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteSalesFactLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetSalesFactLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetSalesFactLoadStatus": {
"Type": "Task",
"Next": "CheckSalesFactLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckSalesFactLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesFactLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "SalesETLPipelineComplete"
}
],
"Default": "SalesWait"
},
"SalesWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetSalesFactLoadStatus"
},
"FailSalesFactLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
},
"ClusterUnavailable": {
"Type": "Fail",
"Cause": "Redshift cluster is not available",
"Error": "Error"
},
"SalesETLPipelineComplete": {
"Type": "Pass",
"Next": "ValidateSalesMetric",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "validate_sales_metric",
"sql_statement": [
"select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')"
]
}
}
},
"ValidateSalesMetric": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetValidateSalesMetricStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetValidateSalesMetricStatus": {
"Type": "Task",
"Next": "CheckValidateSalesMetricStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckValidateSalesMetricStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesMetricValidation"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "DataValidationComplete"
}
],
"Default": "SalesValidationWait"
},
"SalesValidationWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetValidateSalesMetricStatus"
},
"FailSalesMetricValidation": {
"Type": "Fail",
"Cause": "Data Validation Failed",
"Error": "Error"
},
"DataValidationComplete": {
"Type": "Pass",
"Next": "InitializePauseCluster"
},
"InitializePauseCluster": {
"Type": "Pass",
"Next": "PauseCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "pause"
}
}
},
"PauseCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "PauseClusterWait",
"InputPath": "$",
"ResultPath": "$.clusterStatus",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "ClusterPausedComplete"
}
]
},
"InitializeCheckPauseCluster": {
"Type": "Pass",
"Next": "GetStateOfPausedCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfPausedCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterPaused",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterPaused": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializePauseCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "ClusterPausedComplete"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "PauseClusterWait"
}
]
},
"PauseClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckPauseCluster"
},
"ClusterPausedComplete": {
"Type": "Pass",
"End": true
}
}
}
IAM ポリシーの例
次の例 AWS Identity and Access Management サンプルプロジェクトによって生成された (IAM) ポリシーには、ステートマシンおよび関連リソースの実行に必要な最小権限が含まれています。IAM ポリシーに必要なアクセス許可のみを含めることをお勧めします。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE"
],
"Effect": "Allow"
}
]
}
他の で Step Functions を使用するIAMときに を設定する方法の詳細については、「」を参照してください。 AWS サービスについては、「」を参照してくださいStep Functions が統合サービスのIAMポリシーを生成する方法。