Esegui un ELT flusso di lavoroETL/utilizzando Step Functions e Amazon Redshift API - AWS Step Functions

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esegui un ELT flusso di lavoroETL/utilizzando Step Functions e Amazon Redshift API

Questo progetto di esempio dimostra come utilizzare Step Functions e Amazon Redshift API Data per eseguire ETL un flusso di lavoroELT/che carica i dati nel data warehouse Amazon Redshift.

In questo progetto, Step Functions utilizza un AWS Lambda funzione e Amazon Redshift Data API per creare gli oggetti di database richiesti e generare un set di dati di esempio, quindi esegue due processi in parallelo che eseguono il caricamento delle tabelle delle dimensioni, seguiti da una tabella dei fatti. Una volta terminati correttamente i processi di caricamento di entrambe le dimensioni, Step Functions esegue il processo di caricamento per la tabella dei fatti, esegue il processo di convalida, quindi mette in pausa il cluster Amazon Redshift.

Nota

Puoi modificare la ETL logica per ricevere dati da altre fonti come Amazon S3, che può utilizzare il COPYcomando per copiare i dati da Amazon S3 a una tabella Amazon Redshift.

Per ulteriori informazioni sulle integrazioni dei servizi Amazon Redshift e Step Functions, consulta quanto segue:

Nota

Questo progetto di esempio potrebbe comportare costi.

Per nuovi AWS per gli utenti, è disponibile un piano di utilizzo gratuito. In questo piano, i servizi sono gratuiti al di sotto di un determinato livello di utilizzo. Per ulteriori informazioni sull' AWS costi e il piano gratuito, vedi AWS Step Functions prezzi.

Fase 1: Creare la macchina a stati

  1. Apri la console Step Functions e scegli Crea macchina a stati.

  2. Digita ETL job in Amazon Redshift nella casella di ricerca, quindi scegli ETL job in Amazon Redshiftdai risultati di ricerca che vengono restituiti.

  3. Seleziona Successivo per continuare.

  4. Scegli Esegui una demo per creare un ready-to-deploy flusso di lavoro di sola lettura o scegli Crea su di esso per creare una definizione di macchina a stati modificabile su cui basarti e distribuire successivamente.

    Questo progetto di esempio utilizza le seguenti risorse:

    • Un record Amazon Redshift cluster

    • Due Lambda funzioni

    • Un record Amazon Redshift schema

    • Cinque Amazon Redshift tabelle

    • Un record AWS Step Functions macchina a stati

    • Correlati AWS Identity and Access Management (IAM) ruoli.

    Puoi esplorare il grafico del flusso di lavoro relativo al ETL lavoro in Amazon Redshiftnella console Step Functions.

  5. Scegli Usa modello per continuare con la selezione.

I passaggi successivi dipendono dalla scelta precedente:

  1. Esegui una demo: puoi esaminare la macchina a stati prima di creare un progetto di sola lettura con risorse distribuite da AWS CloudFormation al tuo Account AWS.

    Puoi visualizzare la definizione della macchina a stati e, quando sei pronto, scegli Implementa ed esegui per distribuire il progetto e creare le risorse.

    La creazione di risorse e autorizzazioni può richiedere fino a 10 minuti per la distribuzione. Puoi utilizzare il link Stack ID per monitorare i progressi in AWS CloudFormation.

    Una volta completata la distribuzione, dovresti vedere la tua nuova macchina a stati nella console.

  2. Sviluppala: puoi rivedere e modificare la definizione del flusso di lavoro. Potrebbe essere necessario impostare i valori per i segnaposto nel progetto di esempio prima di provare a eseguire il flusso di lavoro personalizzato.

Nota

Potrebbero essere applicati costi standard per i servizi distribuiti sul tuo account.

Passaggio 2: Esegui la macchina a stati

  1. Nella pagina Macchine a stati, scegli il tuo progetto di esempio.

  2. Nella pagina del progetto di esempio, scegli Avvia esecuzione.

  3. Nella finestra di dialogo Avvia esecuzione, effettuate le seguenti operazioni:

    1. (Facoltativo) Immettete un nome di esecuzione personalizzato per sovrascrivere il valore predefinito generato.

      ASCIINomi diversi e registrazione

      Step Functions accetta nomi per macchine a stati, esecuzioni, attività ed etichette che contengono non ASCII caratteri. Poiché tali caratteri non funzioneranno con Amazon CloudWatch, ti consigliamo di utilizzare solo ASCII caratteri in modo da poter tenere traccia delle metriche. CloudWatch

    2. (Facoltativo) Nella casella Input, inserisci i valori di input comeJSON. Puoi saltare questo passaggio se stai eseguendo una demo.

    3. Selezionare Start execution (Avvia esecuzione).

    La console Step Functions ti indirizzerà a una pagina dei dettagli di esecuzione in cui puoi scegliere gli stati nella vista Graph per esplorare le informazioni correlate nel Dettagli del passaggio riquadro.

