Como instalar plug-ins personalizados - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Como instalar plug-ins personalizados

O Amazon Managed Workflows for Apache Airflow oferece suporte ao gerenciador de plug-ins integrado do Apache Airflow, permitindo o uso de operadores, hooks, sensores ou interfaces personalizados do Apache Airflow. Esta página descreve as etapas para instalar plugins personalizados do Apache Airflow em seu ambiente do Amazon MWAA usando um arquivo plugins.zip.

Pré-requisitos

Você precisará do seguinte antes de concluir as etapas nesta página.

  • Permissões: sua conta AWS deve ter o acesso concedido por seu administrador para a política de controle de acesso AmazonMWAAFullConsoleAccess para seu ambiente. Além disso, seu ambiente Amazon MWAA deve ser autorizado pela seu perfil de execução para acessar os recursos da AWS usados pelo seu ambiente.

  • Acesso: se você precisar de acesso a repositórios públicos para instalar dependências diretamente no servidor web, seu ambiente deverá ser configurado com acesso ao servidor web de rede pública. Para ter mais informações, consulte Modos de acesso do Apache Airflow.

  • Configuração do Amazon S3: o bucket do Amazon S3 usado para armazenar seus DAGs, plug-ins personalizados em plugins.zip e dependências do Python em requirements.txt deve ser configurado com Acesso público bloqueado e Versionamento habilitado.

Como funciona

Para executar plug-ins personalizados em seu ambiente, você deve fazer três coisas:

  1. Crie um arquivo plugins.zip localmente.

  2. Faça upload do arquivo plugins.zip local para seu bucket no Amazon S3.

  3. Especifique a versão desse arquivo no campo Arquivo de plug-ins no console do Amazon MWAA.

nota

Se for a primeira vez que você faz o upload de um plugins.zip para o seu bucket do Amazon S3, também será preciso especificar o caminho para o arquivo no console do Amazon MWAA. Você só precisa concluir esta etapa uma vez.

Quando usar os plug-ins

Os plug-ins são necessários somente para estender a interface de usuário do Apache Airflow, conforme descrito na documentação do Apache Airflow. Operadores personalizados podem ser colocados diretamente na pasta /dags ao lado do código DAG.

Caso precise criar suas próprias integrações com sistemas externos, coloque-as na pasta /dags ou em uma subpasta dentro dela, mas não na pasta plugins.zip. No Apache Airflow 2.x, os plug-ins são usados principalmente para estender a interface do usuário.

Da mesma forma, outras dependências não devem ser inseridas em plugins.zip. Em vez disso, elas podem ser armazenados em um local na pasta /dags do Amazon S3, onde serão sincronizadas com cada contêiner do Amazon MWAA antes do início do Apache Airflow.

nota

Qualquer arquivo na pasta /dags ou em plugins.zip que não defina explicitamente um objeto DAG do Apache Airflow deve ser listado em um arquivo .airflowignore.

Visão geral dos plug-ins personalizados

O gerenciador de plug-ins embutido do Apache Airflow pode integrar atributos externos ao núcleo simplesmente soltando arquivos em uma pasta $AIRFLOW_HOME/plugins. Ele permite que você use operadores, hooks, sensores ou interfaces personalizados do Apache Airflow. A seção a seguir fornece um exemplo de estruturas de diretórios simples e aninhadas em um ambiente de desenvolvimento local e as instruções de importação resultantes, que determinam a estrutura de diretórios em um plugins.zip.

Diretório de plug-ins personalizados e limites de tamanho

