安装自定义插件
Amazon MWAA 支持 Apache Airflow 的内置插件管理器,允许您使用自定义 Apache Airflow 运算符、挂钩、传感器或接口。本页介绍使用 plugins.zip
文件在 Amazon MWAA 环境中安装 Apache Airflow 自定义插件的步骤。
先决条件
在完成本页上的步骤之前,您需要具备以下条件。
-
权限 — AWS 账户必须已获得管理员授权,访问适用于环境的 AmazonmwaaFullConsoleAccess 访问控制策略。此外,执行角色必须允许 Amazon MWAA 环境访问环境所使用的 AWS 资源。
-
访问权限-如果您需要访问公共存储库才能直接在 Web 服务器上安装依赖项,则必须将环境配置为具有公共网络 Web 服务器访问权限。有关更多信息,请参阅 Apache Airflow 访问模式。
-
Amazon S3 配置 — 用于存储 DAG 的 Amazon S3 存储桶、在 plugins.zip
中的自定义插件和在 requirements.txt
中的 Python 依赖项必须配置为已阻止公共访问和已启用版本控制。
工作方式
要在环境中运行自定义插件,您必须做三件事:
-
在本地创建 plugins.zip
文件。
-
将 plugins.zip
文件上传到 Amazon S3 中的存储桶。
-
在 Amazon MWAA 控制台的插件文件字段中指定此文件的版本。
如果这是您首次将 plugins.zip
上传到 Amazon S3 存储桶,则还需要在 Amazon MWAA 控制台上指定文件路径。您只需要完成此步骤一次。
何时使用插件
正如 Apache Airflow 文档中所述,仅在扩展 Apache Airflow 用户界面时才需要插件。可以直接将自定义操作符与 DAG
代码一起放入 /dags
文件夹中。
如果需要自行创建与外部系统的集成,请将其放在 /dags
文件夹或其中的子文件夹中,而不是放在 plugins.zip
文件夹中。在 Apache Airflow 2.x 中,插件主要用于扩展 UI。
同样,不应将其他依赖项放入 plugins.zip
中。而可以将其存储在 Amazon S3 /dags
文件夹下的某个位置,在 Apache Airflow 启动之前,这些依赖项将在该位置同步到每个 Amazon MWAA 容器。
/dags
文件夹或 plugins.zip
中任何未显式定义 Apache Airflow DAG 对象的文件都必须在 .airflowignore
文件中列出。
自定义插件概述
Apache Airflow 的内置插件管理器只需将文件拖放到 $AIRFLOW_HOME/plugins
文件夹中即可将外部功能集成到其核心中。它允许您使用自定义 Apache Airflow 操作符、挂钩、传感器或接口。下一节提供了本地开发环境中平面和嵌套目录结构的示例,以及生成的 import 语句,这些语句决定了 plugins.zip 中的目录结构。
自定义插件目录和大小限制
在启动期间,Apache Airflow 计划程序和工作线程在 AWS 托管的 Fargate 容器启动时,在 /usr/local/airflow/plugins/*
中为环境查找自定义插件。
-
目录结构。目录结构(在 /*
中)基于您 plugins.zip
文件的内容。例如,如果 plugins.zip
包含 operators
目录作为顶级目录,则该目录将被解压缩到环境的 /usr/local/airflow/plugins/operators
中。
-
大小限制。我们建议使用小于 1 GB 的 plugins.zip
文件。plugins.zip
文件大小越大,环境的启动时间就越长。尽管 Amazon MWAA 没有明确限制 plugins.zip
文件的大小,但如果无法在十分钟内安装依赖项,Fargate 服务将超时并尝试将环境回滚到稳定状态。
对于使用 Apache Airflow v1.10.12 或 Apache Airflow v2.0.2 的环境,Amazon MWAA 会限制 Apache Airflow Web 服务器上的出站流量,并且不允许您直接在 Web 服务器上安装插件或 Python 依赖项。从 Apache Airflow v2.2.2 开始,Amazon MWAA 可以直接在 Web 服务器上安装插件和依赖项。
自定义插件示例
下一节使用《Apache Airflow 参考指南》中的示例代码来展示如何构建本地开发环境。
在 plugins.zip 中使用平面目录结构的示例
- Apache Airflow v2
-
以下示例显示了 Apache Airflow v2 中一个采用扁平目录结构的 plugins.zip
文件。
例 plugins/virtual_python_plugin.py
以下示例显示了 PythonVirtualenvOperator 自定义插件。
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
- Apache Airflow v1
-
以下示例显示了 Apache Airflow v1 中一个采用扁平目录结构的 plugins.zip
文件。
例 plugins/virtual_python_plugin.py
以下示例显示了 PythonVirtualenvOperator 自定义插件。
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.python_operator import PythonVirtualenvOperator
def _generate_virtualenv_cmd(self, tmp_dir):
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if self.system_site_packages:
cmd.append('--system-site-packages')
if self.python_version is not None:
cmd.append('--python=python{}'.format(self.python_version))
return cmd
PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd
class EnvVarPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
在 plugins.zip 中使用平面目录结构的示例
- Apache Airflow v2
-
以下示例显示了一个 plugins.zip
文件,其中包含 hooks
、operators
的单独目录和 Apache Airflow v2 的 sensors
目录。
例 Plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
以下示例显示了使用自定义插件的 DAG(DAG 文件夹)中的导入语句。
例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
以下示例显示了自定义插件文件中所需的每条导入语句。
例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
按照使用 Amazon MWAA CLI 实用工具测试自定义插件中的步骤进行操作,然后创建 plugins.zip 文件来将内容压缩到 plugins
目录之内。例如,cd plugins
。
- Apache Airflow v1
-
以下示例显示了一个 plugins.zip
文件,其中包含 hooks
、operators
的单独目录和 Apache Airflow v1.10.12 的 sensors
目录。
例 Plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
以下示例显示了使用自定义插件的 DAG(DAG 文件夹)中的导入语句。
例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_operator import MyOperator
from sensors.my_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
from utils.my_utils import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
以下示例显示了自定义插件文件中所需的每条导入语句。
例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
例 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
按照使用 Amazon MWAA CLI 实用工具测试自定义插件中的步骤进行操作,然后创建 plugins.zip 文件来将内容压缩到 plugins
目录之内。例如,cd plugins
。
创建 plugins.zip 文件
以下步骤描述了我们建议在本地创建 plugins.zip 文件的步骤。
步骤 1:使用 Amazon MWAA CLI 实用工具测试自定义插件
-
命令行界面 (CLI) 实用工具可在本地复制 Amazon MWAA 环境。
-
CLI 在本地构建 Docker 容器镜像,类似于 Amazon MWAA 生产镜像。这允许您在部署到 Amazon MWAA 之前运行本地 Apache Airflow 环境来开发和测试 DAG、自定义插件和依赖项。
-
要运行 CLI,请参阅 GitHub 上的 aws-mwaa-local-runner。
步骤 2:创建 plugins.zip 文件
您可以使用内置的 ZIP 存档实用工具或任何其他 ZIP 实用工具(例如 7zip)来创建.zip 文件。
当您创建.zip 文件时,Windows 操作系统的内置 zip 实用工具可能会添加子文件夹。我们建议您验证 plugins.zip 文件的内容,然后再上传到 Amazon S3 存储桶,以确保没有添加其他目录。
-
将目录更改为本地 Airflow 插件目录。例如:
myproject$ cd plugins
-
运行以下命令以确保内容具有可执行权限(仅限 macOS 和 Linux)。
plugins$ chmod -R 755 .
-
将内容压缩到 plugins
文件夹中。
plugins$ zip -r plugins.zip .
上传 plugins.zip
到 Amazon S3
您可以使用 Amazon S3 控制台或 AWS Command Line Interface (AWS CLI) 将 plugins.zip
文件上传到 Amazon S3 存储桶中。
使用 AWS CLI
AWS Command Line Interface (AWS CLI) 是一种开源工具,让您能够在命令行 Shell 中使用命令与 AWS 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:
要使用 AWS CLI 上传,请执行以下操作
-
在命令提示符下,导航到存储 plugins.zip
文件的目录。例如:
cd plugins
-
以下示例列出所有 Amazon S3 存储桶。
aws s3 ls
-
使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。
aws s3 ls s3://YOUR_S3_BUCKET_NAME
-
使用以下命令将 plugins.zip
文件上传到环境的 Amazon S3 存储桶。
aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME
/plugins.zip
使用 Amazon S3 控制台
Amazon S3 控制台是一个基于 Web 的UI ,允许您创建和管理 Amazon S3 桶中的资源。
要使用 Amazon S3 控制台上传,请执行以下操作
-
在 Amazon MWAA 控制台上打开环境页面。
-
选择环境。
-
在 S3 中的 DAG 代码窗格中选择 S3 存储桶链接,在 Amazon S3 控制台上打开存储桶。
-
选择上传。
-
选择 添加文件。
-
选择 plugins.zip
的本地副本,选择上传。
在环境中安装自定义插件
本节介绍如何安装您上传到 Amazon S3 存储桶的自定义插件,方法是指定 plugins.zip 文件的路径,并在每次更新 zip 文件时指定 plugins.zip 文件的版本。
在 Amazon MWAA 控制台上指定 plugins.zip
的路径(第一次)
如果这是您首次将 plugins.zip
上传到 Amazon S3 存储桶,则还需要在 Amazon MWAA 控制台上指定文件路径。您只需要完成此步骤一次。
-
在 Amazon MWAA 控制台上打开环境页面。
-
选择环境。
-
选择编辑。
-
在 Amazon S3 中的 DAG 代码窗格上,选择插件文件-可选字段旁边的浏览 S3。
-
选择 Amazon S3 存储桶中的 plugins.zip
文件。
-
选择选择。
-
选择下一步、更新环境。
在 Amazon MWAA 控制台上指定 plugins.zip
的版本
每次在 Amazon S3 存储桶中上传 plugins.zip
的新版本时,都需要在 Amazon MWAA 控制台上指定 plugins.zip
文件的版本。
-
在 Amazon MWAA 控制台上打开环境页面。
-
选择环境。
-
选择编辑。
-
在 Amazon S3 中的 DAG 代码窗格中,从下拉列表中选择 plugins.zip
的版本。
-
选择下一步。
plugins.zip 的用例示例
接下来做什么?