

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在不同的 Amazon MWAA 環境中叫用 DAGs
<a name="samples-invoke-dag"></a>

下列程式碼範例會建立 Apache Airflow CLI 字符。然後，程式碼會在一個 Amazon MWAA 環境中使用定向無環圖 (DAG)，在不同的 Amazon MWAA 環境中叫用 DAG。

**Topics**
+ [版本](#samples-invoke-dag-version)
+ [先決條件](#samples-invoke-dag-prereqs)
+ [許可](#samples-invoke-dag-permissions)
+ [相依性](#samples-invoke-dag-dependencies)
+ [程式碼範例](#samples-invoke-dag-code)

## 版本
<a name="samples-invoke-dag-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-invoke-dag-prereqs"></a>

若要使用此頁面上的程式碼範例，您需要下列項目：
+ 兩個具有**公有網路** Web 伺服器存取權的 [Amazon MWAA 環境](get-started.md)，包括您目前的環境。
+ 上傳至目標環境 Amazon Simple Storage Service (Amazon S3) 儲存貯體的範例 DAG。

## 許可
<a name="samples-invoke-dag-permissions"></a>

若要使用此頁面上的程式碼範例，您環境的執行角色必須具有建立 Apache Airflow CLI 權杖的許可。您可以連接 AWS受管政策`AmazonMWAAAirflowCliAccess`來授予此許可。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

如需詳細資訊，請參閱 [Apache Airflow CLI 政策：AmazonMWAAAirflowCliAccess](access-policies.md#cli-access)。

## 相依性
<a name="samples-invoke-dag-dependencies"></a>

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 程式碼範例
<a name="samples-invoke-dag-code"></a>

下列程式碼範例假設您在目前環境中使用 DAG，以在另一個環境中叫用 DAG。

1. 在終端機中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並將其儲存為 `invoke_dag.py`。將下列值取代為您的資訊。
   + `your-new-environment-name` – 您要叫用 DAG 的其他環境名稱。
   + `your-target-dag-id` – 您要叫用之其他環境中的 DAG ID。

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  執行下列 AWS CLI 命令，將 DAG 複製到您環境的儲存貯體，然後使用 Apache Airflow UI 觸發 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果 DAG 成功執行，您會在 的任務日誌中收到類似以下的輸出`invoke_dag_task`。

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   若要確認您的 DAG 已成功調用，請導覽至新環境的 Apache Airflow UI，然後執行下列動作：

   1. 在 **DAGs**頁面上，在 DAG 清單中找到您的新目標 DAGs。

   1. 在**上次執行**下，檢查最新 DAG 執行的時間戳記。此時間戳記應緊密符合您`invoke_dag`其他環境中 的最新時間戳記。

   1. 在**最近任務**下，檢查上次執行是否成功。