Cómo cambiar la zona horaria de un DAG en Amazon MWAA - Amazon Managed Workflows para Apache Airflow

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Cómo cambiar la zona horaria de un DAG en Amazon MWAA

Apache Airflow programa su gráfico acíclico dirigido (DAG) en UTC+0 de forma predeterminada. Los siguientes pasos muestran cómo puede cambiar la zona horaria en la que Amazon MWAA ejecuta sus DAG con Pendulum. Opcionalmente, en este tema se muestra cómo puede crear un complemento personalizado para cambiar la zona horaria de los registros de Apache Airflow de su entorno.

Versión

  • Puede usar el código de ejemplo que aparece en esta página con Apache Airflow v2 en Python 3.10.

Requisitos previos

Para usar el código de muestra de esta página, necesitará lo siguiente:

Permisos

  • No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

Creación de un complemento para cambiar la zona horaria en los registros de Airflow

Apache Airflow ejecutará los archivos de Python en el directorio de plugins al inicio. Con el siguiente complemento, puede anular la zona horaria del ejecutor, lo que modifica la zona horaria en la que Apache Airflow escribe los registros.

  1. Cree un directorio llamado plugins para su complemento personalizado y vaya al directorio. Por ejemplo:

    $ mkdir plugins $ cd plugins
  2. Copie el contenido de la siguiente muestra de código y guárdelo localmente como dag-timezone-plugin.py en la carpeta plugins.

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. En el directorio plugins, cree un archivo de Python vacío llamado __init__.py. Su directorio plugins debería ser similar a lo siguiente:

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

Creación de un plugins.zip

Los siguientes pasos muestran cómo crear plugins.zip. El contenido de esta muestra se puede combinar con otros complementos y binarios en un solo archivo plugins.zip.

  1. En el símbolo del sistema, vaya hasta el directorio plugins del paso anterior. Por ejemplo:

    cd plugins
  2. Comprima el contenido en su directorio plugins.

    zip -r ../plugins.zip ./
  3. Carga de plugins.zip en el bucket S3

    $ aws s3 cp plugins.zip s3://your-mwaa-bucket/

Código de ejemplo

Para cambiar la zona horaria predeterminada (UTC+0) en la que se ejecuta el DAG, utilizaremos una biblioteca llamada Pendulum, una biblioteca de Python para trabajar con una datetime y conocer la zona horaria.

  1. En el símbolo del sistema, vaya hasta el directorio en el que están almacenados los DAG. Por ejemplo:

    $ cd dags
  2. Copie el contenido de la siguiente muestra y guárdelo como tz-aware-dag.py.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. Ejecute el siguiente comando de la AWS CLI para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Si se ejecuta correctamente, obtendrá un resultado similar al siguiente en los registros para tz_aware_task en el tz_test DAG:

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
                    

Siguientes pasos