Amazon MWAA での DAG のタイムゾーンの変更 - Amazon Managed Workflows for Apache Airflow

Amazon MWAA での DAG のタイムゾーンの変更

Apache Airflow は、有向非巡回グラフ (DAG) をデフォルトで UTC+0 でスケジュールします。次のステップは、Amazon MWAA が Pendulum を使用して DAG を実行するタイムゾーンを変更する方法を示しています。このトピックでは、任意で、環境の Apache Airflow ログのタイムゾーンを変更するカスタムプラグインを作成する方法を示します。

Version

  • このページのコード例は、Python 3.10Apache Airflow v2 と共に使用可能です。

前提条件

このページのサンプルコードを使用するには、以下が必要です。

アクセス許可

  • このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

Airflow ログのタイムゾーンを変更するプラグインを作成する

Apache Airflow は、起動時に plugins ディレクトリ内の Python ファイルを実行します。次のプラグインを使用すると、エグゼキューターのタイムゾーンをオーバーライドできます。これにより、Apache Airflow がログを書き込むタイムゾーンが変更されます。

  1. カスタムプラグイン用に plugins という名前のディレクトリを作成し、そのディレクトリに移動します。例:

    $ mkdir plugins $ cd plugins
  2. 以下のコードサンプルの内容をコピーし、ローカルに dag-timezone-plugin.py として plugins フォルダに保存します。

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. plugins ディレクトリに、__init__.py という名前の空の Python ファイルを作成します。plugins ディレクトリは以下のようになっているはずです。

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

plugins.zip を作成する

以下のステップは、plugins.zip を作成する方法を示しています。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの plugins.zip ファイルにすることができます。

  1. コマンドプロンプトで、前の手順で作成した plugins ディレクトリに移動してください。例:

    cd plugins
  2. plugins ディレクトリ内のコンテンツを圧縮します。

    zip -r ../plugins.zip ./
  3. plugins.zip を S3 バケットにアップロードします

    $ aws s3 cp plugins.zip s3://your-mwaa-bucket/

コードサンプル

DAG が実行されるデフォルトのタイムゾーン (UTC+0) を変更するには、Pendulum というライブラリを使用します。これは、タイムゾーン対応の日時を処理するための Python ライブラリです。

  1. コマンドプロンプトで、DAG が保存されているディレクトリに移動します。例:

    $ cd dags
  2. 以下の例の内容をコピーして、tz-aware-dag.py として保存してください。

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. 以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 成功した場合、tz_test DAG で tz_aware_task のタスクログに以下のような出力が表示されます。

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
                    

次のステップ