Execute um ELT fluxo de trabalhoETL//usando Step Functions e o Amazon Redshift API - AWS Step Functions

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Execute um ELT fluxo de trabalhoETL//usando Step Functions e o Amazon Redshift API

Este exemplo de projeto demonstra como usar o Step Functions e os API dados do Amazon Redshift para executar ETL um fluxo de trabalhoELT//que carrega dados no armazém de dados do Amazon Redshift.

Neste projeto, Step Functions usa um AWS Lambda função e os dados do Amazon Redshift API para criar os objetos de banco de dados necessários e gerar um conjunto de dados de exemplo e, em seguida, executa dois trabalhos em paralelo que executam o carregamento de tabelas de dimensões, seguidos por uma tabela de fatos. Quando as duas tarefas de carregamento de dimensões terminam com sucesso, o Step Functions executa o trabalho de carregamento da tabela de fatos, executa a tarefa de validação e, em seguida, pausa o cluster do Amazon Redshift.

nota

Você pode modificar a ETL lógica para receber dados de outras fontes, como o Amazon S3, que pode usar o COPYcomando para copiar dados do Amazon S3 para uma tabela do Amazon Redshift.

Para obter mais informações sobre integrações de serviços do Amazon Redshift e do Step Functions, consulte o seguinte:

nota

Este projeto de exemplo pode incorrer em cobranças.

Para novos AWS usuários, um nível de uso gratuito está disponível. Neste nível, os serviços são gratuitos abaixo de um determinado nível de uso. Para obter mais informações sobre AWS custos e o nível gratuito, consulte AWS Step Functions preços.

Etapa 1: Criar a máquina de estado

  1. Abra o console do Step Functions e clique em Criar máquina de estado.

  2. Digite ETL job in Amazon Redshift na caixa de pesquisa e, em seguida, escolha ETL emprego em Amazon Redshiftdos resultados da pesquisa que são retornados.

  3. Escolha Próximo para continuar.

  4. Escolha Executar uma demonstração para criar um ready-to-deploy fluxo de trabalho e somente leitura, ou escolha Criar nele para criar uma definição de máquina de estado editável na qual você possa criar e implantar posteriormente.

    ‎Este projeto de exemplo implementa os recursos a seguir.

    • Uma Amazon Redshift cluster

    • Dois Lambda funções

    • Uma Amazon Redshift Esquema

    • Cinco Amazon Redshift tabelas

    • Uma AWS Step Functions máquina de estado

    • Relacionado AWS Identity and Access Management (IAM) funções.

    Você pode explorar o gráfico do fluxo de trabalho para o ETL trabalho em Amazon Redshiftno console Step Functions.

  5. Escolha Usar modelo para continuar com a seleção.

As próximas etapas dependem da sua escolha anterior:

  1. Execute uma demonstração — Você pode revisar a máquina de estado antes de criar um projeto somente para leitura com recursos implantados pelo AWS CloudFormation para o seu Conta da AWS.

    Você pode visualizar a definição da máquina de estado e, quando estiver pronto, escolher Implantar e executar para implantar o projeto e criar os recursos.

    A implantação pode levar até 10 minutos para criar recursos e permissões. Você pode usar o link Stack ID para monitorar o progresso no AWS CloudFormation.

    Após a conclusão da implantação, você deverá ver sua nova máquina de estado no console.

  2. Desenvolva com base nisso — você pode revisar e editar a definição do fluxo de trabalho. Talvez seja necessário definir valores para espaços reservados no projeto de amostra antes de tentar executar seu fluxo de trabalho personalizado.

nota

Cobranças padrão podem ser aplicadas aos serviços implantados em sua conta.

Etapa 2: Executar a máquina de estado

  1. Na página Máquinas de estado, escolha seu projeto de exemplo.

  2. Na página do projeto de exemplo, escolha Iniciar execução.

  3. Na caixa de diálogo Iniciar execução, faça o seguinte:

    1. (Opcional) Insira um nome de execução personalizado para substituir o padrão gerado.

      ASCIINão-nomes e registro

      Step Functions aceita nomes para máquinas de estado, execuções, atividades e rótulos que não contenham ASCII caracteres. Como esses caracteres não funcionarão com a Amazon CloudWatch, recomendamos usar somente ASCII caracteres para que você possa acompanhar as métricas CloudWatch.

    2. (Opcional) Na caixa Entrada, insira os valores de entrada comoJSON. Você pode pular essa etapa se estiver executando uma demonstração.

    3. Selecione Iniciar execução.

    O console Step Functions direcionará você para uma página de detalhes da execução, na qual você pode escolher estados na visualização do gráfico para explorar informações relacionadas no Detalhes da etapa painel.