O programador e os operadores do Apache Airflow procuram plug-ins personalizados durante a inicialização no contêiner Fargate gerenciado pela AWS para seu ambiente em /usr/local/airflow/plugins/*.

  • Estrutura de diretório. A estrutura do diretório (em /*) é baseada no conteúdo do seu arquivo plugins.zip. Por exemplo, se seu plugins.zip contiver o diretório operators como um diretório de nível superior, o diretório será extraído para /usr/local/airflow/plugins/operators em seu ambiente.

  • Limites de tamanho. Recomendamos um arquivo plugins.zip com menos de 1 GB. Quanto maior o tamanho de um arquivo plugins.zip, maior o tempo de inicialização em um ambiente. Embora o Amazon MWAA não limite explicitamente o tamanho de um arquivo plugins.zip, se as dependências não puderem ser instaladas em dez minutos, o serviço Fargate atingirá o tempo limite e tentará reverter o ambiente para um estado estável.

nota

Para ambientes que usam o Apache Airflow v1.10.12 ou o Apache Airflow v2.0.2, o Amazon MWAA limita o tráfego de saída no servidor web Apache Airflow e não permite que você instale plug-ins nem dependências do Python diretamente no servidor web. A partir do Apache Airflow v2.2.2, o Amazon MWAA pode instalar plug-ins e dependências diretamente no servidor web.

Exemplos de plug-ins personalizados

A seção a seguir usa um exemplo de código no Guia de referência do Apache Airflow para mostrar como estruturar seu ambiente de desenvolvimento local.

Exemplo de uso de uma estrutura de diretórios simples em plugins.zip

Apache Airflow v2

O exemplo a seguir mostra um arquivo plugins.zip com uma estrutura de diretório simples para o Apache Airflow v2.

exemplo diretório simples com PythonVirtualEnvOperator plugins.zip

O exemplo a seguir mostra a árvore de nível superior de um arquivo plugins.zip para o plug-in personalizado PythonVirtualEnvOperator em Como criar um plug-in personalizado para PythonVirtualEnvOperator do Apache Airflow.

├── virtual_python_plugin.py
exemplo plugins/virtual_python_plugin.py

O exemplo a seguir mostra o plug-in personalizado do PythonVirtualEnvOperator.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

O exemplo a seguir mostra um arquivo plugins.zip com uma estrutura de diretório simples para o Apache Airflow v1.

exemplo diretório simples com PythonVirtualEnvOperator plugins.zip

O exemplo a seguir mostra a árvore de nível superior de um arquivo plugins.zip para o plug-in personalizado PythonVirtualEnvOperator em Como criar um plug-in personalizado para PythonVirtualEnvOperator do Apache Airflow.

├── virtual_python_plugin.py
exemplo plugins/virtual_python_plugin.py

O exemplo a seguir mostra o plug-in personalizado do PythonVirtualEnvOperator.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Exemplo de uso de uma estrutura de diretórios aninhados em plugins.zip

Apache Airflow v2

O exemplo a seguir mostra um arquivo plugins.zip com diretórios separados para hooks, operators e um diretório sensors para o Apache Airflow v2.

exemplo plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

O exemplo a seguir mostra as instruções de importação no DAG (pasta DAGs) que usa os plug-ins personalizados.

exemplo dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
exemplo plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Os exemplos a seguir mostram cada uma das instruções de importação necessárias nos arquivos de plug-in personalizados.

exemplo hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
exemplo sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
exemplo operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
exemplo operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Siga as etapas em Teste de plug-ins personalizados usando o utilitário Amazon MWAA CLI e Criando um arquivo plugins.zip para compactar o conteúdo dentro em seu diretório plugins. Por exemplo, cd plugins.

Apache Airflow v1

O exemplo a seguir mostra um arquivo plugins.zip com diretórios separados para hooks, operators e um diretório sensors para o Apache Airflow v1.10.12.

exemplo plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

O exemplo a seguir mostra as instruções de importação no DAG (pasta DAGs) que usa os plug-ins personalizados.

exemplo dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
exemplo plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Os exemplos a seguir mostram cada uma das instruções de importação necessárias nos arquivos de plug-in personalizados.

exemplo hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
exemplo sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
exemplo operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
exemplo operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Siga as etapas em Teste de plug-ins personalizados usando o utilitário Amazon MWAA CLI e Criando um arquivo plugins.zip para compactar o conteúdo dentro em seu diretório plugins. Por exemplo, cd plugins.

Criação de um arquivo plugins.zip

As etapas a seguir descrevem as etapas que recomendamos para criar um arquivo plugins.zip localmente.

Etapa 1: testar plug-ins personalizados usando o utilitário Amazon MWAA CLI

  • O utilitário da interface de linha de comandos (CLI) replica localmente um ambiente do Amazon Managed Workflows for Apache Airflow.

  • A CLI cria localmente uma imagem de contêiner Docker semelhante a uma imagem de produção do Amazon MWAA. Isso permite que você execute um ambiente Apache Airflow local para desenvolver e testar DAGs, plug-ins personalizados e dependências antes da implantação no Amazon MWAA.

  • Para executar a CLI, consulte aws-mwaa-local-runner no GitHub.

Etapa 2: criar o arquivo plugins.zip

É possível usar um utilitário de arquivamento ZIP incorporado ou qualquer outro utilitário ZIP (como 7zip) para criar um arquivo.zip.

nota

O utilitário zip integrado para o sistema operacional Windows pode adicionar subpastas quando você cria um arquivo .zip. Recomendamos verificar o conteúdo do arquivo plugins.zip antes de fazer o upload para o bucket do Amazon S3 para garantir que nenhum diretório adicional tenha sido adicionado.

  1. Altere os diretórios para o diretório local de plug-ins do Airflow. Por exemplo:

    myproject$ cd plugins
  2. Execute o comando a seguir para garantir que o conteúdo tenha permissões executáveis (somente macOS e Linux).

    plugins$ chmod -R 755 .
  3. Compacte o conteúdo em sua pasta plugins.

    plugins$ zip -r plugins.zip .

Como fazer upload de plugins.zip para o Amazon S3

É possível usar o console do Amazon S3 ou a AWS Command Line Interface (AWS CLI) para fazer upload do arquivo plugins.zip para o bucket do Amazon S3.

Uso do AWS CLI

A AWS Command Line Interface (AWS CLI) é uma ferramenta de código aberto que permite interagir com os serviços da AWS usando comandos no shell da linha de comando. Para concluir as etapas nesta página, é necessário o seguinte:

Para fazer o upload usando AWS CLI
  1. No prompt de comando, navegue até o diretório em que seu arquivo plugins.zip está armazenado. Por exemplo:

    cd plugins
  2. Use o comando a seguir para listar todos os seus buckets do Amazon S3.

    aws s3 ls
  3. Use o seguinte comando para listar os arquivos e pastas no bucket do Amazon S3 para seu ambiente.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Use o seguinte comando para carregar o plugins.zip arquivo no bucket do Amazon S3 para seu ambiente.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Usar o console do Amazon S3

O console do Amazon S3 é uma interface de usuário baseada na Web que permite criar e gerenciar os recursos no bucket do Amazon S3.

Fazer o upload usando o console do Amazon S3
  1. Abra a página Ambientes no console do Amazon MWAA.

  2. Escolha um ambiente.

  3. Selecione o link do bucket do S3 no código do DAG no painel do S3 para abrir seu bucket de armazenamento no console do Amazon S3.

  4. Escolha Carregar.

  5. Escolha Adicionar arquivo.

  6. Selecione a cópia local do seu plugins.zip e escolha Carregar.

Instalando plug-ins personalizados em seu ambiente

Esta seção descreve como instalar os plug-ins personalizados que você carregou no seu bucket do Amazon S3 especificando o caminho para o arquivo plugins.zip e especificando a versão do arquivo plugins.zip sempre que o arquivo zip for atualizado.

Como especificar o caminho para plugins.zip no console Amazon MWAA (pela primeira vez)

Se for a primeira vez que você faz o upload de um plugins.zip para o seu bucket do Amazon S3, também será preciso especificar o caminho para o arquivo no console do Amazon MWAA. Você só precisa concluir esta etapa uma vez.

  1. Abra a página Ambientes no console do Amazon MWAA.

  2. Escolha um ambiente.

  3. Selecione a opção Editar.

  4. No código DAG no painel Amazon S3, escolha Navegar S3 ao lado do campo Arquivos de plugin - opcional.

  5. Selecione o plugins.zip arquivo no bucket do Amazon S3.

  6. Selecione Escolher.

  7. Selecione Avançar, Atualizar ambiente.

Como especificar a versão plugins.zip no console do Amazon MWAA

É necessário especificar a versão do seu arquivo plugins.zip no console do Amazon MWAA sempre que você fizer o upload de uma nova versão do seu plugins.zip no bucket do Amazon S3.

  1. Abra a página Ambientes no console do Amazon MWAA.

  2. Escolha um ambiente.

  3. Selecione a opção Editar.

  4. No painel Código DAG no Amazon S3 , escolha uma versão do plugins.zip na lista suspensa.

  5. Escolha Próximo.

Exemplos de casos de uso para plugins.zip

Próximas etapas

  • Teste seus DAGs, plug-ins personalizados e dependências do Python localmente usando aws-mwaa-local-runner no GitHub.