Installation de plugins personnalisés - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Installation de plugins personnalisés

Amazon Managed Workflows pour Apache Airflow prend en charge le gestionnaire de plugins intégré d'Apache Airflow, qui vous permet d'utiliser des opérateurs, des hooks, des capteurs ou des interfaces Apache Airflow personnalisés. Cette page décrit les étapes à suivre pour installer les plugins personnalisés Apache Airflow sur votre MWAA environnement Amazon à l'aide d'un plugins.zip fichier.

Prérequis

Vous aurez besoin des éléments suivants avant de pouvoir effectuer les étapes indiquées sur cette page.

  • Autorisations — Votre AWS compte doit avoir été autorisé par votre administrateur à accéder à la politique de contrôle d'mazonMWAAFullConsoleAccessaccès A pour votre environnement. En outre, votre MWAA environnement Amazon doit être autorisé par votre rôle d'exécution à accéder aux AWS ressources utilisées par votre environnement.

  • Accès : si vous devez accéder à des référentiels publics pour installer des dépendances directement sur le serveur Web, votre environnement doit être configuré avec un accès au serveur Web du réseau public. Pour de plus amples informations, veuillez consulter Modes d'accès à Apache Airflow.

  • Configuration Amazon S3 — Le compartiment Amazon S3 utilisé pour stocker vos DAGs plugins personnalisés et vos dépendances Python requirements.txt doit être configuré avec l'accès public bloqué et le versionnage activé. plugins.zip

Fonctionnement

Pour exécuter des plugins personnalisés sur votre environnement, vous devez effectuer trois opérations :

  1. Créez un plugins.zip fichier localement.

  2. Téléchargez le plugins.zip fichier local dans votre compartiment Amazon S3.

  3. Spécifiez la version de ce fichier dans le champ Fichier de plugins de la MWAA console Amazon.

Note

Si c'est la première fois que vous chargez un plugins.zip fichier dans votre compartiment Amazon S3, vous devez également spécifier le chemin d'accès au fichier sur la MWAA console Amazon. Vous ne devez effectuer cette étape qu'une seule fois.

Quand utiliser les plugins

Les plug-ins ne sont nécessaires que pour étendre l'interface utilisateur d'Apache Airflow, comme indiqué dans la documentation d'Apache Airflow. Les opérateurs personnalisés peuvent être placés directement dans le /dags dossier à côté de votre DAG code.

Si vous devez créer vos propres intégrations avec des systèmes externes, placez-les dans le dags dossier/ou dans l'un de ses sous-dossiers, mais pas dans le plugins.zip dossier. Dans Apache Airflow 2.x, les plugins sont principalement utilisés pour étendre l'interface utilisateur.

De même, aucune autre dépendance ne doit être inséréeplugins.zip. Ils peuvent plutôt être stockés dans un emplacement situé sous le /dags dossier Amazon S3, où ils seront synchronisés avec chaque MWAA conteneur Amazon avant le démarrage d'Apache Airflow.

Note

Tout fichier du /dags dossier ou plugins.zip qui ne définit pas explicitement un DAG objet Apache Airflow doit être répertorié dans un .airflowignore fichier.

Vue d'ensemble des plugins personnalisés

Le gestionnaire de plugins intégré à Apache Airflow peut intégrer des fonctionnalités externes à son cœur en déposant simplement des fichiers dans un $AIRFLOW_HOME/plugins dossier. Il vous permet d'utiliser des opérateurs, des hooks, des capteurs ou des interfaces Apache Airflow personnalisés. La section suivante fournit un exemple de structures de répertoires plates et imbriquées dans un environnement de développement local et les instructions d'importation qui en résultent, qui déterminent la structure de répertoire dans un fichier plugins.zip.

Répertoire des plugins personnalisés et limites de taille