Exemplo de código da máquina de estado

A máquina de estado neste projeto de exemplo se integra com AWS Lambda passando a ETL lógica InputPath diretamente para esses recursos e sendo executada de forma assíncrona usando os dados do Amazon Redshift. API

Navegue pela seguinte definição de máquina de estado para ver como o Step Functions controla AWS Lambda e os dados do Amazon Redshift. API

Para obter informações sobre como o Step Functions pode controlar outros AWS serviços, vejaIntegrando serviços com Step Functions.

{ "Comment": "An ETL workflow for loading dimension and fact tables", "StartAt": "InitializeCheckCluster", "States": { "InitializeCheckCluster": { "Type": "Pass", "Next": "GetStateOfCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterAvailable", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterAvailable": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializeBuildDB" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "InitializeResumeCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "ClusterWait" } ] }, "ClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckCluster" }, "InitializeResumeCluster": { "Type": "Pass", "Next": "ResumeCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "resume" } } }, "ResumeCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "ClusterWait", "InputPath": "$", "ResultPath": "$" }, "InitializeBuildDB": { "Type": "Pass", "Next": "BuildDB", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "build_database", "sql_statement": [ "create schema if not exists {0} authorization {1};", "create table if not exists {0}.customer", "(c_customer_sk int4 not null encode az64", ",c_customer_id char(16) not null encode zstd", ",c_current_addr_sk int4 encode az64", ",c_first_name char(20) encode zstd", ",c_last_name char(30) encode zstd", ",primary key (c_customer_sk)", ") distkey(c_customer_sk);", "--", "create table if not exists {0}.customer_address", "(ca_address_sk int4 not null encode az64", ",ca_address_id char(16) not null encode zstd", ",ca_state char(2) encode zstd", ",ca_zip char(10) encode zstd", ",ca_country varchar(20) encode zstd", ",primary key (ca_address_sk)", ") distkey(ca_address_sk);", "--", "create table if not exists {0}.date_dim", "(d_date_sk integer not null encode az64", ",d_date_id char(16) not null encode zstd", ",d_date date encode az64", ",d_day_name char(9) encode zstd", ",primary key (d_date_sk)", ") diststyle all;", "--", "create table if not exists {0}.item", "(i_item_sk int4 not null encode az64", ",i_item_id char(16) not null encode zstd", ",i_rec_start_date date encode az64", ",i_rec_end_date date encode az64", ",i_current_price numeric(7,2) encode az64", ",i_category char(50) encode zstd", ",i_product_name char(50) encode zstd", ",primary key (i_item_sk)", ") distkey(i_item_sk) sortkey(i_category);", "--", "create table if not exists {0}.store_sales", "(ss_sold_date_sk int4", ",ss_item_sk int4 not null encode az64", ",ss_customer_sk int4 encode az64", ",ss_addr_sk int4 encode az64", ",ss_store_sk int4 encode az64", ",ss_ticket_number int8 not null encode az64", ",ss_quantity int4 encode az64", ",ss_net_paid numeric(7,2) encode az64", ",ss_net_profit numeric(7,2) encode az64", ",primary key (ss_item_sk, ss_ticket_number)", ") distkey(ss_item_sk) sortkey(ss_sold_date_sk);" ] } } }, "BuildDB": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBuildDBStatus", "InputPath": "$", "ResultPath": "$" }, "GetBuildDBStatus": { "Type": "Task", "Next": "CheckBuildDBStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBuildDBStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailBuildDB" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "InitializeBaselineData" } ], "Default": "BuildDBWait" }, "BuildDBWait": { "Type": "Wait", "Seconds": 15, "Next": "GetBuildDBStatus" }, "FailBuildDB": { "Type": "Fail", "Cause": "Database Build Failed", "Error": "Error" }, "InitializeBaselineData": { "Type": "Pass", "Next": "LoadBaselineData", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_baseline_data", "sql_statement": [ "begin transaction;", "truncate table {0}.customer;", "insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)", "values", "(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),", "(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),", "(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),", "(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');", "analyze {0}.customer;", "--", "truncate table {0}.customer_address;", "insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)", "values", "(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),", "(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),", "(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');", "analyze {0}.customer_address;", "--", "truncate table {0}.item;", "insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),", "(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),", "(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),", "(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),", "(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),", "(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');", "--", "analyze {0}.item;", "truncate table {0}.date_dim;", "insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)", "values", "(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),", "(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),", "(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),", "(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),", "(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),", "(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),", "(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),", "(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),", "(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');", "--", "analyze {0}.date_dim;", "-- commit and End transaction", "commit;", "end transaction;" ] } } }, "LoadBaselineData": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBaselineData", "InputPath": "$", "ResultPath": "$" }, "GetBaselineData": { "Type": "Task", "Next": "CheckBaselineData", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBaselineData": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailLoadBaselineData" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "ParallelizeDimensionLoadJob" } ], "Default": "BaselineDataWait" }, "BaselineDataWait": { "Type": "Wait", "Seconds": 20, "Next": "GetBaselineData" }, "FailLoadBaselineData": { "Type": "Fail", "Cause": "Load Baseline Data Failed", "Error": "Error" }, "ParallelizeDimensionLoadJob": { "Type": "Parallel", "Next": "InitializeSalesFactLoadJob", "ResultPath": "$.status", "Branches": [ { "StartAt": "InitializeCustomerAddressDimensionLoadJob", "States": { "InitializeCustomerAddressDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteCustomerAddressDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_customer_address", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_customer_address;", "create table if not exists {0}.stg_customer_address", "(ca_address_id varchar(16) encode zstd", ",ca_state varchar(2) encode zstd", ",ca_zip varchar(10) encode zstd", ",ca_country varchar(20) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)", "values", "('AAAAAAAACFBBAAAA','NE','','United States'),", "('AAAAAAAAGAEFAAAA','NE','61749','United States'),", "('AAAAAAAAPJKKAAAA','OK','','United States'),", "('AAAAAAAAMIHGAAAA','AL','','United States');", "/* Perform UPDATE for existing data with refreshed attribute values */", "update {0}.customer_address", " set ca_state = stg_customer_address.ca_state,", " ca_zip = stg_customer_address.ca_zip,", " ca_country = stg_customer_address.ca_country", " from {0}.stg_customer_address", " where customer_address.ca_address_id = stg_customer_address.ca_address_id;", "/* Perform insert for new rows */", "insert into {0}.customer_address", "(ca_address_sk", ",ca_address_id", ",ca_state", ",ca_zip", ",ca_country", ")", "with max_customer_address_sk as", "(select max(ca_address_sk) max_ca_address_sk", "from {0}.customer_address)", "select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk", ",stg_customer_address.ca_address_id", ",stg_customer_address.ca_state", ",stg_customer_address.ca_zip", ",stg_customer_address.ca_country", "from {0}.stg_customer_address,", "max_customer_address_sk", "where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteCustomerAddressDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetCustomerAddressDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetCustomerAddressDimensionLoadStatus": { "Type": "Task", "Next": "CheckCustomerAddressDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckCustomerAddressDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailCustomerAddressDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteCustomerAddressDimensionLoad" } ], "Default": "CustomerAddressWait" }, "CustomerAddressWait": { "Type": "Wait", "Seconds": 5, "Next": "GetCustomerAddressDimensionLoadStatus" }, "CompleteCustomerAddressDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailCustomerAddressDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } }, { "StartAt": "InitializeItemDimensionLoadJob", "States": { "InitializeItemDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteItemDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_item", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_item;", "create table if not exists {0}.stg_item", "(i_item_id varchar(16) encode zstd", ",i_rec_start_date date encode zstd", ",i_rec_end_date date encode zstd", ",i_current_price numeric(7,2) encode zstd", ",i_category varchar(50) encode zstd", ",i_product_name varchar(50) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_item", "(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),", "('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),", "('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),", "('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),", "('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),", "('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),", "('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),", "('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');", "/************************************************************************************************************************", "** Type 2 is maintained for i_current_price column.", "** Update all attributes for the item when the price is not changed", "** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match", "*************************************************************************************************************************/", "update {0}.item", " set i_category = stg_item.i_category,", " i_product_name = stg_item.i_product_name", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price = stg_item.i_current_price;", "insert into {0}.item", "(i_item_sk", ",i_item_id", ",i_rec_start_date", ",i_rec_end_date", ",i_current_price", ",i_category", ",i_product_name", ")", "with max_item_sk as", "(select max(i_item_sk) max_item_sk", " from {0}.item)", "select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk", " ,stg_item.i_item_id", " ,trunc(sysdate) as i_rec_start_date", " ,null as i_rec_end_date", " ,stg_item.i_current_price", " ,stg_item.i_category", " ,stg_item.i_product_name", " from {0}.stg_item, {0}.item, max_item_sk", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Sunset penultimate records that were inserted as type 2 */", "update {0}.item", " set i_rec_end_date = trunc(sysdate)", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteItemDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetItemDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetItemDimensionLoadStatus": { "Type": "Task", "Next": "CheckItemDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckItemDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailItemDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteItemDimensionLoad" } ], "Default": "ItemWait" }, "ItemWait": { "Type": "Wait", "Seconds": 5, "Next": "GetItemDimensionLoadStatus" }, "CompleteItemDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailItemDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } } ] }, "InitializeSalesFactLoadJob": { "Type": "Pass", "Next": "ExecuteSalesFactLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "load_sales_fact", "sql_statement": [ "begin transaction;", "/* Create a stg_store_sales staging table */", "drop table if exists {0}.stg_store_sales;", "create table {0}.stg_store_sales", "(sold_date date encode zstd", ",i_item_id varchar(16) encode zstd", ",c_customer_id varchar(16) encode zstd", ",ca_address_id varchar(16) encode zstd", ",ss_ticket_number integer encode zstd", ",ss_quantity integer encode zstd", ",ss_net_paid numeric(7,2) encode zstd", ",ss_net_profit numeric(7,2) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_store_sales", "(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)", "values", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),", "('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);", "/* Delete any rows from target store_sales for the input date for idempotency */", "delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');", "/* Insert data from staging table to the target table */", "insert into {0}.store_sales", "(ss_sold_date_sk", ",ss_item_sk", ",ss_customer_sk", ",ss_addr_sk", ",ss_ticket_number", ",ss_quantity", ",ss_net_paid", ",ss_net_profit", ")", "select date_dim.d_date_sk ss_sold_date_sk", " ,item.i_item_sk ss_item_sk", " ,customer.c_customer_sk ss_customer_sk", " ,customer_address.ca_address_sk ss_addr_sk", " ,ss_ticket_number", " ,ss_quantity", " ,ss_net_paid", " ,ss_net_profit", " from {0}.stg_store_sales as store_sales", " inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date", " left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null", " left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id", " left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;", "/* Drop staging table */", "drop table if exists {0}.stg_store_sales;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteSalesFactLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetSalesFactLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetSalesFactLoadStatus": { "Type": "Task", "Next": "CheckSalesFactLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckSalesFactLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesFactLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "SalesETLPipelineComplete" } ], "Default": "SalesWait" }, "SalesWait": { "Type": "Wait", "Seconds": 5, "Next": "GetSalesFactLoadStatus" }, "FailSalesFactLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" }, "ClusterUnavailable": { "Type": "Fail", "Cause": "Redshift cluster is not available", "Error": "Error" }, "SalesETLPipelineComplete": { "Type": "Pass", "Next": "ValidateSalesMetric", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "validate_sales_metric", "sql_statement": [ "select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')" ] } } }, "ValidateSalesMetric": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetValidateSalesMetricStatus", "InputPath": "$", "ResultPath": "$" }, "GetValidateSalesMetricStatus": { "Type": "Task", "Next": "CheckValidateSalesMetricStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckValidateSalesMetricStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesMetricValidation" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "DataValidationComplete" } ], "Default": "SalesValidationWait" }, "SalesValidationWait": { "Type": "Wait", "Seconds": 5, "Next": "GetValidateSalesMetricStatus" }, "FailSalesMetricValidation": { "Type": "Fail", "Cause": "Data Validation Failed", "Error": "Error" }, "DataValidationComplete": { "Type": "Pass", "Next": "InitializePauseCluster" }, "InitializePauseCluster": { "Type": "Pass", "Next": "PauseCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "pause" } } }, "PauseCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "PauseClusterWait", "InputPath": "$", "ResultPath": "$.clusterStatus", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "ClusterPausedComplete" } ] }, "InitializeCheckPauseCluster": { "Type": "Pass", "Next": "GetStateOfPausedCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfPausedCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterPaused", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterPaused": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializePauseCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "ClusterPausedComplete" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "PauseClusterWait" } ] }, "PauseClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckPauseCluster" }, "ClusterPausedComplete": { "Type": "Pass", "End": true } } }

Exemplo IAM de política

O exemplo a seguir AWS Identity and Access Management (IAM) a política gerada pelo projeto de exemplo inclui o menor privilégio necessário para executar a máquina de estado e os recursos relacionados. Recomendamos que você inclua somente as permissões necessárias em suas IAM políticas.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE" ], "Effect": "Allow" } ] }

Para obter informações sobre como configurar IAM ao usar Step Functions com outros AWS serviços, vejaComo o Step Functions gera IAM políticas para serviços integrados.