

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Aufrufen DAGs in verschiedenen Amazon MWAA-Umgebungen
<a name="samples-invoke-dag"></a>

Das folgende Codebeispiel erstellt ein Apache Airflow CLI-Token. Der Code verwendet dann einen gerichteten azyklischen Graphen (DAG) in einer Amazon MWAA-Umgebung, um eine DAG in einer anderen Amazon MWAA-Umgebung aufzurufen.

**Topics**
+ [Version](#samples-invoke-dag-version)
+ [Voraussetzungen](#samples-invoke-dag-prereqs)
+ [Berechtigungen](#samples-invoke-dag-permissions)
+ [Abhängigkeiten](#samples-invoke-dag-dependencies)
+ [Codebeispiel](#samples-invoke-dag-code)

## Version
<a name="samples-invoke-dag-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-invoke-dag-prereqs"></a>

Um das Codebeispiel auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Zwei [Amazon MWAA-Umgebungen](get-started.md) mit Zugriff auf **öffentliche Netzwerk-Webserver**, einschließlich Ihrer aktuellen Umgebung.
+ Eine Beispiel-DAG, die in den Amazon Simple Storage Service (Amazon S3) -Bucket Ihrer Zielumgebung hochgeladen wurde.

## Berechtigungen
<a name="samples-invoke-dag-permissions"></a>

Um das Codebeispiel auf dieser Seite verwenden zu können, muss die Ausführungsrolle Ihrer Umgebung über die Berechtigung verfügen, ein Apache Airflow CLI-Token zu erstellen. Sie können die AWS-managed Policy anhängen, um diese `AmazonMWAAAirflowCliAccess` Berechtigung zu erteilen.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Weitere Informationen finden Sie unter [Apache Airflow CLI-Richtlinie: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Abhängigkeiten
<a name="samples-invoke-dag-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-invoke-dag-code"></a>

Im folgenden Codebeispiel wird davon ausgegangen, dass Sie eine DAG in Ihrer aktuellen Umgebung verwenden, um eine DAG in einer anderen Umgebung aufzurufen.

1. Navigieren Sie in Ihrem Terminal zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`invoke_dag.py`. Ersetzen Sie die folgenden Werte durch Ihre Informationen.
   + `your-new-environment-name`— Der Name der anderen Umgebung, in der Sie die DAG aufrufen möchten.
   + `your-target-dag-id`— Die ID der DAG in der anderen Umgebung, die Sie aufrufen möchten.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Wenn die DAG erfolgreich ausgeführt wird, erhalten Sie in den Aufgabenprotokollen für `invoke_dag_task` eine Ausgabe ähnlich der folgenden.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Um zu überprüfen, ob Ihre DAG erfolgreich aufgerufen wurde, navigieren Sie zur Apache Airflow Airflow-Benutzeroberfläche für Ihre neue Umgebung und gehen Sie dann wie folgt vor:

   1. Suchen Sie auf der **DAGs**Seite Ihre neue Ziel-DAG in der Liste von. DAGs

   1. Überprüfen Sie unter **Letzte Ausführung** den Zeitstempel für die letzte DAG-Ausführung. Dieser Zeitstempel sollte genau mit dem letzten Zeitstempel für Ihre andere `invoke_dag` Umgebung übereinstimmen.

   1. Überprüfen Sie unter **Letzte Aufgaben**, ob die letzte Ausführung erfolgreich war.