Instalación de complementos personalizados - Amazon Managed Workflows para Apache Airflow

Instalación de complementos personalizados

Amazon Managed Workflows para Apache Airflow es compatible con el administrador de complementos integrado en Apache Airflow, lo que le permite utilizar operadores, enlaces, sensores o interfaces personalizados de Apache Airflow. En esta página se describen los pasos para instalar los complementos personalizados de Apache Airflow en su entorno de Amazon Managed Workflows usando un archivo plugins.zip.

Requisitos previos

Para poder llevar a cabo los pasos de esta página, necesitará lo siguiente.

  • Permisos: el administrador debe haber concedido a su cuenta de AWS acceso a la política de control de acceso de AmazonMWAAFullConsoleAccess para su entorno. Además, su rol de ejecución debe permitir que su entorno de Amazon MWAA acceda a los recursos de AWS que utiliza su entorno.

  • Acceso: si tiene que acceder a los repositorios públicos para instalar dependencias directamente en el servidor web, su entorno debe estar configurado con acceso a un servidor web de red pública. Para obtener más información, consulte Modos de acceso de Apache Airflow.

  • Configuración de Amazon S3: el bucket de Amazon S3 que se utiliza para almacenar los DAG, los complementos personalizados en plugins.zip y las dependencias de Python en requirements.txt deben estar configurados con el acceso público bloqueado y el control de versiones activado.

Funcionamiento

Para ejecutar complementos personalizados en su entorno, debe hacer tres cosas:

  1. Cree un archivo plugins.zip local.

  2. Cargue el archivo plugins.zip en su bucket de Amazon S3.

  3. Especifique la versión de este archivo en el campo Archivo de complementos de la consola de Amazon MWAA.

nota

Si es la primera vez que sube un archivo plugins.zip a su bucket de Amazon S3, también tendrá que especificar la ruta al archivo en la consola de Amazon MWAA. Solo necesita realizar este paso una vez.

Cuándo usar los complementos

Los complementos solo son necesarios para ampliar la interfaz de usuario de Apache Airflow, como se describe en la documentación de Apache Airflow. Los operadores personalizados se pueden colocar directamente en la carpeta /dags junto con el código DAG.

Si necesita crear sus propias integraciones con sistemas externos, colóquelas en la carpeta /dags o en una de sus subcarpetas, pero no en la carpeta plugins.zip. En Apache Airflow 2.x, los complementos se utilizan principalmente para ampliar la interfaz de usuario.

Del mismo modo, no se deben colocar otras dependencias en plugins.zip. En su lugar, se pueden almacenar en una ubicación dentro de la carpeta /dags de Amazon S3, donde se sincronizarán con cada contenedor de Amazon MWAA antes de que se inicie Apache Airflow.

nota

Cualquier archivo de la carpeta /dags o en plugins.zip que no defina explícitamente un objeto DAG de Apache Airflow debe figurar en un archivo .airflowignore.

Información general de los complementos personalizados

El administrador de complementos integrado de Apache Airflow puede integrar características externas en su núcleo simplemente colocando archivos en una carpeta $AIRFLOW_HOME/plugins. Le permite utilizar operadores, enlaces, sensores o interfaces personalizados de Apache Airflow. La siguiente sección proporciona un ejemplo de estructuras de directorios planas y anidadas en un entorno de desarrollo local y las instrucciones de importación resultantes, que determinan la estructura de directorios dentro de un plugins.zip.

Directorio de complementos personalizados y límites de tamaño