Esempio di codice di macchina a stati

La macchina a stati in questo progetto di esempio si integra con AWS Lambda passando la ETL logica InputPath direttamente a tali risorse ed eseguendola in modo asincrono utilizzando Amazon Redshift Data. API

Sfoglia la seguente definizione di macchina a stati per vedere come Step Functions controlla AWS Lambda e i dati di Amazon Redshift. API

Per informazioni su come Step Functions può controllare altri AWS servizi, vediIntegrazione dei servizi con Step Functions.

{ "Comment": "An ETL workflow for loading dimension and fact tables", "StartAt": "InitializeCheckCluster", "States": { "InitializeCheckCluster": { "Type": "Pass", "Next": "GetStateOfCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterAvailable", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterAvailable": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializeBuildDB" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "InitializeResumeCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "ClusterWait" } ] }, "ClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckCluster" }, "InitializeResumeCluster": { "Type": "Pass", "Next": "ResumeCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "resume" } } }, "ResumeCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "ClusterWait", "InputPath": "$", "ResultPath": "$" }, "InitializeBuildDB": { "Type": "Pass", "Next": "BuildDB", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "build_database", "sql_statement": [ "create schema if not exists {0} authorization {1};", "create table if not exists {0}.customer", "(c_customer_sk int4 not null encode az64", ",c_customer_id char(16) not null encode zstd", ",c_current_addr_sk int4 encode az64", ",c_first_name char(20) encode zstd", ",c_last_name char(30) encode zstd", ",primary key (c_customer_sk)", ") distkey(c_customer_sk);", "--", "create table if not exists {0}.customer_address", "(ca_address_sk int4 not null encode az64", ",ca_address_id char(16) not null encode zstd", ",ca_state char(2) encode zstd", ",ca_zip char(10) encode zstd", ",ca_country varchar(20) encode zstd", ",primary key (ca_address_sk)", ") distkey(ca_address_sk);", "--", "create table if not exists {0}.date_dim", "(d_date_sk integer not null encode az64", ",d_date_id char(16) not null encode zstd", ",d_date date encode az64", ",d_day_name char(9) encode zstd", ",primary key (d_date_sk)", ") diststyle all;", "--", "create table if not exists {0}.item", "(i_item_sk int4 not null encode az64", ",i_item_id char(16) not null encode zstd", ",i_rec_start_date date encode az64", ",i_rec_end_date date encode az64", ",i_current_price numeric(7,2) encode az64", ",i_category char(50) encode zstd", ",i_product_name char(50) encode zstd", ",primary key (i_item_sk)", ") distkey(i_item_sk) sortkey(i_category);", "--", "create table if not exists {0}.store_sales", "(ss_sold_date_sk int4", ",ss_item_sk int4 not null encode az64", ",ss_customer_sk int4 encode az64", ",ss_addr_sk int4 encode az64", ",ss_store_sk int4 encode az64", ",ss_ticket_number int8 not null encode az64", ",ss_quantity int4 encode az64", ",ss_net_paid numeric(7,2) encode az64", ",ss_net_profit numeric(7,2) encode az64", ",primary key (ss_item_sk, ss_ticket_number)", ") distkey(ss_item_sk) sortkey(ss_sold_date_sk);" ] } } }, "BuildDB": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBuildDBStatus", "InputPath": "$", "ResultPath": "$" }, "GetBuildDBStatus": { "Type": "Task", "Next": "CheckBuildDBStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBuildDBStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailBuildDB" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "InitializeBaselineData" } ], "Default": "BuildDBWait" }, "BuildDBWait": { "Type": "Wait", "Seconds": 15, "Next": "GetBuildDBStatus" }, "FailBuildDB": { "Type": "Fail", "Cause": "Database Build Failed", "Error": "Error" }, "InitializeBaselineData": { "Type": "Pass", "Next": "LoadBaselineData", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_baseline_data", "sql_statement": [ "begin transaction;", "truncate table {0}.customer;", "insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)", "values", "(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),", "(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),", "(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),", "(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');", "analyze {0}.customer;", "--", "truncate table {0}.customer_address;", "insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)", "values", "(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),", "(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),", "(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');", "analyze {0}.customer_address;", "--", "truncate table {0}.item;", "insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),", "(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),", "(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),", "(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),", "(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),", "(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');", "--", "analyze {0}.item;", "truncate table {0}.date_dim;", "insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)", "values", "(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),", "(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),", "(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),", "(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),", "(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),", "(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),", "(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),", "(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),", "(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');", "--", "analyze {0}.date_dim;", "-- commit and End transaction", "commit;", "end transaction;" ] } } }, "LoadBaselineData": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBaselineData", "InputPath": "$", "ResultPath": "$" }, "GetBaselineData": { "Type": "Task", "Next": "CheckBaselineData", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBaselineData": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailLoadBaselineData" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "ParallelizeDimensionLoadJob" } ], "Default": "BaselineDataWait" }, "BaselineDataWait": { "Type": "Wait", "Seconds": 20, "Next": "GetBaselineData" }, "FailLoadBaselineData": { "Type": "Fail", "Cause": "Load Baseline Data Failed", "Error": "Error" }, "ParallelizeDimensionLoadJob": { "Type": "Parallel", "Next": "InitializeSalesFactLoadJob", "ResultPath": "$.status", "Branches": [ { "StartAt": "InitializeCustomerAddressDimensionLoadJob", "States": { "InitializeCustomerAddressDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteCustomerAddressDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_customer_address", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_customer_address;", "create table if not exists {0}.stg_customer_address", "(ca_address_id varchar(16) encode zstd", ",ca_state varchar(2) encode zstd", ",ca_zip varchar(10) encode zstd", ",ca_country varchar(20) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)", "values", "('AAAAAAAACFBBAAAA','NE','','United States'),", "('AAAAAAAAGAEFAAAA','NE','61749','United States'),", "('AAAAAAAAPJKKAAAA','OK','','United States'),", "('AAAAAAAAMIHGAAAA','AL','','United States');", "/* Perform UPDATE for existing data with refreshed attribute values */", "update {0}.customer_address", " set ca_state = stg_customer_address.ca_state,", " ca_zip = stg_customer_address.ca_zip,", " ca_country = stg_customer_address.ca_country", " from {0}.stg_customer_address", " where customer_address.ca_address_id = stg_customer_address.ca_address_id;", "/* Perform insert for new rows */", "insert into {0}.customer_address", "(ca_address_sk", ",ca_address_id", ",ca_state", ",ca_zip", ",ca_country", ")", "with max_customer_address_sk as", "(select max(ca_address_sk) max_ca_address_sk", "from {0}.customer_address)", "select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk", ",stg_customer_address.ca_address_id", ",stg_customer_address.ca_state", ",stg_customer_address.ca_zip", ",stg_customer_address.ca_country", "from {0}.stg_customer_address,", "max_customer_address_sk", "where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteCustomerAddressDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetCustomerAddressDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetCustomerAddressDimensionLoadStatus": { "Type": "Task", "Next": "CheckCustomerAddressDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckCustomerAddressDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailCustomerAddressDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteCustomerAddressDimensionLoad" } ], "Default": "CustomerAddressWait" }, "CustomerAddressWait": { "Type": "Wait", "Seconds": 5, "Next": "GetCustomerAddressDimensionLoadStatus" }, "CompleteCustomerAddressDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailCustomerAddressDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } }, { "StartAt": "InitializeItemDimensionLoadJob", "States": { "InitializeItemDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteItemDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_item", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_item;", "create table if not exists {0}.stg_item", "(i_item_id varchar(16) encode zstd", ",i_rec_start_date date encode zstd", ",i_rec_end_date date encode zstd", ",i_current_price numeric(7,2) encode zstd", ",i_category varchar(50) encode zstd", ",i_product_name varchar(50) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_item", "(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),", "('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),", "('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),", "('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),", "('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),", "('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),", "('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),", "('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');", "/************************************************************************************************************************", "** Type 2 is maintained for i_current_price column.", "** Update all attributes for the item when the price is not changed", "** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match", "*************************************************************************************************************************/", "update {0}.item", " set i_category = stg_item.i_category,", " i_product_name = stg_item.i_product_name", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price = stg_item.i_current_price;", "insert into {0}.item", "(i_item_sk", ",i_item_id", ",i_rec_start_date", ",i_rec_end_date", ",i_current_price", ",i_category", ",i_product_name", ")", "with max_item_sk as", "(select max(i_item_sk) max_item_sk", " from {0}.item)", "select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk", " ,stg_item.i_item_id", " ,trunc(sysdate) as i_rec_start_date", " ,null as i_rec_end_date", " ,stg_item.i_current_price", " ,stg_item.i_category", " ,stg_item.i_product_name", " from {0}.stg_item, {0}.item, max_item_sk", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Sunset penultimate records that were inserted as type 2 */", "update {0}.item", " set i_rec_end_date = trunc(sysdate)", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteItemDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetItemDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetItemDimensionLoadStatus": { "Type": "Task", "Next": "CheckItemDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckItemDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailItemDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteItemDimensionLoad" } ], "Default": "ItemWait" }, "ItemWait": { "Type": "Wait", "Seconds": 5, "Next": "GetItemDimensionLoadStatus" }, "CompleteItemDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailItemDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } } ] }, "InitializeSalesFactLoadJob": { "Type": "Pass", "Next": "ExecuteSalesFactLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "load_sales_fact", "sql_statement": [ "begin transaction;", "/* Create a stg_store_sales staging table */", "drop table if exists {0}.stg_store_sales;", "create table {0}.stg_store_sales", "(sold_date date encode zstd", ",i_item_id varchar(16) encode zstd", ",c_customer_id varchar(16) encode zstd", ",ca_address_id varchar(16) encode zstd", ",ss_ticket_number integer encode zstd", ",ss_quantity integer encode zstd", ",ss_net_paid numeric(7,2) encode zstd", ",ss_net_profit numeric(7,2) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_store_sales", "(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)", "values", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),", "('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);", "/* Delete any rows from target store_sales for the input date for idempotency */", "delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');", "/* Insert data from staging table to the target table */", "insert into {0}.store_sales", "(ss_sold_date_sk", ",ss_item_sk", ",ss_customer_sk", ",ss_addr_sk", ",ss_ticket_number", ",ss_quantity", ",ss_net_paid", ",ss_net_profit", ")", "select date_dim.d_date_sk ss_sold_date_sk", " ,item.i_item_sk ss_item_sk", " ,customer.c_customer_sk ss_customer_sk", " ,customer_address.ca_address_sk ss_addr_sk", " ,ss_ticket_number", " ,ss_quantity", " ,ss_net_paid", " ,ss_net_profit", " from {0}.stg_store_sales as store_sales", " inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date", " left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null", " left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id", " left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;", "/* Drop staging table */", "drop table if exists {0}.stg_store_sales;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteSalesFactLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetSalesFactLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetSalesFactLoadStatus": { "Type": "Task", "Next": "CheckSalesFactLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckSalesFactLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesFactLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "SalesETLPipelineComplete" } ], "Default": "SalesWait" }, "SalesWait": { "Type": "Wait", "Seconds": 5, "Next": "GetSalesFactLoadStatus" }, "FailSalesFactLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" }, "ClusterUnavailable": { "Type": "Fail", "Cause": "Redshift cluster is not available", "Error": "Error" }, "SalesETLPipelineComplete": { "Type": "Pass", "Next": "ValidateSalesMetric", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "validate_sales_metric", "sql_statement": [ "select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')" ] } } }, "ValidateSalesMetric": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetValidateSalesMetricStatus", "InputPath": "$", "ResultPath": "$" }, "GetValidateSalesMetricStatus": { "Type": "Task", "Next": "CheckValidateSalesMetricStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckValidateSalesMetricStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesMetricValidation" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "DataValidationComplete" } ], "Default": "SalesValidationWait" }, "SalesValidationWait": { "Type": "Wait", "Seconds": 5, "Next": "GetValidateSalesMetricStatus" }, "FailSalesMetricValidation": { "Type": "Fail", "Cause": "Data Validation Failed", "Error": "Error" }, "DataValidationComplete": { "Type": "Pass", "Next": "InitializePauseCluster" }, "InitializePauseCluster": { "Type": "Pass", "Next": "PauseCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "pause" } } }, "PauseCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "PauseClusterWait", "InputPath": "$", "ResultPath": "$.clusterStatus", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "ClusterPausedComplete" } ] }, "InitializeCheckPauseCluster": { "Type": "Pass", "Next": "GetStateOfPausedCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfPausedCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterPaused", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterPaused": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializePauseCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "ClusterPausedComplete" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "PauseClusterWait" } ] }, "PauseClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckPauseCluster" }, "ClusterPausedComplete": { "Type": "Pass", "End": true } } }

IAMPolitica di esempio

L'esempio seguente AWS Identity and Access Management (IAM) la politica generata dal progetto di esempio include il minimo privilegio necessario per eseguire la macchina a stati e le risorse correlate. Ti consigliamo di includere solo le autorizzazioni necessarie nelle tue IAM politiche.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE" ], "Effect": "Allow" } ] }

Per informazioni su come configurare l'IAMutilizzo di Step Functions con altri AWS servizi, vediIn che modo Step Functions genera IAM politiche per servizi integrati.