Installazione di plugin personalizzati - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Installazione di plugin personalizzati

Amazon Managed Workflows for Apache Airflow supporta il gestore di plugin integrato di Apache Airflow, che consente di utilizzare operatori, hook, sensori o interfacce Apache Airflow personalizzati. Questa pagina descrive i passaggi per installare i plugin personalizzati Apache Airflow sul tuo MWAA ambiente Amazon utilizzando un file. plugins.zip

Prerequisiti

Avrai bisogno di quanto segue prima di completare i passaggi di questa pagina.

  • Autorizzazioni: l'amministratore deve aver concesso all' AWS account l'accesso alla politica di controllo degli mazonMWAAFull ConsoleAccess accessi A per l'ambiente in uso. Inoltre, il tuo MWAA ambiente Amazon deve essere autorizzato dal tuo ruolo di esecuzione ad accedere alle AWS risorse utilizzate dal tuo ambiente.

  • Accesso: se è necessario accedere agli archivi pubblici per installare le dipendenze direttamente sul server Web, l'ambiente deve essere configurato con l'accesso al server Web di rete pubblica. Per ulteriori informazioni, consulta Modalità di accesso Apache Airflow.

  • Configurazione Amazon S3 : il bucket Amazon S3 utilizzato per archiviare i plug-in plugins.zip personalizzati e DAGs le requirements.txt dipendenze Python deve essere configurato con Public Access Blocked e Versioning Enabled.

Come funziona

Per eseguire plugin personalizzati nel tuo ambiente, devi fare tre cose:

  1. Crea un plugins.zip file localmente.

  2. Carica il plugins.zip file locale nel tuo bucket Amazon S3.

  3. Specificare la versione di questo file nel campo File Plugins sulla MWAA console Amazon.

Nota

Se è la prima volta che carichi un plugins.zip file nel tuo bucket Amazon S3, devi anche specificare il percorso del file sulla console Amazon. MWAA Devi completare questo passaggio solo una volta.

Quando usare i plugin

I plugin sono necessari solo per estendere l'interfaccia utente di Apache Airflow, come indicato nella documentazione di Apache Airflow. Gli operatori personalizzati possono essere inseriti direttamente nella cartella insieme al codice. /dags DAG

Se devi creare le tue integrazioni con sistemi esterni, inseriscile nella dags cartella/o in una sottocartella al suo interno, ma non nella plugins.zip cartella. In Apache Airflow 2.x, i plugin vengono utilizzati principalmente per estendere l'interfaccia utente.

Allo stesso modo, non devono essere inserite altre dipendenze. plugins.zip Possono invece essere archiviati in una posizione all'interno della /dags cartella Amazon S3, dove verranno sincronizzati con ogni MWAA contenitore Amazon prima dell'avvio di Apache Airflow.

Nota

Qualsiasi file nella /dags cartella o in plugins.zip quella cartella che non definisce esplicitamente un DAG oggetto Apache Airflow deve essere elencato in un file. .airflowignore

Panoramica dei plugin personalizzati

Il gestore di plugin integrato di Apache Airflow può integrare funzionalità esterne al suo interno semplicemente trascinando i file in una cartella. $AIRFLOW_HOME/plugins Consente di utilizzare operatori, hook, sensori o interfacce Apache Airflow personalizzati. La sezione seguente fornisce un esempio di strutture di directory piatte e annidate in un ambiente di sviluppo locale e le istruzioni di importazione risultanti, che determinano la struttura delle directory all'interno di un plugins.zip.

Limiti di directory e dimensioni dei plugin personalizzati