El programador y los procesos de trabajo de Apache Airflow buscan complementos personalizados durante el inicio en el contenedor de Fargate administrado por AWS para su entorno en /usr/local/airflow/plugins/*.

  • Estructura de directorios. La estructura de directorios (en /*) se basa en el contenido del archivo plugins.zip. Por ejemplo, si su plugins.zip contiene el directorio operators como directorio de nivel superior, el directorio se extraerá a /usr/local/airflow/plugins/operators en su entorno.

  • Límite de tamaño. Se recomienda un archivo plugins.zip de menos de 1 GB. Cuanto mayor sea el tamaño del archivo plugins.zip, mayor será el tiempo de inicio en un entorno. Aunque Amazon MWAA no limita el tamaño de un archivo plugins.zip de forma explícita, si las dependencias no se pueden instalar en diez minutos, el servicio Fargate agotará el tiempo de espera e intentará revertir el entorno a un estado estable.

nota

Para los entornos que utilizan Apache Airflow v1.10.12 o Apache Airflow v2.0.2, Amazon MWAA limita el tráfico saliente en el servidor web Apache Airflow y no le permite instalar complementos ni dependencias de Python directamente en el servidor web. A partir de la versión 2.2.2 de Apache Airflow, Amazon MWAA puede instalar complementos y dependencias directamente en el servidor web.

Ejemplos de complementos personalizados

En la siguiente sección, se utiliza un código de muestra de la guía de referencia de Apache Airflow para mostrar cómo estructurar su entorno de desarrollo local.

Ejemplo de uso de una estructura de directorios plana en plugins.zip

Apache Airflow v2

El siguiente ejemplo muestra un archivo plugins.zip con una estructura de directorios plana para Apache Airflow v2.

ejemplo directorio plano con PythonVirtualEnvOperator plugins.zip

En el siguiente ejemplo se muestra el árbol de nivel superior de un archivo plugins.zip para el complemento personalizado PythonVirtualEnvOperator en Creación de un complemento personalizado para Apache Airflow PythonVirtualEnvOperator.

├── virtual_python_plugin.py
ejemplo plugins/virtual_python_plugin.py

En el siguiente ejemplo se muestra el complemento personalizado PythonVirtualEnvOperator.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

El siguiente ejemplo muestra un archivo plugins.zip con una estructura de directorios plana para Apache Airflow v1.

ejemplo directorio plano con PythonVirtualEnvOperator plugins.zip

En el siguiente ejemplo se muestra el árbol de nivel superior de un archivo plugins.zip para el complemento personalizado PythonVirtualEnvOperator en Creación de un complemento personalizado para Apache Airflow PythonVirtualEnvOperator.

├── virtual_python_plugin.py
ejemplo plugins/virtual_python_plugin.py

En el siguiente ejemplo se muestra el complemento personalizado PythonVirtualEnvOperator.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Ejemplo de uso de una estructura de directorios anidada en plugins.zip

Apache Airflow v2

El siguiente ejemplo muestra un archivo plugins.zip con directorios independientes para hooks, operators y un directorio sensors para Apache Airflow v2.

ejemplo plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

El siguiente ejemplo muestra las instrucciones de importación en el DAG (carpeta DAG) que usa los complementos personalizados.

ejemplo dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
ejemplo plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Los siguientes ejemplos muestran cada una de las instrucciones de importación necesarias en los archivos de complementos personalizados.

ejemplo hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
ejemplo sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
ejemplo operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
ejemplo operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Siga los pasos que se indican en Probar complementos personalizados con la utilidad CLI de Amazon MWAA y, a continuación, en Crear un archivo plugins.zip para comprimir el contenido en su directorio plugins. Por ejemplo, cd plugins.

Apache Airflow v1

El siguiente ejemplo muestra un archivo plugins.zip con directorios independientes para hooks, operators y un directorio sensors para Apache Airflow v1.10.12.

ejemplo plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

El siguiente ejemplo muestra las instrucciones de importación en el DAG (carpeta DAG) que usa los complementos personalizados.

ejemplo dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
ejemplo plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Los siguientes ejemplos muestran cada una de las instrucciones de importación necesarias en los archivos de complementos personalizados.

ejemplo hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
ejemplo sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
ejemplo operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
ejemplo operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Siga los pasos que se indican en Probar complementos personalizados con la utilidad CLI de Amazon MWAA y, a continuación, en Crear un archivo plugins.zip para comprimir el contenido en su directorio plugins. Por ejemplo, cd plugins.

Crear un archivo plugins.zip

En los pasos siguientes se describen los pasos que recomendamos para crear un archivo plugins.zip de forma local.

Paso uno: pruebe los complementos personalizados con la utilidad CLI de Amazon MWAA

  • La utilidad de la interfaz de la línea de comandos (CLI) replica entornos en Amazon Managed Workflows para Apache Airflow de forma local.

  • La CLI crea localmente una imagen de contenedor de Docker similar a una imagen de producción de Amazon MWAA. Esto le permite ejecutar un entorno local de Apache Airflow para desarrollar y probar los DAG, los complementos personalizados y las dependencias antes de implementarlos en Amazon MWAA.

  • Para ejecutar la CLI, consulte aws-mwaa-local-runner en GitHub.

Paso dos: crear el archivo plugins.zip

Puede utilizar una utilidad de archivado ZIP integrada o cualquier otra utilidad ZIP (como 7zip) para crear un archivo .zip.

nota

La utilidad zip integrada para el sistema operativo Windows puede agregar subcarpetas al crear un archivo .zip. Le recomendamos comprobar el contenido del archivo plugins.zip antes de subirlo a su bucket de Amazon S3 para asegurarse de que no se hayan añadido directorios adicionales.

  1. Cambie los directorios a su directorio local de complementos de Airflow. Por ejemplo:

    myproject$ cd plugins
  2. Ejecute el siguiente comando para asegurarse de que el contenido tiene permisos de ejecución (solo para macOS y Linux).

    plugins$ chmod -R 755 .
  3. Comprima el contenido de la carpeta plugins.

    plugins$ zip -r plugins.zip .

Cómo cargar plugins.zip a Amazon S3

Puede usar la consola de Amazon S3 o la AWS Command Line Interface (AWS CLI) para cargar un archivo plugins.zip a su bucket de Amazon S3.

Uso del AWS CLI

La AWS Command Line Interface (AWS CLI) es una herramienta de código abierto que le permite interactuar con los servicios de AWS mediante el uso de comandos en el intérprete de comandos de la línea de comandos. Para completar los pasos de esta página, necesita lo siguiente:

Carga mediante la AWS CLI
  1. En el símbolo del sistema, vaya hasta el directorio en el que está almacenado el archivo plugins.zip. Por ejemplo:

    cd plugins
  2. Use el siguiente comando para obtener una lista de todos los buckets de Amazon S3.

    aws s3 ls
  3. Utilice el comando siguiente para enumerar los archivos y las carpetas del bucket de Amazon S3 para su entorno.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Utilice el siguiente comando para cargar el archivo plugins.zip en el bucket de Amazon S3 para su entorno.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Uso de la consola de Amazon S3

La consola de Amazon S3 es una interfaz de usuario basada en la web que le permite crear y administrar los recursos de su bucket de Amazon S3.

Carga del contenido usando la consola de Amazon S3
  1. Abra la página Entornos en la consola de Amazon MWAA.

  2. Seleccione un entorno.

  3. Seleccione el enlace del bucket S3 en el panel de códigos de DAG en S3 para abrir el bucket de almacenamiento en la consola de Amazon S3.

  4. Seleccione Cargar.

  5. Elija Añadir archivo.

  6. Seleccione la copia local de su plugins.zip, elija Cargar.

Instalación de complementos personalizados en su entorno

En esta sección se describe cómo instalar los complementos personalizados que ha cargado en su bucket de Amazon S3 especificando la ruta al archivo plugins.zip y especificando la versión del archivo plugins.zip cada vez que se actualiza el archivo zip.

Especificación de la ruta a plugins.zip en la consola MWAA de Amazon (la primera vez)

Si es la primera vez que sube un archivo plugins.zip a su bucket de Amazon S3, también tendrá que especificar la ruta al archivo en la consola de Amazon MWAA. Solo necesita realizar este paso una vez.

  1. Abra la página Entornos en la consola de Amazon MWAA.

  2. Seleccione un entorno.

  3. Elija Editar.

  4. En el panel Código DAG en Amazon S3, elija Explorar S3 junto al campo Archivo de complementos: opcional.

  5. Seleccione el archivo plugins.zip en su bucket de Amazon S3.

  6. Seleccione Elegir.

  7. Seleccione Siguiente, Actualizar entorno.

Especificación de la versión de plugins.zip en la consola de Amazon MWAA

Debe especificar la versión de su archivo plugins.zip en la consola de Amazon MWAA cada vez que cargue una nueva versión de su plugins.zip en su bucket de Amazon S3.

  1. Abra la página Entornos en la consola de Amazon MWAA.

  2. Seleccione un entorno.

  3. Elija Editar.

  4. En el panel Código de DAG en Amazon S3, elija una versión de plugins.zip de la lista desplegable.

  5. Elija Siguiente.

Ejemplos de casos de uso de plugins.zip

Siguientes pasos

  • Pruebe sus DAG, complementos personalizados y dependencias de Python de forma local con aws-mwaa-local-runner en GitHub.