

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Exemplos de código para o Amazon Managed Workflows for Apache Airflow
<a name="sample-code"></a>

Este guia contém exemplos de código, incluindo DAGs plug-ins personalizados, que você pode usar em um ambiente Amazon Managed Workflows para Apache Airflow. Para obter mais exemplos de uso do Apache Airflow AWS com serviços, consulte [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags)o diretório no repositório do Apache Airflow. GitHub 

**Topics**
+ [Como usar um DAG para importar variáveis na CLI](samples-variables-import.md)
+ [Como criar uma conexão SSH usando o `SSHOperator`](samples-ssh.md)
+ [Como usar uma chave secreta em AWS Secrets Manager para uma conexão Snowflake do Apache Airflow](samples-sm-snowflake.md)
+ [Como usar um DAG para gravar métricas personalizadas no CloudWatch](samples-custom-metrics.md)
+ [Limpeza do banco de dados do Aurora PostgreSQL em um ambiente do Amazon MWAA](samples-database-cleanup.md)
+ [Como exportar metadados do ambiente para arquivos CSV no Amazon S3](samples-dag-run-info-to-csv.md)
+ [Como usar uma chave secreta em AWS Secrets Manager para uma variável do Apache Airflow](samples-secrets-manager-var.md)
+ [Como usar uma chave secreta em AWS Secrets Manager para uma conexão do Apache Airflow](samples-secrets-manager.md)
+ [Como criar um plugin personalizado com a Oracle](samples-oracle.md)
+ [Como alterar o fuso horário de um DAG no Amazon MWAA](samples-plugins-timezone.md)
+ [Como atualizar um token CodeArtifact](samples-code-artifact.md)
+ [Como criar de um plug-in personalizado com o Apache Hive e o Hadoop](samples-hive.md)
+ [Como criar um plug-in personalizado para PythonVirtualEnvOperator do Apache Airflow](samples-virtualenv.md)
+ [Invocando DAGs com uma função Lambda](samples-lambda.md)
+ [Invocação DAGs em diferentes ambientes do Amazon MWAA](samples-invoke-dag.md)
+ [Como usar o Amazon MWAA com Amazon RDS para Microsoft SQL Server](samples-sql-server.md)
+ [Como usar o Amazon MWAA com o Amazon EKS](mwaa-eks-example.md)
+ [Como conectar-se ao Amazon ECS usando o `ECSOperator`](samples-ecs-operator.md)
+ [Como usar DBT com o Amazon MWAA](samples-dbt.md)
+ [AWS blogs e tutoriais](#samples-blogs-tutorials)

# Como usar um DAG para importar variáveis na CLI
<a name="samples-variables-import"></a>

O código de exemplo a seguir importa variáveis usando a CLI no Amazon Managed Workflows for Apache Airflow.

**Topics**
+ [Versão](#samples-variables-import-version)
+ [Pré-requisitos](#samples-variables-import-prereqs)
+ [Permissões](#samples-variables-import-permissions)
+ [Dependências](#samples-variables-import-dependencies)
+ [Exemplo de código](#samples-variables-import-code)
+ [Próximas etapas](#samples-variables-import-next-up)

## Versão
<a name="samples-variables-import-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-variables-import-prereqs"></a>

Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

## Permissões
<a name="samples-variables-import-permissions"></a>

Sua Conta da AWS precisa ter acesso à política `AmazonMWAAAirflowCliAccess`. Consulte [Política de CLI do Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md) para saber mais.

## Dependências
<a name="samples-variables-import-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2 e versões posteriores, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-variables-import-code"></a>

O exemplo de código a seguir usa três entradas: o nome do seu ambiente Amazon MWAA (em `mwaa_env`), a Região da AWS do seu ambiente (em `aws_region`) e o arquivo local que contém as variáveis que você deseja importar (em `var_file`).

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## Próximas etapas
<a name="samples-variables-import-next-up"></a>
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).

# Como criar uma conexão SSH usando o `SSHOperator`
<a name="samples-ssh"></a>

O exemplo a seguir descreve como é possível usar `SSHOperator` em um gráfico acíclico direcionado (DAG) para se conectar a uma instância remota do Amazon EC2 a partir do seu ambiente do Amazon Managed Workflows for Apache Airflow. É possível usar uma abordagem semelhante para se conectar a qualquer instância remota com acesso SSH.

No exemplo a seguir, você faz upload de uma chave secreta SSH (`.pem`) para o diretório `dags` do seu ambiente no Amazon S3. Em seguida, você instala as dependências necessárias usando `requirements.txt` e cria uma nova conexão do Apache Airflow na interface do usuário. Por fim, você grava um DAG que cria uma conexão SSH com a instância remota.

**Topics**
+ [Versão](#samples-ssh-version)
+ [Pré-requisitos](#samples-ssh-prereqs)
+ [Permissões](#samples-ssh-permissions)
+ [Requisitos](#samples-ssh-dependencies)
+ [Copie sua chave secreta para o Amazon S3](#samples-ssh-secret)
+ [Crie uma nova conexão com o Apache Airflow](#samples-ssh-connection)
+ [Exemplo de código](#samples-ssh-code)

## Versão
<a name="samples-ssh-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-ssh-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).
+ Uma chave secreta SSH. A amostra de código pressupõe que você tenha uma instância do Amazon EC2 e uma `.pem` na mesma região do seu ambiente Amazon MWAA. Se você não tiver uma chave, consulte [Criar ou importar um par de chaves](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair) no *Guia do usuário do Amazon EC2*.

## Permissões
<a name="samples-ssh-permissions"></a>

Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

## Requisitos
<a name="samples-ssh-dependencies"></a>

Adicione o parâmetro a seguir a `requirements.txt` para instalar o pacote `apache-airflow-providers-ssh` no servidor Web. Depois que o ambiente for atualizado e o Amazon MWAA instalar com sucesso a dependência, você verá um novo tipo de conexão **SSH** na interface do usuário.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**nota**  
`-c` define o URL das restrições em `requirements.txt`. Isso garante que o Amazon MAA instale a versão correta do pacote para seu ambiente.

## Copie sua chave secreta para o Amazon S3
<a name="samples-ssh-secret"></a>

Use o seguinte comando AWS Command Line Interface para copiar sua chave `.pem` para o diretório `dags` do seu ambiente no Amazon S3.

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

O Amazon MWAA copia o conteúdo em `dags`, incluindo a chave `.pem`, para o diretório local `/usr/local/airflow/dags/`. Ao fazer isso, o Apache Airflow pode acessar a chave.

## Crie uma nova conexão com o Apache Airflow
<a name="samples-ssh-connection"></a>

**Para criar uma nova conexão SSH usando a IU do Apache Airflow**

1. Abra a [página Ambientes](https://console.aws.amazon.com/mwaa/home#/environments) no console do Amazon MWAA.

1. Na lista de ambientes, escolha **Abrir a IU do Airflow** em seu ambiente.

1. Na página IU do Apache Airflow, selecione **Admin** na barra de navegação superior para expandir a lista suspensa e selecione **Conexões**.

1. Na página **Listar conexões**, escolha o botão **\$1** ou **Adicionar um novo registro** para adicionar uma nova conexão.

1. Na página **Adicionar conexão**, forneça as seguintes informações:

   1. Em **Id da conexão**, insira **ssh\$1new**.

   1. Em **Tipo de conexão**, selecione **SSH** na lista suspensa.
**nota**  
Se o tipo de conexão **SSH** não estiver disponível na lista, o Amazon MWAA não instalou o pacote de `apache-airflow-providers-ssh` necessário. Atualize seu arquivo `requirements.txt` para incluir esse pacote e tente novamente.

   1. Para **Host**, insira o endereço IP para a instância do Amazon EC2 à qual você deseja se conectar. Por exemplo, **12.345.67.89**.

   1. Em **Nome de usuário**, insira **ec2-user** se você está se conectando a uma instância do Amazon EC2. Seu nome de usuário pode ser diferente, dependendo do tipo de instância remota à qual você deseja que o Apache Airflow se conecte.

   1. Para **Extra**, insira o seguinte par de chave-valor no formato JSON:

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      Esse par de chave-valor instrui o Apache Airflow a procurar a chave secreta no diretório `/dags` local.

## Exemplo de código
<a name="samples-ssh-code"></a>

O DAG a seguir usa `SSHOperator` para se conectar à sua instância de destino do Amazon EC2 e, em seguida, executa o comando `hostname` do Linux para imprimir o nome da instância. É possível modificar o DAG para executar qualquer comando ou script na instância remota.

1. Abra um terminal e navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `ssh.py`.

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  Execute o seguinte comando AWS CLI para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a IU do Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se tiver êxito, você verá um resultado semelhante ao seguinte nos logs de tarefas para `ssh_task` no DAG `ssh_operator_example`:

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# Como usar uma chave secreta em AWS Secrets Manager para uma conexão Snowflake do Apache Airflow
<a name="samples-sm-snowflake"></a>

O exemplo a seguir chama AWS Secrets Manager para obter uma chave secreta para uma conexão Snowflake do Apache Airflow no Amazon Managed Workflows for Apache Airflow. Pressupõe-se que você tenha concluído as etapas em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Versão](#samples-sm-snowflake-version)
+ [Pré-requisitos](#samples-sm-snowflake-prereqs)
+ [Permissões](#samples-sm-snowflake-permissions)
+ [Requisitos](#samples-sm-snowflake-dependencies)
+ [Exemplo de código](#samples-sm-snowflake-code)
+ [Próximas etapas](#samples-sm-snowflake-next-up)

## Versão
<a name="samples-sm-snowflake-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-sm-snowflake-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ O backend do Secrets Manager como uma opção de configuração do Apache Airflow, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).
+ Uma string de conexão do Apache Airflow no Secrets Manager, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

## Permissões
<a name="samples-sm-snowflake-permissions"></a>
+ As permissões do Secrets Manager, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

## Requisitos
<a name="samples-sm-snowflake-dependencies"></a>

Para usar o código de amostra nesta página, adicione as seguintes dependências ao seu `requirements.txt`. Consulte [Como instalar dependências do Python](working-dags-dependencies.md) para saber mais.

```
apache-airflow-providers-snowflake==1.3.0
```

## Exemplo de código
<a name="samples-sm-snowflake-code"></a>

As etapas a seguir descrevem como criar o código DAG que chama o Secrets Manager para obter o segredo.

1. No prompt de comando, navegue até o diretório em que o código do DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `snowflake_connection.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## Próximas etapas
<a name="samples-sm-snowflake-next-up"></a>
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).

# Como usar um DAG para gravar métricas personalizadas no CloudWatch
<a name="samples-custom-metrics"></a>

Você pode usar o exemplo de código a seguir para gravar um gráfico acíclico direcionado (DAG) que executa `PythonOperator` para recuperar métricas em nível de SO para um ambiente Amazon MWAA. O DAG, então, publica os dados como métricas personalizadas para o Amazon CloudWatch.

As métricas personalizadas de nível de SO fornecem visibilidade adicional sobre como os operadores do ambiente estão utilizando recursos como memória virtual e CPU. Você pode usar essas informações para selecionar a [classe de ambiente](environment-class.md) mais adequada à sua workload.

**Topics**
+ [Versão](#samples-custom-metrics-version)
+ [Pré-requisitos](#samples-custom-metrics-prereqs)
+ [Permissões](#samples-custom-metrics-permissions)
+ [Dependências](#samples-custom-metrics-dependencies)
+ [Exemplo de código](#samples-custom-metrics-code)

## Versão
<a name="samples-custom-metrics-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-custom-metrics-prereqs"></a>

Para usar o código de exemplo nesta página, você precisará de:
+ Um [ambiente Amazon MWAA](get-started.md).

## Permissões
<a name="samples-custom-metrics-permissions"></a>

Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

## Dependências
<a name="samples-custom-metrics-dependencies"></a>
+ Nenhuma dependência adicional é necessária para usar o exemplo de código desta página.

## Exemplo de código
<a name="samples-custom-metrics-code"></a>

1. No prompt de comando, navegue até a pasta em que seu código do DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo do exemplo de código a seguir e salve-o localmente como `dag-custom-metrics.py`. Substitua `MWAA-ENV-NAME` pelo nome do seu ambiente.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  Execute o seguinte comando AWS CLI para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a IU do Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se o DAG for executado corretamente, você obterá algo semelhante ao seguinte em seus logs do Apache Airflow:

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Limpeza do banco de dados do Aurora PostgreSQL em um ambiente do Amazon MWAA
<a name="samples-database-cleanup"></a>

A solução Workflows gerenciados pela Amazon para Apache Airflow usa um banco de dados Aurora PostgreSQL como o banco de dados de metadados do Apache Airflow, onde o DAG é executado e as instâncias das tarefas são armazenadas. O código de exemplo a seguir limpa periodicamente as entradas do banco de dados Aurora PostgreSQL dedicado para seu ambiente Amazon MWAA.

**Topics**
+ [Versão](#samples-database-cleanup-version)
+ [Pré-requisitos](#samples-database-cleanup-prereqs)
+ [Dependências](#samples-sql-server-dependencies)
+ [Exemplo de código](#samples-database-cleanup-code)

## Versão
<a name="samples-database-cleanup-version"></a>

Os exemplos de código nesta página são específicos do Apache Airflow v2 compatível com o Amazon MWAA. Consulte as [versões compatíveis do Apache Airflow](airflow-versions.md).

**dica**  
**Para usuários do Apache Airflow v3**: se você quiser limpar um banco de dados (limpar registros antigos das tabelas do metastore), execute o comando da CLI `db clean`.

## Pré-requisitos
<a name="samples-database-cleanup-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).

## Dependências
<a name="samples-sql-server-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-database-cleanup-code"></a>

O DAG a seguir limpa o banco de dados de metadados das tabelas especificadas em `TABLES_TO_CLEAN`. O exemplo exclui dados das tabelas especificadas com mais de 30 dias. Para ajustar até que ponto as entradas são excluídas, defina `MAX_AGE_IN_DAYS` para um outro valor.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# Como exportar metadados do ambiente para arquivos CSV no Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Use o exemplo de código a seguir para criar um gráfico acíclico direcionado (DAG) que consulta o banco de dados em busca de uma variedade de informações de execução do DAG e grava os dados em arquivos `.csv` armazenados no Amazon S3.

Talvez você queira exportar informações do banco de dados Aurora PostgreSQL do seu ambiente para inspecionar os dados localmente, arquivá-los no armazenamento de objetos ou combiná-los com ferramentas como o [operador Amazon S3 para Amazon Redshift](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) e a [limpeza do banco de dados](samples-database-cleanup.md), a fim de mover os metadados do Amazon MWAA do ambiente, mas preservá-los para análise futura.

É possível consultar o banco de dados para qualquer um dos objetos listados nos [Modelos do Apache Airflow](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models). Esse exemplo de código usa três modelos, `DagRun`, `TaskFail` e `TaskInstance`, que fornecem informações relevantes para executar o DAG.

**Topics**
+ [Versão](#samples-dag-run-info-to-csv-version)
+ [Pré-requisitos](#samples-dag-run-info-to-csv-prereqs)
+ [Permissões](#samples-dag-run-info-to-csv-permissions)
+ [Requisitos](#samples-dag-run-info-to-csv-dependencies)
+ [Exemplo de código](#samples-dag-run-info-to-csv-code)

## Versão
<a name="samples-dag-run-info-to-csv-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).
+ Um [novo bucket do Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) no qual você deseja exportar suas informações de metadados.

## Permissões
<a name="samples-dag-run-info-to-csv-permissions"></a>

O Amazon MWAA precisa de permissão para que a ação `s3:PutObject` do Amazon S3 grave as informações de metadados consultadas no Amazon S3. Adicione a seguinte declaração de política ao perfil de execução do seu ambiente.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

Essa política limita somente o acesso de gravação *amzn-s3-demo-bucket* a.

## Requisitos
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2 e versões posteriores, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-dag-run-info-to-csv-code"></a>

As etapas a seguir descrevem como é possível criar um DAG que consulta o Aurora PostgreSQL e grava o resultado em seu novo bucket do Amazon S3.

1. Em seu terminal, navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo do exemplo de código a seguir e salve-o localmente como `metadata_to_csv.py`. É possível alterar o valor atribuído para `MAX_AGE_IN_DAYS` para controlar a idade dos registros mais antigos que seu DAG consulta no banco de dados de metadados.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Execute o AWS CLI comando a seguir para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a interface do Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se for bem-sucedido, você exibirá uma saída semelhante à seguinte nos logs de tarefas da tarefa `export_db`:

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Agora é possível acessar e baixar os arquivos `.csv` exportados em seu novo bucket do Amazon S3 em `/files/export/`.

# Como usar uma chave secreta em AWS Secrets Manager para uma variável do Apache Airflow
<a name="samples-secrets-manager-var"></a>

O exemplo a seguir chama AWS Secrets Manager para obter uma chave secreta para uma variável Apache Airflow no Amazon Managed Workflows for Apache Airflow. Pressupõe-se que você tenha concluído as etapas em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Versão](#samples-secrets-manager-var-version)
+ [Pré-requisitos](#samples-secrets-manager-var-prereqs)
+ [Permissões](#samples-secrets-manager-var-permissions)
+ [Requisitos](#samples-hive-dependencies)
+ [Exemplo de código](#samples-secrets-manager-var-code)
+ [Próximas etapas](#samples-secrets-manager-var-next-up)

## Versão
<a name="samples-secrets-manager-var-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-secrets-manager-var-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ O backend do Secrets Manager como uma opção de configuração do Apache Airflow, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).
+ Uma string variável do Apache Airflow no Secrets Manager, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

## Permissões
<a name="samples-secrets-manager-var-permissions"></a>
+ As permissões do Secrets Manager, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

## Requisitos
<a name="samples-hive-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2 e versões posteriores, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-secrets-manager-var-code"></a>

As etapas a seguir descrevem como criar o código DAG que chama o Secrets Manager para obter o segredo.

1. No prompt de comando, navegue até o diretório em que o código do DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `secrets-manager-var.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## Próximas etapas
<a name="samples-secrets-manager-var-next-up"></a>
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).

# Como usar uma chave secreta em AWS Secrets Manager para uma conexão do Apache Airflow
<a name="samples-secrets-manager"></a>

O exemplo a seguir chama AWS Secrets Manager para obter uma chave secreta para uma conexão Apache Airflow no Amazon Managed Workflows for Apache Airflow. Pressupõe-se que você tenha concluído as etapas em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Versão](#samples-secrets-manager-version)
+ [Pré-requisitos](#samples-secrets-manager-prereqs)
+ [Permissões](#samples-secrets-manager-permissions)
+ [Requisitos](#samples-hive-dependencies)
+ [Exemplo de código](#samples-secrets-manager-code)
+ [Próximas etapas](#samples-secrets-manager-next-up)

## Versão
<a name="samples-secrets-manager-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-secrets-manager-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ O backend do Secrets Manager como uma opção de configuração do Apache Airflow, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).
+ Uma string de conexão do Apache Airflow no Secrets Manager, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

## Permissões
<a name="samples-secrets-manager-permissions"></a>
+ As permissões do Secrets Manager, conforme mostrado em [Configurando uma conexão do Apache Airflow usando um segredo AWS Secrets Manager](connections-secrets-manager.md).

## Requisitos
<a name="samples-hive-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2 e versões posteriores, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-secrets-manager-code"></a>

As etapas a seguir descrevem como criar o código DAG que chama o Secrets Manager para obter o segredo.

1. No prompt de comando, navegue até o diretório em que o código do DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `secrets-manager.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## Próximas etapas
<a name="samples-secrets-manager-next-up"></a>
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).

# Como criar um plugin personalizado com a Oracle
<a name="samples-oracle"></a>

O exemplo a seguir mostra as etapas para criar um plug-in personalizado usando a Oracle para Amazon MWAA e pode ser combinado com outros plug-ins e binários personalizados no arquivo plugins.zip.

**Contents**
+ [Versão](#samples-oracle-version)
+ [Pré-requisitos](#samples-oracle-prereqs)
+ [Permissões](#samples-oracle-permissions)
+ [Requisitos](#samples-oracle-dependencies)
+ [Exemplo de código](#samples-oracle-code)
+ [Criar o plugin personalizado](#samples-oracle-create-pluginszip-steps)
  + [Download de dependências](#samples-oracle-install)
  + [Plug-in personalizado](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Opções de configuração do Airflow](#samples-oracle-airflow-config)
+ [Próximas etapas](#samples-oracle-next-up)

## Versão
<a name="samples-oracle-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-oracle-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).
+ Registro de logs do operador habilitado em qualquer nível de log, `CRITICAL` ou na seção anterior para seu ambiente. Para obter mais informações sobre os tipos de log do Amazon MWAA e como gerenciar seus grupos de logs, consulte [Acessando registros do Airflow na Amazon CloudWatch](monitoring-airflow.md)

## Permissões
<a name="samples-oracle-permissions"></a>

Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

## Requisitos
<a name="samples-oracle-dependencies"></a>

Para usar o código de amostra nesta página, adicione as seguintes dependências ao seu `requirements.txt`. Consulte [Como instalar dependências do Python](working-dags-dependencies.md) para saber mais.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## Exemplo de código
<a name="samples-oracle-code"></a>

As etapas a seguir descrevem como criar o código do DAG que testará o plugin personalizado.

1. No prompt de comando, navegue até o diretório em que o código do DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `oracle.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## Criar o plugin personalizado
<a name="samples-oracle-create-pluginszip-steps"></a>

Esta seção descreve como baixar as dependências, criar o plug-in personalizado e o plugins.zip.

### Download de dependências
<a name="samples-oracle-install"></a>

O Amazon MWAA extrairá o conteúdo do plugins.zip em `/usr/local/airflow/plugins` sobre cada contêiner de agendador e trabalho do Amazon MWAA. Isso é usado para adicionar binários ao seu ambiente. As etapas a seguir descrevem como montar os arquivos necessários para o plugin personalizado.

**Extraia a imagem de contêiner do Amazon Linux**

1. Em seu prompt de comando, extraia a imagem do contêiner Amazon Linux e execute o contêiner localmente. Por exemplo:

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   Seu prompt de comando deve invocar uma linha de comando bash. Por exemplo:

   ```
   bash-4.2#
   ```

1. Instale o recurso de I/O assíncrono nativo do Linux (libaio).

   ```
   yum -y install libaio
   ```

1. Mantenha essa janela aberta para as etapas subsequentes. Copiaremos os seguintes arquivos localmente: `lib64/libaio.so.1`, `lib64/libaio.so.1.0.0`, `lib64/libaio.so.1.0.1`.

**Baixe a pasta do cliente**

1. Instale o pacote de descompactação localmente. Por exemplo:

   ```
   sudo yum install unzip
   ```

1. Crie um diretório `oracle_plugin`. Por exemplo:

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. Use o seguinte comando cURL para baixar o [instantclient-basic-linux.x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip) do [Oracle Instant Client Downloads for Linux x86-64 (64-bit)](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html).

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. Descompacte o arquivo `client.zip`. Por exemplo:

   ```
   unzip *.zip
   ```

**Extrair arquivos do Docker**

1. Em um novo prompt de comando, exiba e grave a ID do contêiner do Docker. Por exemplo:

   ```
   docker container ls
   ```

   Seu prompt de comando deve retornar todos os contêineres e as respectivas IDs. Por exemplo:

   ```
   debc16fd6970
   ```

1. Em seu diretório `oracle_plugin`, extraia os arquivos `lib64/libaio.so.1`, `lib64/libaio.so.1.0.0`, `lib64/libaio.so.1.0.1` para a pasta local `instantclient_18_5`. Por exemplo:

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### Plug-in personalizado
<a name="samples-oracle-plugins-code"></a>

O Apache Airflow executará o conteúdo dos arquivos Python na pasta de plugins na inicialização. Isto é usado para definir e modificar variáveis de ambiente. As seguintes etapas descrevem o código de exemplo para o plugin personalizado.
+ Copie o conteúdo da amostra de código a seguir e salve localmente como `env_var_plugin_oracle.py`.

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

As etapas a seguir explicam como criar `plugins.zip`. O conteúdo deste exemplo pode ser combinado com seus outros plug-ins e binários em um único arquivo `plugins.zip`.

**Compacte o conteúdo do diretório do plug-in**

1. Em um prompt de comando, navegue até o diretório `oracle_plugin`. Por exemplo:

   ```
   cd oracle_plugin
   ```

1. Compacte o diretório `instantclient_18_5` em plugins.zip. Por exemplo:

   ```
   zip -r ../plugins.zip ./
   ```

   Seu prompt de comando exibe:

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. Remova o arquivo `client.zip`. Por exemplo:

   ```
   rm client.zip
   ```

**Compacte o arquivo env\$1var\$1plugin\$1oracle.py**

1. Adicione o arquivo `env_var_plugin_oracle.py` à raiz do plugins.zip. Por exemplo:

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. Agora seu arquivo plugins.zip inclui o seguinte:

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Opções de configuração do Airflow
<a name="samples-oracle-airflow-config"></a>

Se você estiver usando o Apache Airflow v2, adicione `core.lazy_load_plugins : False` como uma opção de configuração do Apache Airflow. Para saber mais, consulte [Usar opções de configuração para carregar plug-ins em 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Próximas etapas
<a name="samples-oracle-next-up"></a>
+ Saiba como fazer o upload do `requirements.txt` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar dependências do Python](working-dags-dependencies.md).
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).
+ Saiba mais sobre como fazer o upload do `plugins.zip` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar plug-ins personalizados](configuring-dag-import-plugins.md).

# Como alterar o fuso horário de um DAG no Amazon MWAA
<a name="samples-plugins-timezone"></a>

Por padrão, o Apache Airflow programa seu gráfico acíclico direcionado (DAG) em UTC\$10. As etapas a seguir mostram como é possível alterar o fuso horário no qual o Amazon MWAA executa seus DAGs com o [Pendulum](https://pypi.org/project/pendulum/). Como opção, este tópico demonstra como é possível criar um plug-in personalizado para alterar o fuso horário dos logs do Apache Airflow do seu ambiente.

**Topics**
+ [Versão](#samples-plugins-timezone-version)
+ [Pré-requisitos](#samples-plugins-timezone-prerequisites)
+ [Permissões](#samples-plugins-timezone-permissions)
+ [Crie um plug-in para alterar o fuso horário nos logs do Airflow](#samples-plugins-timezone-custom-plugin)
+ [Crie um `plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [Exemplo de código](#samples-plugins-timezone-dag)
+ [Próximas etapas](#samples-plugins-timezone-plugins-next-up)

## Versão
<a name="samples-plugins-timezone-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-plugins-timezone-prerequisites"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).

## Permissões
<a name="samples-plugins-timezone-permissions"></a>

Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

## Crie um plug-in para alterar o fuso horário nos logs do Airflow
<a name="samples-plugins-timezone-custom-plugin"></a>

O Apache Airflow executará os arquivos Python no diretório `plugins` na inicialização. Com o plug-in a seguir, é possível substituir o fuso horário do executor, o que modifica o fuso horário no qual o Apache Airflow grava os logs.

1. Crie um diretório com o nome `plugins` para seu plug-in personalizado e navegue até o diretório. Por exemplo:

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. Copie o conteúdo do exemplo do código a seguir e salve localmente como `dag-timezone-plugin.py` no arquivo `plugins`.

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. No diretório `plugins`, crie um arquivo Python vazio chamado `__init__.py`. Seu diretório `plugins` deve ser semelhante ao seguinte:

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## Crie um `plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

As etapas a seguir explicam como criar `plugins.zip`. O conteúdo deste exemplo pode ser combinado com outros plug-ins e binários em um único arquivo `plugins.zip`.

1. Em sua prompt linha de comando, navegue até o diretório `plugins` da etapa anterior. Por exemplo:

   ```
   cd plugins
   ```

1. Compacte o conteúdo em seu diretório `plugins`.

   ```
   zip -r ../plugins.zip ./
   ```

1. Faça upload de `plugins.zip` para o seu bucket S3.

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## Exemplo de código
<a name="samples-plugins-timezone-dag"></a>

Para alterar o fuso horário padrão (UTC\$10) no qual o DAG é executado, usaremos uma biblioteca chamada [Pendulum](https://pypi.org/project/pendulum/), uma biblioteca Python para trabalhar com data e hora com reconhecimento de fuso horário.

1. No prompt de comando, navegue até o diretório em que seus DAGs estão armazenados. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo do exemplo a seguir e salve como `tz-aware-dag.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  Execute o seguinte comando AWS CLI para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a IU do Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se tiver êxito, você exibirá uma saída semelhante à seguinte nos logs de tarefas para `tz_aware_task` no DAG `tz_test`:

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## Próximas etapas
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ Saiba mais sobre como fazer o upload do `plugins.zip` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar plug-ins personalizados](configuring-dag-import-plugins.md).

# Como atualizar um token CodeArtifact
<a name="samples-code-artifact"></a>

Se você estiver usando o CodeArtifact para instalar dependências do Python, o Amazon MWAA exigirá um token ativo. Para permitir que o Amazon MWAA acesse um repositório CodeArtifact em runtime, é possível usar um [script de startup](using-startup-script.md) e configurar [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url) com o token.

O tópico a seguir descreve como é possível criar um script de startup que usa a operação da API CodeArtifact [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token) para recuperar um novo token sempre que seu ambiente for inicializado ou atualizado.

**Topics**
+ [Versão](#samples-code-artifact-version)
+ [Pré-requisitos](#samples-code-artifact-prereqs)
+ [Permissões](#samples-code-artifact-permissions)
+ [Exemplo de código](#samples-code-artifact-code)
+ [Próximas etapas](#samples-code-artifact-next-up)

## Versão
<a name="samples-code-artifact-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-code-artifact-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).
+ Um [repositório CodeArtifact](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html) onde você armazena dependências para seu ambiente.

## Permissões
<a name="samples-code-artifact-permissions"></a>

Para atualizar o token CodeArtifact e gravar o resultado no Amazon S3, o Amazon MWAA deve ter as seguintes permissões no perfil de execução.
+ A ação `codeartifact:GetAuthorizationToken` permite que o Amazon MWAA recupere um novo token do CodeArtifact. A política a seguir concede permissão para cada domínio do CodeArtifact que você criar. É possível restringir ainda mais o acesso aos seus domínios modificando o valor do recurso na instrução e especificando somente os domínios que você deseja que seu ambiente acesse.

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ A ação `sts:GetServiceBearerToken` é necessária para chamar a operação da API CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html). Essa operação retorna um token que deve ser usado ao usar um gerenciador de pacotes, como `pip` com CodeArtifact. Para usar um gerenciador de pacotes com um repositório CodeArtifact, um perfil de execução do seu ambiente deve permitir `sts:GetServiceBearerToken`, conforme mostrado na instrução da política a seguir.

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## Exemplo de código
<a name="samples-code-artifact-code"></a>

As etapas a seguir descrevem como é possível criar um script de inicialização que atualize o token CodeArtifact.

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `code_artifact_startup_script.sh`.

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. Navegue até a pasta em que você salvou o script. Use `cp` em uma nova janela de prompt para fazer o upload do script em seu bucket. Substitua *amzn-s3-demo-bucket* por suas informações.

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   Se tiver êxito, o Amazon S3 envia o caminho da URL para o objeto:

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   Depois de fazer o upload do script, seu ambiente atualiza e executa o script no startup.

## Próximas etapas
<a name="samples-code-artifact-next-up"></a>
+ Saiba como usar scripts de startup para personalizar seu ambiente em [Como usar um script de startup com o Amazon MWAA](using-startup-script.md).
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).
+ Saiba mais sobre como fazer o upload do `plugins.zip` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar plug-ins personalizados](configuring-dag-import-plugins.md).

# Como criar de um plug-in personalizado com o Apache Hive e o Hadoop
<a name="samples-hive"></a>

O Amazon MWAA extrai o conteúdo de um `plugins.zip` para `/usr/local/airflow/plugins`. Isso pode ser usado para adicionar binários aos seus contêineres. Além disso, o Apache Airflow executa o conteúdo dos arquivos Python na pasta `plugins` em *inicialização*, permitindo que você defina e modifique variáveis de ambiente. O exemplo a seguir mostra as etapas para criar um plug-in personalizado usando o Apache Hive e o Hadoop em um ambiente Amazon Managed Workflows for Apache Airflow e pode ser combinado com outros plug-ins e binários personalizados.

**Topics**
+ [Versão](#samples-hive-version)
+ [Pré-requisitos](#samples-hive-prereqs)
+ [Permissões](#samples-hive-permissions)
+ [Requisitos](#samples-hive-dependencies)
+ [Download de dependências](#samples-hive-install)
+ [Plug-in personalizado](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [Exemplo de código](#samples-hive-code)
+ [Opções de configuração do Airflow](#samples-hive-airflow-config)
+ [Próximas etapas](#samples-hive-next-up)

## Versão
<a name="samples-hive-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-hive-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).

## Permissões
<a name="samples-hive-permissions"></a>

Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

## Requisitos
<a name="samples-hive-dependencies"></a>

Para usar o código de amostra nesta página, adicione as seguintes dependências ao seu `requirements.txt`. Consulte [Como instalar dependências do Python](working-dags-dependencies.md) para saber mais.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## Download de dependências
<a name="samples-hive-install"></a>

O Amazon MWAA extrairá o conteúdo do plugins.zip em `/usr/local/airflow/plugins` sobre cada contêiner de agendador e trabalho do Amazon MWAA. Isso é usado para adicionar binários ao seu ambiente. As etapas a seguir descrevem como montar os arquivos necessários para o plugin personalizado.

1. No prompt de comando, navegue até o diretório em que você gostaria de criar seu plug-in. Por exemplo:

   ```
   cd plugins
   ```

1. Baixe o [Hadoop](https://hadoop.apache.org/) de um [espelho](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz), por exemplo:

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. Baixe o [Hive](https://hive.apache.org/) de um [espelho](https://www.apache.org/dyn/closer.cgi/hive/), por exemplo:

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. Crie um diretório. Por exemplo:

   ```
   mkdir hive_plugin
   ```

1. Extraia o Hadoop.

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Extraia o Hive.

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## Plug-in personalizado
<a name="samples-hive-plugins-code"></a>

O Apache Airflow executará o conteúdo dos arquivos Python na pasta de plugins na inicialização. Isto é usado para definir e modificar variáveis de ambiente. As seguintes etapas descrevem o código de exemplo para o plugin personalizado.

1. Em um prompt de comando, navegue até o diretório `hive_plugin`. Por exemplo:

   ```
   cd hive_plugin
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `hive_plugin.py` no diretório `hive_plugin`.

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. Copie o conteúdo do texto a seguir e salve localmente como `.airflowignore` no diretório `hive_plugin`.

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

As etapas a seguir explicam como criar `plugins.zip`. O conteúdo deste exemplo pode ser combinado com outros plug-ins e binários em um único arquivo `plugins.zip`.

1. Em sua prompt linha de comando, navegue até o diretório `hive_plugin` da etapa anterior. Por exemplo:

   ```
   cd hive_plugin
   ```

1. Compacte o conteúdo em sua pasta `plugins`.

   ```
   zip -r ../hive_plugin.zip ./
   ```

## Exemplo de código
<a name="samples-hive-code"></a>

As etapas a seguir descrevem como criar o código do DAG que testará o plugin personalizado.

1. No prompt de comando, navegue até o diretório em que o código do DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `hive.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Opções de configuração do Airflow
<a name="samples-hive-airflow-config"></a>

Se você estiver usando o Apache Airflow v2, adicione `core.lazy_load_plugins : False` como uma opção de configuração do Apache Airflow. Para saber mais, consulte [Usar opções de configuração para carregar plug-ins em 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Próximas etapas
<a name="samples-hive-next-up"></a>
+ Saiba como fazer o upload do `requirements.txt` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar dependências do Python](working-dags-dependencies.md).
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).
+ Saiba mais sobre como fazer o upload do `plugins.zip` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar plug-ins personalizados](configuring-dag-import-plugins.md).

# Como criar um plug-in personalizado para PythonVirtualEnvOperator do Apache Airflow
<a name="samples-virtualenv"></a>

O exemplo a seguir explica como corrigir o `PythonVirtualenvOperator` do Apache Airflow com um plug-in personalizado no Amazon Managed Workflows para o Apache Airflow.

**Topics**
+ [Versão](#samples-virtualenv-version)
+ [Pré-requisitos](#samples-virtualenv-prereqs)
+ [Permissões](#samples-virtualenv-permissions)
+ [Requisitos](#samples-virtualenv-dependencies)
+ [Código de exemplo de plugin personalizado](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [Exemplo de código](#samples-virtualenv-code)
+ [Opções de configuração do Airflow](#samples-virtualenv-airflow-config)
+ [Próximas etapas](#samples-virtualenv-next-up)

## Versão
<a name="samples-virtualenv-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-virtualenv-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).

## Permissões
<a name="samples-virtualenv-permissions"></a>

Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

## Requisitos
<a name="samples-virtualenv-dependencies"></a>

Para usar o código de amostra nesta página, adicione as seguintes dependências ao seu `requirements.txt`. Consulte [Como instalar dependências do Python](working-dags-dependencies.md) para saber mais.

```
virtualenv
```

## Código de exemplo de plugin personalizado
<a name="samples-virtualenv-plugins-code"></a>

O Apache Airflow executará o conteúdo dos arquivos Python na pasta de plugins na inicialização. Esse plug-in corrigirá o `PythonVirtualenvOperator` integrado durante o processo de inicialização para torná-lo compatível com o Amazon MWAA. As seguintes etapas exibem o exemplo de código para o plugin personalizado.

1. Em seu prompt de comando, navegue até o diretório `plugins` da etapa anterior. Por exemplo:

   ```
   cd plugins
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `virtual_python_plugin.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

As etapas a seguir explicam como criar `plugins.zip`.

1. Em seu prompt de comando, navegue até o diretório que contém `virtual_python_plugin.py` da etapa anterior. Por exemplo:

   ```
   cd plugins
   ```

1. Compacte o conteúdo em sua pasta `plugins`.

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## Exemplo de código
<a name="samples-virtualenv-code"></a>

As etapas a seguir descrevem como criar o código do DAG para o plugin personalizado.

1. No prompt de comando, navegue até o diretório em que o código do DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `virtualenv_test.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Opções de configuração do Airflow
<a name="samples-virtualenv-airflow-config"></a>

Se você estiver usando o Apache Airflow v2, adicione `core.lazy_load_plugins : False` como uma opção de configuração do Apache Airflow. Para saber mais, consulte [Usar opções de configuração para carregar plug-ins em 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Próximas etapas
<a name="samples-virtualenv-next-up"></a>
+ Saiba como fazer o upload do `requirements.txt` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar dependências do Python](working-dags-dependencies.md).
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).
+ Saiba mais sobre como fazer o upload do `plugins.zip` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar plug-ins personalizados](configuring-dag-import-plugins.md).

# Invocando DAGs com uma função Lambda
<a name="samples-lambda"></a>

O exemplo de código a seguir usa uma função [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) para obter um token CLI do Apache Airflow e invocar um gráfico acíclico direcionado (DAG) em um ambiente Amazon MWAA.

**Topics**
+ [Versão](#samples-lambda-version)
+ [Pré-requisitos](#samples-lambda-prereqs)
+ [Permissões](#samples-lambda-permissions)
+ [Dependências](#samples-lambda-dependencies)
+ [Exemplo de código](#samples-lambda-code)

## Versão
<a name="samples-lambda-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-lambda-prereqs"></a>

Para usar esse exemplo de código, você deve:
+ Usar o [modo de acesso à rede pública](configuring-networking.md#webserver-options-public-network-onconsole) para seu ambiente [Amazon MWAA](get-started.md).
+ Ter uma [função do Lambda](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html) usando o runtime mais recente do Python.

**nota**  
Se a função do Lambda e seu ambiente Amazon MWAA estiverem na mesma VPC, você poderá usar esse código em uma rede privada. Para essa configuração, o perfil de execução da função do Lambda precisa de permissão para chamar a operação da API **CreateNetworkInterface** do Amazon Elastic Compute Cloud (Amazon EC2). Você pode fornecer essa permissão usando a política [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS gerenciada.

## Permissões
<a name="samples-lambda-permissions"></a>

Para usar o exemplo de código nesta página, o perfil de execução do seu ambiente Amazon MWAA precisa de acesso para realizar a ação `airflow:CreateCliToken`. Você pode fornecer essa permissão usando a política `AmazonMWAAAirflowCliAccess` AWS-managed:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Para obter mais informações, consulte [Política de CLI do Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dependências
<a name="samples-lambda-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2 e versões posteriores, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-lambda-code"></a>

1. Abra o AWS Lambda console em [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/).

1. Escolha sua função do Lambda na lista **Funções**.

1. Na página da função, copie o código a seguir e substitua-o pelos nomes dos seus recursos:
   + `YOUR_ENVIRONMENT_NAME`: o nome do seu ambiente do Amazon MWAA.
   + `YOUR_DAG_NAME`: o nome do DAG que você deseja invocar.

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. Escolha **Implantar**.

1. Escolha **Testar** para invocar sua função usando o console Lambda.

1. Para verificar se seu Lambda invocou seu DAG com sucesso, use o console Amazon MWAA para navegar até a IU do Apache Airflow do seu ambiente e faça o seguinte:

   1. Na **DAGs**página, localize seu novo DAG de destino na lista de DAGs.

   1. Em **Última execução**, verifique a data e hora da última execução do DAG. Esse carimbo de data/hora deve ser semelhante ao carimbo de data/hora mais recente para `invoke_dag` em seu outro ambiente.

   1. Em **Tarefas recentes**, verifique se a última execução foi bem-sucedida.

# Invocação DAGs em diferentes ambientes do Amazon MWAA
<a name="samples-invoke-dag"></a>

O exemplo de código a seguir cria um token da CLI do Apache Airflow. O código, usa um gráfico acíclico direcionado (directed acyclic graph, DAG) em um ambiente do Amazon MWAA para invocar um DAG em um ambiente diferente do Amazon MWAA.

**Topics**
+ [Versão](#samples-invoke-dag-version)
+ [Pré-requisitos](#samples-invoke-dag-prereqs)
+ [Permissões](#samples-invoke-dag-permissions)
+ [Dependências](#samples-invoke-dag-dependencies)
+ [Exemplo de código](#samples-invoke-dag-code)

## Versão
<a name="samples-invoke-dag-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-invoke-dag-prereqs"></a>

Para usar o código de exemplo nesta página, você precisará de:
+ Dois [ambientes Amazon MWAA](get-started.md) com acesso ao servidor Web de **rede pública**, incluindo seu ambiente atual.
+ Um exemplo de DAG que foi carregado para o bucket do Amazon Simple Storage Service (Amazon S3) do seu ambiente de destino.

## Permissões
<a name="samples-invoke-dag-permissions"></a>

Para usar o exemplo de código nesta página, o perfil de execução do seu ambiente deve ter permissão para criar um token da CLI do Apache Airflow. Você pode anexar a política AWS gerenciada `AmazonMWAAAirflowCliAccess` para conceder essa permissão.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Para obter mais informações, consulte [Política de CLI do Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dependências
<a name="samples-invoke-dag-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2 e versões posteriores, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-invoke-dag-code"></a>

O exemplo de código a seguir pressupõe que você esteja usando um DAG em seu ambiente atual para invocar um DAG em outro ambiente.

1. Em seu terminal, navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo do exemplo de código a seguir e salve-o localmente como `invoke_dag.py`. Substitua os seguintes valores por suas informações.
   + `your-new-environment-name`: o nome do outro ambiente onde deseja invocar o DAG.
   + `your-target-dag-id`: o ID do DAG no outro ambiente que você deseja invocar.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Execute o AWS CLI comando a seguir para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a interface do Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se o DAG for executado com êxito, você verá um resultado semelhante ao seguinte nos logs de tarefas de `invoke_dag_task`.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Para verificar se seu DAG foi invocado com sucesso, navegue até a IU do Apache Airflow para seu novo ambiente e faça o seguinte:

   1. Na **DAGs**página, localize seu novo DAG de destino na lista de DAGs.

   1. Em **Última execução**, verifique a data e hora da última execução do DAG. Esse carimbo de data/hora deve ser semelhante ao carimbo de data/hora mais recente para `invoke_dag` em seu outro ambiente.

   1. Em **Tarefas recentes**, verifique se a última execução foi bem-sucedida.

# Como usar o Amazon MWAA com Amazon RDS para Microsoft SQL Server
<a name="samples-sql-server"></a>

É possível usar o Amazon MWAA para se conectar a um [RDS para SQL Server](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html). O exemplo do código a seguir usa DAGs em um ambiente Amazon Managed Workflows for Apache Airflow para se conectar e executar consultas em um Amazon RDS para Microsoft SQL Server.

**Topics**
+ [Versão](#samples-sql-server-version)
+ [Pré-requisitos](#samples-sql-server-prereqs)
+ [Dependências](#samples-sql-server-dependencies)
+ [Conexão Apache Airflow v2](#samples-sql-server-conn)
+ [Exemplo de código](#samples-sql-server-code)
+ [Próximas etapas](#samples-sql-server-next-up)

## Versão
<a name="samples-sql-server-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-sql-server-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).
+ O Amazon MWAA e o RDS para SQL Server estão sendo executados na mesma Amazon VPC/
+ Os grupos de segurança de VPC do Amazon MWAA e do servidor estão configurados com as seguintes conexões:
  + Uma regra de entrada para a porta `1433` aberta para o Amazon RDS no grupo de segurança do Amazon MWAA
  + Ou uma regra de saída para a porta `1433` aberta do Amazon MWAA para o RDS
+ A conexão do Apache Airflow de RDS para SQL Server reflete o nome do host, a porta, o nome de usuário e a senha do banco de dados do servidor SQL do Amazon RDS criado no processo anterior.

## Dependências
<a name="samples-sql-server-dependencies"></a>

Para usar o código de amostra nesta seção, adicione a seguinte dependência ao seu `requirements.txt`. Consulte [Como instalar dependências do Python](working-dags-dependencies.md) para saber mais.

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Conexão Apache Airflow v2
<a name="samples-sql-server-conn"></a>

Caso esteja usando uma conexão no Apache Airflow v2, certifique-se de que o objeto de conexão Airflow inclua os seguintes pares de chave-valor:

1. **ID de conexão: ** mssql\$1default

1. **Tipo de conexão: ** Amazon Web Services

1. **Host: ** `YOUR_DB_HOST`

1. **Esquema: **

1. **Login: ** admin

1. **Senha: **

1. **Porta: ** 1433

1. **Extra: **

## Exemplo de código
<a name="samples-sql-server-code"></a>

1. No prompt de comando, navegue até o diretório em que o código do DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `sql-server.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## Próximas etapas
<a name="samples-sql-server-next-up"></a>
+ Saiba como fazer o upload do `requirements.txt` arquivo neste exemplo para seu bucket do Amazon S3 em [Como instalar dependências do Python](working-dags-dependencies.md).
+ Saiba como fazer o upload do código DAG neste exemplo para a pasta `dags` em seu bucket do Amazon S3 em [Como adicionar ou atualizar DAGs](configuring-dag-folder.md).
+ Explore exemplos de scripts e outros [exemplos de módulos pymssql](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html).
+ Saiba mais sobre a execução do código SQL em um banco de dados Microsoft SQL específico usando [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) no *Guia de referência do Apache Airflow*.

# Como usar o Amazon MWAA com o Amazon EKS
<a name="mwaa-eks-example"></a>

O exemplo a seguir demonstra como usar o Amazon Managed Workflows for Apache Airflow com o Amazon EKS.

**Topics**
+ [Versão](#mwaa-eks-example-version)
+ [Pré-requisitos](#eksctl-prereqs)
+ [Criar uma chave pública para o Amazon EC2](#eksctl-create-key)
+ [Criar um cluster](#create-cluster-eksctl)
+ [Crie um namespace `mwaa`](#eksctl-namespace)
+ [Criar um perfil para o namespace `mwaa`](#eksctl-role)
+ [Crie e anexe um perfil do IAM para o cluster do Amazon EKS](#eksctl-iam-role)
+ [Crie o arquivo requirements.txt](#eksctl-requirements)
+ [Crie um mapeamento de identidade para o Amazon EKS](#eksctl-identity-map)
+ [Criar a `kubeconfig`](#eksctl-kube-config)
+ [Criar um DAG](#eksctl-create-dag)
+ [Adicione o DAG e `kube_config.yaml` ao bucket do Amazon S3](#eksctl-dag-bucket)
+ [Habilite e acione o exemplo](#eksctl-trigger-pod)

## Versão
<a name="mwaa-eks-example-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="eksctl-prereqs"></a>

Para usar o exemplo deste tópico, você precisará de:
+ Um [ambiente Amazon MWAA](get-started.md).
+ eksctl. Para saber mais, consulte [Instalar eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl).
+ kubectl. Para saber mais, consulte [Instalar e configurar o kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/). Em alguns casos, ele é instalado com o eksctl.
+ Um par de chaves do EC2 na região em que criar seu ambiente Amazon MWAA. Para saber mais, consulte [Criação ou importação de um par de chaves](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair).

**nota**  
Ao usar um comando `eksctl`, é possível incluir um `--profile` para especificar um perfil diferente do padrão.

## Criar uma chave pública para o Amazon EC2
<a name="eksctl-create-key"></a>

Use o comando a seguir para criar uma chave pública de seu par de chaves privadas.

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

Para saber mais, consulte [Recuperar a chave pública para seu par de chaves](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key).

## Criar um cluster
<a name="create-cluster-eksctl"></a>

Use o seguinte comando para criar o cluster. Se quiser um nome personalizado para o cluster ou criá-lo em uma região diferente, substitua o nome e os valores da região. Você deve criar o cluster na mesma região em que criou o ambiente do Amazon MWAA. Substitua os valores das sub-redes para que correspondam às sub-redes na sua rede Amazon VPC que você usa para o Amazon MWAA. Substitua o valor de `ssh-public-key` para corresponder à chave que você usa. É possível usar uma chave existente do Amazon EC2 que esteja na mesma região ou criar uma nova chave na mesma região em que você criar seu ambiente Amazon MWAA.

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

Leva algum tempo para concluir a criação do cluster. Depois de concluído, é possível verificar se o cluster foi criado com sucesso e se o provedor IAM OIDC está configurado usando o seguinte comando:

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## Crie um namespace `mwaa`
<a name="eksctl-namespace"></a>

Depois de confirmar que o cluster foi criado com sucesso, use o comando a seguir para criar um namespace para os pods.

```
kubectl create namespace mwaa
```

## Criar um perfil para o namespace `mwaa`
<a name="eksctl-role"></a>

Depois de criar o namespace, crie um perfil e uma associação de perfil para um usuário do Amazon MWAA no EKS que possa executar pods em um namespace do MWAA. Se você usou um nome diferente para o namespace, substitua mwaa em `-n mwaa` pelo nome usado.

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

Confirme se o novo perfil pode acessar o cluster do Amazon EKS ao executar o comando a seguir. Certifique-se de usar o nome correto se você não usou*mwaa*:

```
kubectl get pods -n mwaa --as mwaa-service
```

Você recebe uma mensagem de retorno dizendo:

```
No resources found in mwaa namespace.
```

## Crie e anexe um perfil do IAM para o cluster do Amazon EKS
<a name="eksctl-iam-role"></a>

Você deve criar um perfil do IAM e depois vinculá-lo ao cluster Amazon EKS (k8s) para que ele possa ser usado para autenticação por meio do IAM. O perfil é usado somente para fazer login no cluster e não tem nenhuma permissão para o console ou as chamadas de API.

Crie um novo perfil para o ambiente do Amazon MWAA usando as etapas em [Perfil de execução do Amazon MWAA](mwaa-create-role.md). No entanto, em vez de criar e anexar as políticas descritas neste tópico, anexe a seguinte política:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

Depois de criar o perfil, edite seu ambiente do Amazon MWAA para usar o perfil que você criou como o perfil de execução para o ambiente. Para alterar o perfil, edite o ambiente a ser usado. Você seleciona a função de execução em **Permissões**.

**Problemas conhecidos:**
+ Há um problema conhecido com a função de subcaminhos que não conseguem se autenticar ARNs com o Amazon EKS. A solução alternativa para isso é criar o perfil de serviço manualmente, em vez de usar o criado pelo próprio Amazon MWAA. Para saber mais, consulte [Funções com caminhos não funcionam quando o caminho é incluído em seu ARN no aws-auth configmap](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)
+ Se a lista de serviços do Amazon MWAA não estiver disponível no IAM, você precisará escolher uma política de serviço alternativa, como o Amazon EC2, e depois atualizar a política de confiança do perfil de acordo com o seguinte:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  Para saber mais, consulte [Como usar políticas de confiança com perfis do IAM](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/).

## Crie o arquivo requirements.txt
<a name="eksctl-requirements"></a>

Para usar o código de amostra nesta seção, certifique-se de ter adicionado uma das seguintes opções de banco de dados ao seu `requirements.txt`. Consulte [Como instalar dependências do Python](working-dags-dependencies.md) para saber mais.

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## Crie um mapeamento de identidade para o Amazon EKS
<a name="eksctl-identity-map"></a>

Use o ARN para o perfil que você criou no comando a seguir para criar um mapeamento de identidade para o Amazon EKS. Altere a região *us-east-1* para a região em que você criou o ambiente. Substitua o ARN da função e, por fim, *mwaa-execution-role* substitua pela função de execução do seu ambiente.

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## Criar a `kubeconfig`
<a name="eksctl-kube-config"></a>

Use o comando a seguir para criar `kubeconfig`:

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

Se você usou um perfil específico ao executar `update-kubeconfig`, precisará remover a seção `env:` adicionada ao arquivo kube\$1config.yaml para que ela funcione corretamente com o Amazon MWAA. Para fazer isso, exclua do arquivo o seguinte e então salve-o:

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## Criar um DAG
<a name="eksctl-create-dag"></a>

Use o exemplo de código a seguir para criar um arquivo Python, como `mwaa_pod_example.py` para o DAG.

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## Adicione o DAG e `kube_config.yaml` ao bucket do Amazon S3
<a name="eksctl-dag-bucket"></a>

Coloque o DAG criado e o arquivo `kube_config.yaml` no bucket do Amazon S3 para o ambiente Amazon MWAA. É possível colocar arquivos em seu bucket usando o console do Amazon S3 ou a AWS Command Line Interface.

## Habilite e acione o exemplo
<a name="eksctl-trigger-pod"></a>

No Apache Airflow, habilite o exemplo e, em seguida, acione-o.

Depois de ser executado e concluído com sucesso, use o comando a seguir para verificar o pod:

```
kubectl get pods -n mwaa
```

Você terá um resultado semelhante ao seguinte:

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

Em seguida, é possível verificar a saída do pod com o comando a seguir. Substitua o valor do nome pelo valor retornado do comando anterior:

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# Como conectar-se ao Amazon ECS usando o `ECSOperator`
<a name="samples-ecs-operator"></a>

O tópico descreve como é possível usar `ECSOperator` para se conectar a um contêiner do Amazon Elastic Container Service (Amazon ECS) do Amazon MWAA. Nas etapas a seguir, você adicionará as permissões necessárias à função de execução do seu ambiente, usará um CloudFormation modelo para criar um cluster Amazon ECS Fargate e, finalmente, criará e carregará um DAG que se conecta ao seu novo cluster.

**Topics**
+ [Versão](#samples-ecs-operator-version)
+ [Pré-requisitos](#samples-ecs-operator-prereqs)
+ [Permissões](#samples-ecs-operator-permissions)
+ [Criar um cluster do Amazon ECS ](#create-cfn-template)
+ [Exemplo de código](#samples-ecs-operator-code)

## Versão
<a name="samples-ecs-operator-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-ecs-operator-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).

## Permissões
<a name="samples-ecs-operator-permissions"></a>
+ O perfil de execução do seu ambiente precisa de permissão para executar tarefas no Amazon ECS. Você pode anexar a política FullAccess AWS gerenciada pelo [AmazoneCS\$1](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) à sua função de execução ou criar e anexar a política a seguir à sua função de execução.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ Além de adicionar as permissões necessárias para executar tarefas no Amazon ECS, você também deve modificar a declaração de política de CloudWatch registros em sua função de execução do Amazon MWAA para permitir o acesso ao grupo de registros de tarefas do Amazon ECS, conforme listado a seguir. O grupo de registros do Amazon ECS é criado pelo CloudFormation modelo em[Criar um cluster do Amazon ECS ](#create-cfn-template).

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

Para obter mais informações sobre o perfil de execução do Amazon MWAA e como anexar uma política, consulte [Perfil de execução](mwaa-create-role.md).

## Criar um cluster do Amazon ECS 
<a name="create-cfn-template"></a>

Usando o CloudFormation modelo a seguir, você criará um cluster Amazon ECS Fargate para usar com seu fluxo de trabalho do Amazon MWAA. Para obter mais informações, consulte [Como criar uma definição de tarefa](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition) no *Guia do desenvolvedor do Amazon Elastic Container Service*.

1. Crie um arquivo JSON com o seguinte conteúdo e salve-o como `ecs-mwaa-cfn.json`.

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. No prompt de comando, use o AWS CLI comando a seguir para criar uma nova pilha. Você deve substituir os valores `SecurityGroups` e `SubnetIds` por valores para os grupos de segurança e sub-redes do seu ambiente do Amazon MWAA.

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   Como alternativa, é possível usar o seguinte script shell. O script recupera os valores necessários para os grupos de segurança e sub-redes do seu ambiente usando o `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI comando e, em seguida, cria a pilha adequadamente. Para executar o script.

   1. Copie e salve o script `ecs-stack-helper.sh` no mesmo diretório do seu CloudFormation modelo.

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. Execute o script usando os comandos a seguir. Substitua `environment-name` e `stack-name` por suas informações.

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   Se for bem-sucedido, você consultará o seguinte resultado mostrando seu novo ID de CloudFormation pilha.

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

Depois que sua CloudFormation pilha estiver AWS concluída e provisionar seus recursos do Amazon ECS, você estará pronto para criar e carregar seu DAG.

## Exemplo de código
<a name="samples-ecs-operator-code"></a>

1. Abra um prompt de comando e navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo da amostra de código a seguir e salve localmente como `mwaa-ecs-operator.py` e, em seguida, faça o upload de seu novo DAG para o Amazon S3.

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**nota**  
No exemplo do DAG, para `awslogs_group`, talvez você precise modificar o grupo de logs com o nome do seu grupo de logs de tarefas do Amazon ECS. O exemplo pressupõe um grupo de logs chamado `mwaa-ecs-zero`. Para `awslogs_stream_prefix`, use o prefixo do fluxo de logs de tarefas do Amazon ECS. O exemplo pressupõe um prefixo de fluxo de log, `ecs`.

1.  Execute o AWS CLI comando a seguir para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a interface do Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se tiver êxito, você verá um resultado semelhante ao seguinte nos logs de tarefas para `ecs_operator_task` no DAG `ecs_fargate_dag`:

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# Como usar DBT com o Amazon MWAA
<a name="samples-dbt"></a>

Este tópico demonstra como é possível usar o DBT e o Postgres com o Amazon MWAA. Nas etapas a seguir, você adicionará as dependências necessárias ao seu `requirements.txt` e fará o upload de um exemplo de projeto de DBT para o bucket Amazon S3 do seu ambiente. Em seguida, você usará um exemplo de DAG para verificar se o Amazon MWAA instalou as dependências e, por fim, usará `BashOperator` para executar o projeto de DBT.

**Topics**
+ [Versão](#samples-dbt-version)
+ [Pré-requisitos](#samples-dbt-prereqs)
+ [Dependências](#samples-dbt-dependencies)
+ [Faça o upload de um projeto de DBT para o Amazon S3](#samples-dbt-upload-project)
+ [Use um DAG para verificar a instalação da dependência de DBT](#samples-dbt-test-dependencies)
+ [Use um DAG para executar um projeto de DBT](#samples-dbt-run-project)

## Versão
<a name="samples-dbt-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-dbt-prereqs"></a>

Antes de concluir as etapas a seguir, você precisará do seguinte:
+ Um [ambiente do Amazon MWAA](get-started.md) usando o Apache Airflow v2.2.2. Esse exemplo foi gravado e testado com a versão 2.2.2. Talvez seja necessário modificar o exemplo para usá-lo com outras versões do Apache Airflow.
+ Um exemplo de projeto de DBT. Para começar a usar o dbt com o Amazon MWAA, você pode criar uma bifurcação e clonar o projeto [dbt starter a partir do repositório dbt-labs](https://github.com/dbt-labs/dbt-starter-project). GitHub 

## Dependências
<a name="samples-dbt-dependencies"></a>

Para usar o Amazon MWAA com DBT, adicione o script de inicialização a seguir ao ambiente. Para saber mais, consulte [Usar um script de startup com o Amazon MWAA](using-startup-script.md).

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

Nas seções a seguir, você fará o upload do diretório do seu projeto de DBT para o Amazon S3 e executará um DAG que valida se o Amazon MWAA instalou com sucesso as dependências DBT necessárias.

## Faça o upload de um projeto de DBT para o Amazon S3
<a name="samples-dbt-upload-project"></a>

Para poder usar um projeto de DBT com seu ambiente Amazon MWAA, é possível fazer o upload todo o diretório do projeto na pasta `dags` do seu ambiente. Quando o ambiente é atualizado, o Amazon MWAA baixa o diretório DBT para a pasta local `usr/local/airflow/dags/`.

**Para fazer upload de um projeto de DBT no Amazon S3**

1. Navegue até o diretório em que você clonou o projeto inicial de DBT.

1. Execute o seguinte AWS CLI comando do Amazon S3 para copiar recursivamente o conteúdo do projeto para a `dags` pasta do seu ambiente usando o parâmetro. `--recursive` O comando cria um subdiretório chamado `dbt` que é possível usar para todos os seus projetos de DBT. Se o subdiretório já existir, os arquivos do projeto serão copiados para o diretório existente e um novo diretório não será criado. O comando também cria um subdiretório dentro do diretório `dbt` para esse projeto inicial específico.

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   É possível usar nomes diferentes para os subdiretórios do projeto para organizar vários projetos de DBT dentro do diretório principal `dbt`.

## Use um DAG para verificar a instalação da dependência de DBT
<a name="samples-dbt-test-dependencies"></a>

O DAG a seguir usa um código `BashOperator` e um comando bash para verificar se o Amazon MWAA instalou com sucesso as dependências DBT especificadas em `requirements.txt`.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

Faça o seguinte para acessar os logs de tarefas e verificar se o dbt e as dependências dele foram instalados.

1. Navegue até o console do Amazon MWAA e escolha **Abrir IU do Airflow** na lista de ambientes disponíveis.

1. Na IU do Apache Airflow, encontre o DAG `dbt-installation-test` na lista e escolha a data na coluna `Last Run` para abrir a última tarefa bem-sucedida.

1. Usando a **Exibição de gráfico**, escolha a `bash_command` tarefa para abrir os detalhes da instância da tarefa.

1. Escolha **Log** para abrir os logs de tarefas e, em seguida, verifique se os logs listam com êxito a versão do DBT que especificamos em `requirements.txt`.

## Use um DAG para executar um projeto de DBT
<a name="samples-dbt-run-project"></a>

O DAG a seguir usa um código `BashOperator` para copiar os projetos de DBT que você fez o upload para o Amazon S3 do diretório local `usr/local/airflow/dags/` para o diretório acessível para gravação `/tmp` e, em seguida, executa o projeto de DBT. Os comandos bash pressupõem um projeto inicial de DBT intitulado `dbt-starter-project`. Modifique o nome do diretório de acordo com o nome do diretório do seu projeto.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS blogs e tutoriais
<a name="samples-blogs-tutorials"></a>
+ [Como trabalhar com o Amazon EKS e o Amazon MWAA for Apache Airflow v2.x](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)