Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Ejecute un ELT flujo de trabajoETL//con Step Functions y Amazon Redshift API
Este proyecto de ejemplo muestra cómo utilizar Step Functions y Amazon Redshift Data API para ejecutar un ELT flujo de trabajoETL/que carga datos en el almacén de datos de Amazon Redshift.
En este proyecto, Step Functions utiliza un AWS Lambda y Amazon Redshift Data API para crear los objetos de base de datos necesarios y generar un conjunto de datos de ejemplo. A continuación, ejecuta dos trabajos en paralelo que cargan tablas de dimensiones, seguidas de una tabla de hechos. Cuando ambos trabajos de carga de dimensiones finalicen correctamente, Step Functions ejecuta el trabajo de carga de la tabla de hechos, ejecuta el trabajo de validación y, posteriormente, detiene el clúster de Amazon Redshift.
nota
Puede modificar la ETL lógica para recibir datos de otras fuentes, como Amazon S3, que puede utilizar el COPYcomando para copiar datos de Amazon S3 a una tabla de Amazon Redshift.
Para obtener más información acerca de las integraciones de servicios de Amazon Redshift y Step Functions, consulte los siguientes temas:
nota
Este proyecto de muestra puede generar cargos.
Para nuevos AWS para los usuarios, hay disponible un nivel de uso gratuito. En esta capa, los servicios son gratuitos por debajo de determinado nivel de uso. Para obtener más información acerca de AWS los costos y el nivel gratuito, consulte AWS Step Functions precios
Paso 1: Crear la máquina de estado
-
Abra la consola de Step Functions
y seleccione Crear máquina de estado. -
Escribe
ETL job in Amazon Redshift
en el cuadro de búsqueda y, a continuación, selecciona ETL trabajo en Amazon Redshiftde los resultados de búsqueda que se muestran. -
Elija Siguiente para continuar.
-
Elija Ejecutar una demostración para crear un ready-to-deploy flujo de trabajo y de solo lectura, o elija Crear a partir de ella para crear una definición de máquina de estados editable sobre la que pueda crear e implementar posteriormente.
En este proyecto de muestra se implementan los siguientes recursos:
-
Un registro Amazon Redshift Clúster
-
Dos Lambda funciones
-
Un registro Amazon Redshift Esquema
-
Cinco Amazon Redshift tablas
-
Un registro AWS Step Functions máquina de estado
-
Relacionado AWS Identity and Access Management (IAM) roles.
Puede explorar el gráfico de flujo de ETL trabajo del trabajo en Amazon Redshiften la consola Step Functions.
-
-
Elija Utilizar plantilla para continuar con la selección.
Los siguientes pasos dependen de la elección anterior:
-
Realice una demostración: puede revisar la máquina de estados antes de crear un proyecto de solo lectura con los recursos desplegados por AWS CloudFormation a tu Cuenta de AWS.
Puede ver la definición de la máquina de estados y, cuando esté listo, elija Implementar y ejecutar para implementar el proyecto y crear los recursos.
La creación de recursos y permisos puede tardar hasta 10 minutos en implementarse. Puede utilizar el enlace Stack ID para supervisar el progreso en AWS CloudFormation.
Una vez completada la implementación, deberías ver tu nueva máquina de estados en la consola.
-
Aproveche esta información: puede revisar y editar la definición del flujo de trabajo. Es posible que tengas que establecer valores para los marcadores de posición en el proyecto de ejemplo antes de intentar ejecutar tu flujo de trabajo personalizado.
nota
Es posible que se apliquen cargos estándar por los servicios implementados en tu cuenta.
Paso 2: Ejecutar la máquina de estado
-
En la página Máquina de estado, elija su proyecto de muestra.
-
En la página del proyecto de muestra, seleccione Iniciar ejecución.
-
En el cuadro de diálogo Iniciar ejecución, haga lo siguiente:
-
(Opcional) Introduce un nombre de ejecución personalizado para anular el valor predeterminado generado.
ASCIINombres no identificables y registro
Step Functions acepta nombres para máquinas de estados, ejecuciones, actividades y etiquetas que no contengan ASCII caracteres. Como estos personajes no funcionan con Amazon CloudWatch, te recomendamos que utilices solo ASCII caracteres para poder hacer un seguimiento de las métricas CloudWatch.
-
(Opcional) En el cuadro de entrada, introduce los valores de entrada comoJSON. Puede omitir este paso si está realizando una demostración.
-
Seleccione Iniciar ejecución.
La consola Step Functions lo dirigirá a una página de detalles de ejecución en la que puede elegir estados en la vista de gráficos para explorar la información relacionada en el Detalles del paso panel.
-
Código de la máquina de estado de ejemplo
La máquina de estados de este proyecto de ejemplo se integra con AWS Lambda pasando la ETL lógica InputPath directamente a esos recursos y ejecutándola de forma asíncrona con Amazon Redshift Data. API
Examine la siguiente definición de máquina de estados para ver cómo Step Functions controla AWS Lambda y Amazon Redshift Data. API
Para obtener información sobre cómo Step Functions puede controlar otros AWS servicios, consulteIntegración de servicios con Step Functions.
{
"Comment": "An ETL workflow for loading dimension and fact tables",
"StartAt": "InitializeCheckCluster",
"States": {
"InitializeCheckCluster": {
"Type": "Pass",
"Next": "GetStateOfCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterAvailable",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterAvailable": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializeBuildDB"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "InitializeResumeCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "ClusterWait"
}
]
},
"ClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckCluster"
},
"InitializeResumeCluster": {
"Type": "Pass",
"Next": "ResumeCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "resume"
}
}
},
"ResumeCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "ClusterWait",
"InputPath": "$",
"ResultPath": "$"
},
"InitializeBuildDB": {
"Type": "Pass",
"Next": "BuildDB",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "build_database",
"sql_statement": [
"create schema if not exists {0} authorization {1};",
"create table if not exists {0}.customer",
"(c_customer_sk int4 not null encode az64",
",c_customer_id char(16) not null encode zstd",
",c_current_addr_sk int4 encode az64",
",c_first_name char(20) encode zstd",
",c_last_name char(30) encode zstd",
",primary key (c_customer_sk)",
") distkey(c_customer_sk);",
"--",
"create table if not exists {0}.customer_address",
"(ca_address_sk int4 not null encode az64",
",ca_address_id char(16) not null encode zstd",
",ca_state char(2) encode zstd",
",ca_zip char(10) encode zstd",
",ca_country varchar(20) encode zstd",
",primary key (ca_address_sk)",
") distkey(ca_address_sk);",
"--",
"create table if not exists {0}.date_dim",
"(d_date_sk integer not null encode az64",
",d_date_id char(16) not null encode zstd",
",d_date date encode az64",
",d_day_name char(9) encode zstd",
",primary key (d_date_sk)",
") diststyle all;",
"--",
"create table if not exists {0}.item",
"(i_item_sk int4 not null encode az64",
",i_item_id char(16) not null encode zstd",
",i_rec_start_date date encode az64",
",i_rec_end_date date encode az64",
",i_current_price numeric(7,2) encode az64",
",i_category char(50) encode zstd",
",i_product_name char(50) encode zstd",
",primary key (i_item_sk)",
") distkey(i_item_sk) sortkey(i_category);",
"--",
"create table if not exists {0}.store_sales",
"(ss_sold_date_sk int4",
",ss_item_sk int4 not null encode az64",
",ss_customer_sk int4 encode az64",
",ss_addr_sk int4 encode az64",
",ss_store_sk int4 encode az64",
",ss_ticket_number int8 not null encode az64",
",ss_quantity int4 encode az64",
",ss_net_paid numeric(7,2) encode az64",
",ss_net_profit numeric(7,2) encode az64",
",primary key (ss_item_sk, ss_ticket_number)",
") distkey(ss_item_sk) sortkey(ss_sold_date_sk);"
]
}
}
},
"BuildDB": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBuildDBStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetBuildDBStatus": {
"Type": "Task",
"Next": "CheckBuildDBStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBuildDBStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailBuildDB"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "InitializeBaselineData"
}
],
"Default": "BuildDBWait"
},
"BuildDBWait": {
"Type": "Wait",
"Seconds": 15,
"Next": "GetBuildDBStatus"
},
"FailBuildDB": {
"Type": "Fail",
"Cause": "Database Build Failed",
"Error": "Error"
},
"InitializeBaselineData": {
"Type": "Pass",
"Next": "LoadBaselineData",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_baseline_data",
"sql_statement": [
"begin transaction;",
"truncate table {0}.customer;",
"insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)",
"values",
"(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),",
"(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),",
"(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),",
"(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');",
"analyze {0}.customer;",
"--",
"truncate table {0}.customer_address;",
"insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),",
"(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),",
"(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');",
"analyze {0}.customer_address;",
"--",
"truncate table {0}.item;",
"insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),",
"(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),",
"(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),",
"(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),",
"(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),",
"(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');",
"--",
"analyze {0}.item;",
"truncate table {0}.date_dim;",
"insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)",
"values",
"(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),",
"(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),",
"(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),",
"(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),",
"(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),",
"(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),",
"(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),",
"(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),",
"(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');",
"--",
"analyze {0}.date_dim;",
"-- commit and End transaction",
"commit;",
"end transaction;"
]
}
}
},
"LoadBaselineData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBaselineData",
"InputPath": "$",
"ResultPath": "$"
},
"GetBaselineData": {
"Type": "Task",
"Next": "CheckBaselineData",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBaselineData": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailLoadBaselineData"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "ParallelizeDimensionLoadJob"
}
],
"Default": "BaselineDataWait"
},
"BaselineDataWait": {
"Type": "Wait",
"Seconds": 20,
"Next": "GetBaselineData"
},
"FailLoadBaselineData": {
"Type": "Fail",
"Cause": "Load Baseline Data Failed",
"Error": "Error"
},
"ParallelizeDimensionLoadJob": {
"Type": "Parallel",
"Next": "InitializeSalesFactLoadJob",
"ResultPath": "$.status",
"Branches": [
{
"StartAt": "InitializeCustomerAddressDimensionLoadJob",
"States": {
"InitializeCustomerAddressDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteCustomerAddressDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_customer_address",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_customer_address;",
"create table if not exists {0}.stg_customer_address",
"(ca_address_id varchar(16) encode zstd",
",ca_state varchar(2) encode zstd",
",ca_zip varchar(10) encode zstd",
",ca_country varchar(20) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"('AAAAAAAACFBBAAAA','NE','','United States'),",
"('AAAAAAAAGAEFAAAA','NE','61749','United States'),",
"('AAAAAAAAPJKKAAAA','OK','','United States'),",
"('AAAAAAAAMIHGAAAA','AL','','United States');",
"/* Perform UPDATE for existing data with refreshed attribute values */",
"update {0}.customer_address",
" set ca_state = stg_customer_address.ca_state,",
" ca_zip = stg_customer_address.ca_zip,",
" ca_country = stg_customer_address.ca_country",
" from {0}.stg_customer_address",
" where customer_address.ca_address_id = stg_customer_address.ca_address_id;",
"/* Perform insert for new rows */",
"insert into {0}.customer_address",
"(ca_address_sk",
",ca_address_id",
",ca_state",
",ca_zip",
",ca_country",
")",
"with max_customer_address_sk as",
"(select max(ca_address_sk) max_ca_address_sk",
"from {0}.customer_address)",
"select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk",
",stg_customer_address.ca_address_id",
",stg_customer_address.ca_state",
",stg_customer_address.ca_zip",
",stg_customer_address.ca_country",
"from {0}.stg_customer_address,",
"max_customer_address_sk",
"where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteCustomerAddressDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetCustomerAddressDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetCustomerAddressDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckCustomerAddressDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckCustomerAddressDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailCustomerAddressDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteCustomerAddressDimensionLoad"
}
],
"Default": "CustomerAddressWait"
},
"CustomerAddressWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetCustomerAddressDimensionLoadStatus"
},
"CompleteCustomerAddressDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailCustomerAddressDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
},
{
"StartAt": "InitializeItemDimensionLoadJob",
"States": {
"InitializeItemDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteItemDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_item",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_item;",
"create table if not exists {0}.stg_item",
"(i_item_id varchar(16) encode zstd",
",i_rec_start_date date encode zstd",
",i_rec_end_date date encode zstd",
",i_current_price numeric(7,2) encode zstd",
",i_category varchar(50) encode zstd",
",i_product_name varchar(50) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_item",
"(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),",
"('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),",
"('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),",
"('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),",
"('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),",
"('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),",
"('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),",
"('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');",
"/************************************************************************************************************************",
"** Type 2 is maintained for i_current_price column.",
"** Update all attributes for the item when the price is not changed",
"** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match",
"*************************************************************************************************************************/",
"update {0}.item",
" set i_category = stg_item.i_category,",
" i_product_name = stg_item.i_product_name",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price = stg_item.i_current_price;",
"insert into {0}.item",
"(i_item_sk",
",i_item_id",
",i_rec_start_date",
",i_rec_end_date",
",i_current_price",
",i_category",
",i_product_name",
")",
"with max_item_sk as",
"(select max(i_item_sk) max_item_sk",
" from {0}.item)",
"select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk",
" ,stg_item.i_item_id",
" ,trunc(sysdate) as i_rec_start_date",
" ,null as i_rec_end_date",
" ,stg_item.i_current_price",
" ,stg_item.i_category",
" ,stg_item.i_product_name",
" from {0}.stg_item, {0}.item, max_item_sk",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Sunset penultimate records that were inserted as type 2 */",
"update {0}.item",
" set i_rec_end_date = trunc(sysdate)",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteItemDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetItemDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetItemDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckItemDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckItemDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailItemDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteItemDimensionLoad"
}
],
"Default": "ItemWait"
},
"ItemWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetItemDimensionLoadStatus"
},
"CompleteItemDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailItemDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
}
]
},
"InitializeSalesFactLoadJob": {
"Type": "Pass",
"Next": "ExecuteSalesFactLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "load_sales_fact",
"sql_statement": [
"begin transaction;",
"/* Create a stg_store_sales staging table */",
"drop table if exists {0}.stg_store_sales;",
"create table {0}.stg_store_sales",
"(sold_date date encode zstd",
",i_item_id varchar(16) encode zstd",
",c_customer_id varchar(16) encode zstd",
",ca_address_id varchar(16) encode zstd",
",ss_ticket_number integer encode zstd",
",ss_quantity integer encode zstd",
",ss_net_paid numeric(7,2) encode zstd",
",ss_net_profit numeric(7,2) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_store_sales",
"(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)",
"values",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),",
"('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);",
"/* Delete any rows from target store_sales for the input date for idempotency */",
"delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');",
"/* Insert data from staging table to the target table */",
"insert into {0}.store_sales",
"(ss_sold_date_sk",
",ss_item_sk",
",ss_customer_sk",
",ss_addr_sk",
",ss_ticket_number",
",ss_quantity",
",ss_net_paid",
",ss_net_profit",
")",
"select date_dim.d_date_sk ss_sold_date_sk",
" ,item.i_item_sk ss_item_sk",
" ,customer.c_customer_sk ss_customer_sk",
" ,customer_address.ca_address_sk ss_addr_sk",
" ,ss_ticket_number",
" ,ss_quantity",
" ,ss_net_paid",
" ,ss_net_profit",
" from {0}.stg_store_sales as store_sales",
" inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date",
" left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null",
" left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id",
" left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;",
"/* Drop staging table */",
"drop table if exists {0}.stg_store_sales;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteSalesFactLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetSalesFactLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetSalesFactLoadStatus": {
"Type": "Task",
"Next": "CheckSalesFactLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckSalesFactLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesFactLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "SalesETLPipelineComplete"
}
],
"Default": "SalesWait"
},
"SalesWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetSalesFactLoadStatus"
},
"FailSalesFactLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
},
"ClusterUnavailable": {
"Type": "Fail",
"Cause": "Redshift cluster is not available",
"Error": "Error"
},
"SalesETLPipelineComplete": {
"Type": "Pass",
"Next": "ValidateSalesMetric",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "validate_sales_metric",
"sql_statement": [
"select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')"
]
}
}
},
"ValidateSalesMetric": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetValidateSalesMetricStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetValidateSalesMetricStatus": {
"Type": "Task",
"Next": "CheckValidateSalesMetricStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckValidateSalesMetricStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesMetricValidation"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "DataValidationComplete"
}
],
"Default": "SalesValidationWait"
},
"SalesValidationWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetValidateSalesMetricStatus"
},
"FailSalesMetricValidation": {
"Type": "Fail",
"Cause": "Data Validation Failed",
"Error": "Error"
},
"DataValidationComplete": {
"Type": "Pass",
"Next": "InitializePauseCluster"
},
"InitializePauseCluster": {
"Type": "Pass",
"Next": "PauseCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "pause"
}
}
},
"PauseCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "PauseClusterWait",
"InputPath": "$",
"ResultPath": "$.clusterStatus",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "ClusterPausedComplete"
}
]
},
"InitializeCheckPauseCluster": {
"Type": "Pass",
"Next": "GetStateOfPausedCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfPausedCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterPaused",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterPaused": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializePauseCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "ClusterPausedComplete"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "PauseClusterWait"
}
]
},
"PauseClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckPauseCluster"
},
"ClusterPausedComplete": {
"Type": "Pass",
"End": true
}
}
}
Ejemplo IAM de política
El siguiente ejemplo AWS Identity and Access Management La política (IAM) generada por el proyecto de ejemplo incluye el mínimo de privilegios necesarios para ejecutar la máquina de estados y los recursos relacionados. Le recomendamos que incluya solo los permisos que sean necesarios en sus IAM políticas.
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE"
],
"Effect": "Allow"
}
]
}
Para obtener información sobre cómo configurar el uso IAM de Step Functions con otros AWS servicios, consulteCómo Step Functions genera IAM políticas para servicios integrados.