Le planificateur Apache Airflow et les Workers recherchent des plugins personnalisés lors du démarrage sur le conteneur AWS Fargate géré pour votre environnement sur. /usr/local/airflow/plugins/*

  • Structure du répertoire. La structure du répertoire (at/*) est basée sur le contenu de votre plugins.zip fichier. Par exemple, si vous plugins.zip incluez le operators répertoire en tant que répertoire de premier niveau, le répertoire sera extrait dans votre environnement. /usr/local/airflow/plugins/operators

  • Limite de taille. Nous recommandons un plugins.zip fichier de moins de 1 Go. Plus la taille d'un plugins.zip fichier est importante, plus le temps de démarrage d'un environnement est long. Bien qu'Amazon MWAA ne limite pas explicitement la taille d'un plugins.zip fichier, si les dépendances ne peuvent pas être installées dans les dix minutes, le service Fargate expirera et tentera de rétablir la stabilité de l'environnement.

Note

Pour les environnements utilisant Apache Airflow v1.10.12 ou Apache Airflow v2.0.2, Amazon MWAA limite le trafic sortant sur le serveur Web Apache Airflow et ne vous autorise pas à installer des plugins ni des dépendances Python directement sur le serveur Web. À partir d'Apache Airflow v2.2.2, Amazon MWAA peut installer des plugins et des dépendances directement sur le serveur Web.

Exemples de plugins personnalisés

La section suivante utilise un exemple de code du guide de référence Apache Airflow pour montrer comment structurer votre environnement de développement local.

Exemple d'utilisation d'une structure de répertoire plate dans plugins.zip

Apache Airflow v2

L'exemple suivant montre un plugins.zip fichier avec une structure de répertoire plate pour Apache Airflow v2.

Exemple répertoire plat avec PythonVirtualenvOperator plugins.zip

L'exemple suivant montre l'arborescence de niveau supérieur d'un fichier plugins.zip pour le plugin PythonVirtualenvOperator personnalisé dansCréation d'un plugin personnalisé pour Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
Exemple plugins/virtual_python_plugin.py

L'exemple suivant montre le plugin PythonVirtualenvOperator personnalisé.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

L'exemple suivant montre un plugins.zip fichier avec une structure de répertoire plate pour Apache Airflow v1.

Exemple répertoire plat avec PythonVirtualenvOperator plugins.zip

L'exemple suivant montre l'arborescence de niveau supérieur d'un fichier plugins.zip pour le plugin PythonVirtualenvOperator personnalisé dansCréation d'un plugin personnalisé pour Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
Exemple plugins/virtual_python_plugin.py

L'exemple suivant montre le plugin PythonVirtualenvOperator personnalisé.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Exemple d'utilisation d'une structure de répertoire imbriquée dans plugins.zip

Apache Airflow v2

L'exemple suivant montre un plugins.zip fichier avec des répertoires distincts pourhooks,operators, et un sensors répertoire pour Apache Airflow v2.

Exemple plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

L'exemple suivant montre les instructions d'importation dans le DAG (DAGsdossier) qui utilise les plugins personnalisés.

Exemple dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Exemple plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Les exemples suivants montrent chacune des instructions d'importation nécessaires dans les fichiers de plug-in personnalisés.

Exemple hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Exemple sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Exemple operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Exemple operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Suivez les étapes décrites dans Tester des plugins personnalisés à l'aide de l'MWAACLIutilitaire Amazon, puis dans Création d'un fichier plugins.zip pour compresser le contenu de votre pluginsrépertoire. Par exemple, cd plugins.

Apache Airflow v1

L'exemple suivant montre un plugins.zip fichier avec des répertoires distincts pour hooksoperators, et un sensors répertoire pour Apache Airflow v1.10.12.

Exemple plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

L'exemple suivant montre les instructions d'importation dans le DAG (DAGsdossier) qui utilise les plugins personnalisés.

Exemple dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Exemple plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Les exemples suivants montrent chacune des instructions d'importation nécessaires dans les fichiers de plug-in personnalisés.

Exemple hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Exemple sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Exemple operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Exemple operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Suivez les étapes décrites dans Tester des plugins personnalisés à l'aide de l'MWAACLIutilitaire Amazon, puis dans Création d'un fichier plugins.zip pour compresser le contenu de votre pluginsrépertoire. Par exemple, cd plugins.

Création d'un fichier plugins.zip

Les étapes suivantes décrivent les étapes que nous recommandons pour créer un fichier plugins.zip localement.

Première étape : tester les plugins personnalisés à l'aide de l'MWAACLIutilitaire Amazon

  • L'utilitaire d'interface de ligne de commande (CLI) réplique localement un environnement Amazon Managed Workflows pour Apache Airflow.

  • CLIcrée localement une image de conteneur Docker similaire à une image de MWAA production Amazon. Cela vous permet d'exécuter un environnement Apache Airflow local pour développer et tester DAGs des plugins personnalisés et des dépendances avant le déploiement sur AmazonMWAA.

  • Pour exécuter leCLI, reportez-vous à la section aws-mwaa-local-runnerActivé GitHub.

Deuxième étape : créer le fichier plugins.zip

Vous pouvez utiliser un utilitaire d'ZIParchivage intégré ou tout autre ZIP utilitaire (tel que 7zip) pour créer un fichier .zip.

Note

L'utilitaire zip intégré pour le système d'exploitation Windows peut ajouter des sous-dossiers lorsque vous créez un fichier .zip. Nous vous recommandons de vérifier le contenu du fichier plugins.zip avant de le télécharger dans votre compartiment Amazon S3 afin de vous assurer qu'aucun répertoire supplémentaire n'a été ajouté.

  1. Remplacez les répertoires par votre répertoire de plugins Airflow local. Par exemple :

    myproject$ cd plugins
  2. Exécutez la commande suivante pour vous assurer que le contenu dispose d'autorisations exécutables (macOS et Linux uniquement).

    plugins$ chmod -R 755 .
  3. Compressez le contenu de votre plugins dossier.

    plugins$ zip -r plugins.zip .

Téléchargement plugins.zip vers Amazon S3

Vous pouvez utiliser la console Amazon S3 ou le AWS Command Line Interface (AWS CLI) pour charger un plugins.zip fichier dans votre compartiment Amazon S3.

À l'aide du AWS CLI

The AWS Command Line Interface (AWS CLI) est un outil open source qui vous permet d'interagir avec les AWS services à l'aide de commandes dans votre shell de ligne de commande. Pour effectuer les étapes indiquées sur cette page, vous avez besoin des éléments suivants :

Pour effectuer un téléchargement à l'aide du AWS CLI
  1. Dans votre invite de commande, accédez au répertoire dans lequel votre plugins.zip fichier est stocké. Par exemple :

    cd plugins
  2. Utilisez la commande suivante pour répertorier tous vos compartiments Amazon S3.

    aws s3 ls
  3. Utilisez la commande suivante pour répertorier les fichiers et les dossiers du compartiment Amazon S3 de votre environnement.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Utilisez la commande suivante pour charger le plugins.zip fichier dans le compartiment Amazon S3 de votre environnement.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Utilisation de la console Amazon S3

La console Amazon S3 est une interface utilisateur Web qui vous permet de créer et de gérer les ressources de votre compartiment Amazon S3.

Pour charger à l'aide de la console Amazon S3
  1. Ouvrez la page Environnements sur la MWAA console Amazon.

  2. Choisissez un environnement.

  3. Sélectionnez le lien du compartiment S3 dans le DAGcode du volet S3 pour ouvrir votre compartiment de stockage sur la console Amazon S3.

  4. Sélectionnez Charger.

  5. Choisissez Ajouter un fichier.

  6. Sélectionnez la copie locale de votre fichierplugins.zip, puis choisissez Upload.

Installation de plugins personnalisés sur votre environnement

Cette section décrit comment installer les plugins personnalisés que vous avez chargés dans votre compartiment Amazon S3 en spécifiant le chemin d'accès au fichier plugins.zip et en spécifiant la version du fichier plugins.zip chaque fois que le fichier zip est mis à jour.

Spécifier le chemin d'accès plugins.zip sur la MWAA console Amazon (pour la première fois)

Si c'est la première fois que vous chargez un plugins.zip fichier dans votre compartiment Amazon S3, vous devez également spécifier le chemin d'accès au fichier sur la MWAA console Amazon. Vous ne devez effectuer cette étape qu'une seule fois.

  1. Ouvrez la page Environnements sur la MWAA console Amazon.

  2. Choisissez un environnement.

  3. Choisissez Modifier.

  4. Dans le DAG code du volet Amazon S3, choisissez Browse S3 à côté du champ facultatif « Fichier de plugins ».

  5. Sélectionnez le plugins.zip fichier dans votre compartiment Amazon S3.

  6. Choisissez Choisir.

  7. Choisissez Suivant, Mettre à jour l'environnement.

Spécification de la plugins.zip version sur la MWAA console Amazon

Vous devez spécifier la version de votre plugins.zip fichier sur la MWAA console Amazon chaque fois que vous chargez une nouvelle version de votre fichier plugins.zip dans votre compartiment Amazon S3.

  1. Ouvrez la page Environnements sur la MWAA console Amazon.

  2. Choisissez un environnement.

  3. Choisissez Modifier.

  4. Dans le DAG code du volet Amazon S3, choisissez une plugins.zip version dans la liste déroulante.

  5. Choisissez Suivant.

Exemples de cas d'utilisation pour plugins.zip

Quelle est la prochaine étape ?

  • Testez vos DAGs plugins personnalisés et vos dépendances Python localement à l'aide de l'option aws-mwaa-local-runneron GitHub.