

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Amazon MWAA 的代码示例
<a name="sample-code"></a>

本指南包含可在适用于 Apache Airflow 的亚马逊托管工作流程环境中使用的代码示例，包括 DAGs 自定义插件。有关将 Apache Airflow AWS 用于服务的更多示例，请参阅 Apache Airflow 存储库中的目录。[https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags) GitHub 

**Topics**
+ [使用 DAG 在 CLI 中导入变量](samples-variables-import.md)
+ [使用 `SSHOperator` 创建 SSH 连接](samples-ssh.md)
+ [使用 AWS Secrets Manager 中的密钥进行 Apache Airflow Snowflake 连接](samples-sm-snowflake.md)
+ [使用 DAG 在 CloudWatch 中编写自定义指标](samples-custom-metrics.md)
+ [在 Amazon MWAA 环境中清理 Aurora PostgreSQL 数据库](samples-database-cleanup.md)
+ [将环境元数据导出到 Amazon S3 上的 CSV 文件](samples-dag-run-info-to-csv.md)
+ [为 Apache Airflow 变量使用 AWS Secrets Manager 中的密钥](samples-secrets-manager-var.md)
+ [使用 AWS Secrets Manager 中的密钥进行 Apache Airflow 连接](samples-secrets-manager.md)
+ [使用 Oracle 创建自定义插件](samples-oracle.md)
+ [在 Amazon MWAA 上更改 DAG 的时区](samples-plugins-timezone.md)
+ [刷新 CodeArtifact 令牌](samples-code-artifact.md)
+ [使用 Apache Hive 和 Hadoop 创建自定义插件](samples-hive.md)
+ [为 Apache Airflow PythonVirtualenvOperator 创建自定义插件](samples-virtualenv.md)
+ [使用 Lambda DAGs 函数进行调用](samples-lambda.md)
+ [DAGs 在不同的 Amazon MWAA 环境中调用](samples-invoke-dag.md)
+ [将 Amazon RDS for Microsoft SQL Server 与 Amazon MWAA 一起使用](samples-sql-server.md)
+ [将 Amazon MWAA 与 Amazon EKS 一起使用](mwaa-eks-example.md)
+ [使用 `ECSOperator` 连接 Amazon ECS](samples-ecs-operator.md)
+ [在 Amazon MWAA 中使用 dbt](samples-dbt.md)
+ [AWS 博客和教程](#samples-blogs-tutorials)

# 使用 DAG 在 CLI 中导入变量
<a name="samples-variables-import"></a>

以下示例代码会使用 Amazon MWAA 上的 CLI 导入变量。

**Topics**
+ [版本](#samples-variables-import-version)
+ [先决条件](#samples-variables-import-prereqs)
+ [权限](#samples-variables-import-permissions)
+ [依赖项](#samples-variables-import-dependencies)
+ [代码示例](#samples-variables-import-code)
+ [接下来做什么？](#samples-variables-import-next-up)

## 版本
<a name="samples-variables-import-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-variables-import-prereqs"></a>

无需其他权限即可使用本页上的代码示例。

## 权限
<a name="samples-variables-import-permissions"></a>

AWS 账户 需要访问 `AmazonMWAAAirflowCliAccess` 策略。要了解更多信息，请参阅 [Apache Airflow CLI 政策：亚马逊 MWAAAirflow CliAccess](access-policies.md)。

## 依赖项
<a name="samples-variables-import-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 安装 Apache Airflow。

## 代码示例
<a name="samples-variables-import-code"></a>

以下示例代码需要三个输入：Amazon MWAA 环境名称（在 `mwaa_env` 中）、环境 AWS 区域（在 `aws_region` 中）和包含要导入的变量的本地文件（在 `var_file` 中）。

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## 接下来做什么？
<a name="samples-variables-import-next-up"></a>
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。

# 使用 `SSHOperator` 创建 SSH 连接
<a name="samples-ssh"></a>

以下示例介绍如何使用有向无环图（DAG）中的 `SSHOperator` 从 Amazon MWAA 环境连接到远程 Amazon EC2 实例。您可以使用类似的方法连接到任何具有 SSH 访问权限的远程实例。

在以下示例中，您将 SSH 密钥 (`.pem`) 上传到在 Amazon S3 上的环境的 `dags` 目录中。然后，您可以使用 `requirements.txt` 安装必要的依赖项，并在 UI 中创建新的 Apache Airflow 连接。最后，您编写一个 DAG 来创建与远程实例的 SSH 连接。

**Topics**
+ [版本](#samples-ssh-version)
+ [先决条件](#samples-ssh-prereqs)
+ [权限](#samples-ssh-permissions)
+ [要求](#samples-ssh-dependencies)
+ [将密钥复制到 Amazon S3](#samples-ssh-secret)
+ [创建新的 Apache Airflow 连接](#samples-ssh-connection)
+ [代码示例](#samples-ssh-code)

## 版本
<a name="samples-ssh-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-ssh-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境](get-started.md)。
+ SSH 密钥。该代码示例假设您有 Amazon EC2 实例，并且与 Amazon MWAA 环境位于同一区域的 `.pem`。如果您没有密钥，请参阅*《Amazon EC2 用户指南》*中的[创建或导入密钥对](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair)。

## 权限
<a name="samples-ssh-permissions"></a>

无需其他权限即可使用本页上的代码示例。

## 要求
<a name="samples-ssh-dependencies"></a>

将以下参数添加到 `requirements.txt`，以将 `apache-airflow-providers-ssh` 程序包安装到 Web 服务器上。当环境更新并且 Amazon MWAA 成功安装依赖项后，您将在 UI 中获得新的 **SSH** 连接类型。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**注意**  
`-c` 定义了 `requirements.txt` 中的约束 URL。这样可以确保 Amazon MWAA 为环境安装了正确的程序包版本。

## 将密钥复制到 Amazon S3
<a name="samples-ssh-secret"></a>

使用以下 AWS Command Line Interface 命令将 `.pem` 密钥复制到 Amazon S3 中的环境 `dags` 目录中。

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA 将包括 `.pem` 密钥在内的 `dags` 中的内容复制到本地 `/usr/local/airflow/dags/` 目录，这样，Apache Airflow 就可以访问密钥了。

## 创建新的 Apache Airflow 连接
<a name="samples-ssh-connection"></a>

**使用 Apache Airflow UI 创建新的 SSH 连接**

1. 在 Amazon MWAA 控制台上打开[环境页面](https://console.aws.amazon.com/mwaa/home#/environments)。

1. 从环境列表中，为环境选择**打开 Airflow UI**。

1. 在 Apache Airflow UI 页面上，从主导航栏中选择**管理员**以展开下拉列表，然后选择**连接**。

1. 在**列出连接**页面上，选择 **\$1** 或**添加新记录**按钮以添加新连接。

1. 在**连接到 AD** 页面上，提供以下信息：

   1. 在**连接名称**中，输入 **ssh\$1new**。

   1. 在**连接类型**中，从下拉列表中选择 **SSH**。
**注意**  
如果列表中没有 **SSH** 连接类型，则表示 Amazon MWAA 尚未安装所需的 `apache-airflow-providers-ssh` 程序包。请更新 `requirements.txt` 文件以包含此程序包，然后重试。

   1. 在**主机**中，请输入要连接的 Amazon EC2 实例的 IP 地址。例如 **12.345.67.89**。

   1. 在**用户名**中，请输入 **ec2-user**，以检查您是否正在连接到 Amazon EC2 实例。用户名可能会有所不同，具体取决于您希望 Apache Airflow 连接到的远程实例的类型。

   1. 在**附加**中，请以 JSON 格式输入以下键/值对：

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      此键值对指示 Apache Airflow 在本地 `/dags` 目录中搜索密钥。

## 代码示例
<a name="samples-ssh-code"></a>

以下 DAG 使用 `SSHOperator` 连接到目标 Amazon EC2 实例，然后运行 `hostname` Linux 命令来打印该实例的名称。您可以修改 DAG 以在远程实例上运行任何命令或脚本。

1. 打开终端，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `ssh.py`。

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow UI 触发 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您将在 `ssh_operator_example` DAG 的任务日志中看到输出，类似于 `ssh_task` 的任务日志中的以下内容：

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# 使用 AWS Secrets Manager 中的密钥进行 Apache Airflow Snowflake 连接
<a name="samples-sm-snowflake"></a>

以下示例调用 AWS Secrets Manager 在 Amazon MWAA 上获取 Apache Airflow Snowflake 连接的密钥。它假设您已完成 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 中的步骤。

**Topics**
+ [版本](#samples-sm-snowflake-version)
+ [先决条件](#samples-sm-snowflake-prereqs)
+ [权限](#samples-sm-snowflake-permissions)
+ [要求](#samples-sm-snowflake-dependencies)
+ [代码示例](#samples-sm-snowflake-code)
+ [接下来做什么？](#samples-sm-snowflake-next-up)

## 版本
<a name="samples-sm-snowflake-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-sm-snowflake-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ 创建 Secrets Manager 后端作为 Apache Airflow 配置选项，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。
+ Secrets Manager 中的 Apache Airflow 连接字符串，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。

## 权限
<a name="samples-sm-snowflake-permissions"></a>
+ Secrets Manager 权限，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。

## 要求
<a name="samples-sm-snowflake-dependencies"></a>

要使用本页上的示例代码，请将以下依赖项添加到 `requirements.txt`。要了解更多信息，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。

```
apache-airflow-providers-snowflake==1.3.0
```

## 代码示例
<a name="samples-sm-snowflake-code"></a>

以下步骤描述了如何创建 DAG 代码，以便调用 Secrets Manager 来获取密钥。

1. 在命令提示符下，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `snowflake_connection.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## 接下来做什么？
<a name="samples-sm-snowflake-next-up"></a>
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。

# 使用 DAG 在 CloudWatch 中编写自定义指标
<a name="samples-custom-metrics"></a>

您可以使用以下代码示例编写有向无环图（DAG），该图运行 `PythonOperator` 以检索 Amazon MWAA 环境的操作系统级指标。DAG 随后将数据作为自定义指标发布到 Amazon CloudWatch。

自定义操作系统级指标可让您进一步了解环境工作线程如何使用虚拟内存和 CPU 等资源。您可以使用此信息来选择最适合您的工作负载的[环境类](environment-class.md)。

**Topics**
+ [版本](#samples-custom-metrics-version)
+ [先决条件](#samples-custom-metrics-prereqs)
+ [权限](#samples-custom-metrics-permissions)
+ [依赖项](#samples-custom-metrics-dependencies)
+ [代码示例](#samples-custom-metrics-code)

## 版本
<a name="samples-custom-metrics-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-custom-metrics-prereqs"></a>

要使用本页上的代码示例，您需要以下内容：
+ [Amazon MWAA 环境](get-started.md)。

## 权限
<a name="samples-custom-metrics-permissions"></a>

无需其他权限即可使用本页上的代码示例。

## 依赖项
<a name="samples-custom-metrics-dependencies"></a>
+ 无需其他依赖项即可使用本页上的代码示例。

## 代码示例
<a name="samples-custom-metrics-code"></a>

1. 在命令提示符下，导航到存储 DAG 代码的文件夹。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容并本地另存为 `dag-custom-metrics.py`。用环境名称替换 `MWAA-ENV-NAME`。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow UI 触发 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果 DAG 成功运行，您会在 Apache Airflow 日志中收到类似以下的内容：

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# 在 Amazon MWAA 环境中清理 Aurora PostgreSQL 数据库
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow 使用 Aurora PostgreSQL 数据库作为 DAG 运行并存储任务实例的 Apache Airflow 元数据库。以下示例代码会定期为 Amazon MWAA 环境清除专用 Aurora PostgreSQL 数据库中的条目。

**Topics**
+ [版本](#samples-database-cleanup-version)
+ [先决条件](#samples-database-cleanup-prereqs)
+ [依赖项](#samples-sql-server-dependencies)
+ [代码示例](#samples-database-cleanup-code)

## 版本
<a name="samples-database-cleanup-version"></a>

本页上的代码示例特定于 Amazon MWAA 支持的 Apache Airflow v2。请参阅[支持的 Apache Airflow 版本](airflow-versions.md)。

**提示**  
**对于 Apache Airflow v3 用户**：如果要清理数据库（从元存储表中清除旧记录），请运行 `db clean` CLI 命令。

## 先决条件
<a name="samples-database-cleanup-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)

## 依赖项
<a name="samples-sql-server-dependencies"></a>

要在 Apache Airflow v2 中使用此代码示例，无需附加依赖项。用于[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)安装 Apache Airflow。

## 代码示例
<a name="samples-database-cleanup-code"></a>

以下 DAG 会清理 `TABLES_TO_CLEAN` 中指定表的元数据数据库。该示例将删除指定表中存在超过 30 天的数据。要调整删除条目的存续时间，请将 `MAX_AGE_IN_DAYS` 设置为其他值。

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# 将环境元数据导出到 Amazon S3 上的 CSV 文件
<a name="samples-dag-run-info-to-csv"></a>

使用以下代码示例创建有向无环图 (DAG)，该图在数据库中查询一系列 DAG 运行信息，并将数据写入存储在 Amazon S3 上的 `.csv` 文件中。

您可能需要从环境的 Aurora PostgreSQL 数据库中导出信息，以便在本地检查数据，将其存档到对象存储中，或者将它们与诸如 [Amazon S3 到 Amazon Redshift 运算符](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html)和[数据库清理](samples-database-cleanup.md)之类的工具结合使用，以便将 Amazon MWAA 元数据移出环境，但保留它们以备将来分析。

您可以在数据库中查询 [Apache Airflow 模型](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models)中列出的任何对象。此代码示例使用三个模型：`DagRun`、`TaskFail` 和 `TaskInstance`，它们提供与 DAG 运行相关的信息。

**Topics**
+ [版本](#samples-dag-run-info-to-csv-version)
+ [先决条件](#samples-dag-run-info-to-csv-prereqs)
+ [Permissions](#samples-dag-run-info-to-csv-permissions)
+ [要求](#samples-dag-run-info-to-csv-dependencies)
+ [代码示例](#samples-dag-run-info-to-csv-code)

## 版本
<a name="samples-dag-run-info-to-csv-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-dag-run-info-to-csv-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境](get-started.md)。
+ 您想要将元数据导出到[新的 Amazon S3 存储桶](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)。

## Permissions
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA 需要获得 Amazon S3 操作 `s3:PutObject` 的权限，才能将查询的元数据信息写入 Amazon S3。将以下策略声明添加到环境的执行角色中。

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

此政策将写入权限限制为*amzn-s3-demo-bucket*。

## 要求
<a name="samples-dag-run-info-to-csv-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。用于[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)安装 Apache Airflow。

## 代码示例
<a name="samples-dag-run-info-to-csv-code"></a>

以下步骤描述了如何创建 DAG，以查询 Aurora PostgreSQL 并将结果写入新的 Amazon S3 存储桶。

1. 在您的终端，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容并本地另存为 `metadata_to_csv.py`。您可以更改分配给 `MAX_AGE_IN_DAYS` 的值，以控制 DAG 从元数据数据库中查询的最早记录的龄期。

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow 用户界面触发 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，输出将类似于在 `export_db` 任务的任务日志中的以下内容：

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   现在，您可以在 `/files/export/` 中的新 Amazon S3 存储桶中访问和下载导出的 `.csv` 文件。

# 为 Apache Airflow 变量使用 AWS Secrets Manager 中的密钥
<a name="samples-secrets-manager-var"></a>

以下示例调用 AWS Secrets Manager 来获取 Amazon MWAA 上的 Apache Airflow 变量的密钥。它假设您已完成 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 中的步骤。

**Topics**
+ [版本](#samples-secrets-manager-var-version)
+ [先决条件](#samples-secrets-manager-var-prereqs)
+ [权限](#samples-secrets-manager-var-permissions)
+ [要求](#samples-hive-dependencies)
+ [代码示例](#samples-secrets-manager-var-code)
+ [接下来做什么？](#samples-secrets-manager-var-next-up)

## 版本
<a name="samples-secrets-manager-var-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-secrets-manager-var-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ 创建 Secrets Manager 后端作为 Apache Airflow 配置选项，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。
+ Secrets Manager 中的 Apache Airflow 变量字符串，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。

## 权限
<a name="samples-secrets-manager-var-permissions"></a>
+ Secrets Manager 权限，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。

## 要求
<a name="samples-hive-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 安装 Apache Airflow。

## 代码示例
<a name="samples-secrets-manager-var-code"></a>

以下步骤描述了如何创建 DAG 代码，以便调用 Secrets Manager 来获取密钥。

1. 在命令提示符下，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `secrets-manager-var.py`。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## 接下来做什么？
<a name="samples-secrets-manager-var-next-up"></a>
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。

# 使用 AWS Secrets Manager 中的密钥进行 Apache Airflow 连接
<a name="samples-secrets-manager"></a>

以下示例调用 AWS Secrets Manager 在 Amazon MWAA 上获取 Apache Airflow 连接的密钥。它假设您已完成 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 中的步骤。

**Topics**
+ [版本](#samples-secrets-manager-version)
+ [先决条件](#samples-secrets-manager-prereqs)
+ [权限](#samples-secrets-manager-permissions)
+ [要求](#samples-hive-dependencies)
+ [代码示例](#samples-secrets-manager-code)
+ [接下来做什么？](#samples-secrets-manager-next-up)

## 版本
<a name="samples-secrets-manager-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-secrets-manager-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ 创建 Secrets Manager 后端作为 Apache Airflow 配置选项，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。
+ Secrets Manager 中的 Apache Airflow 连接字符串，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。

## 权限
<a name="samples-secrets-manager-permissions"></a>
+ Secrets Manager 权限，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。

## 要求
<a name="samples-hive-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 安装 Apache Airflow。

## 代码示例
<a name="samples-secrets-manager-code"></a>

以下步骤描述了如何创建 DAG 代码，以便调用 Secrets Manager 来获取密钥。

1. 在命令提示符下，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `secrets-manager.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## 接下来做什么？
<a name="samples-secrets-manager-next-up"></a>
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。

# 使用 Oracle 创建自定义插件
<a name="samples-oracle"></a>

以下示例将引导您完成使用 Oracle 为 Amazon MWAA 创建自定义插件的步骤，该插件可以与 plugins.zip 文件中的其他自定义插件和二进制文件组合使用。

**Contents**
+ [版本](#samples-oracle-version)
+ [先决条件](#samples-oracle-prereqs)
+ [权限](#samples-oracle-permissions)
+ [要求](#samples-oracle-dependencies)
+ [代码示例](#samples-oracle-code)
+ [创建自定义插件](#samples-oracle-create-pluginszip-steps)
  + [下载依赖项](#samples-oracle-install)
  + [自定义插件](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Airflow 配置选项](#samples-oracle-airflow-config)
+ [接下来做什么？](#samples-oracle-next-up)

## 版本
<a name="samples-oracle-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-oracle-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)
+ 为环境启用任何日志级别、`CRITICAL` 或在上一部分中的工作线程日志记录。有关 Amazon MWAA 日志类型以及如何管理日志组的更多信息，请参阅 [访问 Amazon 中的 Airflow 日志 CloudWatch](monitoring-airflow.md)。

## 权限
<a name="samples-oracle-permissions"></a>

无需其他权限即可使用本页上的代码示例。

## 要求
<a name="samples-oracle-dependencies"></a>

要使用本页上的示例代码，请将以下依赖项添加到 `requirements.txt`。要了解更多信息，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## 代码示例
<a name="samples-oracle-code"></a>

以下步骤介绍如何创建用于测试自定义插件的 DAG 代码。

1. 在命令提示符下，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `oracle.py`。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## 创建自定义插件
<a name="samples-oracle-create-pluginszip-steps"></a>

本节介绍如何下载依赖项、创建自定义插件和 plugins.zip。

### 下载依赖项
<a name="samples-oracle-install"></a>

Amazon MWAA 会将 plugins.zip 的内容提取到每个 Amazon MWAA 计划程序和工作线程容器上的 `/usr/local/airflow/plugins`。这用于向环境中添加二进制文件。以下步骤介绍如何组装自定义插件所需的文件。

**拉取 Amazon Linux 容器镜像**

1. 在命令提示符下，提取 Amazon Linux 容器镜像，然后在本地运行该容器。例如：

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   命令提示符可以调用 bash 命令行。例如：

   ```
   bash-4.2#
   ```

1. 安装 Linux 原生异步 I/O 工具（libaio）。

   ```
   yum -y install libaio
   ```

1. 请将此窗口保持打开状态以供后续步骤使用。我们将在本地复制以下文件：`lib64/libaio.so.1`、`lib64/libaio.so.1.0.0`、`lib64/libaio.so.1.0.1`。

**下载客户端文件夹**

1. 在本地安装解压缩包。例如：

   ```
   sudo yum install unzip
   ```

1. 创建 `oracle_plugin` 目录。例如：

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. 使用以下 curl 命令[从适用于 Linux x86-64（64 位）的 Oracle 即时客户端下载](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html)中下载 [instantclient-basic-linux.x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip)。

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. 解压缩 `client.zip` 文件。例如：

   ```
   unzip *.zip
   ```

**从 Docker 中提取文件**

1. 在新的命令提示符下，显示并写下 Docker 容器 ID。例如：

   ```
   docker container ls
   ```

   命令提示符可以返回所有容器及其 ID。例如：

   ```
   debc16fd6970
   ```

1. 在 `oracle_plugin` 目录中，将 `lib64/libaio.so.1`、`lib64/libaio.so.1.0.0`、`lib64/libaio.so.1.0.1` 文件解压缩到本地 `instantclient_18_5` 文件夹。例如：

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### 自定义插件
<a name="samples-oracle-plugins-code"></a>

Apache Airflow 将在启动时执行插件文件夹中的 Python 文件内容。这用于设置和修改环境变量。以下步骤介绍了此自定义插件的示例代码。
+ 复制以下代码示例的内容，并在本地另存为 `env_var_plugin_oracle.py`。

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

以下步骤说明如何创建 `plugins.zip`。此示例的内容可以与其他插件和二进制文件合并到单个 `plugins.zip` 文件中。

**压缩插件目录中的内容。**

1. 在命令行提示符中，导航到 `oracle_plugin` 目录。例如：

   ```
   cd oracle_plugin
   ```

1. 将 `instantclient_18_5` 目录压缩到 plugins.zip 中。例如：

   ```
   zip -r ../plugins.zip ./
   ```

   您的命令提示符将显示：

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. 移除该 `client.zip` 文件。例如：

   ```
   rm client.zip
   ```

**压缩 env\$1var\$1plugin\$1oracle.py 文件**

1. 将 `env_var_plugin_oracle.py` 文件添加到 plugins.zip 文件的根目录。例如：

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. plugins.zip 现在包含以下内容：

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Airflow 配置选项
<a name="samples-oracle-airflow-config"></a>

如果您使用的是 Apache Airflow v2，请添加 `core.lazy_load_plugins : False` 为 Apache Airflow 配置选项。要了解更多信息，请参阅 [2 中的使用配置选项加载插件](configuring-env-variables.md#configuring-2.0-airflow-override)。

## 接下来做什么？
<a name="samples-oracle-next-up"></a>
+ 要了解如何将本示例中的 `requirements.txt` 文件上传到 Amazon S3 存储桶，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。
+ 要了解如何将本示例中的 `plugins.zip` 文件上传到 Amazon S3 存储桶，请参阅 [安装自定义插件](configuring-dag-import-plugins.md)。

# 在 Amazon MWAA 上更改 DAG 的时区
<a name="samples-plugins-timezone"></a>

默认情况下，Apache Airflow 将有向无环图（DAG）安排在 UTC\$10 中。以下步骤展示了如何使用 [Pendulum](https://pypi.org/project/pendulum/) 更改 Amazon MWAA 运行 DAG 所在的时区。或者，本主题演示如何创建自定义插件来更改环境的 Apache Airflow 日志的时区。

**Topics**
+ [版本](#samples-plugins-timezone-version)
+ [先决条件](#samples-plugins-timezone-prerequisites)
+ [权限](#samples-plugins-timezone-permissions)
+ [创建插件以更改 Airflow 日志中的时区](#samples-plugins-timezone-custom-plugin)
+ [创建 `plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [代码示例](#samples-plugins-timezone-dag)
+ [接下来做什么？](#samples-plugins-timezone-plugins-next-up)

## 版本
<a name="samples-plugins-timezone-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-plugins-timezone-prerequisites"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)

## 权限
<a name="samples-plugins-timezone-permissions"></a>

无需其他权限即可使用本页上的代码示例。

## 创建插件以更改 Airflow 日志中的时区
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow 在启动时运行 `plugins` 目录中的 Python 文件。使用以下插件，您可以覆盖执行程序的时区，该时区会修改 Apache Airflow 写入日志的时区。

1. 创建名为 `plugins` 的目录并导航到其中。例如：

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. 复制以下代码示例的内容，并将其本地另存为 `dag-timezone-plugin.py`，保存在 `plugins` 文件夹中。

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. 在 `plugins` 目录中创建名为 `__init__.py` 的空 Python 文件。`plugins` 目录应类似于以下内容。

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## 创建 `plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

以下步骤说明如何创建 `plugins.zip`。此示例的内容可以与其他插件和二进制文件组合成单个 `plugins.zip` 文件。

1. 在命令提示符下，导航到上一步中的 `plugins` 目录。例如：

   ```
   cd plugins
   ```

1. 将内容压缩到 `plugins` 目录中。

   ```
   zip -r ../plugins.zip ./
   ```

1. 将 `plugins.zip` 上传到 S3 存储桶。

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## 代码示例
<a name="samples-plugins-timezone-dag"></a>

要更改 DAG 运行的默认时区（UTC\$10），我们将使用一个名为 [Pendulum](https://pypi.org/project/pendulum/) 的库，这是一个用于处理时区感知日期时间的 Python 库。

1. 在命令提示符下，导航到存储 DAG 的目录。例如：

   ```
   cd dags
   ```

1. 复制以下示例的内容并另存为 `tz-aware-dag.py`。

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow UI 触发 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您将输出类似于在 `tz_test` DAG 中的 `tz_aware_task` 的任务日志中的以下内容：

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## 接下来做什么？
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ 要了解如何将本示例中的 `plugins.zip` 文件上传到 Amazon S3 存储桶，请参阅 [安装自定义插件](configuring-dag-import-plugins.md)。

# 刷新 CodeArtifact 令牌
<a name="samples-code-artifact"></a>

如果您使用 CodeArtifact 来安装 Python 依赖项，则 Amazon MWAA 需要有效的令牌。要允许 Amazon MWAA 在运行时访问 CodeArtifact 存储库，您可以使用[启动脚本](using-startup-script.md)并使用令牌设置 [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url)。

以下主题介绍如何创建启动脚本，该脚本使用 [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token) CodeArtifact API 操作在每次环境启动或更新时检索新令牌。

**Topics**
+ [版本](#samples-code-artifact-version)
+ [先决条件](#samples-code-artifact-prereqs)
+ [权限](#samples-code-artifact-permissions)
+ [代码示例](#samples-code-artifact-code)
+ [接下来做什么？](#samples-code-artifact-next-up)

## 版本
<a name="samples-code-artifact-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-code-artifact-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境](get-started.md)。
+ [CodeArtifact 存储库](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html)，用于存储环境的依赖项。

## 权限
<a name="samples-code-artifact-permissions"></a>

要刷新 CodeArtifact 令牌并将结果写入 Amazon S3，Amazon MWAA 的执行角色必须具有以下权限。
+ 该 `codeartifact:GetAuthorizationToken` 操作允许 Amazon MWAA 从 CodeArtifact 中检索新令牌。以下策略为您创建的每个 CodeArtifact 域授予权限。您可以通过修改语句中的资源值并仅指定您希望环境访问的域来进一步限制对所有域的访问。

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ 该 `sts:GetServiceBearerToken` 操作是调用 CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html) API 操作所必需的。此操作返回一个令牌，在将程序包管理器（例如 `pip`）与 CodeArtifact 配合使用时，必须使用该令牌。要将程序包管理器与 CodeArtifact 存储库配合使用，环境的执行角色必须允许 `sts:GetServiceBearerToken`，如以下策略声明所列。

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## 代码示例
<a name="samples-code-artifact-code"></a>

以下步骤描述了如何创建用于更新 CodeArtifact 令牌的启动脚本。

1. 复制以下代码示例的内容，并在本地另存为 `code_artifact_startup_script.sh`。

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. 导航到保存该脚本的文件夹。在新提示窗口中使用 `cp` 将脚本上传到存储桶。用您的信息替换 *amzn-s3-demo-bucket*。

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   如果成功，Amazon S3 会输出该对象的 URL 路径：

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   上传脚本后，环境会在启动时更新并运行脚本。

## 接下来做什么？
<a name="samples-code-artifact-next-up"></a>
+ 要了解如何使用启动脚本自定义环境，请参阅 [在 Amazon MWAA 中使用启动脚本](using-startup-script.md)。
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。
+ 要了解如何将本示例中的 `plugins.zip` 文件上传到 Amazon S3 存储桶，请参阅 [安装自定义插件](configuring-dag-import-plugins.md)。

# 使用 Apache Hive 和 Hadoop 创建自定义插件
<a name="samples-hive"></a>

Amazon MWAA 将 `plugins.zip` 的内容提取到 `/usr/local/airflow/plugins`。这可以用来向容器中添加二进制文件。此外，Apache Airflow 会在*启动*时执行 `plugins` 文件夹中的 Python 文件内容，使您能够设置和修改环境变量。以下示例将引导您完成在 Amazon MWAA 环境中使用 Apache Hive 和 Hadoop 创建自定义插件的步骤，该插件可以与其他自定义插件和二进制文件组合使用。

**Topics**
+ [版本](#samples-hive-version)
+ [先决条件](#samples-hive-prereqs)
+ [权限](#samples-hive-permissions)
+ [要求](#samples-hive-dependencies)
+ [下载依赖项](#samples-hive-install)
+ [自定义插件](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [代码示例](#samples-hive-code)
+ [Airflow 配置选项](#samples-hive-airflow-config)
+ [接下来做什么？](#samples-hive-next-up)

## 版本
<a name="samples-hive-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-hive-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)

## 权限
<a name="samples-hive-permissions"></a>

无需其他权限即可使用本页上的代码示例。

## 要求
<a name="samples-hive-dependencies"></a>

要使用本页上的示例代码，请将以下依赖项添加到 `requirements.txt`。要了解更多信息，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## 下载依赖项
<a name="samples-hive-install"></a>

Amazon MWAA 会将 plugins.zip 的内容提取到每个 Amazon MWAA 计划程序和工作线程容器上的 `/usr/local/airflow/plugins`。这用于向环境中添加二进制文件。以下步骤介绍如何组装自定义插件所需的文件。

1. 在命令提示符下，导航到要创建插件的目录。例如：

   ```
   cd plugins
   ```

1. 从[镜像](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz)中下载 [Hadoop](https://hadoop.apache.org/)，例如：

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. 从[镜像](https://www.apache.org/dyn/closer.cgi/hive/)中下载 [Hive](https://hive.apache.org/)，例如：

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. 创建目录。例如：

   ```
   mkdir hive_plugin
   ```

1. Hadoop 提取。

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Hive 提取。

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## 自定义插件
<a name="samples-hive-plugins-code"></a>

Apache Airflow 将在启动时执行插件文件夹中的 Python 文件内容。这用于设置和修改环境变量。以下步骤介绍了此自定义插件的示例代码。

1. 在命令行提示符中，导航到 `hive_plugin` 目录。例如：

   ```
   cd hive_plugin
   ```

1. 复制以下代码示例的内容，并在 `hive_plugin` 目录中将其本地另存为 `hive_plugin.py`。

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. 复制以下文本的内容，并在 `hive_plugin` 目录中将其本地另存为 `.airflowignore`。

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

以下步骤说明如何创建 `plugins.zip`。此示例的内容可以与其他插件和二进制文件组合成一个 `plugins.zip` 文件。

1. 在命令提示符下，导航到上一步中的 `hive_plugin` 目录。例如：

   ```
   cd hive_plugin
   ```

1. 将内容压缩到 `plugins` 文件夹中。

   ```
   zip -r ../hive_plugin.zip ./
   ```

## 代码示例
<a name="samples-hive-code"></a>

以下步骤介绍如何创建用于测试自定义插件的 DAG 代码。

1. 在命令提示符下，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `hive.py`。

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Airflow 配置选项
<a name="samples-hive-airflow-config"></a>

如果您使用的是 Apache Airflow v2，请添加 `core.lazy_load_plugins : False` 为 Apache Airflow 配置选项。要了解更多信息，请参阅 [2 中的使用配置选项加载插件](configuring-env-variables.md#configuring-2.0-airflow-override)。

## 接下来做什么？
<a name="samples-hive-next-up"></a>
+ 要了解如何将本示例中的 `requirements.txt` 文件上传到 Amazon S3 存储桶，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。
+ 要了解如何将本示例中的 `plugins.zip` 文件上传到 Amazon S3 存储桶，请参阅 [安装自定义插件](configuring-dag-import-plugins.md)。

# 为 Apache Airflow PythonVirtualenvOperator 创建自定义插件
<a name="samples-virtualenv"></a>

以下示例说明了如何在 Amazon Managed Workflows for Apache Airflow 上使用自定义插件修补 Apache Airflow `PythonVirtualenvOperator`。

**Topics**
+ [版本](#samples-virtualenv-version)
+ [先决条件](#samples-virtualenv-prereqs)
+ [权限](#samples-virtualenv-permissions)
+ [要求](#samples-virtualenv-dependencies)
+ [自定义插件示例代码](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [代码示例](#samples-virtualenv-code)
+ [Airflow 配置选项](#samples-virtualenv-airflow-config)
+ [接下来做什么？](#samples-virtualenv-next-up)

## 版本
<a name="samples-virtualenv-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-virtualenv-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)

## 权限
<a name="samples-virtualenv-permissions"></a>

无需其他权限即可使用本页上的代码示例。

## 要求
<a name="samples-virtualenv-dependencies"></a>

要使用本页上的示例代码，请将以下依赖项添加到 `requirements.txt`。要了解更多信息，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。

```
virtualenv
```

## 自定义插件示例代码
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow 将在启动时执行插件文件夹中的 Python 文件内容。此插件将在启动过程中修补内置的 `PythonVirtualenvOperator`，使其与 Amazon MWAA 兼容。以下步骤显示了此自定义插件的示例代码。

1. 在命令提示符下，导航到上一部分中的 `plugins` 目录。例如：

   ```
   cd plugins
   ```

1. 复制以下代码示例的内容，并在本地另存为 `virtual_python_plugin.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

以下步骤说明如何创建 `plugins.zip`。

1. 在命令提示符下，导航到上一部分中包含 `virtual_python_plugin.py` 的目录。例如：

   ```
   cd plugins
   ```

1. 将内容压缩到 `plugins` 文件夹中。

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## 代码示例
<a name="samples-virtualenv-code"></a>

以下步骤介绍了如何创建自定义插件的 DAG 代码。

1. 在命令提示符下，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `virtualenv_test.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Airflow 配置选项
<a name="samples-virtualenv-airflow-config"></a>

如果您使用的是 Apache Airflow v2，请添加 `core.lazy_load_plugins : False` 为 Apache Airflow 配置选项。要了解更多信息，请参阅 [2 中的使用配置选项加载插件](configuring-env-variables.md#configuring-2.0-airflow-override)。

## 接下来做什么？
<a name="samples-virtualenv-next-up"></a>
+ 要了解如何将本示例中的 `requirements.txt` 文件上传到 Amazon S3 存储桶，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。
+ 要了解如何将本示例中的 `plugins.zip` 文件上传到 Amazon S3 存储桶，请参阅 [安装自定义插件](configuring-dag-import-plugins.md)。

# 使用 Lambda DAGs 函数进行调用
<a name="samples-lambda"></a>

以下代码示例使用 [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) 函数获取 Apache Airflow CLI 令牌并在 Amazon MWAA 环境中调用有向无环图（DAG）。

**Topics**
+ [版本](#samples-lambda-version)
+ [先决条件](#samples-lambda-prereqs)
+ [Permissions](#samples-lambda-permissions)
+ [依赖项](#samples-lambda-dependencies)
+ [代码示例](#samples-lambda-code)

## 版本
<a name="samples-lambda-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-lambda-prereqs"></a>

要使用此代码示例，您必须：
+ 使用 [Amazon MWAA](get-started.md) 环境[公共网络访问模式](configuring-networking.md#webserver-options-public-network-onconsole)。
+ 使用最新的 Python 运行时创建一个 [Lambda 函数](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html)。

**注意**  
如果 Lambda 函数和 Amazon MWAA 环境处于同一 VPC 中，则可以在私有网络上使用此代码。对于本配置，Lambda 函数的执行角色需要获得调用 Amazon Elastic Compute Cloud（Amazon EC2）**CreateNetworkInterface** API 操作的权限。您可以使用 [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS-managed 策略提供此权限。

## Permissions
<a name="samples-lambda-permissions"></a>

要使用本页上的代码示例，Amazon MWAA 环境的执行角色需要访问权限才能执行 `airflow:CreateCliToken` 操作。您可以使用 `AmazonMWAAAirflowCliAccess` AWS-managed 策略提供此权限：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

有关更多信息，请参阅[Apache Airflow CLI 政策：亚马逊 MWAAAirflow CliAccess](access-policies.md#cli-access)。

## 依赖项
<a name="samples-lambda-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。用于[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)安装 Apache Airflow。

## 代码示例
<a name="samples-lambda-code"></a>

1. 打开 AWS Lambda 控制台，网址为[https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/)。

1. 从 **Functions** 列表中选择 Lambda 函数。

1. 在函数页面上，复制以下代码并将以下代码替换为资源名称：
   + `YOUR_ENVIRONMENT_NAME` – Amazon MWAA 环境名称。
   + `YOUR_DAG_NAME` – 您想调用的 DAG 名称。

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. 选择**部署**。

1. 选择**测试**，使用 Lambda 控制台调用函数。

1. 要验证 Lambda 是否成功调用了 DAG，请使用 Amazon MWAA 控制台导航到环境的 Apache Airflow UI 界面，然后执行以下操作：

   1. 在该**DAGs**页面上，在列表中找到您的新目标 DAG DAGs。

   1. 在**上次运行**下，查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中 `invoke_dag` 的最新时间戳非常匹配。

   1. 在**近期任务**下，检查上次运行是否成功。

# DAGs 在不同的 Amazon MWAA 环境中调用
<a name="samples-invoke-dag"></a>

以下代码示例创建了一个 Apache Airflow CLI 令牌。然后，该代码使用一个 Amazon MWAA 环境中的有向无环图（DAG）在另一个 Amazon MWAA 环境中调用 DAG。

**Topics**
+ [版本](#samples-invoke-dag-version)
+ [先决条件](#samples-invoke-dag-prereqs)
+ [Permissions](#samples-invoke-dag-permissions)
+ [依赖项](#samples-invoke-dag-dependencies)
+ [代码示例](#samples-invoke-dag-code)

## 版本
<a name="samples-invoke-dag-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-invoke-dag-prereqs"></a>

要使用本页上的代码示例，您需要以下内容：
+ 两个具有**公有网络** Web 服务器访问权限的 [Amazon MWAA 环境](get-started.md)，包括您当前的环境。
+ 上传到目标环境的 Amazon Simple Storage Service（Amazon S3）桶的示例 DAG。

## Permissions
<a name="samples-invoke-dag-permissions"></a>

要使用本页上的代码示例，环境的执行角色必须具有创建 Apache Airflow CLI 令牌的权限。您可以附加 AWS-managed 策略`AmazonMWAAAirflowCliAccess`来授予此权限。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

有关更多信息，请参阅[Apache Airflow CLI 政策：亚马逊 MWAAAirflow CliAccess](access-policies.md#cli-access)。

## 依赖项
<a name="samples-invoke-dag-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。用于[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)安装 Apache Airflow。

## 代码示例
<a name="samples-invoke-dag-code"></a>

以下代码示例假设您在当前环境中使用 DAG 在另一个环境中调用 DAG。

1. 在您的终端，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下示例的内容并本地另存为 `invoke_dag.py`。用您自己的信息替换以下值。
   + `your-new-environment-name`— 您要调用 DAG 的另一个环境的名称。
   + `your-target-dag-id`— 您要调用 DAG 的另一个环境中的 DAG ID。

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow 用户界面触发 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果 DAG 成功运行，您将看到类似于 `invoke_dag_task` 的任务日志中的以下内容的输出。

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   要验证 DAG 是否已成功调用，请导航到新环境的 Apache Airflow UI，然后执行以下操作：

   1. 在该**DAGs**页面上，在列表中找到您的新目标 DAG DAGs。

   1. 在**上次运行**下，查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中 `invoke_dag` 的最新时间戳非常匹配。

   1. 在**近期任务**下，检查上次运行是否成功。

# 将 Amazon RDS for Microsoft SQL Server 与 Amazon MWAA 一起使用
<a name="samples-sql-server"></a>

您可以使用 Amazon MWAA 连接到 [RDS for SQL Server](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html)。以下示例代码使用 Amazon Managed Workflows for Apache Airflow 环境中的 DAG 连接到 Amazon RDS for Microsoft SQL Server 并在其上执行查询。

**Topics**
+ [版本](#samples-sql-server-version)
+ [先决条件](#samples-sql-server-prereqs)
+ [依赖项](#samples-sql-server-dependencies)
+ [Apache Airflow v2 连接](#samples-sql-server-conn)
+ [代码示例](#samples-sql-server-code)
+ [接下来做什么？](#samples-sql-server-next-up)

## 版本
<a name="samples-sql-server-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-sql-server-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)
+ Amazon MWAA 和 RDS for SQL Server 在同一个 Amazon VPC/上运行
+ Amazon MWAA 和服务器的 VPC 安全组配置有以下连接：
  + Amazon MWAA 安全组中对 Amazon RDS `1433` 开放端口的入站规则
  + 或者是从 Amazon MWAA 到 RDS 开放端口 `1433` 的出站规则
+ RDS for SQL Server 的 Apache Airflow 连接反映了在之前过程中创建的 Amazon RDS SQL 服务器数据库的主机名、端口、用户名和密码。

## 依赖项
<a name="samples-sql-server-dependencies"></a>

要使用本节中的示例代码，请将以下依赖项添加到 `requirements.txt`。要了解更多信息，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Apache Airflow v2 连接
<a name="samples-sql-server-conn"></a>

如果您在 Apache Airflow v2 中使用连接，请确保 Airflow 连接对象包含以下键值对：

1. **连接 ID：**mssql\$1default

1. **连接类型：**Amazon Web Services

1. **主机：**`YOUR_DB_HOST`

1. **架构：**

1. **登录：**管理员

1. **密码：**

1. **端口：**1433

1. **附加依赖项：**

## 代码示例
<a name="samples-sql-server-code"></a>

1. 在命令提示符下，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `sql-server.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## 接下来做什么？
<a name="samples-sql-server-next-up"></a>
+ 要了解如何将本示例中的 `requirements.txt` 文件上传到 Amazon S3 存储桶，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。
+ 浏览示例脚本和其他 [pymssql 模块示例](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html)。
+ 在*《Apache Airflow 参考指南》*中详细了解如何使用 [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) 在特定的 Microsoft SQL 数据库中执行 SQL 代码。

# 将 Amazon MWAA 与 Amazon EKS 一起使用
<a name="mwaa-eks-example"></a>

以下示例演示了如何将 Amazon MWAA 与 Amazon EKS 一起使用。

**Topics**
+ [版本](#mwaa-eks-example-version)
+ [先决条件](#eksctl-prereqs)
+ [创建 Amazon EC2 公有密钥](#eksctl-create-key)
+ [创建集群](#create-cluster-eksctl)
+ [创建 `mwaa` 命名空间](#eksctl-namespace)
+ [为 `mwaa` 命名空间创建角色](#eksctl-role)
+ [创建并附加 Amazon EKS 集群的 IAM 角色](#eksctl-iam-role)
+ [创建 requirements.txt 文件](#eksctl-requirements)
+ [为 Amazon EKS 创建身份映射](#eksctl-identity-map)
+ [创建 `kubeconfig`](#eksctl-kube-config)
+ [创建 DAG](#eksctl-create-dag)
+ [将 DAG 和 `kube_config.yaml` 添加到 Amazon S3 存储桶中](#eksctl-dag-bucket)
+ [启用并触发示例](#eksctl-trigger-pod)

## 版本
<a name="mwaa-eks-example-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="eksctl-prereqs"></a>

要使用本主题中的示例，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)
+ eksctl。要了解更多信息，请参阅[安装 eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl)。
+ kubectl。要了解更多信息，请参阅[安装和设置 kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/)。在某些情况下，它是与 eksctl 一起安装的。
+ 在您创建 Amazon MWAA 环境的区域中的 EC2 密钥对。要了解更多信息，请参阅[创建或导入密钥对](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair)。

**注意**  
使用 `eksctl` 命令时，可以包含 `--profile`，以指定默认配置文件以外的配置文件。

## 创建 Amazon EC2 公有密钥
<a name="eksctl-create-key"></a>

使用以下命令，以从私有密钥对中创建公有密钥。

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

要了解更新信息，请参阅[检索密钥对的公有密钥](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key)。

## 创建集群
<a name="create-cluster-eksctl"></a>

使用以下命令来创建集群。如果您想要为集群自定义名称或在其他区域创建集群，请替换名称和区域值。您必须在与您在创建 Amazon MWAA 环境的同一区域中创建集群。替换子网的值，使其与您用于 Amazon MWAA 的 Amazon VPC 网络中的子网相匹配。替换 `ssh-public-key` 的值以匹配您使用的密钥。您可以使用位于同一区域的 Amazon EC2 中的现有密钥，也可以在创建 Amazon MWAA 环境的同一区域创建新密钥。

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

完成集群的创建需要一段时间。完成后，您可以使用以下命令验证集群是否已成功创建并配置了 IAM OIDC 提供商：

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## 创建 `mwaa` 命名空间
<a name="eksctl-namespace"></a>

确认集群已成功创建后，使用以下命令为 pod 创建命名空间。

```
kubectl create namespace mwaa
```

## 为 `mwaa` 命名空间创建角色
<a name="eksctl-role"></a>

创建命名空间后，在 EKS 上为可在 MWAA 命名空间中运行 pod 的 Amazon MWAA 用户创建角色和角色绑定。如果您为命名空间使用了不同的名称，请将 `-n mwaa` 中的 mwaa 名称替换为您使用的名称。

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

运行以下命令来确认新角色可以访问 Amazon EKS 集群。如果您没有使用以下名称，请务必使用正确的名称*mwaa*：

```
kubectl get pods -n mwaa --as mwaa-service
```

您会收到一条含有如下内容的消息：

```
No resources found in mwaa namespace.
```

## 创建并附加 Amazon EKS 集群的 IAM 角色
<a name="eksctl-iam-role"></a>

您必须创建一个 IAM 角色，然后将其绑定到 Amazon EKS（k8s）集群，这样该角色才能通过 IAM 进行身份验证。该角色仅用于登录集群，没有任何控制台或 API 调用的权限。

使用 [Amazon MWAA 执行角色](mwaa-create-role.md) 中的步骤为 Amazon MWAA 环境创建新角色。但是，与其创建和附加该主题中描述的策略，不如附加以下策略：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

创建角色后，编辑 Amazon MWAA 环境，使用您创建的角色作为环境的执行角色。要更改角色，请编辑要使用的环境。您可以在“**权限**”下选择执行角色。

**已知问题：**
+ 角色 ARNs 存在一个已知问题，子路径无法通过 Amazon EKS 进行身份验证。解决方法是手动创建服务角色，而不是使用 Amazon MWAA 自己创建的服务角色。要了解更多信息，请参阅[在 aws-auth ConfigMap 中当 ARN 包含路径时，带有路径的角色不起作用](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)
+ 如果 IAM 中没有 Amazon MWAA 服务列表，则需要选择备用服务策略，例如 Amazon EC2，然后更新该角色的信任策略以匹配以下内容：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  要了解更多信息，请参阅[如何在 IAM 角色中使用信任策略](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/)。

## 创建 requirements.txt 文件
<a name="eksctl-requirements"></a>

要使用本节中的示例代码，请确保已向 `requirements.txt` 中添加了以下数据库选项之一。要了解更多信息，请参阅 [安装 Python 依赖项](working-dags-dependencies.md)。

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## 为 Amazon EKS 创建身份映射
<a name="eksctl-identity-map"></a>

使用您在以下命令中创建的角色的 ARN 为 Amazon EKS 创建身份映射。将区域*us-east-1*更改为创建环境的区域。替换角色的 ARN，最后替换*mwaa-execution-role*为环境的执行角色。

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## 创建 `kubeconfig`
<a name="eksctl-kube-config"></a>

使用以下命令创建 `kubeconfig`：

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

如果您在运行 `update-kubeconfig` 时使用了特定的配置文件，则需要删除添加到 kube\$1config.yaml 文件中的 `env:` 部分，这样它才能在 Amazon MWAA 中正常运行。为此，请从文件中删除以下内容，然后将其保存：

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## 创建 DAG
<a name="eksctl-create-dag"></a>

使用以下代码示例创建 Python 文件，例如 DAG 的 `mwaa_pod_example.py` 文件。

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## 将 DAG 和 `kube_config.yaml` 添加到 Amazon S3 存储桶中
<a name="eksctl-dag-bucket"></a>

将您创建的 DAG 和 `kube_config.yaml` 文件放入 Amazon MWAA 环境的 Amazon S3 存储桶中。您可以使用 Amazon S3 控制台或 AWS Command Line Interface将所有文件放入存储桶中。

## 启用并触发示例
<a name="eksctl-trigger-pod"></a>

在 Apache Airflow 中，启用该示例，然后将其触发。

成功运行并完成后，使用以下命令验证 Pod：

```
kubectl get pods -n mwaa
```

您将获得与下内容类似的输出：

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

然后，您可以使用以下命令验证 Pod 的输出。请将名称值替换为上一个命令返回的值：

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# 使用 `ECSOperator` 连接 Amazon ECS
<a name="samples-ecs-operator"></a>

此主题介绍如何从 Amazon MWAA 使用 `ECSOperator` 连接到 Amazon Elastic Container Service（Amazon ECS）容器。在以下步骤中，您将向环境的执行角色添加所需的权限，使用 CloudFormation 模板创建 Amazon ECS Fargate 集群，最后创建并上传连接到新集群的 DAG。

**Topics**
+ [版本](#samples-ecs-operator-version)
+ [先决条件](#samples-ecs-operator-prereqs)
+ [Permissions](#samples-ecs-operator-permissions)
+ [创建 Amazon ECS 集群](#create-cfn-template)
+ [代码示例](#samples-ecs-operator-code)

## 版本
<a name="samples-ecs-operator-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-ecs-operator-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)

## Permissions
<a name="samples-ecs-operator-permissions"></a>
+ 环境的执行角色需要权限才能在 Amazon ECS 中运行任务。您可以将 [Amazonecs\$1 FullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) AWS托管策略附加到您的执行角色，也可以创建以下策略并将其附加到您的执行角色。

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ 除了添加在 Amazon ECS 中运行任务所需的权限外，您还必须修改 Amazon MWAA 执行角色中的 CloudWatch 日志策略声明，以允许访问 Amazon ECS 任务日志组，如下所示。Amazon ECS 日志组由中的 CloudFormation 模板创建[创建 Amazon ECS 集群](#create-cfn-template)。

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

有关 Amazon MWAA 执行角色，以及如何附加策略的更多信息，请参阅 [执行角色](mwaa-create-role.md)。

## 创建 Amazon ECS 集群
<a name="create-cfn-template"></a>

使用以下 CloudFormation 模板，您将构建一个 Amazon ECS Fargate 集群，用于您的 Amazon MWAA 工作流程。有关更多信息，请参阅*《Amazon Elastic Container Service 开发人员指南》*中的[创建一个任务定义](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition)。

1. 创建一个包含以下代码的 JSON 文件，并将该文件另存为 `ecs-mwaa-cfn.json`。

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. 在命令提示符下，使用以下 AWS CLI 命令创建新堆栈。您必须将 `SecurityGroups` 和 `SubnetIds` 的值替换为 Amazon MWAA 环境的安全组和子网的值。

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   或者，您可以使用以下 Shell 脚本：该脚本使用`[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI 命令检索环境的安全组和子网所需的值，然后相应地创建堆栈。要运行该脚本，请运行以下命令。

   1. 复制脚本并将其保存到与 CloudFormation 模板相同的目录中。`ecs-stack-helper.sh`

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. 使用以下命令运行该脚本。将 `environment-name` 和 `stack-name` 替换为您的信息。

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   如果成功，您将参考以下显示您的新 CloudFormation 堆栈 ID 的输出。

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

在 CloudFormation 堆栈完成并 AWS 预配置 Amazon ECS 资源后，您就可以创建和上传您的 DAG 了。

## 代码示例
<a name="samples-ecs-operator-code"></a>

1. 打开命令提示符，然后导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容并将其本地另存为 `mwaa-ecs-operator.py`，然后将新 DAG 上传到 Amazon S3。

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**注意**  
在 DAG 的示例中，对于 `awslogs_group`，您可能需要使用 Amazon ECS 任务日志组的名称修改日志组。示例假设名为 `mwaa-ecs-zero` 的日志组。对于 `awslogs_stream_prefix`，使用 Amazon ECS 任务日志流前缀。该示例假设日志流前缀为 `ecs`。

1.  运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow 用户界面触发 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您将在 `ecs_fargate_dag` DAG 的任务日志中看到输出，类似于 `ecs_operator_task` 的任务日志中的以下内容：

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# 在 Amazon MWAA 中使用 dbt
<a name="samples-dbt"></a>

本主题演示了如何在 Amazon MWAA 中使用 dbt 和 Postgres。在以下步骤中，您将所需的依赖项添加到 `requirements.txt` 中，并将示例 dbt 项目上传到环境的 Amazon S3 存储桶。然后，您将使用示例 DAG 来验证 Amazon MWAA 是否已安装依赖项，最后使用 `BashOperator` 来运行 dbt 项目。

**Topics**
+ [版本](#samples-dbt-version)
+ [先决条件](#samples-dbt-prereqs)
+ [依赖项](#samples-dbt-dependencies)
+ [将 dbt 项目上传到 Amazon S3](#samples-dbt-upload-project)
+ [使用 DAG 验证 dbt 依赖项的安装](#samples-dbt-test-dependencies)
+ [使用 DAG 来运行 dbt 项目](#samples-dbt-run-project)

## 版本
<a name="samples-dbt-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-dbt-prereqs"></a>

在完成以下步骤之前，您需要具备以下条件：
+ 使用 Apache Airflow v2.2.2 的 [Amazon MWAA 环境](get-started.md)。此示例已编写，并使用 v2.2.2 进行了测试。您可能需要修改示例以与其他 Apache Airflow 版本一起使用。
+ dbt 项目示例。要开始在 Amazon MWAA 中使用 dbt，你可以创建一个分支并从 dbt-labs 存储库中克隆 [dbt 入门项目](https://github.com/dbt-labs/dbt-starter-project)。 GitHub 

## 依赖项
<a name="samples-dbt-dependencies"></a>

要将 Amazon MWAA 与 dbt 配合使用，请将以下启动脚本添加到环境中。要了解更多信息，请参阅[在 Amazon MWAA 中使用启动脚本](using-startup-script.md)。

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

在以下章节中，您可将 dbt 项目目录上传到 Amazon S3 并运行 DAG 来验证 Amazon MWAA 是否已成功安装所需的 dbt 依赖项。

## 将 dbt 项目上传到 Amazon S3
<a name="samples-dbt-upload-project"></a>

为了能够在 Amazon MWAA 环境中使用 dbt 项目，您可以将整个项目目录上传到环境的 `dags` 文件夹中。当环境更新时，Amazon MWAA 会将 dbt 目录下载到本地 `usr/local/airflow/dags/` 文件夹。

**要将 dbt 项目上传到 Amazon S3，请执行以下操作**

1. 导航到您克隆 dbt 入门项目的目录。

1. 运行以下 Amazon S3 AWS CLI 命令，使用`--recursive`参数以递归方式将项目内容复制到您的环境`dags`文件夹。该命令会创建一个名为 `dbt` 的子目录，您可以将其用于所有 dbt 项目。如果子目录已经存在，则项目文件将被复制到现有目录中，并且不会创建新目录。该命令还会为该特定入门项目在 `dbt` 目录中创建一个子目录。

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   您可以为项目子目录使用不同的名称，以便在 `dbt` 父目录中组织多个 dbt 项目。

## 使用 DAG 验证 dbt 依赖项的安装
<a name="samples-dbt-test-dependencies"></a>

以下 DAG 使用 `BashOperator` 和 bash 命令来验证 Amazon MWAA 是否已成功安装 `requirements.txt` 中指定的 dbt 依赖项。

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

执行以下操作以访问任务日志并验证是否已安装 dbt 及其依赖项。

1. 导航到 Amazon MWAA 控制台，然后从可用环境列表中选择**打开 Airflow UI**。

1. 在 Apache Airflow UI 上，从列表中找到 `dbt-installation-test` DAG，然后在 `Last Run` 列中选择日期以打开上一个成功的任务。

1. 使用**图表视图**，选择 `bash_command` 任务以打开任务实例的详细信息。

1. 选择**日志**来打开任务日志，然后验证日志是否成功列出了我们在 `requirements.txt` 中指定的 dbt 版本。

## 使用 DAG 来运行 dbt 项目
<a name="samples-dbt-run-project"></a>

以下 DAG 使用 `BashOperator` 将您从本地 `usr/local/airflow/dags/` 目录上传到 Amazon S3 的 dbt 项目复制到可写入的 `/tmp` 目录，然后运行 dbt 项目。bash 命令假设一个名为 `dbt-starter-project` 的 入门 dbt 项目。根据您项目目录的名称修改目录名称。

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS 博客和教程
<a name="samples-blogs-tutorials"></a>
+ [使用 Amazon EKS 和 Apache Airflow v2.x 的 Amazon MWAA](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)