DAGDie Zeitzone von a bei Amazon ändern MWAA - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

DAGDie Zeitzone von a bei Amazon ändern MWAA

Apache Airflow legt Ihren gerichteten azyklischen Graphen (DAG) standardmäßig auf +0 fest. UTC Die folgenden Schritte zeigen, wie Sie die Zeitzone ändern können, in der Amazon Ihr DAGs mit Pendulum MWAA ausführt. Optional zeigt dieses Thema, wie Sie ein benutzerdefiniertes Plugin erstellen können, um die Zeitzone für die Apache Airflow-Protokolle Ihrer Umgebung zu ändern.

Version

  • Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 in Python 3.10 verwenden.

Voraussetzungen

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:

Berechtigungen

  • Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

Erstellen Sie ein Plugin, um die Zeitzone in Airflow-Protokollen zu ändern

Apache Airflow führt die Python-Dateien beim Start im plugins Verzeichnis aus. Mit dem folgenden Plugin können Sie die Zeitzone des Executors überschreiben, wodurch die Zeitzone geändert wird, in der Apache Airflow Protokolle schreibt.

  1. Erstellen Sie ein Verzeichnis, das plugins nach Ihrem benutzerdefinierten Plugin benannt ist, und navigieren Sie zu dem Verzeichnis. Beispielsweise:

    $ mkdir plugins $ cd plugins
  2. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal wie dag-timezone-plugin.py im plugins Ordner.

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. Erstellen Sie im plugins Verzeichnis eine leere Python-Datei mit dem Namen__init__.py. Ihr plugins Verzeichnis sollte dem folgenden ähnlich sein:

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

Erstellen eines plugins.zip

Die folgenden Schritte zeigen, wie Sie erstellenplugins.zip. Der Inhalt dieses Beispiels kann mit anderen Plugins und Binärdateien in einer einzigen plugins.zip Datei kombiniert werden.

  1. Navigieren Sie in der Befehlszeile zu dem plugins Verzeichnis aus dem vorherigen Schritt. Beispielsweise:

    cd plugins
  2. Komprimieren Sie den Inhalt Ihres plugins Verzeichnisses.

    zip -r ../plugins.zip ./
  3. Laden Sie plugins.zip es in Ihren S3-Bucket hoch

    $ aws s3 cp plugins.zip s3://your-mwaa-bucket/

Codebeispiel

Um die Standardzeitzone (UTC+0) zu ändern, in der das DAG ausgeführt wird, verwenden wir eine Bibliothek namens Pendulum, eine Python-Bibliothek für die Arbeit mit zeitzonensensitiver Datetime.

  1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Sie gespeichert sind. DAGs Beispielsweise:

    $ cd dags
  2. Kopieren Sie den Inhalt des folgenden Beispiels und speichern Sie ihn untertz-aware-dag.py.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. Führen Sie den folgenden AWS CLI Befehl aus, um das in den Bucket Ihrer Umgebung DAG zu kopieren, und lösen Sie es dann DAG mithilfe der Apache Airflow-Benutzeroberfläche aus.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Wenn der Vorgang erfolgreich ist, geben Sie in den Task-Logs eine Ausgabe ähnlich tz_aware_task der tz_test DAG folgenden aus:

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
                    

Als nächstes