

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Amazon Managed Workflows for Apache Airflow 的程式碼範例
<a name="sample-code"></a>

本指南包含程式碼範例，包括 DAGs和自訂外掛程式，您可以在 Amazon Managed Workflows for Apache Airflow 環境中使用。如需搭配 AWS 服務使用 Apache Airflow 的更多範例，請參閱 Apache Airflow GitHub 儲存庫中的 [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags)目錄。

**Topics**
+ [使用 DAG 在 CLI 中匯入變數](samples-variables-import.md)
+ [使用 建立 SSH 連線 `SSHOperator`](samples-ssh.md)
+ [在 中使用私密金鑰 AWS Secrets Manager 進行 Apache Airflow Snowflake 連線](samples-sm-snowflake.md)
+ [使用 DAG 在 CloudWatch 中寫入自訂指標](samples-custom-metrics.md)
+ [Amazon MWAA 環境上的 Aurora PostgreSQL 資料庫清除](samples-database-cleanup.md)
+ [將環境中繼資料匯出至 Amazon S3 上的 CSV 檔案](samples-dag-run-info-to-csv.md)
+ [針對 AWS Secrets Manager Apache Airflow 變數在 中使用私密金鑰](samples-secrets-manager-var.md)
+ [在 中使用私密金鑰 AWS Secrets Manager 進行 Apache Airflow 連線](samples-secrets-manager.md)
+ [使用 Oracle 建立自訂外掛程式](samples-oracle.md)
+ [在 Amazon MWAA 上變更 DAG 的時區](samples-plugins-timezone.md)
+ [重新整理 CodeArtifact 權杖](samples-code-artifact.md)
+ [使用 Apache Hive 和 Hadoop 建立自訂外掛程式](samples-hive.md)
+ [為 Apache Airflow PythonVirtualenvOperator 建立自訂外掛程式](samples-virtualenv.md)
+ [使用 Lambda 函數叫用 DAGs](samples-lambda.md)
+ [在不同的 Amazon MWAA 環境中叫用 DAGs](samples-invoke-dag.md)
+ [搭配使用 Amazon MWAA 與 Amazon RDS for Microsoft SQL Server](samples-sql-server.md)
+ [搭配使用 Amazon MWAA 與 Amazon EKS](mwaa-eks-example.md)
+ [使用 連線至 Amazon ECS `ECSOperator`](samples-ecs-operator.md)
+ [搭配 Amazon MWAA 使用 dbt](samples-dbt.md)
+ [AWS 部落格和教學課程](#samples-blogs-tutorials)

# 使用 DAG 在 CLI 中匯入變數
<a name="samples-variables-import"></a>

下列範例程式碼會使用 Amazon Managed Workflows for Apache Airflow 上的 CLI 匯入變數。

**Topics**
+ [版本](#samples-variables-import-version)
+ [先決條件](#samples-variables-import-prereqs)
+ [權限](#samples-variables-import-permissions)
+ [相依性](#samples-variables-import-dependencies)
+ [範例程式碼](#samples-variables-import-code)
+ [後續步驟？](#samples-variables-import-next-up)

## 版本
<a name="samples-variables-import-version"></a>

您可以在 Python 3.10 中使用 **Apache Airflow v2** 和 Python 3.1[1](https://peps.python.org/pep-0664/) ****中使用此頁面的程式碼範例。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/)

## 先決條件
<a name="samples-variables-import-prereqs"></a>

使用此頁面上的程式碼範例不需要額外的許可。

## 權限
<a name="samples-variables-import-permissions"></a>

您的 AWS 帳戶 需要存取 `AmazonMWAAAirflowCliAccess`政策。若要進一步了解，請參閱 [Apache Airflow CLI 政策：AmazonMWAAAirflowCliAccess](access-policies.md)。

## 相依性
<a name="samples-variables-import-dependencies"></a>

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 範例程式碼
<a name="samples-variables-import-code"></a>

下列範例程式碼需要三個輸入：您的 Amazon MWAA 環境名稱 （在 中`mwaa_env`)、您環境 AWS 區域 的 （在 中`aws_region`)，以及包含您要匯入之變數的本機檔案 （在 中`var_file`)。

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## 後續步驟？
<a name="samples-variables-import-next-up"></a>
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。

# 使用 建立 SSH 連線 `SSHOperator`
<a name="samples-ssh"></a>

下列範例說明如何使用定向無環圖 (DAG) `SSHOperator`中的 ，從 Amazon Managed Workflows for Apache Airflow 環境連線至遠端 Amazon EC2 執行個體。您可以使用類似的方法來連線至具有 SSH 存取的任何遠端執行個體。

在下列範例中，您將 SSH 私密金鑰 (`.pem`) 上傳至 Amazon S3 上的環境`dags`目錄。然後，您可以使用 安裝必要的相依性，`requirements.txt`並在 UI 中建立新的 Apache Airflow 連線。最後，您撰寫的 DAG 會建立與遠端執行個體的 SSH 連線。

**Topics**
+ [版本](#samples-ssh-version)
+ [先決條件](#samples-ssh-prereqs)
+ [權限](#samples-ssh-permissions)
+ [要求](#samples-ssh-dependencies)
+ [將您的私密金鑰複製到 Amazon S3](#samples-ssh-secret)
+ [建立新的 Apache Airflow 連線](#samples-ssh-connection)
+ [範例程式碼](#samples-ssh-code)

## 版本
<a name="samples-ssh-version"></a>

您可以在 Python 3.10 中使用 **Apache Airflow v2** 和 Python 3.1[1](https://peps.python.org/pep-0664/) ****中使用此頁面的程式碼範例。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/)

## 先決條件
<a name="samples-ssh-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。
+ SSH 私密金鑰。程式碼範例假設您在與 Amazon MWAA 環境相同的區域中有 Amazon EC2 執行個體和 。 `.pem`如果您沒有金鑰，請參閱《*Amazon EC2 使用者指南*》中的[建立或匯入金鑰對](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair)。

## 權限
<a name="samples-ssh-permissions"></a>

使用此頁面上的程式碼範例不需要額外的許可。

## 要求
<a name="samples-ssh-dependencies"></a>

將下列參數新增至 `requirements.txt`，以在 Web 伺服器上安裝`apache-airflow-providers-ssh`套件。一旦您的環境更新且 Amazon MWAA 成功安裝相依性，您將在 UI 中取得新的 **SSH** 連線類型。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**注意**  
`-c` 定義 中的限制條件 URL`requirements.txt`。這可確保 Amazon MWAA 為您的環境安裝正確的套件版本。

## 將您的私密金鑰複製到 Amazon S3
<a name="samples-ssh-secret"></a>

使用下列 AWS Command Line Interface 命令將您的`.pem`金鑰複製到 Amazon S3 中的環境`dags`目錄。

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA 會將 中的內容`dags`，包括`.pem`金鑰，複製到本機`/usr/local/airflow/dags/`目錄。透過這樣做，Apache Airflow 可以存取金鑰。

## 建立新的 Apache Airflow 連線
<a name="samples-ssh-connection"></a>

**使用 Apache Airflow UI 建立新的 SSH 連線**

1. 在 Amazon MWAA 主控台上開啟[環境](https://console.aws.amazon.com/mwaa/home#/environments)頁面。

1. 從環境清單中，為您的環境選擇 **Open Airflow UI**。

1. 在 Apache Airflow UI 頁面上，從主導覽列選擇**管理員**以展開下拉式清單，然後選擇**連線**。

1. 在**列出連線**頁面上，選擇 **＋**，或**新增記錄**按鈕以新增連線。

1. 在**新增連線**頁面上，新增下列資訊：

   1. 針對**連線 ID**，輸入 **ssh\$1new**。

   1. 針對**連線類型**，從下拉式清單中選擇 **SSH**。
**注意**  
如果清單中沒有 **SSH** 連線類型，Amazon MWAA 尚未安裝必要的`apache-airflow-providers-ssh`套件。更新您的`requirements.txt`檔案以包含此套件，然後再試一次。

   1. 針對**主機**，輸入您要連線之 Amazon EC2 執行個體的 IP 地址。例如 **12.345.67.89**。

   1. 針對**使用者名稱**，**ec2-user**如果您要連線至 Amazon EC2 執行個體，請輸入 。您的使用者名稱可能不同，取決於您希望 Apache Airflow 連線的遠端執行個體類型。

   1. 對於 **Extra**，以 JSON 格式輸入下列鍵/值對：

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      此鍵/值對會指示 Apache Airflow 在本機`/dags`目錄中搜尋私密金鑰。

## 範例程式碼
<a name="samples-ssh-code"></a>

下列 DAG 使用 `SSHOperator` 連線到您的目標 Amazon EC2 執行個體，然後執行 `hostname` Linux 命令來列印執行個體的名稱。您可以修改 DAG 以在遠端執行個體上執行任何命令或指令碼。

1. 開啟終端機，然後導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `ssh.py`。

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  執行下列 AWS CLI 命令，將 DAG 複製到您環境的儲存貯體，然後使用 Apache Airflow UI 觸發 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您會收到類似 `ssh_operator_example` DAG 中 任務日誌中下列項目`ssh_task`的輸出：

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# 在 中使用私密金鑰 AWS Secrets Manager 進行 Apache Airflow Snowflake 連線
<a name="samples-sm-snowflake"></a>

下列範例呼叫 AWS Secrets Manager 會在 Amazon Managed Workflows for Apache Airflow 上取得 Apache Airflow Snowflake 連線的私密金鑰。它假設您已完成 中的步驟[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

**Topics**
+ [版本](#samples-sm-snowflake-version)
+ [先決條件](#samples-sm-snowflake-prereqs)
+ [權限](#samples-sm-snowflake-permissions)
+ [要求](#samples-sm-snowflake-dependencies)
+ [範例程式碼](#samples-sm-snowflake-code)
+ [後續步驟？](#samples-sm-snowflake-next-up)

## 版本
<a name="samples-sm-snowflake-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-sm-snowflake-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ Secrets Manager 後端做為 Apache Airflow 組態選項，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。
+ Secrets Manager 中的 Apache Airflow 連線字串，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

## 權限
<a name="samples-sm-snowflake-permissions"></a>
+ Secrets Manager 許可，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

## 要求
<a name="samples-sm-snowflake-dependencies"></a>

若要使用此頁面上的範例程式碼，請將下列相依性新增至您的 `requirements.txt`。若要進一步了解，請參閱 [安裝 Python 相依性](working-dags-dependencies.md)。

```
apache-airflow-providers-snowflake==1.3.0
```

## 範例程式碼
<a name="samples-sm-snowflake-code"></a>

下列步驟說明如何建立 DAG 程式碼，呼叫 Secrets Manager 來取得秘密。

1. 在命令提示中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `snowflake_connection.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## 後續步驟？
<a name="samples-sm-snowflake-next-up"></a>
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。

# 使用 DAG 在 CloudWatch 中寫入自訂指標
<a name="samples-custom-metrics"></a>

您可以使用下列程式碼範例來撰寫執行 的導向無環圖 (DAG)，`PythonOperator`以擷取 Amazon MWAA 環境的作業系統層級指標。然後，DAG 會將資料作為自訂指標發佈至 Amazon CloudWatch。

自訂作業系統層級指標可讓您進一步了解環境工作者如何利用虛擬記憶體和 CPU 等資源。您可以使用此資訊來選取最適合工作負載[的環境類別](environment-class.md)。

**Topics**
+ [版本](#samples-custom-metrics-version)
+ [先決條件](#samples-custom-metrics-prereqs)
+ [權限](#samples-custom-metrics-permissions)
+ [相依性](#samples-custom-metrics-dependencies)
+ [程式碼範例](#samples-custom-metrics-code)

## 版本
<a name="samples-custom-metrics-version"></a>

您可以在 Python 3.10 中使用 **Apache Airflow v2** 和 Python 3.1[1](https://peps.python.org/pep-0664/) ****中使用此頁面的程式碼範例。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/)

## 先決條件
<a name="samples-custom-metrics-prereqs"></a>

若要使用此頁面上的程式碼範例，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。

## 權限
<a name="samples-custom-metrics-permissions"></a>

使用此頁面上的程式碼範例不需要額外的許可。

## 相依性
<a name="samples-custom-metrics-dependencies"></a>
+ 使用此頁面上的程式碼範例不需要額外的相依性。

## 程式碼範例
<a name="samples-custom-metrics-code"></a>

1. 在命令提示中，導覽至存放 DAG 程式碼的資料夾。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並將其儲存為 `dag-custom-metrics.py`。將 取代`MWAA-ENV-NAME`為您的環境名稱。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  執行下列 AWS CLI 命令，將 DAG 複製到您環境的儲存貯體，然後使用 Apache Airflow UI 觸發 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果 DAG 執行成功，您會在 Apache Airflow 日誌中取得類似以下內容的內容：

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Amazon MWAA 環境上的 Aurora PostgreSQL 資料庫清除
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow 使用 Aurora PostgreSQL 資料庫做為 Apache Airflow 中繼資料資料庫，其中存放 DAG 執行和任務執行個體。下列範例程式碼會定期從 Amazon MWAA 環境的專用 Aurora PostgreSQL 資料庫清除項目。

**Topics**
+ [版本](#samples-database-cleanup-version)
+ [先決條件](#samples-database-cleanup-prereqs)
+ [相依性](#samples-sql-server-dependencies)
+ [範例程式碼](#samples-database-cleanup-code)

## 版本
<a name="samples-database-cleanup-version"></a>

此頁面上的程式碼範例專屬於 Amazon MWAA 上支援的 Apache Airflow v2。請參閱[支援的 Apache Airflow 版本](airflow-versions.md)。

**提示**  
**對於 Apache Airflow v3 使用者**：如果您想要清除資料庫 （從中繼存放區資料表清除舊記錄），請執行 `db clean` CLI 命令。

## 先決條件
<a name="samples-database-cleanup-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。

## 相依性
<a name="samples-sql-server-dependencies"></a>

若要搭配 Apache Airflow v2 使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 範例程式碼
<a name="samples-database-cleanup-code"></a>

下列 DAG 會清除 中指定資料表的中繼資料資料庫`TABLES_TO_CLEAN`。此範例會從超過 30 天的指定資料表中刪除資料。若要調整項目的刪除時間，請將 `MAX_AGE_IN_DAYS`設定為不同的值。

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# 將環境中繼資料匯出至 Amazon S3 上的 CSV 檔案
<a name="samples-dag-run-info-to-csv"></a>

使用下列程式碼範例建立定向無環圖 (DAG)，查詢資料庫以取得一系列 DAG 執行資訊，並將資料寫入 Amazon S3 上存放`.csv`的檔案。

您可能想要從環境的 Aurora PostgreSQL 資料庫匯出資訊，以在本機檢查資料、將其封存在物件儲存中，或將其與 [Amazon S3 等工具結合到 Amazon Redshift 運算子](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html)和[資料庫清除](samples-database-cleanup.md)，以將 Amazon MWAA 中繼資料移出環境，但保留以供未來分析。

您可以查詢資料庫，尋找 [Apache Airflow 模型](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models)中列出的任何物件。此程式碼範例使用三種模型 `DagRun`、 `TaskFail`和 `TaskInstance`，可提供與 DAG 執行相關的資訊。

**Topics**
+ [版本](#samples-dag-run-info-to-csv-version)
+ [先決條件](#samples-dag-run-info-to-csv-prereqs)
+ [許可](#samples-dag-run-info-to-csv-permissions)
+ [要求](#samples-dag-run-info-to-csv-dependencies)
+ [範例程式碼](#samples-dag-run-info-to-csv-code)

## 版本
<a name="samples-dag-run-info-to-csv-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-dag-run-info-to-csv-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。
+ 您要匯出中繼資料資訊[的新 Amazon S3 儲存貯](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)體。

## 許可
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA 需要 Amazon S3 動作的許可`s3:PutObject`，才能將查詢的中繼資料資訊寫入 Amazon S3。將下列政策陳述式新增至您環境的執行角色。

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

此政策僅限制對 *amzn-s3-demo-bucket* 的寫入存取。

## 要求
<a name="samples-dag-run-info-to-csv-dependencies"></a>

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 範例程式碼
<a name="samples-dag-run-info-to-csv-code"></a>

下列步驟說明如何建立查詢 Aurora PostgreSQL 的 DAG，並將結果寫入新的 Amazon S3 儲存貯體。

1. 在終端機中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並將其儲存為 `metadata_to_csv.py`。您可以變更指派給 的值`MAX_AGE_IN_DAYS`，以控制 DAG 從中繼資料資料庫查詢的最舊記錄的存留期。

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  執行下列 AWS CLI 命令，將 DAG 複製到您環境的儲存貯體，然後使用 Apache Airflow UI 觸發 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您將在任務的任務日誌中輸出類似以下內容`export_db`：

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   您現在可以在 的新 Amazon S3 儲存貯體中存取和下載匯出`.csv`的檔案`/files/export/`。

# 針對 AWS Secrets Manager Apache Airflow 變數在 中使用私密金鑰
<a name="samples-secrets-manager-var"></a>

下列範例呼叫 AWS Secrets Manager 會在 Amazon Managed Workflows for Apache Airflow 上取得 Apache Airflow 變數的私密金鑰。它假設您已完成 中的步驟[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

**Topics**
+ [版本](#samples-secrets-manager-var-version)
+ [先決條件](#samples-secrets-manager-var-prereqs)
+ [權限](#samples-secrets-manager-var-permissions)
+ [要求](#samples-hive-dependencies)
+ [範例程式碼](#samples-secrets-manager-var-code)
+ [後續步驟？](#samples-secrets-manager-var-next-up)

## 版本
<a name="samples-secrets-manager-var-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-secrets-manager-var-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ Secrets Manager 後端做為 Apache Airflow 組態選項，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。
+ Secrets Manager 中的 Apache Airflow 變數字串，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

## 權限
<a name="samples-secrets-manager-var-permissions"></a>
+ Secrets Manager 許可，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

## 要求
<a name="samples-hive-dependencies"></a>

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 範例程式碼
<a name="samples-secrets-manager-var-code"></a>

下列步驟說明如何建立 DAG 程式碼，呼叫 Secrets Manager 來取得秘密。

1. 在命令提示中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `secrets-manager-var.py`。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## 後續步驟？
<a name="samples-secrets-manager-var-next-up"></a>
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。

# 在 中使用私密金鑰 AWS Secrets Manager 進行 Apache Airflow 連線
<a name="samples-secrets-manager"></a>

下列範例呼叫 AWS Secrets Manager 會在 Amazon Managed Workflows for Apache Airflow 上取得 Apache Airflow 連線的私密金鑰。它假設您已完成 中的步驟[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

**Topics**
+ [版本](#samples-secrets-manager-version)
+ [先決條件](#samples-secrets-manager-prereqs)
+ [權限](#samples-secrets-manager-permissions)
+ [要求](#samples-hive-dependencies)
+ [範例程式碼](#samples-secrets-manager-code)
+ [後續步驟？](#samples-secrets-manager-next-up)

## 版本
<a name="samples-secrets-manager-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-secrets-manager-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ Secrets Manager 後端做為 Apache Airflow 組態選項，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。
+ Secrets Manager 中的 Apache Airflow 連線字串，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

## 權限
<a name="samples-secrets-manager-permissions"></a>
+ Secrets Manager 許可，如 所列[使用 AWS Secrets Manager 秘密設定 Apache Airflow 連線](connections-secrets-manager.md)。

## 要求
<a name="samples-hive-dependencies"></a>

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 範例程式碼
<a name="samples-secrets-manager-code"></a>

下列步驟說明如何建立 DAG 程式碼，呼叫 Secrets Manager 來取得秘密。

1. 在命令提示中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `secrets-manager.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## 後續步驟？
<a name="samples-secrets-manager-next-up"></a>
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。

# 使用 Oracle 建立自訂外掛程式
<a name="samples-oracle"></a>

下列範例會逐步解說使用 Oracle for Amazon MWAA 建立自訂外掛程式的步驟，並可與您的 plugins.zip 檔案中的其他自訂外掛程式和二進位檔結合使用。

**Contents**
+ [版本](#samples-oracle-version)
+ [先決條件](#samples-oracle-prereqs)
+ [權限](#samples-oracle-permissions)
+ [要求](#samples-oracle-dependencies)
+ [範例程式碼](#samples-oracle-code)
+ [建立自訂外掛程式](#samples-oracle-create-pluginszip-steps)
  + [下載相依性](#samples-oracle-install)
  + [自訂外掛程式](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Airflow 組態選項](#samples-oracle-airflow-config)
+ [後續步驟？](#samples-oracle-next-up)

## 版本
<a name="samples-oracle-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-oracle-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。
+ 在任何日誌層級`CRITICAL`或環境的上一節中啟用工作者記錄。如需 Amazon MWAA 日誌類型以及如何管理日誌群組的詳細資訊，請參閱 [在 Amazon CloudWatch 中存取 Airflow 日誌](monitoring-airflow.md)

## 權限
<a name="samples-oracle-permissions"></a>

使用此頁面上的程式碼範例不需要額外的許可。

## 要求
<a name="samples-oracle-dependencies"></a>

若要使用此頁面上的範例程式碼，請將下列相依性新增至您的 `requirements.txt`。若要進一步了解，請參閱 [安裝 Python 相依性](working-dags-dependencies.md)。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## 範例程式碼
<a name="samples-oracle-code"></a>

下列步驟說明如何建立 DAG 程式碼來測試自訂外掛程式。

1. 在命令提示中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `oracle.py`。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## 建立自訂外掛程式
<a name="samples-oracle-create-pluginszip-steps"></a>

本節說明如何下載相依性、建立自訂外掛程式和 plugins.zip。

### 下載相依性
<a name="samples-oracle-install"></a>

Amazon MWAA 會在每個 Amazon MWAA 排程器和工作者容器`/usr/local/airflow/plugins`上，將 plugins.zip 的內容擷取至 。這是用來將二進位檔新增至您的環境。下列步驟說明如何組合自訂外掛程式所需的檔案。

**提取 Amazon Linux 容器映像**

1. 在您的命令提示字元中，提取 Amazon Linux 容器映像，並在本機執行容器。例如：

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   您的命令提示可以叫用 bash 命令列。例如：

   ```
   bash-4.2#
   ```

1. 安裝 Linux 原生非同步 I/O 設施 (libaio)。

   ```
   yum -y install libaio
   ```

1. 保持此視窗開啟以進行後續步驟。我們將在本機複製下列檔案：`lib64/libaio.so.1`、`lib64/libaio.so.1.0.0`、`lib64/libaio.so.1.0.1`。

**下載用戶端資料夾**

1. 在本機安裝 unzip 套件。例如：

   ```
   sudo yum install unzip
   ```

1. 建立 `oracle_plugin` 目錄。例如：

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. 使用以下 curl 命令從 Oracle Instant Client Downloads for Linux x8[6-64 (64 位元） 下載 instantclient-basic-linux.x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip)。 [https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html)

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. 解壓縮 `client.zip` 檔案。例如：

   ```
   unzip *.zip
   ```

**從 Docker 擷取檔案**

1. 在新的命令提示字元中，顯示並寫下您的 Docker 容器 ID。例如：

   ```
   docker container ls
   ```

   您的命令提示可以傳回所有容器及其 IDs。例如：

   ```
   debc16fd6970
   ```

1. 在您的`oracle_plugin`目錄中，將 `lib64/libaio.so.1`、`lib64/libaio.so.1.0.0`、 `lib64/libaio.so.1.0.1` 檔案擷取至本機`instantclient_18_5`資料夾。例如：

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### 自訂外掛程式
<a name="samples-oracle-plugins-code"></a>

Apache Airflow 會在啟動時執行外掛程式資料夾中 Python 檔案的內容。這用於設定和修改環境變數。下列步驟說明自訂外掛程式的範例程式碼。
+ 複製下列程式碼範例的內容，並在本機儲存為 `env_var_plugin_oracle.py`。

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

下列步驟說明如何建立 `plugins.zip`。此範例的內容可以與其他外掛程式和二進位檔合併為單一`plugins.zip`檔案。

**壓縮外掛程式目錄的內容**

1. 在您的命令提示字元中，導覽至 `oracle_plugin`目錄。例如：

   ```
   cd oracle_plugin
   ```

1. 在 plugins.zip 中壓縮`instantclient_18_5`目錄。例如：

   ```
   zip -r ../plugins.zip ./
   ```

   您的命令提示字元會顯示：

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. 移除 `client.zip` 檔案。例如：

   ```
   rm client.zip
   ```

**壓縮 env\$1var\$1plugin\$1oracle.py 檔案**

1. 將 `env_var_plugin_oracle.py` 檔案新增至 plugins.zip 的根目錄。例如：

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. 您的 plugins.zip 現在包含下列項目：

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Airflow 組態選項
<a name="samples-oracle-airflow-config"></a>

如果您使用的是 Apache Airflow v2，請將 新增`core.lazy_load_plugins : False`為 Apache Airflow 組態選項。若要進一步了解，請參閱[使用組態選項載入 2 中的外掛程式](configuring-env-variables.md#configuring-2.0-airflow-override)。

## 後續步驟？
<a name="samples-oracle-next-up"></a>
+ 了解如何在此範例中將 `requirements.txt` 檔案上傳至 中的 Amazon S3 儲存貯體[安裝 Python 相依性](working-dags-dependencies.md)。
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。
+ 進一步了解如何在此範例中將`plugins.zip`檔案上傳至 中的 Amazon S3 儲存貯體[安裝自訂外掛程式](configuring-dag-import-plugins.md)。

# 在 Amazon MWAA 上變更 DAG 的時區
<a name="samples-plugins-timezone"></a>

Apache Airflow 預設會以 UTC\$10 排程您的導向無環圖 (DAG)。下列步驟說明如何變更 Amazon MWAA 使用 [Pendulum](https://pypi.org/project/pendulum/) 執行 DAGs時區。或者，本主題示範如何建立自訂外掛程式，以變更您環境 Apache Airflow 日誌的時區。

**Topics**
+ [版本](#samples-plugins-timezone-version)
+ [先決條件](#samples-plugins-timezone-prerequisites)
+ [許可](#samples-plugins-timezone-permissions)
+ [建立外掛程式以變更 Airflow 日誌中的時區](#samples-plugins-timezone-custom-plugin)
+ [建立 `plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [範例程式碼](#samples-plugins-timezone-dag)
+ [後續步驟？](#samples-plugins-timezone-plugins-next-up)

## 版本
<a name="samples-plugins-timezone-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-plugins-timezone-prerequisites"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。

## 許可
<a name="samples-plugins-timezone-permissions"></a>

使用此頁面上的程式碼範例不需要額外的許可。

## 建立外掛程式以變更 Airflow 日誌中的時區
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow 會在啟動時在 `plugins`目錄中執行 Python 檔案。使用下列外掛程式，您可以覆寫執行器的時區，以修改 Apache Airflow 寫入日誌的時區。

1. `plugins` 為您的自訂外掛程式建立名為 的目錄，然後導覽至 目錄。例如：

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `dag-timezone-plugin.py` 資料夾中的 `plugins`。

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. 在 `plugins`目錄中，建立名為 的空白 Python 檔案`__init__.py`。您的`plugins`目錄應該類似於以下內容：

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## 建立 `plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

下列步驟說明如何建立 `plugins.zip`。此範例的內容可以與其他外掛程式和二進位檔合併為單一`plugins.zip`檔案。

1. 在您的命令提示字元中，導覽至上一個步驟的 `plugins` 目錄。例如：

   ```
   cd plugins
   ```

1. 壓縮`plugins`目錄中的內容。

   ```
   zip -r ../plugins.zip ./
   ```

1. `plugins.zip` 上傳至您的 S3 儲存貯體

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## 範例程式碼
<a name="samples-plugins-timezone-dag"></a>

若要變更 DAG 執行的預設時區 (UTC\$10)，我們將使用名為 [Pendulum](https://pypi.org/project/pendulum/) 的程式庫，Python 程式庫可用來使用時區感知日期時間。

1. 在命令提示中，導覽至存放 DAGs目錄。例如：

   ```
   cd dags
   ```

1. 複製下列範例的內容並儲存為 `tz-aware-dag.py`。

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  執行下列 AWS CLI 命令，將 DAG 複製到您環境的儲存貯體，然後使用 Apache Airflow UI 觸發 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您將輸出類似 `tz_test` DAG 中 的任務日誌`tz_aware_task`中的以下內容：

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## 後續步驟？
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ 進一步了解如何在此範例中將`plugins.zip`檔案上傳至 中的 Amazon S3 儲存貯體[安裝自訂外掛程式](configuring-dag-import-plugins.md)。

# 重新整理 CodeArtifact 權杖
<a name="samples-code-artifact"></a>

如果您使用 CodeArtifact 安裝 Python 相依性，Amazon MWAA 需要作用中的字符。若要允許 Amazon MWAA 在執行時間存取 CodeArtifact 儲存庫，您可以使用[啟動指令碼](using-startup-script.md)並使用[https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url)字符設定 。

下列主題說明如何建立啟動指令碼，該指令碼使用 [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token) CodeArtifact API 操作在每次環境啟動或更新時擷取新的字符。

**Topics**
+ [版本](#samples-code-artifact-version)
+ [先決條件](#samples-code-artifact-prereqs)
+ [權限](#samples-code-artifact-permissions)
+ [範例程式碼](#samples-code-artifact-code)
+ [後續步驟？](#samples-code-artifact-next-up)

## 版本
<a name="samples-code-artifact-version"></a>

您可以在 Python 3.10 中使用 **Apache Airflow v2** 和 Python 3.1[1](https://peps.python.org/pep-0664/) ****中使用此頁面的程式碼範例。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/)

## 先決條件
<a name="samples-code-artifact-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。
+ [CodeArtifact 儲存庫](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html)，您可以在其中存放您環境的相依性。

## 權限
<a name="samples-code-artifact-permissions"></a>

若要重新整理 CodeArtifact 字符並將結果寫入 Amazon S3 Amazon MWAA，必須在執行角色中具有下列許可。
+ `codeartifact:GetAuthorizationToken` 動作允許 Amazon MWAA 從 CodeArtifact 擷取新權杖。下列政策會為您建立的每個 CodeArtifact 網域授予許可。您可以透過修改 陳述式中的資源值，並僅指定您希望環境存取的網域，進一步限制對網域的存取。

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ 呼叫 CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html) API 操作需要 `sts:GetServiceBearerToken`動作。此操作會傳回權杖，在`pip`搭配 CodeArtifact 使用 等套件管理員時必須使用該權杖。若要搭配 CodeArtifact 儲存庫使用套件管理員，您環境的執行角色角色必須允許 `sts:GetServiceBearerToken`，如下列政策陳述式所列。

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## 範例程式碼
<a name="samples-code-artifact-code"></a>

下列步驟說明如何建立更新 CodeArtifact 字符的啟動指令碼。

1. 複製下列程式碼範例的內容，並在本機儲存為 `code_artifact_startup_script.sh`。

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. 導覽至您儲存指令碼的資料夾。在`cp`新的提示視窗中使用 ，將指令碼上傳至您的儲存貯體。將 *amzn-s3-demo-bucket* 取代為您的資訊。

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   如果成功，Amazon S3 會輸出物件的 URL 路徑：

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   上傳指令碼後，您的環境會在啟動時更新並執行指令碼。

## 後續步驟？
<a name="samples-code-artifact-next-up"></a>
+ 了解如何使用啟動指令碼在 中自訂您的環境[搭配 Amazon MWAA 使用啟動指令碼](using-startup-script.md)。
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。
+ 進一步了解如何在此範例中將`plugins.zip`檔案上傳至 中的 Amazon S3 儲存貯體[安裝自訂外掛程式](configuring-dag-import-plugins.md)。

# 使用 Apache Hive 和 Hadoop 建立自訂外掛程式
<a name="samples-hive"></a>

Amazon MWAA 會將 的內容擷取`plugins.zip`至 `/usr/local/airflow/plugins`。這可用於將二進位檔新增至您的容器。此外，Apache Airflow 會在*啟動*時在 `plugins` 資料夾中執行 Python 檔案的內容，讓您能夠設定和修改環境變數。下列範例會逐步解說在 Amazon Managed Workflows for Apache Airflow 環境上使用 Apache Hive 和 Hadoop 建立自訂外掛程式的步驟，並可與其他自訂外掛程式和二進位檔結合使用。

**Topics**
+ [版本](#samples-hive-version)
+ [先決條件](#samples-hive-prereqs)
+ [權限](#samples-hive-permissions)
+ [要求](#samples-hive-dependencies)
+ [下載相依性](#samples-hive-install)
+ [自訂外掛程式](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [範例程式碼](#samples-hive-code)
+ [Airflow 組態選項](#samples-hive-airflow-config)
+ [後續步驟？](#samples-hive-next-up)

## 版本
<a name="samples-hive-version"></a>

您可以在 Python 3.10 中使用 **Apache Airflow v2** 和 Python 3.1[1](https://peps.python.org/pep-0664/) ****中使用此頁面的程式碼範例。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/)

## 先決條件
<a name="samples-hive-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。

## 權限
<a name="samples-hive-permissions"></a>

使用此頁面上的程式碼範例不需要額外的許可。

## 要求
<a name="samples-hive-dependencies"></a>

若要使用此頁面上的範例程式碼，請將下列相依性新增至您的 `requirements.txt`。若要進一步了解，請參閱 [安裝 Python 相依性](working-dags-dependencies.md)。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## 下載相依性
<a name="samples-hive-install"></a>

Amazon MWAA 會在每個 Amazon MWAA 排程器和工作者容器`/usr/local/airflow/plugins`上，將 plugins.zip 的內容擷取至 。這是用來將二進位檔新增至您的環境。下列步驟說明如何組合自訂外掛程式所需的檔案。

1. 在命令提示字元中，導覽至您要建立外掛程式的目錄。例如：

   ```
   cd plugins
   ```

1. 從[鏡像](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz)下載 [Hadoop](https://hadoop.apache.org/)，例如：

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. 從[鏡像](https://www.apache.org/dyn/closer.cgi/hive/)下載 [Hive](https://hive.apache.org/)，例如：

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. 建立目錄。例如：

   ```
   mkdir hive_plugin
   ```

1. 擷取 Hadoop。

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. 擷取 Hive。

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## 自訂外掛程式
<a name="samples-hive-plugins-code"></a>

Apache Airflow 會在啟動時執行外掛程式資料夾中 Python 檔案的內容。這用於設定和修改環境變數。下列步驟說明自訂外掛程式的範例程式碼。

1. 在您的命令提示字元中，導覽至 `hive_plugin`目錄。例如：

   ```
   cd hive_plugin
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `hive_plugin.py` `hive_plugin`目錄中的 。

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. 處理下列文字的內容，並在本機儲存為 `.airflowignore` `hive_plugin`目錄中的 。

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

下列步驟說明如何建立 `plugins.zip`。此範例的內容可以與其他外掛程式和二進位檔合併為單一`plugins.zip`檔案。

1. 在您的命令提示字元中，導覽至上一個步驟的 `hive_plugin` 目錄。例如：

   ```
   cd hive_plugin
   ```

1. 壓縮`plugins`資料夾中的內容。

   ```
   zip -r ../hive_plugin.zip ./
   ```

## 範例程式碼
<a name="samples-hive-code"></a>

下列步驟說明如何建立 DAG 程式碼來測試自訂外掛程式。

1. 在命令提示中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `hive.py`。

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Airflow 組態選項
<a name="samples-hive-airflow-config"></a>

如果您使用的是 Apache Airflow v2，請將 新增`core.lazy_load_plugins : False`為 Apache Airflow 組態選項。若要進一步了解，請參閱[使用組態選項載入 2 中的外掛程式](configuring-env-variables.md#configuring-2.0-airflow-override)。

## 後續步驟？
<a name="samples-hive-next-up"></a>
+ 了解如何在此範例中將`requirements.txt`檔案上傳至 中的 Amazon S3 儲存貯體[安裝 Python 相依性](working-dags-dependencies.md)。
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。
+ 進一步了解如何在此範例中將`plugins.zip`檔案上傳至 中的 Amazon S3 儲存貯體[安裝自訂外掛程式](configuring-dag-import-plugins.md)。

# 為 Apache Airflow PythonVirtualenvOperator 建立自訂外掛程式
<a name="samples-virtualenv"></a>

下列範例說明如何在 Amazon Managed Workflows for Apache Airflow 上使用`PythonVirtualenvOperator`自訂外掛程式修補 Apache Airflow。

**Topics**
+ [版本](#samples-virtualenv-version)
+ [先決條件](#samples-virtualenv-prereqs)
+ [權限](#samples-virtualenv-permissions)
+ [要求](#samples-virtualenv-dependencies)
+ [自訂外掛程式範例程式碼](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [範例程式碼](#samples-virtualenv-code)
+ [Airflow 組態選項](#samples-virtualenv-airflow-config)
+ [後續步驟？](#samples-virtualenv-next-up)

## 版本
<a name="samples-virtualenv-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-virtualenv-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。

## 權限
<a name="samples-virtualenv-permissions"></a>

使用此頁面上的程式碼範例不需要額外的許可。

## 要求
<a name="samples-virtualenv-dependencies"></a>

若要使用此頁面上的範例程式碼，請將下列相依性新增至您的 `requirements.txt`。若要進一步了解，請參閱 [安裝 Python 相依性](working-dags-dependencies.md)。

```
virtualenv
```

## 自訂外掛程式範例程式碼
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow 會在啟動時執行外掛程式資料夾中 Python 檔案的內容。此外掛程式會在該啟動程序`PythonVirtualenvOperator`期間修補內建 ，使其與 Amazon MWAA 相容。下列步驟顯示自訂外掛程式的範例程式碼。

1. 在您的命令提示字元中，導覽至上一節中的 `plugins` 目錄。例如：

   ```
   cd plugins
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `virtual_python_plugin.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

下列步驟說明如何建立 `plugins.zip`。

1. 在您的命令提示字元中，導覽至上一節`virtual_python_plugin.py`中包含 的目錄。例如：

   ```
   cd plugins
   ```

1. 壓縮`plugins`資料夾中的內容。

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## 範例程式碼
<a name="samples-virtualenv-code"></a>

下列步驟說明如何建立自訂外掛程式的 DAG 程式碼。

1. 在命令提示中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `virtualenv_test.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Airflow 組態選項
<a name="samples-virtualenv-airflow-config"></a>

如果您使用的是 Apache Airflow v2，請將 新增`core.lazy_load_plugins : False`為 Apache Airflow 組態選項。若要進一步了解，請參閱[使用組態選項載入 2 中的外掛程式](configuring-env-variables.md#configuring-2.0-airflow-override)。

## 後續步驟？
<a name="samples-virtualenv-next-up"></a>
+ 了解如何在此範例中將`requirements.txt`檔案上傳至 中的 Amazon S3 儲存貯體[安裝 Python 相依性](working-dags-dependencies.md)。
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。
+ 進一步了解如何在此範例中將`plugins.zip`檔案上傳至 中的 Amazon S3 儲存貯體[安裝自訂外掛程式](configuring-dag-import-plugins.md)。

# 使用 Lambda 函數叫用 DAGs
<a name="samples-lambda"></a>

下列程式碼範例使用 [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)函數來取得 Apache Airflow CLI 字符，並在 Amazon MWAA 環境中調用定向無環圖 (DAG)。

**Topics**
+ [版本](#samples-lambda-version)
+ [先決條件](#samples-lambda-prereqs)
+ [許可](#samples-lambda-permissions)
+ [相依性](#samples-lambda-dependencies)
+ [程式碼範例](#samples-lambda-code)

## 版本
<a name="samples-lambda-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-lambda-prereqs"></a>

若要使用此程式碼範例，您必須：
+ 為您的 [Amazon MWAA 環境](get-started.md)使用[公有網路存取模式](configuring-networking.md#webserver-options-public-network-onconsole)。
+ 使用最新的 Python 執行時間來擁有 [Lambda 函數](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html)。

**注意**  
如果 Lambda 函數和您的 Amazon MWAA 環境位於相同的 VPC 中，您可以在私有網路上使用此程式碼。對於此組態，Lambda 函數的執行角色需要呼叫 Amazon Elastic Compute Cloud (Amazon EC2) **CreateNetworkInterface** API 操作的許可。您可以使用 [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS受管政策提供此許可。

## 許可
<a name="samples-lambda-permissions"></a>

若要使用此頁面上的程式碼範例，Amazon MWAA 環境的執行角色需要存取 才能執行`airflow:CreateCliToken`動作。您可以使用 `AmazonMWAAAirflowCliAccess` AWS受管政策提供此許可：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

如需詳細資訊，請參閱 [Apache Airflow CLI 政策：AmazonMWAAAirflowCliAccess](access-policies.md#cli-access)。

## 相依性
<a name="samples-lambda-dependencies"></a>

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 程式碼範例
<a name="samples-lambda-code"></a>

1. 在 https：//[https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/) 開啟 AWS Lambda 主控台。

1. 從函數清單中選擇您的 Lambda **函數**。

1. 在函數頁面上，複製下列程式碼，並以您的資源名稱取代下列程式碼：
   + `YOUR_ENVIRONMENT_NAME` – Amazon MWAA 環境的名稱。
   + `YOUR_DAG_NAME` – 您要叫用的 DAG 名稱。

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. 選擇**部署**。

1. 選擇**測試**以使用 Lambda 主控台叫用函數。

1. 若要驗證您的 Lambda 是否已成功調用 DAG，請使用 Amazon MWAA 主控台導覽至您環境的 Apache Airflow UI，然後執行下列動作：

   1. 在 **DAGs**頁面上，在 DAG 清單中找到您的新目標 DAGs。

   1. 在**上次執行**下，檢查最新 DAG 執行的時間戳記。此時間戳記應緊密符合您`invoke_dag`其他環境中 的最新時間戳記。

   1. 在**最近任務**下，檢查上次執行是否成功。

# 在不同的 Amazon MWAA 環境中叫用 DAGs
<a name="samples-invoke-dag"></a>

下列程式碼範例會建立 Apache Airflow CLI 字符。然後，程式碼會在一個 Amazon MWAA 環境中使用定向無環圖 (DAG)，在不同的 Amazon MWAA 環境中叫用 DAG。

**Topics**
+ [版本](#samples-invoke-dag-version)
+ [先決條件](#samples-invoke-dag-prereqs)
+ [許可](#samples-invoke-dag-permissions)
+ [相依性](#samples-invoke-dag-dependencies)
+ [程式碼範例](#samples-invoke-dag-code)

## 版本
<a name="samples-invoke-dag-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-invoke-dag-prereqs"></a>

若要使用此頁面上的程式碼範例，您需要下列項目：
+ 兩個具有**公有網路** Web 伺服器存取權的 [Amazon MWAA 環境](get-started.md)，包括您目前的環境。
+ 上傳至目標環境 Amazon Simple Storage Service (Amazon S3) 儲存貯體的範例 DAG。

## 許可
<a name="samples-invoke-dag-permissions"></a>

若要使用此頁面上的程式碼範例，您環境的執行角色必須具有建立 Apache Airflow CLI 權杖的許可。您可以連接 AWS受管政策`AmazonMWAAAirflowCliAccess`來授予此許可。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

如需詳細資訊，請參閱 [Apache Airflow CLI 政策：AmazonMWAAAirflowCliAccess](access-policies.md#cli-access)。

## 相依性
<a name="samples-invoke-dag-dependencies"></a>

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 程式碼範例
<a name="samples-invoke-dag-code"></a>

下列程式碼範例假設您在目前環境中使用 DAG，以在另一個環境中叫用 DAG。

1. 在終端機中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並將其儲存為 `invoke_dag.py`。將下列值取代為您的資訊。
   + `your-new-environment-name` – 您要叫用 DAG 的其他環境名稱。
   + `your-target-dag-id` – 您要叫用之其他環境中的 DAG ID。

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  執行下列 AWS CLI 命令，將 DAG 複製到您環境的儲存貯體，然後使用 Apache Airflow UI 觸發 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果 DAG 成功執行，您會在 的任務日誌中收到類似以下的輸出`invoke_dag_task`。

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   若要確認您的 DAG 已成功調用，請導覽至新環境的 Apache Airflow UI，然後執行下列動作：

   1. 在 **DAGs**頁面上，在 DAG 清單中找到您的新目標 DAGs。

   1. 在**上次執行**下，檢查最新 DAG 執行的時間戳記。此時間戳記應緊密符合您`invoke_dag`其他環境中 的最新時間戳記。

   1. 在**最近任務**下，檢查上次執行是否成功。

# 搭配使用 Amazon MWAA 與 Amazon RDS for Microsoft SQL Server
<a name="samples-sql-server"></a>

您可以使用 Amazon Managed Workflows for Apache Airflow 連線到 [RDS for SQL Server](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html)。下列範例程式碼使用 Amazon Managed Workflows for Apache Airflow 環境上的 DAGs 來連線至 Amazon RDS for Microsoft SQL Server 並執行查詢。

**Topics**
+ [版本](#samples-sql-server-version)
+ [先決條件](#samples-sql-server-prereqs)
+ [相依性](#samples-sql-server-dependencies)
+ [Apache Airflow v2 連線](#samples-sql-server-conn)
+ [範例程式碼](#samples-sql-server-code)
+ [後續步驟？](#samples-sql-server-next-up)

## 版本
<a name="samples-sql-server-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-sql-server-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。
+ Amazon MWAA 和 RDS for SQL Server 在相同的 Amazon VPC/ 中執行
+ Amazon MWAA 和伺服器的 VPC 安全群組已設定下列連線：
  + Amazon MWAA 安全群組中為 Amazon RDS `1433`開啟之連接埠的傳入規則
  + 或從 Amazon MWAA 至 RDS `1433`開啟之連接埠的傳出規則
+ RDS for SQL Server 的 Apache Airflow Connection 會反映先前程序中建立之 Amazon RDS SQL Server 資料庫的主機名稱、連接埠、使用者名稱和密碼。

## 相依性
<a name="samples-sql-server-dependencies"></a>

若要使用本節中的範例程式碼，請將下列相依性新增至您的 `requirements.txt`。若要進一步了解，請參閱 [安裝 Python 相依性](working-dags-dependencies.md)。

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Apache Airflow v2 連線
<a name="samples-sql-server-conn"></a>

如果您在 Apache Airflow v2 中使用連線，請確定 Airflow 連線物件包含下列鍵/值對：

1. **Conn ID：**mssql\$1default

1. **Conn 類型：**Amazon Web Services

1. **主機： ** `YOUR_DB_HOST`

1. **結構描述： **

1. **登入：**admin

1. **密碼： **

1. **連接埠：**1433

1. **額外： **

## 範例程式碼
<a name="samples-sql-server-code"></a>

1. 在命令提示中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `sql-server.py`。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## 後續步驟？
<a name="samples-sql-server-next-up"></a>
+ 了解如何在此範例中將 `requirements.txt` 檔案上傳至 中的 Amazon S3 儲存貯體[安裝 Python 相依性](working-dags-dependencies.md)。
+ 了解如何在此範例中將 DAG 程式碼上傳至 Amazon S3 儲存貯體中的 `dags` 資料夾[新增或更新 DAGs](configuring-dag-folder.md)。
+ 探索範例指令碼和其他 [pymssql 模組範例](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html)。
+ 進一步了解如何使用 *Apache Airflow 參考指南*中的 [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) 在特定 Microsoft SQL 資料庫中執行 SQL 程式碼。

# 搭配使用 Amazon MWAA 與 Amazon EKS
<a name="mwaa-eks-example"></a>

下列範例示範如何使用 Amazon Managed Workflows for Apache Airflow 搭配 Amazon EKS。

**Topics**
+ [版本](#mwaa-eks-example-version)
+ [先決條件](#eksctl-prereqs)
+ [建立 Amazon EC2 的公有金鑰](#eksctl-create-key)
+ [建立叢集](#create-cluster-eksctl)
+ [建立`mwaa`命名空間](#eksctl-namespace)
+ [建立 `mwaa` 命名空間的角色](#eksctl-role)
+ [建立並連接 Amazon EKS 叢集的 IAM 角色](#eksctl-iam-role)
+ [建立 requirements.txt 檔案](#eksctl-requirements)
+ [建立 Amazon EKS 的身分映射](#eksctl-identity-map)
+ [建立 `kubeconfig`](#eksctl-kube-config)
+ [建立 DAG](#eksctl-create-dag)
+ [將 DAG 和 `kube_config.yaml` 新增至 Amazon S3 儲存貯體](#eksctl-dag-bucket)
+ [啟用並觸發範例](#eksctl-trigger-pod)

## 版本
<a name="mwaa-eks-example-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="eksctl-prereqs"></a>

若要使用本主題中的範例，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。
+ eksctl。若要進一步了解，請參閱[安裝 eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl)。
+ kubectl。若要進一步了解，請參閱[安裝和設定 kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/)。在某些情況下，這會與 eksctl 一起安裝。
+ 在您建立 Amazon MWAA 環境的區域中的 EC2 金鑰對。若要進一步了解，請參閱[建立或匯入金鑰對](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair)。

**注意**  
當您使用 `eksctl`命令時，您可以包含 `--profile`來指定預設以外的設定檔。

## 建立 Amazon EC2 的公有金鑰
<a name="eksctl-create-key"></a>

使用下列命令，從您的私有金鑰對建立公有金鑰。

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

若要進一步了解，請參閱[擷取金鑰對的公有金鑰](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key)。

## 建立叢集
<a name="create-cluster-eksctl"></a>

使用下列命令來建立叢集。如果您想要叢集的自訂名稱，或在不同的區域中建立它，請取代名稱和區域值。您必須在建立 Amazon MWAA 環境的相同區域中建立叢集。取代子網路的值，以符合您用於 Amazon MWAA 的 Amazon VPC 網路中的子網路。取代 的值`ssh-public-key`，以符合您使用的金鑰。您可以使用相同區域中來自 Amazon EC2 的現有金鑰，或在建立 Amazon MWAA 環境的相同區域中建立新的金鑰。

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

完成建立叢集需要一些時間。完成後，您可以驗證叢集已成功建立，並使用下列命令設定 IAM OIDC 提供者：

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## 建立`mwaa`命名空間
<a name="eksctl-namespace"></a>

確認叢集已成功建立後，請使用下列命令來建立 Pod 的命名空間。

```
kubectl create namespace mwaa
```

## 建立 `mwaa` 命名空間的角色
<a name="eksctl-role"></a>

建立命名空間後，請在 EKS 上為可在 MWAA 命名空間中執行 Pod 的 Amazon MWAA 使用者建立角色和角色繫結關係。如果您使用不同的命名空間名稱，請將 中的 mwaa 取代為您使用`-n mwaa`的名稱。

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

執行下列命令，確認新角色可以存取 Amazon EKS 叢集。如果您未使用 *mwaa*，請務必使用正確的名稱：

```
kubectl get pods -n mwaa --as mwaa-service
```

您會收到一則訊息，指出：

```
No resources found in mwaa namespace.
```

## 建立並連接 Amazon EKS 叢集的 IAM 角色
<a name="eksctl-iam-role"></a>

您必須建立 IAM 角色，然後將其繫結至 Amazon EKS (k8s) 叢集，以便用於透過 IAM 進行身分驗證。此角色僅用於登入叢集，且沒有任何主控台或 API 呼叫的許可。

使用 中的步驟為 Amazon MWAA 環境建立新的角色[Amazon MWAA 執行角色](mwaa-create-role.md)。不過，與其建立和連接該主題中所述的政策，請連接下列政策：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

建立角色之後，請編輯 Amazon MWAA 環境，以使用您建立做為環境執行角色的角色。若要變更角色，請編輯要使用的環境。您可以在**許可**下選取執行角色。

**已知問題：**
+ 子路徑無法向 Amazon EKS 驗證的角色 ARNs 存在已知問題。解決方法是手動建立服務角色，而不是使用 Amazon MWAA 本身建立的服務角色。若要進一步了解，請參閱 [aws-auth configmap 中包含路徑的 角色無法運作](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)
+ 如果 IAM 中沒有 Amazon MWAA 服務清單，您需要選擇替代服務政策，例如 Amazon EC2，然後更新角色的信任政策以符合下列項目：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  若要進一步了解，請參閱[如何搭配 IAM 角色使用信任政策](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/)。

## 建立 requirements.txt 檔案
<a name="eksctl-requirements"></a>

若要使用本節中的範例程式碼，請確定您已將下列其中一個資料庫選項新增至 `requirements.txt`。若要進一步了解，請參閱 [安裝 Python 相依性](working-dags-dependencies.md)。

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## 建立 Amazon EKS 的身分映射
<a name="eksctl-identity-map"></a>

針對您在下列命令中建立的角色使用 ARN，為 Amazon EKS 建立身分映射。將 Region *us-east-1* 變更為您建立環境的區域。取代角色的 ARN，最後以您環境的執行角色取代 *mwaa-execution-role*。

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## 建立 `kubeconfig`
<a name="eksctl-kube-config"></a>

使用下列命令來建立 `kubeconfig`：

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

如果您在執行時使用特定設定檔`update-kubeconfig`，則需要移除新增至 kube\$1config.yaml 檔案的 `env:`區段，以便其與 Amazon MWAA 搭配使用。若要這麼做，請從 檔案刪除下列項目，然後儲存它：

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## 建立 DAG
<a name="eksctl-create-dag"></a>

使用下列程式碼範例來建立 Python 檔案，例如 `mwaa_pod_example.py` DAG 的 。

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## 將 DAG 和 `kube_config.yaml` 新增至 Amazon S3 儲存貯體
<a name="eksctl-dag-bucket"></a>

將您建立的 DAG 和 `kube_config.yaml` 檔案放入 Amazon MWAA 環境的 Amazon S3 儲存貯體。您可以使用 Amazon S3 主控台或 將檔案放入儲存貯體 AWS Command Line Interface。

## 啟用並觸發範例
<a name="eksctl-trigger-pod"></a>

在 Apache Airflow 中，啟用範例，然後觸發該範例。

成功執行並完成之後，請使用下列命令來驗證 Pod：

```
kubectl get pods -n mwaa
```

您可以取得類似下列的輸出：

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

然後，您可以使用下列命令來驗證 Pod 的輸出。將名稱值取代為上一個命令傳回的值：

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# 使用 連線至 Amazon ECS `ECSOperator`
<a name="samples-ecs-operator"></a>

本主題說明如何使用 從 Amazon MWAA `ECSOperator`連線至 Amazon Elastic Container Service (Amazon ECS) 容器。在下列步驟中，您將新增必要的許可到環境的執行角色、使用 CloudFormation 範本建立 Amazon ECS Fargate 叢集，最後建立並上傳連線至新叢集的 DAG。

**Topics**
+ [版本](#samples-ecs-operator-version)
+ [先決條件](#samples-ecs-operator-prereqs)
+ [許可](#samples-ecs-operator-permissions)
+ [建立 Amazon ECS 叢集](#create-cfn-template)
+ [範例程式碼](#samples-ecs-operator-code)

## 版本
<a name="samples-ecs-operator-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-ecs-operator-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。

## 許可
<a name="samples-ecs-operator-permissions"></a>
+ 您環境的執行角色需要許可，才能在 Amazon ECS 中執行任務。您可以將 [AmazonECS\$1FullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) 受 AWS管政策連接至您的執行角色，或建立下列政策並將其連接至您的執行角色。

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ 除了新增在 Amazon ECS 中執行任務所需的許可之外，您還必須修改 Amazon MWAA 執行角色中的 CloudWatch Logs 政策陳述式，以允許存取 Amazon ECS 任務日誌群組，如下所示。Amazon ECS 日誌群組是由 CloudFormation 範本在 中建立[建立 Amazon ECS 叢集](#create-cfn-template)。

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

如需 Amazon MWAA 執行角色以及如何連接政策的詳細資訊，請參閱 [執行角色](mwaa-create-role.md)。

## 建立 Amazon ECS 叢集
<a name="create-cfn-template"></a>

使用下列 CloudFormation 範本，您將建置 Amazon ECS Fargate 叢集以搭配 Amazon MWAA 工作流程使用。如需詳細資訊，請參閱《*Amazon Elastic Container Service 開發人員指南*》中的[建立任務定義](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition)。

1. 使用下列程式碼建立 JSON 檔案，並將其儲存為 `ecs-mwaa-cfn.json`。

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. 在您的命令提示字元中，使用下列 AWS CLI 命令來建立新的堆疊。您必須將 `SecurityGroups`和 值取代`SubnetIds`為 Amazon MWAA 環境安全群組和子網路的值。

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   或者，您可以使用下列 shell 指令碼。指令碼會使用 `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI 命令擷取您環境安全群組和子網路的必要值，然後相應地建立堆疊。若要執行指令碼，請執行下列動作。

   1. 複製指令碼，並將指令碼儲存為與 CloudFormation 範本`ecs-stack-helper.sh`相同的目錄。

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. 使用以下命令執行指令碼。將 `environment-name`和 取代`stack-name`為您的資訊。

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   如果成功，您會參考以下顯示新 CloudFormation 堆疊 ID 的輸出。

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

 CloudFormation 堆疊完成並 AWS 佈建 Amazon ECS 資源後，您就可以建立和上傳 DAG。

## 範例程式碼
<a name="samples-ecs-operator-code"></a>

1. 開啟命令提示，然後導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並在本機儲存為 `mwaa-ecs-operator.py`，然後將新的 DAG 上傳至 Amazon S3。

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**注意**  
在範例 DAG 中，對於 `awslogs_group`，您可能需要使用 Amazon ECS 任務日誌群組的名稱來修改日誌群組。此範例假設名為 的日誌群組`mwaa-ecs-zero`。對於 `awslogs_stream_prefix`，請使用 Amazon ECS 任務日誌串流字首。此範例假設日誌串流字首 `ecs`。

1.  執行下列 AWS CLI 命令，將 DAG 複製到您環境的儲存貯體，然後使用 Apache Airflow UI 觸發 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您會在 `ecs_fargate_dag` DAG 中的 任務日誌`ecs_operator_task`中取得類似下列的輸出：

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# 搭配 Amazon MWAA 使用 dbt
<a name="samples-dbt"></a>

本主題示範如何搭配 Amazon MWAA 使用 dbt 和 Postgres。在下列步驟中，您將新增必要的相依性到您的 `requirements.txt`，並將範例 dbt 專案上傳至您環境的 Amazon S3 儲存貯體。然後，您將使用範例 DAG 來驗證 Amazon MWAA 已安裝相依性，最後使用 `BashOperator`來執行 dbt 專案。

**Topics**
+ [版本](#samples-dbt-version)
+ [先決條件](#samples-dbt-prereqs)
+ [相依性](#samples-dbt-dependencies)
+ [將 dbt 專案上傳至 Amazon S3](#samples-dbt-upload-project)
+ [使用 DAG 驗證 dbt 相依性安裝](#samples-dbt-test-dependencies)
+ [使用 DAG 執行 dbt 專案](#samples-dbt-run-project)

## 版本
<a name="samples-dbt-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-dbt-prereqs"></a>

在您可以完成下列步驟之前，您將需要下列項目：
+ 使用 Apache Airflow 2.2.2 版的 [Amazon MWAA 環境](get-started.md)。此範例已撰寫，並使用 v2.2.2 進行測試。您可能需要修改範例，以與其他 Apache Airflow 版本搭配使用。
+ 範例 dbt 專案。若要開始使用 dbt 搭配 Amazon MWAA，您可以建立分支，並從 [dbt-labs GitHub 儲存庫複製 dbt 入門專案](https://github.com/dbt-labs/dbt-starter-project)。 GitHub 

## 相依性
<a name="samples-dbt-dependencies"></a>

若要將 Amazon MWAA 與 dbt 搭配使用，請將下列啟動指令碼新增至您的環境。若要進一步了解，請參閱[搭配 Amazon MWAA 使用啟動指令碼](using-startup-script.md)。

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

在下列各節中，您將上傳 dbt 專案目錄至 Amazon S3，並執行 DAG，以驗證 Amazon MWAA 是否已成功安裝所需的 dbt 相依性。

## 將 dbt 專案上傳至 Amazon S3
<a name="samples-dbt-upload-project"></a>

若要將 dbt 專案與 Amazon MWAA 環境搭配使用，您可以將整個專案目錄上傳至環境的`dags`資料夾。當環境更新時，Amazon MWAA 會將 dbt 目錄下載至本機`usr/local/airflow/dags/`資料夾。

**將 dbt 專案上傳至 Amazon S3**

1. 導覽至您複製 dbt 入門專案的目錄。

1. 執行下列 Amazon S3 AWS CLI command，使用 `--recursive` 參數以遞迴方式將專案內容複製到您環境的`dags`資料夾。命令會建立名為 的子目錄`dbt`，可用於所有 dbt 專案。如果子目錄已存在，專案檔案會複製到現有目錄，而且不會建立新的目錄。命令也會在此特定入門專案的 `dbt` 目錄中建立子目錄。

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   您可以針對專案子目錄使用不同的名稱，在父`dbt`目錄中組織多個 dbt 專案。

## 使用 DAG 驗證 dbt 相依性安裝
<a name="samples-dbt-test-dependencies"></a>

下列 DAG 使用 `BashOperator`和 bash 命令來驗證 Amazon MWAA 是否已成功安裝 中指定的 dbt 相依性`requirements.txt`。

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

執行下列動作來存取任務日誌，並確認已安裝 dbt 及其相依性。

1. 導覽至 Amazon MWAA 主控台，然後從可用環境清單中選擇 **Open Airflow UI**。

1. 在 Apache Airflow UI 上，從清單中尋找 `dbt-installation-test` DAG，然後在`Last Run`欄中選擇日期以開啟最後一個成功任務。

1. 使用**圖形檢視**，選擇`bash_command`任務以開啟任務執行個體詳細資訊。

1. 選擇**日誌**以開啟任務日誌，然後驗證日誌是否成功列出我們在 中指定的 dbt 版本`requirements.txt`。

## 使用 DAG 執行 dbt 專案
<a name="samples-dbt-run-project"></a>

下列 DAG 使用 `BashOperator`將上傳到 Amazon S3 的 dbt 專案從本機`usr/local/airflow/dags/`目錄複製到可寫入`/tmp`目錄，然後執行 dbt 專案。bash 命令會假設名為 的入門 dbt 專案`dbt-starter-project`。根據專案目錄的名稱修改目錄名稱。

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS 部落格和教學課程
<a name="samples-blogs-tutorials"></a>
+ [使用 Amazon EKS 和 Amazon MWAA for Apache Airflow v2.x](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)