

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Codebeispiele für Amazon Managed Workflows für Apache Airflow
<a name="sample-code"></a>

Dieses Handbuch enthält Codebeispiele, einschließlich DAGs benutzerdefinierter Plugins, die Sie in einer Amazon Managed Workflows for Apache Airflow Airflow-Umgebung verwenden können. Weitere Beispiele für die Verwendung von Apache Airflow mit AWS Diensten finden Sie im [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags)Verzeichnis im Apache Airflow Airflow-Repository GitHub .

**Topics**
+ [Verwenden einer DAG zum Importieren von Variablen in die CLI](samples-variables-import.md)
+ [Erstellen einer SSH-Verbindung mit dem `SSHOperator`](samples-ssh.md)
+ [Verwenden eines geheimen Schlüssels AWS Secrets Manager für eine Apache Airflow Snowflake-Verbindung](samples-sm-snowflake.md)
+ [Verwenden einer DAG zum Schreiben benutzerdefinierter Metriken in CloudWatch](samples-custom-metrics.md)
+ [Säuberung der Aurora PostgreSQL-Datenbank in einer Amazon MWAA-Umgebung](samples-database-cleanup.md)
+ [Exportieren von Umgebungsmetadaten in CSV-Dateien auf Amazon S3](samples-dag-run-info-to-csv.md)
+ [Verwendung eines geheimen Schlüssels AWS Secrets Manager für eine Apache Airflow Airflow-Variable](samples-secrets-manager-var.md)
+ [Verwendung eines geheimen Schlüssels AWS Secrets Manager für eine Apache Airflow Airflow-Verbindung](samples-secrets-manager.md)
+ [Ein benutzerdefiniertes Plugin mit Oracle erstellen](samples-oracle.md)
+ [Ändern der Zeitzone einer DAG auf Amazon MWAA](samples-plugins-timezone.md)
+ [Ein CodeArtifact Token aktualisieren](samples-code-artifact.md)
+ [Erstellen eines benutzerdefinierten Plugins mit Apache Hive und Hadoop](samples-hive.md)
+ [Ein benutzerdefiniertes Plugin für Apache Airflow erstellen PythonVirtualenvOperator](samples-virtualenv.md)
+ [Aufrufen DAGs mit einer Lambda-Funktion](samples-lambda.md)
+ [Aufrufen DAGs in verschiedenen Amazon MWAA-Umgebungen](samples-invoke-dag.md)
+ [Verwenden von Amazon MWAA mit Amazon RDS for Microsoft SQL Server](samples-sql-server.md)
+ [Verwenden von Amazon MWAA mit Amazon EKS](mwaa-eks-example.md)
+ [Herstellen einer Verbindung zu Amazon ECS mithilfe der `ECSOperator`](samples-ecs-operator.md)
+ [Verwenden von dbt mit Amazon MWAA](samples-dbt.md)
+ [AWS Blogs und Tutorials](#samples-blogs-tutorials)

# Verwenden einer DAG zum Importieren von Variablen in die CLI
<a name="samples-variables-import"></a>

Der folgende Beispielcode importiert Variablen mithilfe der CLI auf Amazon Managed Workflows for Apache Airflow.

**Topics**
+ [Version](#samples-variables-import-version)
+ [Voraussetzungen](#samples-variables-import-prereqs)
+ [Berechtigungen](#samples-variables-import-permissions)
+ [Abhängigkeiten](#samples-variables-import-dependencies)
+ [Codebeispiel](#samples-variables-import-code)
+ [Als nächstes](#samples-variables-import-next-up)

## Version
<a name="samples-variables-import-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-variables-import-prereqs"></a>

Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

## Berechtigungen
<a name="samples-variables-import-permissions"></a>

Sie AWS-Konto benötigen Zugriff auf die `AmazonMWAAAirflowCliAccess` Richtlinie. Weitere Informationen finden Sie unter[Apache Airflow CLI-Richtlinie: Amazon MWAAAirflow CliAccess](access-policies.md).

## Abhängigkeiten
<a name="samples-variables-import-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-variables-import-code"></a>

Der folgende Beispielcode benötigt drei Eingaben: Ihren Amazon MWAA-Umgebungsnamen (in`mwaa_env`), den AWS-Region Ihrer Umgebung (in`aws_region`) und die lokale Datei, die die Variablen enthält, die Sie importieren möchten (in`var_file`).

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## Als nächstes
<a name="samples-variables-import-next-up"></a>
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).

# Erstellen einer SSH-Verbindung mit dem `SSHOperator`
<a name="samples-ssh"></a>

Das folgende Beispiel beschreibt, wie Sie den `SSHOperator` in a Directed Acyclic Graph (DAG) verwenden können, um von Ihrer Amazon Managed Workflows for Apache Airflow EC2 Airflow-Umgebung aus eine Verbindung zu einer Amazon-Remote-Instance herzustellen. Sie können einen ähnlichen Ansatz verwenden, um eine Verbindung zu einer beliebigen Remote-Instance mit SSH-Zugriff herzustellen.

Im folgenden Beispiel laden Sie einen geheimen SSH-Schlüssel (`.pem`) in das `dags` Verzeichnis Ihrer Umgebung auf Amazon S3 hoch. Anschließend installieren Sie die erforderlichen Abhängigkeiten mithilfe der Benutzeroberfläche `requirements.txt` und erstellen eine neue Apache Airflow Airflow-Verbindung. Schließlich schreiben Sie eine DAG, die eine SSH-Verbindung zur Remote-Instanz herstellt.

**Topics**
+ [Version](#samples-ssh-version)
+ [Voraussetzungen](#samples-ssh-prereqs)
+ [Berechtigungen](#samples-ssh-permissions)
+ [Voraussetzungen](#samples-ssh-dependencies)
+ [Kopieren Sie Ihren geheimen Schlüssel zu Amazon S3](#samples-ssh-secret)
+ [Erstellen Sie eine neue Apache Airflow Airflow-Verbindung](#samples-ssh-connection)
+ [Codebeispiel](#samples-ssh-code)

## Version
<a name="samples-ssh-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-ssh-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).
+ Ein geheimer SSH-Schlüssel. Das Codebeispiel geht davon aus, dass Sie über eine EC2 Amazon-Instance verfügen und sich `.pem` in derselben Region wie Ihre Amazon MWAA-Umgebung befinden. Wenn Sie keinen Schlüssel haben, finden Sie weitere Informationen unter [key pair erstellen oder importieren](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair) im * EC2 Amazon-Benutzerhandbuch*.

## Berechtigungen
<a name="samples-ssh-permissions"></a>

Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

## Voraussetzungen
<a name="samples-ssh-dependencies"></a>

Fügen Sie den folgenden Parameter hinzu`requirements.txt`, um das `apache-airflow-providers-ssh` Paket auf dem Webserver zu installieren. Sobald Ihre Umgebung aktualisiert wurde und Amazon MWAA die Abhängigkeit erfolgreich installiert hat, erhalten Sie in der Benutzeroberfläche einen neuen **SSH-Verbindungstyp**.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**Anmerkung**  
`-c`definiert die URL der Einschränkungen in. `requirements.txt` Dadurch wird sichergestellt, dass Amazon MWAA die richtige Paketversion für Ihre Umgebung installiert.

## Kopieren Sie Ihren geheimen Schlüssel zu Amazon S3
<a name="samples-ssh-secret"></a>

Verwenden Sie den folgenden AWS Command Line Interface Befehl, um Ihren `.pem` Schlüssel in das `dags` Verzeichnis Ihrer Umgebung in Amazon S3 zu kopieren.

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA kopiert den Inhalt einschließlich des `.pem` Schlüssels in `dags` das lokale `/usr/local/airflow/dags/` Verzeichnis. Auf diese Weise kann Apache Airflow auf den Schlüssel zugreifen.

## Erstellen Sie eine neue Apache Airflow Airflow-Verbindung
<a name="samples-ssh-connection"></a>

**So erstellen Sie eine neue SSH-Verbindung mit der Apache Airflow Airflow-Benutzeroberfläche**

1. Öffnen Sie die Seite [Umgebungen](https://console.aws.amazon.com/mwaa/home#/environments) auf der Amazon MWAA-Konsole.

1. Wählen Sie aus der Liste der Umgebungen **Open Airflow UI** für Ihre Umgebung aus.

1. **Wählen Sie auf der Apache Airflow Airflow-UI-Seite in der Hauptnavigationsleiste **Admin** aus, um die Dropdownliste zu erweitern, und wählen Sie dann Connections aus.**

1. Wählen Sie auf der Seite **Verbindungen auflisten** die Option **\$1** oder die Schaltfläche **Neuen Datensatz hinzufügen**, um eine neue Verbindung hinzuzufügen.

1. **Fügen Sie auf der Seite „Verbindung** hinzufügen“ die folgenden Informationen hinzu:

   1. Geben Sie als **Verbindungs-ID** ein**ssh\$1new**.

   1. Wählen Sie als **Verbindungstyp** **SSH** aus der Dropdownliste aus.
**Anmerkung**  
Wenn der **SSH-Verbindungstyp** nicht in der Liste verfügbar ist, hat Amazon MWAA das erforderliche Paket nicht installiert. `apache-airflow-providers-ssh` Aktualisieren Sie Ihre `requirements.txt` Datei, sodass sie dieses Paket enthält, und versuchen Sie es erneut.

   1. Geben Sie für **Host** die IP-Adresse der EC2 Amazon-Instance ein, zu der Sie eine Verbindung herstellen möchten. Beispiel, **12.345.67.89**.

   1. Geben Sie **unter Nutzername** ein, **ec2-user** ob Sie eine Verbindung zu einer EC2 Amazon-Instance herstellen. Ihr Benutzername kann je nach Art der Remote-Instance, mit der Apache Airflow eine Verbindung herstellen soll, unterschiedlich sein.

   1. Geben Sie für **Extra** das folgende Schlüssel-Wert-Paar im JSON-Format ein:

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      Dieses Schlüssel-Wert-Paar weist Apache Airflow an, im lokalen Verzeichnis nach dem geheimen Schlüssel zu suchen. `/dags`

## Codebeispiel
<a name="samples-ssh-code"></a>

Die folgende DAG verwendet die`SSHOperator`, um eine Verbindung zu Ihrer EC2 Amazon-Zielinstanz herzustellen, und führt dann den `hostname` Linux-Befehl aus, um den Namen der Instance auszudrucken. Sie können die DAG so ändern, dass sie jeden Befehl oder jedes Skript auf der Remote-Instance ausführt.

1. Öffnen Sie ein Terminal und navigieren Sie zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`ssh.py`.

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Bei Erfolg erhalten Sie in den Aufgabenprotokollen für die `ssh_operator_example` DAG eine Ausgabe, die `ssh_task` der folgenden ähnelt:

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# Verwenden eines geheimen Schlüssels AWS Secrets Manager für eine Apache Airflow Snowflake-Verbindung
<a name="samples-sm-snowflake"></a>

Das folgende Beispiel ruft AWS Secrets Manager auf, um einen geheimen Schlüssel für eine Apache Airflow Snowflake-Verbindung auf Amazon Managed Workflows for Apache Airflow abzurufen. Es wird davon ausgegangen, dass Sie die Schritte unter abgeschlossen haben. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)

**Topics**
+ [Version](#samples-sm-snowflake-version)
+ [Voraussetzungen](#samples-sm-snowflake-prereqs)
+ [Berechtigungen](#samples-sm-snowflake-permissions)
+ [Voraussetzungen](#samples-sm-snowflake-dependencies)
+ [Codebeispiel](#samples-sm-snowflake-code)
+ [Als nächstes](#samples-sm-snowflake-next-up)

## Version
<a name="samples-sm-snowflake-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-sm-snowflake-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Das Secrets Manager Manager-Backend als Apache Airflow Airflow-Konfigurationsoption, wie unter aufgeführt. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)
+ Eine Apache Airflow Airflow-Verbindungszeichenfolge in Secrets Manager, wie unter aufgeführt. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)

## Berechtigungen
<a name="samples-sm-snowflake-permissions"></a>
+ Secrets Manager Manager-Berechtigungen wie unter aufgeführt[Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md).

## Voraussetzungen
<a name="samples-sm-snowflake-dependencies"></a>

Um den Beispielcode auf dieser Seite zu verwenden, fügen Sie Ihrem die folgenden Abhängigkeiten hinzu`requirements.txt`. Weitere Informationen finden Sie unter[Python-Abhängigkeiten installieren](working-dags-dependencies.md).

```
apache-airflow-providers-snowflake==1.3.0
```

## Codebeispiel
<a name="samples-sm-snowflake-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie den DAG-Code erstellen, der Secrets Manager aufruft, um das Geheimnis abzurufen.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`snowflake_connection.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## Als nächstes
<a name="samples-sm-snowflake-next-up"></a>
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).

# Verwenden einer DAG zum Schreiben benutzerdefinierter Metriken in CloudWatch
<a name="samples-custom-metrics"></a>

Sie können das folgende Codebeispiel verwenden, um einen Directed Acyclic Graph (DAG) zu schreiben, der eine ausführt, `PythonOperator` um Metriken auf Betriebssystemebene für eine Amazon MWAA-Umgebung abzurufen. Die DAG veröffentlicht die Daten dann als benutzerdefinierte Metriken für Amazon CloudWatch.

Benutzerdefinierte Metriken auf Betriebssystemebene bieten Ihnen zusätzliche Einblicke in die Nutzung von Ressourcen wie virtuellen Speicher und CPU durch Ihre Mitarbeiter in der Umgebung. Sie können diese Informationen verwenden, um die [Umgebungsklasse](environment-class.md) auszuwählen, die am besten zu Ihrer Arbeitslast passt.

**Topics**
+ [Version](#samples-custom-metrics-version)
+ [Voraussetzungen](#samples-custom-metrics-prereqs)
+ [Berechtigungen](#samples-custom-metrics-permissions)
+ [Abhängigkeiten](#samples-custom-metrics-dependencies)
+ [Codebeispiel](#samples-custom-metrics-code)

## Version
<a name="samples-custom-metrics-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-custom-metrics-prereqs"></a>

Um das Codebeispiel auf dieser Seite verwenden zu können, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).

## Berechtigungen
<a name="samples-custom-metrics-permissions"></a>

Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

## Abhängigkeiten
<a name="samples-custom-metrics-dependencies"></a>
+ Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Abhängigkeiten erforderlich.

## Codebeispiel
<a name="samples-custom-metrics-code"></a>

1. Navigieren Sie in der Befehlszeile zu dem Ordner, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`dag-custom-metrics.py`. `MWAA-ENV-NAME`Ersetzen Sie es durch Ihren Umgebungsnamen.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Wenn die DAG erfolgreich ausgeführt wird, erhalten Sie in Ihren Apache Airflow Airflow-Protokollen etwas Ähnliches wie das Folgende:

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Säuberung der Aurora PostgreSQL-Datenbank in einer Amazon MWAA-Umgebung
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow verwendet eine Aurora PostgreSQL-Datenbank als Apache Airflow-Metadatendatenbank, in der DAG-Ausführungen ausgeführt und Task-Instances gespeichert werden. Der folgende Beispielcode löscht regelmäßig Einträge aus der speziellen Aurora PostgreSQL-Datenbank für Ihre Amazon MWAA-Umgebung.

**Topics**
+ [Version](#samples-database-cleanup-version)
+ [Voraussetzungen](#samples-database-cleanup-prereqs)
+ [Abhängigkeiten](#samples-sql-server-dependencies)
+ [Codebeispiel](#samples-database-cleanup-code)

## Version
<a name="samples-database-cleanup-version"></a>

Die Codebeispiele auf dieser Seite sind spezifisch für Apache Airflow v2, das auf Amazon MWAA unterstützt wird. Sehen Sie sich die [unterstützten Apache Airflow Airflow-Versionen](airflow-versions.md) an.

**Tipp**  
**Für Apache Airflow v3-Benutzer**: Wenn Sie eine Datenbank bereinigen möchten (alte Datensätze aus Metastore-Tabellen löschen), führen Sie den `db clean` CLI-Befehl aus.

## Voraussetzungen
<a name="samples-database-cleanup-prereqs"></a>

Um den Beispielcode auf dieser Seite verwenden zu können, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).

## Abhängigkeiten
<a name="samples-sql-server-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-database-cleanup-code"></a>

Die folgende DAG bereinigt die Metadaten-Datenbank nach den in angegebenen Tabellen. `TABLES_TO_CLEAN` Das Beispiel löscht Daten aus den angegebenen Tabellen, die älter als 30 Tage sind. Um anzupassen, wie weit die Einträge gelöscht werden, legen Sie `MAX_AGE_IN_DAYS` einen anderen Wert fest.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# Exportieren von Umgebungsmetadaten in CSV-Dateien auf Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Verwenden Sie das folgende Codebeispiel, um einen gerichteten azyklischen Graph (DAG) zu erstellen, der die Datenbank nach einer Reihe von DAG-Ausführungsinformationen abfragt und die Daten in `.csv` Dateien schreibt, die auf Amazon S3 gespeichert sind.

Möglicherweise möchten Sie Informationen aus der Aurora PostgreSQL-Datenbank Ihrer Umgebung exportieren, um die Daten lokal zu untersuchen, sie im Objektspeicher zu archivieren oder sie mit Tools wie dem [Amazon S3 S3-to-Amazon-Redshift-Operator und der [Datenbankbereinigung](samples-database-cleanup.md) zu](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) kombinieren, um Amazon MWAA-Metadaten aus der Umgebung zu verschieben, sie aber für future Analysen aufzubewahren.

Sie können die Datenbank nach allen Objekten abfragen, die in [Apache Airflow Airflow-Modellen](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models) aufgeführt sind. In diesem Codebeispiel werden drei Modelle,, und`DagRun`, verwendet`TaskFail`, die Informationen bereitstellen`TaskInstance`, die für DAG-Läufe relevant sind.

**Topics**
+ [Version](#samples-dag-run-info-to-csv-version)
+ [Voraussetzungen](#samples-dag-run-info-to-csv-prereqs)
+ [Berechtigungen](#samples-dag-run-info-to-csv-permissions)
+ [Voraussetzungen](#samples-dag-run-info-to-csv-dependencies)
+ [Codebeispiel](#samples-dag-run-info-to-csv-code)

## Version
<a name="samples-dag-run-info-to-csv-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).
+ Ein [neuer Amazon S3 S3-Bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html), in den Sie Ihre Metadateninformationen exportieren möchten.

## Berechtigungen
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA benötigt für die Amazon S3-Aktion die Erlaubnis, die abgefragten Metadateninformationen in Amazon S3 `s3:PutObject` zu schreiben. Fügen Sie der Ausführungsrolle Ihrer Umgebung die folgende Richtlinienerklärung hinzu.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

Diese Richtlinie beschränkt den Schreibzugriff nur auf*amzn-s3-demo-bucket*.

## Voraussetzungen
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-dag-run-info-to-csv-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie eine DAG erstellen können, die Aurora PostgreSQL abfragt und das Ergebnis in Ihren neuen Amazon S3 S3-Bucket schreibt.

1. Navigieren Sie in Ihrem Terminal zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`metadata_to_csv.py`. Sie können den zugewiesenen Wert ändern, `MAX_AGE_IN_DAYS` um das Alter der ältesten Datensätze zu steuern, die Ihre DAG aus der Metadatendatenbank abfragt.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Wenn der Vorgang erfolgreich ist, geben Sie in den Aufgabenprotokollen für die Aufgabe eine Ausgabe ähnlich der `export_db` folgenden aus:

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Sie können jetzt auf die exportierten `.csv` Dateien in Ihrem neuen Amazon S3 S3-Bucket in zugreifen und sie herunterladen`/files/export/`.

# Verwendung eines geheimen Schlüssels AWS Secrets Manager für eine Apache Airflow Airflow-Variable
<a name="samples-secrets-manager-var"></a>

Das folgende Beispiel ruft AWS Secrets Manager auf, um einen geheimen Schlüssel für eine Apache Airflow-Variable in Amazon Managed Workflows for Apache Airflow abzurufen. Es wird davon ausgegangen, dass Sie die Schritte unter abgeschlossen haben. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)

**Topics**
+ [Version](#samples-secrets-manager-var-version)
+ [Voraussetzungen](#samples-secrets-manager-var-prereqs)
+ [Berechtigungen](#samples-secrets-manager-var-permissions)
+ [Voraussetzungen](#samples-hive-dependencies)
+ [Codebeispiel](#samples-secrets-manager-var-code)
+ [Als nächstes](#samples-secrets-manager-var-next-up)

## Version
<a name="samples-secrets-manager-var-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-secrets-manager-var-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Das Secrets Manager Manager-Backend als Apache Airflow Airflow-Konfigurationsoption, wie unter aufgeführt. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)
+ Eine Apache Airflow Airflow-Variablenzeichenfolge in Secrets Manager, wie unter aufgeführt. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)

## Berechtigungen
<a name="samples-secrets-manager-var-permissions"></a>
+ Secrets Manager Manager-Berechtigungen wie unter aufgeführt[Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md).

## Voraussetzungen
<a name="samples-hive-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-secrets-manager-var-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie den DAG-Code erstellen, der Secrets Manager aufruft, um das Geheimnis abzurufen.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`secrets-manager-var.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## Als nächstes
<a name="samples-secrets-manager-var-next-up"></a>
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).

# Verwendung eines geheimen Schlüssels AWS Secrets Manager für eine Apache Airflow Airflow-Verbindung
<a name="samples-secrets-manager"></a>

Das folgende Beispiel ruft AWS Secrets Manager auf, um einen geheimen Schlüssel für eine Apache Airflow-Verbindung auf Amazon Managed Workflows for Apache Airflow abzurufen. Es wird davon ausgegangen, dass Sie die Schritte unter abgeschlossen haben. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)

**Topics**
+ [Version](#samples-secrets-manager-version)
+ [Voraussetzungen](#samples-secrets-manager-prereqs)
+ [Berechtigungen](#samples-secrets-manager-permissions)
+ [Voraussetzungen](#samples-hive-dependencies)
+ [Codebeispiel](#samples-secrets-manager-code)
+ [Als nächstes](#samples-secrets-manager-next-up)

## Version
<a name="samples-secrets-manager-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-secrets-manager-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Das Secrets Manager Manager-Backend als Apache Airflow Airflow-Konfigurationsoption, wie unter aufgeführt. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)
+ Eine Apache Airflow Airflow-Verbindungszeichenfolge in Secrets Manager, wie unter aufgeführt. [Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md)

## Berechtigungen
<a name="samples-secrets-manager-permissions"></a>
+ Secrets Manager Manager-Berechtigungen wie unter aufgeführt[Konfiguration einer Apache Airflow Airflow-Verbindung mithilfe eines Geheimnisses AWS Secrets Manager](connections-secrets-manager.md).

## Voraussetzungen
<a name="samples-hive-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-secrets-manager-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie den DAG-Code erstellen, der Secrets Manager aufruft, um das Geheimnis abzurufen.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`secrets-manager.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## Als nächstes
<a name="samples-secrets-manager-next-up"></a>
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).

# Ein benutzerdefiniertes Plugin mit Oracle erstellen
<a name="samples-oracle"></a>

Das folgende Beispiel führt Sie durch die Schritte zum Erstellen eines benutzerdefinierten Plugins mit Oracle for Amazon MWAA, das mit anderen benutzerdefinierten Plugins und Binärdateien in Ihrer Datei plugins.zip kombiniert werden kann.

**Contents**
+ [Version](#samples-oracle-version)
+ [Voraussetzungen](#samples-oracle-prereqs)
+ [Berechtigungen](#samples-oracle-permissions)
+ [Voraussetzungen](#samples-oracle-dependencies)
+ [Codebeispiel](#samples-oracle-code)
+ [Erstellen Sie das benutzerdefinierte Plugin](#samples-oracle-create-pluginszip-steps)
  + [Laden Sie Abhängigkeiten herunter](#samples-oracle-install)
  + [Benutzerdefiniertes Plug-in](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Konfigurationsoptionen für Airflow](#samples-oracle-airflow-config)
+ [Als nächstes](#samples-oracle-next-up)

## Version
<a name="samples-oracle-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-oracle-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).
+ Die Worker-Protokollierung ist auf jeder Protokollebene `CRITICAL` oder im vorherigen Abschnitt für Ihre Umgebung aktiviert. Weitere Informationen zu Amazon MWAA-Protokolltypen und zur Verwaltung Ihrer Protokollgruppen finden Sie unter [Zugreifen auf Airflow-Protokolle in Amazon CloudWatch](monitoring-airflow.md)

## Berechtigungen
<a name="samples-oracle-permissions"></a>

Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

## Voraussetzungen
<a name="samples-oracle-dependencies"></a>

Um den Beispielcode auf dieser Seite zu verwenden, fügen Sie Ihrem die folgenden Abhängigkeiten hinzu`requirements.txt`. Weitere Informationen finden Sie unter[Python-Abhängigkeiten installieren](working-dags-dependencies.md).

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## Codebeispiel
<a name="samples-oracle-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie den DAG-Code erstellen, mit dem das benutzerdefinierte Plugin getestet wird.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`oracle.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## Erstellen Sie das benutzerdefinierte Plugin
<a name="samples-oracle-create-pluginszip-steps"></a>

In diesem Abschnitt wird beschrieben, wie Sie die Abhängigkeiten herunterladen, das benutzerdefinierte Plugin und die Datei plugins.zip erstellen.

### Laden Sie Abhängigkeiten herunter
<a name="samples-oracle-install"></a>

Amazon MWAA extrahiert den Inhalt von plugins.zip in jeden `/usr/local/airflow/plugins` Amazon MWAA-Scheduler und Worker-Container. Dies wird verwendet, um Binärdateien zu Ihrer Umgebung hinzuzufügen. In den folgenden Schritten wird beschrieben, wie Sie die für das benutzerdefinierte Plugin benötigten Dateien zusammenstellen.

**Rufen Sie das Amazon Linux-Container-Image ab**

1. Rufen Sie in Ihrer Befehlszeile das Amazon Linux-Container-Image ab und führen Sie den Container lokal aus. Beispiel:

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   Ihre Eingabeaufforderung kann eine Bash-Befehlszeile aufrufen. Beispiel:

   ```
   bash-4.2#
   ```

1. Installieren Sie die native asynchrone I/O Linux-Einrichtung (libaio).

   ```
   yum -y install libaio
   ```

1. Lassen Sie dieses Fenster für nachfolgende Schritte geöffnet. Wir werden die folgenden Dateien lokal kopieren:`lib64/libaio.so.1`,`lib64/libaio.so.1.0.0`,`lib64/libaio.so.1.0.1`.

**Laden Sie den Kundenordner herunter**

1. Installieren Sie das Entpackungspaket lokal. Beispiel:

   ```
   sudo yum install unzip
   ```

1. Erstellen Sie ein `oracle_plugin`-Verzeichnis. Beispiel:

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. [Verwenden Sie den folgenden curl-Befehl, um die [instantclient-basic-linuxDatei .x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip) von Oracle Instant Client Downloads für Linux x86-64 (64-Bit) herunterzuladen.](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html)

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. Entpacken Sie die Datei `client.zip`. Beispiel:

   ```
   unzip *.zip
   ```

**Extrahieren Sie Dateien aus Docker**

1. Zeigen Sie in einer neuen Befehlszeile Ihre Docker-Container-ID an und notieren Sie sie. Beispiel:

   ```
   docker container ls
   ```

   Ihre Eingabeaufforderung kann alle Container und ihre IDs zurückgeben. Beispiel:

   ```
   debc16fd6970
   ```

1. Extrahieren Sie in Ihrem `oracle_plugin` Verzeichnis die `lib64/libaio.so.1``lib64/libaio.so.1.0.0`,, `lib64/libaio.so.1.0.1` Dateien in den lokalen `instantclient_18_5` Ordner. Beispiel:

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### Benutzerdefiniertes Plug-in
<a name="samples-oracle-plugins-code"></a>

Apache Airflow führt beim Start den Inhalt der Python-Dateien im Plugins-Ordner aus. Dies wird verwendet, um Umgebungsvariablen festzulegen und zu ändern. In den folgenden Schritten wird der Beispielcode für das benutzerdefinierte Plugin beschrieben.
+ Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`env_var_plugin_oracle.py`.

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

In den folgenden Schritten wird erklärt, wie Sie das erstellen`plugins.zip`. Der Inhalt dieses Beispiels kann mit Ihren anderen Plugins und Binärdateien in einer einzigen `plugins.zip` Datei kombiniert werden.

**Den Inhalt des Plugin-Verzeichnisses komprimieren**

1. Navigieren Sie in der Befehlszeile zu dem `oracle_plugin` Verzeichnis. Beispiel:

   ```
   cd oracle_plugin
   ```

1. Komprimieren Sie das `instantclient_18_5` Verzeichnis in der Datei plugins.zip. Beispiel:

   ```
   zip -r ../plugins.zip ./
   ```

   Ihre Eingabeaufforderung zeigt an:

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. Entfernen Sie die `client.zip` Datei. Beispiel:

   ```
   rm client.zip
   ```

**Komprimieren Sie die Datei env\$1var\$1plugin\$1oracle.py**

1. Fügen Sie die `env_var_plugin_oracle.py` Datei dem Stammverzeichnis der Datei plugins.zip hinzu. Beispiel:

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. Ihre Datei plugins.zip enthält jetzt Folgendes:

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Konfigurationsoptionen für Airflow
<a name="samples-oracle-airflow-config"></a>

Wenn Sie Apache Airflow v2 verwenden, fügen Sie es `core.lazy_load_plugins : False` als Apache Airflow Airflow-Konfigurationsoption hinzu. Weitere Informationen finden Sie unter [Verwenden von Konfigurationsoptionen zum Laden von Plugins in 2.](configuring-env-variables.md#configuring-2.0-airflow-override)

## Als nächstes
<a name="samples-oracle-next-up"></a>
+ Erfahren Sie unter, wie Sie die `requirements.txt` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket hochladen[Python-Abhängigkeiten installieren](working-dags-dependencies.md).
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).
+ Weitere Informationen zum Hochladen der `plugins.zip` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket finden Sie unter[Installation benutzerdefinierter Plugins](configuring-dag-import-plugins.md).

# Ändern der Zeitzone einer DAG auf Amazon MWAA
<a name="samples-plugins-timezone"></a>

Apache Airflow plant deinen gerichteten azyklischen Graphen (DAG) standardmäßig in UTC\$10. [In den folgenden Schritten erfahren Sie, wie Sie die Zeitzone ändern können, in der Amazon MWAA Ihr DAGs Gerät mit Pendulum ausführt.](https://pypi.org/project/pendulum/) Optional zeigt dieses Thema, wie Sie ein benutzerdefiniertes Plugin erstellen können, um die Zeitzone für die Apache Airflow Airflow-Protokolle Ihrer Umgebung zu ändern.

**Topics**
+ [Version](#samples-plugins-timezone-version)
+ [Voraussetzungen](#samples-plugins-timezone-prerequisites)
+ [Berechtigungen](#samples-plugins-timezone-permissions)
+ [Erstellen Sie ein Plugin, um die Zeitzone in Airflow-Protokollen zu ändern](#samples-plugins-timezone-custom-plugin)
+ [Erstellen eines `plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [Codebeispiel](#samples-plugins-timezone-dag)
+ [Als nächstes](#samples-plugins-timezone-plugins-next-up)

## Version
<a name="samples-plugins-timezone-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-plugins-timezone-prerequisites"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).

## Berechtigungen
<a name="samples-plugins-timezone-permissions"></a>

Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

## Erstellen Sie ein Plugin, um die Zeitzone in Airflow-Protokollen zu ändern
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow führt die Python-Dateien beim Start im `plugins` Verzeichnis aus. Mit dem folgenden Plugin können Sie die Zeitzone des Executors überschreiben, wodurch die Zeitzone geändert wird, in der Apache Airflow Logs schreibt.

1. Erstellen Sie ein Verzeichnis, das `plugins` nach Ihrem benutzerdefinierten Plugin benannt ist, und navigieren Sie zu dem Verzeichnis. Zum Beispiel:

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal wie `dag-timezone-plugin.py` im `plugins` Ordner.

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. Erstellen Sie im `plugins` Verzeichnis eine leere Python-Datei mit dem Namen`__init__.py`. Ihr `plugins` Verzeichnis sollte dem folgenden ähnlich sein:

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## Erstellen eines `plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

In den folgenden Schritten wird erklärt, wie Sie erstellen`plugins.zip`. Der Inhalt dieses Beispiels kann mit anderen Plugins und Binärdateien in einer einzigen `plugins.zip` Datei kombiniert werden.

1. Navigieren Sie in der Befehlszeile zu dem `plugins` Verzeichnis aus dem vorherigen Schritt. Zum Beispiel:

   ```
   cd plugins
   ```

1. Komprimieren Sie den Inhalt Ihres `plugins` Verzeichnisses.

   ```
   zip -r ../plugins.zip ./
   ```

1. Laden Sie `plugins.zip` es in Ihren S3-Bucket hoch

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## Codebeispiel
<a name="samples-plugins-timezone-dag"></a>

Um die Standardzeitzone (UTC\$10) zu ändern, in der die DAG ausgeführt wird, verwenden wir eine Bibliothek namens [Pendulum](https://pypi.org/project/pendulum/), eine Python-Bibliothek für die Arbeit mit zeitzonensensitiver Datetime.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Sie gespeichert sind. DAGs Zum Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Beispiels und speichern Sie ihn unter`tz-aware-dag.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Wenn der Vorgang erfolgreich ist, geben Sie in den Task-Logs für die in der `tz_test` DAG eine ähnliche Ausgabe wie folgt aus: `tz_aware_task`

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## Als nächstes
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ Weitere Informationen zum Hochladen der `plugins.zip` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket finden Sie unter[Installation benutzerdefinierter Plugins](configuring-dag-import-plugins.md).

# Ein CodeArtifact Token aktualisieren
<a name="samples-code-artifact"></a>

Wenn Sie Python-Abhängigkeiten CodeArtifact zur Installation verwenden, benötigt Amazon MWAA ein aktives Token. Damit Amazon MWAA zur Laufzeit auf ein CodeArtifact Repository zugreifen kann, können Sie ein [Startskript](using-startup-script.md) verwenden und das [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url)mit dem Token festlegen.

Im folgenden Thema wird beschrieben, wie Sie ein Startskript erstellen können, das mithilfe der [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token)CodeArtifact API-Operation bei jedem Start oder jeder Aktualisierung Ihrer Umgebung ein neues Token abruft.

**Topics**
+ [Version](#samples-code-artifact-version)
+ [Voraussetzungen](#samples-code-artifact-prereqs)
+ [Berechtigungen](#samples-code-artifact-permissions)
+ [Codebeispiel](#samples-code-artifact-code)
+ [Als nächstes](#samples-code-artifact-next-up)

## Version
<a name="samples-code-artifact-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-code-artifact-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).
+ Ein [CodeArtifact Repository](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html), in dem Sie Abhängigkeiten für Ihre Umgebung speichern.

## Berechtigungen
<a name="samples-code-artifact-permissions"></a>

Um das CodeArtifact Token zu aktualisieren und das Ergebnis in Amazon S3 zu schreiben, muss Amazon MWAA über die folgenden Berechtigungen in der Ausführungsrolle verfügen.
+ Die `codeartifact:GetAuthorizationToken` Aktion ermöglicht Amazon MWAA, ein neues Token von abzurufen. CodeArtifact Die folgende Richtlinie gewährt Berechtigungen für jede CodeArtifact Domain, die Sie erstellen. Sie können den Zugriff auf Ihre Domänen weiter einschränken, indem Sie den Ressourcenwert in der Anweisung ändern und nur die Domänen angeben, auf die Ihre Umgebung zugreifen soll.

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ Die `sts:GetServiceBearerToken` Aktion ist erforderlich, um den CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html)API-Vorgang aufzurufen. Dieser Vorgang gibt ein Token zurück, das verwendet werden muss, wenn Sie einen Paketmanager wie `pip` with verwenden CodeArtifact. Um einen Paketmanager mit einem CodeArtifact Repository zu verwenden, muss die Ausführungsrolle Ihrer Umgebung dies zulassen, `sts:GetServiceBearerToken` wie in der folgenden Richtlinienerklärung aufgeführt.

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## Codebeispiel
<a name="samples-code-artifact-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie ein Startskript erstellen können, das das CodeArtifact Token aktualisiert.

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`code_artifact_startup_script.sh`.

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. Navigieren Sie zu dem Ordner, in dem Sie das Skript gespeichert haben. Verwenden Sie es `cp` in einem neuen Eingabeaufforderungsfenster, um das Skript in Ihren Bucket hochzuladen. *amzn-s3-demo-bucket*Ersetzen Sie es durch Ihre Informationen.

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   Bei Erfolg gibt Amazon S3 den URL-Pfad zum Objekt aus:

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   Nachdem Sie das Skript hochgeladen haben, wird Ihre Umgebung aktualisiert und das Skript beim Start ausgeführt.

## Als nächstes
<a name="samples-code-artifact-next-up"></a>
+ Erfahren Sie unter, wie Sie Startskripts verwenden, um Ihre Umgebung anzupassen[Verwenden eines Startup-Skripts mit Amazon MWAA](using-startup-script.md).
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).
+ Weitere Informationen zum Hochladen der `plugins.zip` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket finden Sie unter[Installation benutzerdefinierter Plugins](configuring-dag-import-plugins.md).

# Erstellen eines benutzerdefinierten Plugins mit Apache Hive und Hadoop
<a name="samples-hive"></a>

Amazon MWAA extrahiert den Inhalt eines `plugins.zip` bis. `/usr/local/airflow/plugins` Dies kann verwendet werden, um Binärdateien zu Ihren Containern hinzuzufügen. Darüber hinaus führt Apache Airflow *beim Start* den Inhalt der Python-Dateien im `plugins` Ordner aus, sodass Sie Umgebungsvariablen festlegen und ändern können. Das folgende Beispiel führt Sie durch die Schritte zur Erstellung eines benutzerdefinierten Plugins mit Apache Hive und Hadoop in einer Amazon Managed Workflows for Apache Airflow Airflow-Umgebung, das mit anderen benutzerdefinierten Plugins und Binärdateien kombiniert werden kann.

**Topics**
+ [Version](#samples-hive-version)
+ [Voraussetzungen](#samples-hive-prereqs)
+ [Berechtigungen](#samples-hive-permissions)
+ [Voraussetzungen](#samples-hive-dependencies)
+ [Laden Sie Abhängigkeiten herunter](#samples-hive-install)
+ [Benutzerdefiniertes Plug-in](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [Codebeispiel](#samples-hive-code)
+ [Konfigurationsoptionen für Airflow](#samples-hive-airflow-config)
+ [Als nächstes](#samples-hive-next-up)

## Version
<a name="samples-hive-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-hive-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).

## Berechtigungen
<a name="samples-hive-permissions"></a>

Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

## Voraussetzungen
<a name="samples-hive-dependencies"></a>

Um den Beispielcode auf dieser Seite zu verwenden, fügen Sie Ihrem die folgenden Abhängigkeiten hinzu`requirements.txt`. Weitere Informationen finden Sie unter[Python-Abhängigkeiten installieren](working-dags-dependencies.md).

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## Laden Sie Abhängigkeiten herunter
<a name="samples-hive-install"></a>

Amazon MWAA extrahiert den Inhalt von plugins.zip in jeden `/usr/local/airflow/plugins` Amazon MWAA-Scheduler und Worker-Container. Dies wird verwendet, um Binärdateien zu Ihrer Umgebung hinzuzufügen. In den folgenden Schritten wird beschrieben, wie Sie die für das benutzerdefinierte Plugin benötigten Dateien zusammenstellen.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Sie Ihr Plugin erstellen möchten. Beispiel:

   ```
   cd plugins
   ```

1. Laden Sie [Hadoop](https://hadoop.apache.org/) von einem [Mirror](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz) herunter, zum Beispiel:

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. Laden Sie [Hive](https://hive.apache.org/) von einem [Mirror](https://www.apache.org/dyn/closer.cgi/hive/) herunter, zum Beispiel:

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. Erstellen Sie ein Verzeichnis. Beispiel:

   ```
   mkdir hive_plugin
   ```

1. Extrahieren Sie Hadoop.

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Extrahieren Sie Hive.

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## Benutzerdefiniertes Plug-in
<a name="samples-hive-plugins-code"></a>

Apache Airflow führt beim Start den Inhalt der Python-Dateien im Plugins-Ordner aus. Dies wird verwendet, um Umgebungsvariablen festzulegen und zu ändern. In den folgenden Schritten wird der Beispielcode für das benutzerdefinierte Plugin beschrieben.

1. Navigieren Sie in der Befehlszeile zu dem `hive_plugin` Verzeichnis. Beispiel:

   ```
   cd hive_plugin
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal wie `hive_plugin.py` im `hive_plugin` Verzeichnis.

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. Kopieren Sie den Inhalt des folgenden Textes und speichern Sie ihn lokal wie `.airflowignore` im `hive_plugin` Verzeichnis.

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

In den folgenden Schritten wird erklärt, wie Sie erstellen`plugins.zip`. Der Inhalt dieses Beispiels kann mit anderen Plugins und Binärdateien in einer einzigen `plugins.zip` Datei kombiniert werden.

1. Navigieren Sie in der Befehlszeile zu dem `hive_plugin` Verzeichnis aus dem vorherigen Schritt. Beispiel:

   ```
   cd hive_plugin
   ```

1. Komprimieren Sie den Inhalt Ihres `plugins` Ordners.

   ```
   zip -r ../hive_plugin.zip ./
   ```

## Codebeispiel
<a name="samples-hive-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie den DAG-Code erstellen, mit dem das benutzerdefinierte Plugin getestet wird.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`hive.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Konfigurationsoptionen für Airflow
<a name="samples-hive-airflow-config"></a>

Wenn Sie Apache Airflow v2 verwenden, fügen Sie es `core.lazy_load_plugins : False` als Apache Airflow Airflow-Konfigurationsoption hinzu. Weitere Informationen finden Sie unter [Verwenden von Konfigurationsoptionen zum Laden von Plugins in 2.](configuring-env-variables.md#configuring-2.0-airflow-override)

## Als nächstes
<a name="samples-hive-next-up"></a>
+ Erfahren Sie unter, wie Sie die `requirements.txt` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket hochladen[Python-Abhängigkeiten installieren](working-dags-dependencies.md).
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).
+ Weitere Informationen zum Hochladen der `plugins.zip` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket finden Sie unter[Installation benutzerdefinierter Plugins](configuring-dag-import-plugins.md).

# Ein benutzerdefiniertes Plugin für Apache Airflow erstellen PythonVirtualenvOperator
<a name="samples-virtualenv"></a>

Das folgende Beispiel erklärt, wie der Apache Airflow `PythonVirtualenvOperator` mit einem benutzerdefinierten Plugin auf Amazon Managed Workflows für Apache Airflow gepatcht wird.

**Topics**
+ [Version](#samples-virtualenv-version)
+ [Voraussetzungen](#samples-virtualenv-prereqs)
+ [Berechtigungen](#samples-virtualenv-permissions)
+ [Voraussetzungen](#samples-virtualenv-dependencies)
+ [Benutzerdefinierter Plugin-Beispielcode](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [Codebeispiel](#samples-virtualenv-code)
+ [Konfigurationsoptionen für Airflow](#samples-virtualenv-airflow-config)
+ [Als nächstes](#samples-virtualenv-next-up)

## Version
<a name="samples-virtualenv-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-virtualenv-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).

## Berechtigungen
<a name="samples-virtualenv-permissions"></a>

Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

## Voraussetzungen
<a name="samples-virtualenv-dependencies"></a>

Um den Beispielcode auf dieser Seite zu verwenden, fügen Sie Ihrem die folgenden Abhängigkeiten hinzu`requirements.txt`. Weitere Informationen finden Sie unter[Python-Abhängigkeiten installieren](working-dags-dependencies.md).

```
virtualenv
```

## Benutzerdefinierter Plugin-Beispielcode
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow führt beim Start den Inhalt der Python-Dateien im Plugins-Ordner aus. Dieses Plugin patcht das integrierte Modul `PythonVirtualenvOperator` während des Startvorgangs, um es mit Amazon MWAA kompatibel zu machen. In den folgenden Schritten wird der Beispielcode für das benutzerdefinierte Plugin angezeigt.

1. Navigieren Sie in der Befehlszeile zu dem `plugins` Verzeichnis im vorherigen Abschnitt. Beispiel:

   ```
   cd plugins
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`virtual_python_plugin.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

In den folgenden Schritten wird erklärt, wie Sie das erstellen`plugins.zip`.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, das `virtual_python_plugin.py` im vorherigen Abschnitt enthalten ist. Beispiel:

   ```
   cd plugins
   ```

1. Komprimieren Sie den Inhalt Ihres `plugins` Ordners.

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## Codebeispiel
<a name="samples-virtualenv-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie den DAG-Code für das benutzerdefinierte Plugin erstellen.

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`virtualenv_test.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Konfigurationsoptionen für Airflow
<a name="samples-virtualenv-airflow-config"></a>

Wenn Sie Apache Airflow v2 verwenden, fügen Sie es `core.lazy_load_plugins : False` als Apache Airflow Airflow-Konfigurationsoption hinzu. Weitere Informationen finden Sie unter [Verwenden von Konfigurationsoptionen zum Laden von Plugins in 2.](configuring-env-variables.md#configuring-2.0-airflow-override)

## Als nächstes
<a name="samples-virtualenv-next-up"></a>
+ Erfahren Sie unter, wie Sie die `requirements.txt` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket hochladen[Python-Abhängigkeiten installieren](working-dags-dependencies.md).
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).
+ Weitere Informationen zum Hochladen der `plugins.zip` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket finden Sie unter[Installation benutzerdefinierter Plugins](configuring-dag-import-plugins.md).

# Aufrufen DAGs mit einer Lambda-Funktion
<a name="samples-lambda"></a>

Das folgende Codebeispiel verwendet eine [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)Funktion, um ein Apache Airflow-CLI-Token abzurufen und einen gerichteten azyklischen Graphen (DAG) in einer Amazon MWAA-Umgebung aufzurufen.

**Topics**
+ [Version](#samples-lambda-version)
+ [Voraussetzungen](#samples-lambda-prereqs)
+ [Berechtigungen](#samples-lambda-permissions)
+ [Abhängigkeiten](#samples-lambda-dependencies)
+ [Codebeispiel](#samples-lambda-code)

## Version
<a name="samples-lambda-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-lambda-prereqs"></a>

Um dieses Codebeispiel zu verwenden, müssen Sie:
+ Verwenden Sie den [öffentlichen Netzwerkzugriffsmodus](configuring-networking.md#webserver-options-public-network-onconsole) für Ihre [Amazon MWAA-Umgebung](get-started.md).
+ Verwenden Sie eine [Lambda-Funktion](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html), die die neueste Python-Laufzeit verwendet.

**Anmerkung**  
Wenn sich die Lambda-Funktion und Ihre Amazon MWAA-Umgebung in derselben VPC befinden, können Sie diesen Code in einem privaten Netzwerk verwenden. Für diese Konfiguration benötigt die Ausführungsrolle der Lambda-Funktion die Erlaubnis, den **CreateNetworkInterface** API-Vorgang Amazon Elastic Compute Cloud (Amazon EC2) aufzurufen. Sie können diese Berechtigung mithilfe der [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS-managed Policy erteilen.

## Berechtigungen
<a name="samples-lambda-permissions"></a>

Um das Codebeispiel auf dieser Seite verwenden zu können, benötigt die Ausführungsrolle Ihrer Amazon MWAA-Umgebung Zugriff, um die `airflow:CreateCliToken` Aktion auszuführen. Sie können diese Berechtigung mithilfe der Richtlinie `AmazonMWAAAirflowCliAccess` AWS-managed erteilen:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Weitere Informationen finden Sie unter [Apache Airflow CLI-Richtlinie: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Abhängigkeiten
<a name="samples-lambda-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-lambda-code"></a>

1. Öffnen Sie die AWS Lambda Konsole unter. [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/)

1. Wählen Sie Ihre Lambda-Funktion aus der **Funktionsliste** aus.

1. Kopieren Sie auf der Funktionsseite den folgenden Code und ersetzen Sie ihn durch die Namen Ihrer Ressourcen:
   + `YOUR_ENVIRONMENT_NAME`— Der Name Ihrer Amazon MWAA-Umgebung.
   + `YOUR_DAG_NAME`— Der Name der DAG, die Sie aufrufen möchten.

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. Wählen Sie **Bereitstellen**.

1. Wählen Sie **Test**, um Ihre Funktion mit der Lambda-Konsole aufzurufen.

1. Um zu überprüfen, ob Ihr Lambda Ihre DAG erfolgreich aufgerufen hat, verwenden Sie die Amazon MWAA-Konsole, um zur Apache Airflow Airflow-Benutzeroberfläche Ihrer Umgebung zu navigieren, und gehen Sie dann wie folgt vor:

   1. Suchen Sie auf der **DAGs**Seite Ihre neue Ziel-DAG in der Liste von. DAGs

   1. Überprüfen Sie unter **Letzte Ausführung** den Zeitstempel für die letzte DAG-Ausführung. Dieser Zeitstempel sollte genau mit dem letzten Zeitstempel für Ihre andere `invoke_dag` Umgebung übereinstimmen.

   1. Überprüfen Sie unter **Letzte Aufgaben**, ob die letzte Ausführung erfolgreich war.

# Aufrufen DAGs in verschiedenen Amazon MWAA-Umgebungen
<a name="samples-invoke-dag"></a>

Das folgende Codebeispiel erstellt ein Apache Airflow CLI-Token. Der Code verwendet dann einen gerichteten azyklischen Graphen (DAG) in einer Amazon MWAA-Umgebung, um eine DAG in einer anderen Amazon MWAA-Umgebung aufzurufen.

**Topics**
+ [Version](#samples-invoke-dag-version)
+ [Voraussetzungen](#samples-invoke-dag-prereqs)
+ [Berechtigungen](#samples-invoke-dag-permissions)
+ [Abhängigkeiten](#samples-invoke-dag-dependencies)
+ [Codebeispiel](#samples-invoke-dag-code)

## Version
<a name="samples-invoke-dag-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-invoke-dag-prereqs"></a>

Um das Codebeispiel auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Zwei [Amazon MWAA-Umgebungen](get-started.md) mit Zugriff auf **öffentliche Netzwerk-Webserver**, einschließlich Ihrer aktuellen Umgebung.
+ Eine Beispiel-DAG, die in den Amazon Simple Storage Service (Amazon S3) -Bucket Ihrer Zielumgebung hochgeladen wurde.

## Berechtigungen
<a name="samples-invoke-dag-permissions"></a>

Um das Codebeispiel auf dieser Seite verwenden zu können, muss die Ausführungsrolle Ihrer Umgebung über die Berechtigung verfügen, ein Apache Airflow CLI-Token zu erstellen. Sie können die AWS-managed Policy anhängen, um diese `AmazonMWAAAirflowCliAccess` Berechtigung zu erteilen.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Weitere Informationen finden Sie unter [Apache Airflow CLI-Richtlinie: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Abhängigkeiten
<a name="samples-invoke-dag-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-invoke-dag-code"></a>

Im folgenden Codebeispiel wird davon ausgegangen, dass Sie eine DAG in Ihrer aktuellen Umgebung verwenden, um eine DAG in einer anderen Umgebung aufzurufen.

1. Navigieren Sie in Ihrem Terminal zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`invoke_dag.py`. Ersetzen Sie die folgenden Werte durch Ihre Informationen.
   + `your-new-environment-name`— Der Name der anderen Umgebung, in der Sie die DAG aufrufen möchten.
   + `your-target-dag-id`— Die ID der DAG in der anderen Umgebung, die Sie aufrufen möchten.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Wenn die DAG erfolgreich ausgeführt wird, erhalten Sie in den Aufgabenprotokollen für `invoke_dag_task` eine Ausgabe ähnlich der folgenden.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Um zu überprüfen, ob Ihre DAG erfolgreich aufgerufen wurde, navigieren Sie zur Apache Airflow Airflow-Benutzeroberfläche für Ihre neue Umgebung und gehen Sie dann wie folgt vor:

   1. Suchen Sie auf der **DAGs**Seite Ihre neue Ziel-DAG in der Liste von. DAGs

   1. Überprüfen Sie unter **Letzte Ausführung** den Zeitstempel für die letzte DAG-Ausführung. Dieser Zeitstempel sollte genau mit dem letzten Zeitstempel für Ihre andere `invoke_dag` Umgebung übereinstimmen.

   1. Überprüfen Sie unter **Letzte Aufgaben**, ob die letzte Ausführung erfolgreich war.

# Verwenden von Amazon MWAA mit Amazon RDS for Microsoft SQL Server
<a name="samples-sql-server"></a>

Sie können Amazon Managed Workflows for Apache Airflow verwenden, um eine Verbindung zu einem [RDS für SQL Server](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html) herzustellen. Der folgende Beispielcode wird in einer DAGs Amazon Managed Workflows for Apache Airflow Airflow-Umgebung verwendet, um eine Verbindung zu einem Amazon RDS for Microsoft SQL Server herzustellen und Abfragen auf diesem auszuführen.

**Topics**
+ [Version](#samples-sql-server-version)
+ [Voraussetzungen](#samples-sql-server-prereqs)
+ [Abhängigkeiten](#samples-sql-server-dependencies)
+ [Apache Airflow v2-Verbindung](#samples-sql-server-conn)
+ [Codebeispiel](#samples-sql-server-code)
+ [Als nächstes](#samples-sql-server-next-up)

## Version
<a name="samples-sql-server-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-sql-server-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).
+ Amazon MWAA und der RDS für SQL Server laufen in derselben Amazon VPC/
+ Die VPC-Sicherheitsgruppen von Amazon MWAA und dem Server sind mit den folgenden Verbindungen konfiguriert:
  + Eine eingehende Regel für den Port, der für Amazon RDS in der Sicherheitsgruppe von Amazon MWAA `1433` geöffnet ist
  + Oder eine ausgehende Regel für den Port von `1433` Open from Amazon MWAA To RDS
+ Apache Airflow Connection for RDS for SQL Server spiegelt den Hostnamen, den Port, den Benutzernamen und das Passwort aus der Amazon RDS-SQL-Serverdatenbank wider, die im vorherigen Prozess erstellt wurde.

## Abhängigkeiten
<a name="samples-sql-server-dependencies"></a>

Um den Beispielcode in diesem Abschnitt zu verwenden, fügen Sie Ihrer die folgende Abhängigkeit hinzu. `requirements.txt` Weitere Informationen finden Sie unter[Python-Abhängigkeiten installieren](working-dags-dependencies.md).

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Apache Airflow v2-Verbindung
<a name="samples-sql-server-conn"></a>

Wenn Sie eine Verbindung in Apache Airflow v2 verwenden, stellen Sie sicher, dass das Airflow-Verbindungsobjekt die folgenden Schlüssel-Wert-Paare enthält:

1. **Verbindungs-ID: mssql\$1default**

1. **Verbindungstyp:** Amazon Web Services

1. **Gastgeber:** `YOUR_DB_HOST`

1. **Schema:**

1. **Einloggen:** admin

1. **Passwort:**

1. **Hafen:** 1433

1. **Zusätzlich:**

## Codebeispiel
<a name="samples-sql-server-code"></a>

1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Zum Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`sql-server.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## Als nächstes
<a name="samples-sql-server-next-up"></a>
+ Erfahren Sie unter, wie Sie die `requirements.txt` Datei in diesem Beispiel in Ihren Amazon S3 S3-Bucket hochladen[Python-Abhängigkeiten installieren](working-dags-dependencies.md).
+ Erfahren Sie unter, wie Sie den DAG-Code in diesem Beispiel in den `dags` Ordner in Ihrem Amazon S3 S3-Bucket hochladen[Hinzufügen oder Aktualisieren DAGs](configuring-dag-folder.md).
+ Erkunden Sie Beispielskripte und andere Beispiele für [Pymssql-Module.](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html)
+ Weitere Informationen zum Ausführen von SQL-Code in einer bestimmten Microsoft SQL-Datenbank mithilfe des [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) finden Sie im *Apache* Airflow Airflow-Referenzhandbuch.

# Verwenden von Amazon MWAA mit Amazon EKS
<a name="mwaa-eks-example"></a>

Das folgende Beispiel zeigt, wie Amazon Managed Workflows für Apache Airflow mit Amazon EKS verwendet werden.

**Topics**
+ [Version](#mwaa-eks-example-version)
+ [Voraussetzungen](#eksctl-prereqs)
+ [Erstellen Sie einen öffentlichen Schlüssel für Amazon EC2](#eksctl-create-key)
+ [Den Cluster erstellen](#create-cluster-eksctl)
+ [Erstellen Sie einen Namespace `mwaa`](#eksctl-namespace)
+ [Erstellen Sie eine Rolle für den Namespace `mwaa`](#eksctl-role)
+ [Eine IAM-Rolle für den Amazon EKS-Cluster erstellen und anhängen](#eksctl-iam-role)
+ [Erstellen Sie die Datei requirements.txt](#eksctl-requirements)
+ [Erstellen Sie eine Identitätszuweisung für Amazon EKS](#eksctl-identity-map)
+ [Erstellen der `kubeconfig`](#eksctl-kube-config)
+ [Erstellen Sie eine DAG](#eksctl-create-dag)
+ [DAG und `kube_config.yaml` zum Amazon S3 S3-Bucket hinzufügen](#eksctl-dag-bucket)
+ [Aktivieren und lösen Sie das Beispiel aus](#eksctl-trigger-pod)

## Version
<a name="mwaa-eks-example-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="eksctl-prereqs"></a>

Um das Beispiel in diesem Thema zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).
+ eksctl. Weitere Informationen finden Sie unter [Installieren](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl) von eksctl.
+ kubectl. Weitere Informationen finden Sie unter [Kubectl installieren und einrichten](https://kubernetes.io/docs/tasks/tools/install-kubectl/). In einigen Fällen wird dies mit eksctl installiert.
+ Ein EC2-Schlüsselpaar in der Region, in der Sie Ihre Amazon MWAA-Umgebung erstellen. Weitere Informationen finden Sie unter [key pair erstellen oder importieren](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair).

**Anmerkung**  
Wenn Sie einen `eksctl` Befehl verwenden, können Sie a einschließen, `--profile` um ein anderes Profil als das Standardprofil anzugeben.

## Erstellen Sie einen öffentlichen Schlüssel für Amazon EC2
<a name="eksctl-create-key"></a>

Verwenden Sie den folgenden Befehl, um einen öffentlichen Schlüssel aus Ihrem privaten key pair zu erstellen.

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

Weitere Informationen finden Sie unter [Abrufen des öffentlichen Schlüssels für Ihr key pair](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key).

## Den Cluster erstellen
<a name="create-cluster-eksctl"></a>

Verwenden Sie den folgenden Befehl, um den Cluster zu erstellen. Wenn Sie einen benutzerdefinierten Namen für den Cluster verwenden oder ihn in einer anderen Region erstellen möchten, ersetzen Sie die Werte für Name und Region. Sie müssen den Cluster in derselben Region erstellen, in der Sie die Amazon MWAA-Umgebung erstellen. Ersetzen Sie die Werte für die Subnetze so, dass sie den Subnetzen in Ihrem Amazon VPC-Netzwerk entsprechen, die Sie für Amazon MWAA verwenden. Ersetzen Sie den Wert für so, dass er dem von Ihnen `ssh-public-key` verwendeten Schlüssel entspricht. Sie können einen vorhandenen Schlüssel aus Amazon EC2 verwenden, der sich in derselben Region befindet, oder einen neuen Schlüssel in derselben Region erstellen, in der Sie Ihre Amazon MWAA-Umgebung erstellen.

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

Es dauert einige Zeit, bis die Erstellung des Clusters abgeschlossen ist. Sobald der Vorgang abgeschlossen ist, können Sie mithilfe des folgenden Befehls überprüfen, ob der Cluster erfolgreich erstellt wurde und ob der IAM OIDC-Anbieter konfiguriert wurde:

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## Erstellen Sie einen Namespace `mwaa`
<a name="eksctl-namespace"></a>

Nachdem Sie bestätigt haben, dass der Cluster erfolgreich erstellt wurde, verwenden Sie den folgenden Befehl, um einen Namespace für die Pods zu erstellen.

```
kubectl create namespace mwaa
```

## Erstellen Sie eine Rolle für den Namespace `mwaa`
<a name="eksctl-role"></a>

Nachdem Sie den Namespace erstellt haben, erstellen Sie eine Rolle und eine Rollenbindung für einen Amazon MWAA-Benutzer auf EKS, der Pods in einem MWAA-Namespace ausführen kann. Wenn Sie einen anderen Namen für den Namespace verwendet haben, ersetzen Sie mwaa in durch den Namen, den Sie verwendet haben. `-n mwaa`

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

Vergewissern Sie sich, dass die neue Rolle auf den Amazon EKS-Cluster zugreifen kann, indem Sie den folgenden Befehl ausführen. Achten Sie darauf, den richtigen Namen zu verwenden, falls Sie Folgendes nicht verwendet haben*mwaa*:

```
kubectl get pods -n mwaa --as mwaa-service
```

Sie erhalten eine Nachricht zurück, die besagt:

```
No resources found in mwaa namespace.
```

## Eine IAM-Rolle für den Amazon EKS-Cluster erstellen und anhängen
<a name="eksctl-iam-role"></a>

Sie müssen eine IAM-Rolle erstellen und sie dann an den Amazon EKS (k8s) -Cluster binden, damit sie für die Authentifizierung über IAM verwendet werden kann. Die Rolle wird nur für die Anmeldung am Cluster verwendet und hat keine Berechtigungen für die Konsole oder API-Aufrufe.

Erstellen Sie mithilfe der Schritte unter eine neue Rolle für die Amazon MWAA-Umgebung. [Amazon MWAA-Ausführungsrolle](mwaa-create-role.md) Anstatt jedoch die in diesem Thema beschriebenen Richtlinien zu erstellen und anzuhängen, fügen Sie die folgende Richtlinie bei:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

Nachdem Sie die Rolle erstellt haben, bearbeiten Sie Ihre Amazon MWAA-Umgebung, um die von Ihnen erstellte Rolle als Ausführungsrolle für die Umgebung zu verwenden. Um die Rolle zu ändern, bearbeiten Sie die zu verwendende Umgebung. Sie wählen die Ausführungsrolle unter **Berechtigungen** aus.

**Bekannte Probleme:**
+ Es gibt ein bekanntes Problem ARNs mit Rollen, bei denen Unterpfade sich nicht bei Amazon EKS authentifizieren können. Die Lösung hierfür besteht darin, die Servicerolle manuell zu erstellen, anstatt die von Amazon MWAA selbst erstellte Rolle zu verwenden. Weitere Informationen finden Sie unter [Rollen mit Pfaden funktionieren nicht, wenn der Pfad in ihrem ARN in der aws-auth-Configmap enthalten ist](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268).
+ Wenn die Amazon MWAA-Serviceliste in IAM nicht verfügbar ist, müssen Sie eine alternative Servicerichtlinie wie Amazon EC2 auswählen und dann die Vertrauensrichtlinie der Rolle so aktualisieren, dass sie den folgenden Anforderungen entspricht:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  Weitere Informationen finden Sie unter [So verwenden Sie Vertrauensrichtlinien mit](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/) IAM-Rollen.

## Erstellen Sie die Datei requirements.txt
<a name="eksctl-requirements"></a>

Um den Beispielcode in diesem Abschnitt verwenden zu können, stellen Sie sicher, dass Sie Ihrem eine der folgenden Datenbankoptionen hinzugefügt haben`requirements.txt`. Weitere Informationen finden Sie unter[Python-Abhängigkeiten installieren](working-dags-dependencies.md).

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## Erstellen Sie eine Identitätszuweisung für Amazon EKS
<a name="eksctl-identity-map"></a>

Verwenden Sie den ARN für die Rolle, die Sie im folgenden Befehl erstellt haben, um eine Identitätszuweisung für Amazon EKS zu erstellen. Ändern Sie die Region in *us-east-1* die Region, in der Sie die Umgebung erstellt haben. Ersetzen Sie den ARN für die Rolle und schließlich *mwaa-execution-role* durch die Ausführungsrolle Ihrer Umgebung.

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## Erstellen der `kubeconfig`
<a name="eksctl-kube-config"></a>

Verwenden Sie den folgenden Befehl, um Folgendes zu erstellen`kubeconfig`:

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

Wenn Sie bei der Ausführung ein bestimmtes Profil verwendet haben, müssen `update-kubeconfig` Sie den `env:` Abschnitt entfernen, der der Datei kube\$1config.yaml hinzugefügt wurde, damit er ordnungsgemäß mit Amazon MWAA funktioniert. Löschen Sie dazu Folgendes aus der Datei und speichern Sie sie anschließend:

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## Erstellen Sie eine DAG
<a name="eksctl-create-dag"></a>

Verwenden Sie das folgende Codebeispiel, um eine Python-Datei zu erstellen, z. B. `mwaa_pod_example.py` für die DAG.

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## DAG und `kube_config.yaml` zum Amazon S3 S3-Bucket hinzufügen
<a name="eksctl-dag-bucket"></a>

Fügen Sie die von Ihnen erstellte DAG und die `kube_config.yaml` Datei in den Amazon S3 S3-Bucket für die Amazon MWAA-Umgebung ein. Sie können Dateien entweder mit der Amazon S3 S3-Konsole oder mit dem in Ihren Bucket einfügen AWS Command Line Interface.

## Aktivieren und lösen Sie das Beispiel aus
<a name="eksctl-trigger-pod"></a>

Aktivieren Sie das Beispiel in Apache Airflow und lösen Sie es dann aus.

Nachdem es erfolgreich ausgeführt und abgeschlossen wurde, verwenden Sie den folgenden Befehl, um den Pod zu verifizieren:

```
kubectl get pods -n mwaa
```

Sie erhalten eine Ausgabe, die der folgenden ähnelt:

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

Anschließend können Sie die Ausgabe des Pods mit dem folgenden Befehl überprüfen. Ersetzen Sie den Namenswert durch den Wert, der vom vorherigen Befehl zurückgegeben wurde:

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# Herstellen einer Verbindung zu Amazon ECS mithilfe der `ECSOperator`
<a name="samples-ecs-operator"></a>

In diesem Thema wird beschrieben, wie Sie den verwenden können`ECSOperator`, um eine Verbindung zu einem Amazon Elastic Container Service (Amazon ECS) -Container von Amazon MWAA herzustellen. In den folgenden Schritten fügen Sie der Ausführungsrolle Ihrer Umgebung die erforderlichen Berechtigungen hinzu, verwenden eine CloudFormation Vorlage, um einen Amazon ECS Fargate-Cluster zu erstellen, und laden schließlich eine DAG hoch, die eine Verbindung zu Ihrem neuen Cluster herstellt.

**Topics**
+ [Version](#samples-ecs-operator-version)
+ [Voraussetzungen](#samples-ecs-operator-prereqs)
+ [Berechtigungen](#samples-ecs-operator-permissions)
+ [Erstellen Sie einen Amazon ECS-Cluster](#create-cfn-template)
+ [Codebeispiel](#samples-ecs-operator-code)

## Version
<a name="samples-ecs-operator-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-ecs-operator-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).

## Berechtigungen
<a name="samples-ecs-operator-permissions"></a>
+ Die Ausführungsrolle für Ihre Umgebung benötigt die Erlaubnis, Aufgaben in Amazon ECS auszuführen. Sie können entweder die von [AmazonECS\$1 FullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) AWS verwaltete Richtlinie an Ihre Ausführungsrolle anhängen oder die folgende Richtlinie erstellen und an Ihre Ausführungsrolle anhängen.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ Sie müssen nicht nur die erforderlichen Berechtigungen für die Ausführung von Aufgaben in Amazon ECS hinzufügen, sondern auch die CloudWatch Protokollrichtlinie in Ihrer Amazon MWAA-Ausführungsrolle ändern, um Zugriff auf die Amazon ECS-Aufgabenprotokollgruppe zu gewähren, wie im Folgenden aufgeführt. Die Amazon ECS-Protokollgruppe wird durch die CloudFormation Vorlage in erstellt[Erstellen Sie einen Amazon ECS-Cluster](#create-cfn-template).

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

Weitere Informationen zur Amazon MWAA-Ausführungsrolle und zum Anhängen einer Richtlinie finden Sie unter. [Rolle bei der Ausführung](mwaa-create-role.md)

## Erstellen Sie einen Amazon ECS-Cluster
<a name="create-cfn-template"></a>

Mithilfe der folgenden CloudFormation Vorlage erstellen Sie einen Amazon ECS Fargate-Cluster zur Verwendung mit Ihrem Amazon MWAA-Workflow. Weitere Informationen finden Sie unter [Erstellen einer Aufgabendefinition](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition) im *Amazon Elastic Container Service Developer Guide*.

1. Erstellen Sie eine JSON-Datei mit dem folgenden Code und speichern Sie sie unter`ecs-mwaa-cfn.json`.

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. Verwenden Sie in Ihrer Befehlszeile den folgenden AWS CLI Befehl, um einen neuen Stack zu erstellen. Sie müssen die Werte `SecurityGroups` und `SubnetIds` durch Werte für die Sicherheitsgruppen und Subnetze Ihrer Amazon MWAA-Umgebung ersetzen.

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   Alternativ können Sie das folgende Shell-Skript verwenden. Das Skript ruft mithilfe des `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI Befehls die erforderlichen Werte für die Sicherheitsgruppen und Subnetze Ihrer Umgebung ab und erstellt dann den Stack entsprechend. Gehen Sie wie folgt vor, um das Skript auszuführen.

   1. Kopieren und speichern Sie das Skript `ecs-stack-helper.sh` in demselben Verzeichnis wie Ihre CloudFormation Vorlage.

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. Führen Sie das Skript mit den folgenden Befehlen aus. Ersetzen Sie `environment-name` und `stack-name` durch Ihre Informationen.

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   Bei Erfolg verweisen Sie auf die folgende Ausgabe, in der Ihre neue CloudFormation Stack-ID angezeigt wird.

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

Nachdem Ihr CloudFormation Stack AWS fertiggestellt und Ihre Amazon ECS-Ressourcen bereitgestellt wurden, können Sie Ihre DAG erstellen und hochladen.

## Codebeispiel
<a name="samples-ecs-operator-code"></a>

1. Öffnen Sie eine Befehlszeile und navigieren Sie zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter. Laden Sie dann Ihre neue DAG auf Amazon S3 hoch. `mwaa-ecs-operator.py`

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**Anmerkung**  
In der Beispiel-DAG für `awslogs_group` müssen Sie möglicherweise die Protokollgruppe mit dem Namen Ihrer Amazon ECS-Aufgabenprotokollgruppe ändern. Das Beispiel geht von einer Protokollgruppe mit dem Namen aus`mwaa-ecs-zero`. Verwenden Sie für `awslogs_stream_prefix` das Amazon ECS-Aufgabenprotokoll-Stream-Präfix. Das Beispiel geht von einem Log-Stream-Präfix aus,`ecs`.

1.  Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Bei Erfolg erhalten Sie in den Aufgabenprotokollen für die `ecs_fargate_dag` DAG eine Ausgabe, die `ecs_operator_task` der folgenden ähnelt:

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# Verwenden von dbt mit Amazon MWAA
<a name="samples-dbt"></a>

Dieses Thema zeigt, wie Sie dbt und Postgres mit Amazon MWAA verwenden können. In den folgenden Schritten fügen Sie Ihrem `requirements.txt` die erforderlichen Abhängigkeiten hinzu und laden ein DBT-Beispielprojekt in den Amazon S3 S3-Bucket Ihrer Umgebung hoch. Anschließend überprüfen Sie anhand einer Beispiel-DAG, ob Amazon MWAA die Abhängigkeiten installiert hat, und verwenden die schließlich, `BashOperator` um das dbt-Projekt auszuführen.

**Topics**
+ [Version](#samples-dbt-version)
+ [Voraussetzungen](#samples-dbt-prereqs)
+ [Abhängigkeiten](#samples-dbt-dependencies)
+ [Laden Sie ein dbt-Projekt auf Amazon S3 hoch](#samples-dbt-upload-project)
+ [Verwenden Sie eine DAG, um die Installation der DBT-Abhängigkeit zu überprüfen](#samples-dbt-test-dependencies)
+ [Verwenden Sie eine DAG, um ein DBT-Projekt auszuführen](#samples-dbt-run-project)

## Version
<a name="samples-dbt-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-dbt-prereqs"></a>

Bevor Sie die folgenden Schritte ausführen können, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung, die Apache Airflow](get-started.md) v2.2.2 verwendet. Dieses Beispiel wurde mit Version 2.2.2 geschrieben und getestet. Möglicherweise müssen Sie das Beispiel ändern, um es mit anderen Apache Airflow Airflow-Versionen zu verwenden.
+ Ein DBT-Beispielprojekt. Um mit der Verwendung von dbt mit Amazon MWAA zu beginnen, können Sie einen Fork erstellen und das [dbt-Starter-Projekt aus dem dbt-labs-Repository](https://github.com/dbt-labs/dbt-starter-project) klonen. GitHub 

## Abhängigkeiten
<a name="samples-dbt-dependencies"></a>

Um Amazon MWAA mit dbt zu verwenden, fügen Sie Ihrer Umgebung das folgende Startskript hinzu. Weitere Informationen finden Sie unter [Verwenden eines Startskripts mit Amazon MWAA](using-startup-script.md).

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

In den folgenden Abschnitten laden Sie Ihr dbt-Projektverzeichnis auf Amazon S3 hoch und führen eine DAG aus, die überprüft, ob Amazon MWAA die erforderlichen dbt-Abhängigkeiten erfolgreich installiert hat.

## Laden Sie ein dbt-Projekt auf Amazon S3 hoch
<a name="samples-dbt-upload-project"></a>

Um ein dbt-Projekt mit Ihrer Amazon MWAA-Umgebung verwenden zu können, können Sie das gesamte Projektverzeichnis in den Ordner Ihrer Umgebung hochladen. `dags` Wenn die Umgebung aktualisiert wird, lädt Amazon MWAA das dbt-Verzeichnis in den lokalen Ordner herunter. `usr/local/airflow/dags/`

**Um ein dbt-Projekt auf Amazon S3 hochzuladen**

1. Navigieren Sie zu dem Verzeichnis, in dem Sie das dbt-Starterprojekt geklont haben.

1. Führen Sie den folgenden Amazon S3 AWS CLI S3-Befehl aus, um den Inhalt des Projekts mithilfe des `--recursive` Parameters rekursiv in den `dags` Ordner Ihrer Umgebung zu kopieren. Der Befehl erstellt ein Unterverzeichnis namens`dbt`, das Sie für all Ihre DBT-Projekte verwenden können. Wenn das Unterverzeichnis bereits existiert, werden die Projektdateien in das bestehende Verzeichnis kopiert, und es wird kein neues Verzeichnis erstellt. Der Befehl erstellt auch ein Unterverzeichnis innerhalb des `dbt` Verzeichnisses für dieses spezielle Starterprojekt.

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   Sie können verschiedene Namen für Projektunterverzeichnisse verwenden, um mehrere DBT-Projekte innerhalb des übergeordneten Verzeichnisses zu organisieren. `dbt`

## Verwenden Sie eine DAG, um die Installation der DBT-Abhängigkeit zu überprüfen
<a name="samples-dbt-test-dependencies"></a>

Die folgende DAG verwendet einen `BashOperator` und einen Bash-Befehl, um zu überprüfen, ob Amazon MWAA die in angegebenen dbt-Abhängigkeiten erfolgreich installiert hat. `requirements.txt`

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

Gehen Sie wie folgt vor, um auf die Task-Protokolle zuzugreifen und zu überprüfen, ob dbt und seine Abhängigkeiten installiert wurden.

1. Navigieren Sie zur Amazon MWAA-Konsole und wählen Sie dann **Open Airflow UI** aus der Liste der verfügbaren Umgebungen aus.

1. Suchen Sie auf der Apache Airflow Airflow-Benutzeroberfläche die `dbt-installation-test` DAG in der Liste und wählen Sie dann das Datum in der `Last Run` Spalte aus, an der die letzte erfolgreiche Aufgabe geöffnet werden soll.

1. Wählen Sie in der **Diagrammansicht** die `bash_command` Aufgabe aus, um die Details der Aufgabeninstanz zu öffnen.

1. Wählen Sie **Log**, um die Task-Logs zu öffnen, und vergewissern Sie sich dann, dass die Logs die von uns angegebene DBT-Version erfolgreich auflisten. `requirements.txt`

## Verwenden Sie eine DAG, um ein DBT-Projekt auszuführen
<a name="samples-dbt-run-project"></a>

Die folgende DAG verwendet a`BashOperator`, um die dbt-Projekte, die Sie auf Amazon S3 hochgeladen haben, aus dem lokalen `usr/local/airflow/dags/` Verzeichnis in das Verzeichnis mit Schreibzugriff zu `/tmp` kopieren, und führt dann das dbt-Projekt aus. Die Bash-Befehle gehen von einem DBT-Startprojekt mit dem Titel aus. `dbt-starter-project` Ändern Sie den Verzeichnisnamen entsprechend dem Namen Ihres Projektverzeichnisses.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS Blogs und Tutorials
<a name="samples-blogs-tutorials"></a>
+ [Arbeiten mit Amazon EKS und Amazon MWAA für Apache Airflow v2.x](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)