Apache Airflow Scheduler e Workers cercano plugin personalizzati durante l'avvio sul contenitore Fargate gestito da AWS Fargate per il vostro ambiente in. /usr/local/airflow/plugins/*

  • Struttura delle directory. La struttura delle cartelle (at/*) si basa sul contenuto del plugins.zip file. Ad esempio, se la directory è plugins.zip operators contenuta come directory di primo livello, la directory verrà estratta nell'ambiente /usr/local/airflow/plugins/operators in cui si trova.

  • Limite di dimensione. Consigliamo un plugins.zip file inferiore a 1 GB. Maggiore è la dimensione di un plugins.zip file, maggiore è il tempo di avvio in un ambiente. Sebbene Amazon MWAA non limiti esplicitamente la dimensione di un plugins.zip file, se le dipendenze non possono essere installate entro dieci minuti, il servizio Fargate andrà in timeout e tenterà di ripristinare l'ambiente a uno stato stabile.

Nota

Per gli ambienti che utilizzano Apache Airflow v1.10.12 o Apache Airflow v2.0.2, MWAA Amazon limita il traffico in uscita sul server Web Apache Airflow e non consente di installare plugin o dipendenze Python direttamente sul server Web. A partire da Apache Airflow v2.2.2, MWAA Amazon può installare plugin e dipendenze direttamente sul server web.

Esempi di plugin personalizzati

La sezione seguente utilizza il codice di esempio contenuto nella guida di riferimento di Apache Airflow per mostrare come strutturare l'ambiente di sviluppo locale.

Esempio di utilizzo di una struttura di directory piatta in plugins.zip

Apache Airflow v2

L'esempio seguente mostra un plugins.zip file con una struttura di directory piatta per Apache Airflow v2.

Esempio directory piatta con plugins.zip PythonVirtualenvOperator

L'esempio seguente mostra l'albero di primo livello di un file plugins.zip per il plugin PythonVirtualenvOperator personalizzato inCreazione di un plugin personalizzato per Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
Esempio plugins/virtual_python_plugin.py

L'esempio seguente mostra il plugin PythonVirtualenvOperator personalizzato.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

L'esempio seguente mostra un plugins.zip file con una struttura di directory piatta per Apache Airflow v1.

Esempio directory piatta con plugins.zip PythonVirtualenvOperator

L'esempio seguente mostra l'albero di primo livello di un file plugins.zip per il plugin PythonVirtualenvOperator personalizzato inCreazione di un plugin personalizzato per Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
Esempio plugins/virtual_python_plugin.py

L'esempio seguente mostra il plugin PythonVirtualenvOperator personalizzato.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Esempio di utilizzo di una struttura di directory annidata in plugins.zip

Apache Airflow v2

L'esempio seguente mostra un plugins.zip file con directory separate per e una sensors directory per hooks Apache Airflow v2. operators

Esempio plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

L'esempio seguente mostra le istruzioni di importazione nella DAG (DAGscartella) che utilizza i plugin personalizzati.

Esempio dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Esempio plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

I seguenti esempi mostrano ciascuna delle istruzioni di importazione necessarie nei file dei plugin personalizzati.

Esempio hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Esempio sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Esempio operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Esempio operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Segui la procedura descritta in Testare i plugin personalizzati utilizzando l'MWAACLIutilità Amazon, quindi Creazione di un file plugins.zip per comprimere i contenuti all'interno della tua plugins directory. Ad esempio cd plugins.

Apache Airflow v1

L'esempio seguente mostra un plugins.zip file con directory separate per e una sensors directory per hooks Apache Airflow v1.10.12. operators

Esempio plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

L'esempio seguente mostra le istruzioni di importazione nella DAG (DAGscartella) che utilizza i plugin personalizzati.

Esempio dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Esempio plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

I seguenti esempi mostrano ciascuna delle istruzioni di importazione necessarie nei file dei plugin personalizzati.

Esempio hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Esempio sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Esempio operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Esempio operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Segui la procedura descritta in Testare i plugin personalizzati utilizzando l'MWAACLIutilità Amazon, quindi Creazione di un file plugins.zip per comprimere i contenuti all'interno della tua plugins directory. Ad esempio cd plugins.

Creazione di un file plugins.zip

I passaggi seguenti descrivono i passaggi consigliati per creare un file plugins.zip localmente.

Fase uno: testare i plugin personalizzati utilizzando l'utilità Amazon MWAA CLI

  • L'utilità Command Line Interface (CLI) replica localmente un ambiente Amazon Managed Workflows for Apache Airflow.

  • CLICrea localmente un'immagine del contenitore Docker simile a un'immagine di MWAA produzione Amazon. Ciò consente di eseguire un ambiente Apache Airflow locale per sviluppare e testare DAGs plug-in personalizzati e dipendenze prima della distribuzione su Amazon. MWAA

  • Per eseguire il, vedi onCLI. aws-mwaa-local-runner GitHub

Fase due: creare il file plugins.zip

È possibile utilizzare un'utilità di ZIP archiviazione integrata o qualsiasi altra ZIP utilità (ad esempio 7zip) per creare un file.zip.

Nota

L'utilità zip integrata per il sistema operativo Windows può aggiungere sottocartelle quando si crea un file con estensione zip. Ti consigliamo di verificare il contenuto del file plugins.zip prima di caricarlo nel tuo bucket Amazon S3 per assicurarti che non siano state aggiunte altre directory.

  1. Cambia le directory nella cartella locale dei plugin Airflow. Per esempio:

    myproject$ cd plugins
  2. Esegui il comando seguente per assicurarti che i contenuti abbiano autorizzazioni eseguibili (solo macOS e Linux).

    plugins$ chmod -R 755 .
  3. Comprimi il contenuto all'interno della cartellaplugins.

    plugins$ zip -r plugins.zip .

Caricamento plugins.zip su Amazon S3

Puoi utilizzare la console Amazon S3 o il AWS Command Line Interface (AWS CLI) per caricare un plugins.zip file nel tuo bucket Amazon S3.

Usando il AWS CLI

Il AWS Command Line Interface (AWS CLI) è uno strumento open source che consente di interagire con i AWS servizi utilizzando i comandi nella shell della riga di comando. Per completare la procedura descritta in questa pagina, è necessario quanto segue:

Per caricare utilizzando il AWS CLI
  1. Nel prompt dei comandi, accedi alla directory in cui è archiviato il plugins.zip file. Per esempio:

    cd plugins
  2. Usa il seguente comando per elencare tutti i tuoi bucket Amazon S3.

    aws s3 ls
  3. Usa il seguente comando per elencare i file e le cartelle nel bucket Amazon S3 per il tuo ambiente.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Usa il seguente comando per caricare il plugins.zip file nel bucket Amazon S3 per il tuo ambiente.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Utilizzo della console Amazon S3

La console Amazon S3 è un'interfaccia utente basata sul Web che consente di creare e gestire le risorse nel bucket Amazon S3.

Per caricare utilizzando la console Amazon S3
  1. Apri la pagina Ambienti sulla MWAA console Amazon.

  2. Scegli un ambiente.

  3. Seleziona il link al bucket S3 nel DAGcodice nel riquadro S3 per aprire il bucket di storage sulla console Amazon S3.

  4. Scegli Carica.

  5. Scegli Aggiungi file.

  6. Seleziona la copia locale del tuoplugins.zip, scegli Carica.

Installazione di plugin personalizzati nel tuo ambiente

Questa sezione descrive come installare i plugin personalizzati caricati nel bucket Amazon S3 specificando il percorso del file plugins.zip e specificando la versione del file plugins.zip ogni volta che il file zip viene aggiornato.

Specificazione del percorso plugins.zip sulla MWAA console Amazon (la prima volta)

Se è la prima volta che carichi un plugins.zip file nel tuo bucket Amazon S3, devi anche specificare il percorso del file sulla console Amazon. MWAA Devi completare questo passaggio solo una volta.

  1. Apri la pagina Ambienti sulla MWAA console Amazon.

  2. Scegli un ambiente.

  3. Scegli Modifica.

  4. Nel riquadro del DAGcodice nel riquadro Amazon S3, scegli Browse S3 accanto al file Plugins (campo opzionale).

  5. Seleziona il plugins.zip file nel tuo bucket Amazon S3.

  6. Scegliere Choose (Scegli).

  7. Scegli Avanti, Aggiorna ambiente.

Specificare la plugins.zip versione sulla console Amazon MWAA

Devi specificare la versione del tuo plugins.zip file sulla MWAA console Amazon ogni volta che carichi una nuova versione del tuo file plugins.zip nel tuo bucket Amazon S3.

  1. Apri la pagina Ambienti sulla MWAA console Amazon.

  2. Scegli un ambiente.

  3. Scegli Modifica.

  4. Nel riquadro del DAGcodice in Amazon S3, scegli una plugins.zip versione nell'elenco a discesa.

  5. Scegli Next (Successivo).

Esempi di casi d'uso per plugins.zip

Fasi successive

  • Testa DAGs i tuoi plugin personalizzati e le dipendenze Python localmente usando on. aws-mwaa-local-runner GitHub