カスタムプラグインのインストール
Amazon Managed Workflows for Apache Airflow は Apache Airflow の組み込みプラグインマネージャーをサポートしているため、カスタム Apache Airflow オペレータ、フック、センサー、またはインターフェイスを使用できます。このページでは、plugins.zip
ファイルを使用して、Amazon MWAA 環境に Apache Airflow カスタムプラグインをインストールする手順について説明します。
前提条件
このページのステップを完了するには、以下のものが必要です。
-
権限 — AWS アカウントには、管理者から、ご使用の環境の AmazonMWAAFullConsoleAccess アクセスコントロールポリシーへのアクセス権限が付与されている必要があります。さらに、Amazon MWAA 環境には、その環境で使用される AWS のリソースへのアクセスを実行ロールで許可されている必要があります。
-
アクセス — 依存関係をウェブサーバーに直接インストールするためにパブリックリポジトリにアクセスする必要がある場合は、パブリックネットワークのウェブサーバーアクセスが環境に設定されている必要があります。詳細については、「Apache Airflow のアクセスモード」を参照してください。
-
Amazon S3 設定 — plugins.zip
で DAG、カスタムプラグイン、および requirements.txt
で Python の依存関係を保存するために使用される Amazon S3 バケットは、Public Access Blocked と Versioning Enabledで構成する必要があります。
仕組み
カスタムプラグインを環境で実行するには、次の 3 つのことを行う必要があります。
-
plugins.zip
ファイルをローカルに作成します。
-
plugins.zip
のファイルを Amazon S3 のバケットにアップロードしてください。
-
Amazon MWAA コンソールの [プラグインファイル] フィールドに、このファイルのバージョンを指定します。
これが初めて plugins.zip
を Amazon S3 バケットにアップロードする場合、Amazon MWAA コンソールでファイルへのパスも指定する必要があります。1 回だけこのステップを行ってください。
プラグインを使用するタイミング
プラグインは、「Apache Airflow ドキュメント」で説明されているように、Apache Airflow ユーザーインターフェイスを拡張する場合にのみ必要です。カスタム演算子は、DAG
コードとともに /dags
フォルダに直接配置できます。
外部システムと独自の統合を作成する必要がある場合は、/dags
フォルダまたはサブフォルダ内に配置しますが、plugins.zip
フォルダには配置しません。Apache Airflow 2.x では、プラグインは主に UI を拡張するために使用されます。
同様に、他の依存関係を plugins.zip
に配置しないでください。代わりに、Amazon S3 /dags
フォルダの下の場所に保存できます。このフォルダは、Apache Airflow が起動する前に各 Amazon MWAA コンテナに同期されます。
Apache Airflow DAG オブジェクトを明示的に定義していない /dags
フォルダ内または plugins.zip
内のファイルは、.airflowignore
ファイルにリストする必要があります。
カスタムプラグイン数
Apache Airflow の組み込みプラグインマネージャは、単にファイルを$AIRFLOW_HOME/plugins
フォルダにドロップすることで外部の機能をコアに統合できます。これにより、カスタムの Apache Airflow オペレータ、フック、センサー、またはインターフェースを使用できます。次のセクションでは、ローカル開発環境におけるフラットでネストされたディレクトリ構造の例と、plugins.zip 内のディレクトリ構造を決定する import ステートメントの例を示します。
カスタムプラグインのディレクトリとサイズの制限
Apache Airflow スケジューラとワーカーは、AWS で管理される Fargate コンテナが /usr/local/airflow/plugins/*
での環境でスタートアップする際に、カスタムプラグインを探します。
-
ディレクトリ構造。(/*
での) ディレクトリ構造は、plugins.zip
ファイルの内容に基づいています。例えば、plugins.zip
に operators
ディレクトリがトップレベルディレクトリとして含まれている場合、そのディレクトリは環境の /usr/local/airflow/plugins/operators
に抽出されます。
-
サイズ制限。1 GB 未満の plugins.zip
ファイルをお勧めします。plugins.zip
ファイルのサイズが大きいほど、環境でのスタートアップ時間が長くなります。Amazon MWAA は plugins.zip
ファイルのサイズを明示的に制限していませんが、10 分以内に依存関係をインストールできない場合、Fargate サービスはタイムアウトし、環境を安定した状態にロールバックしようとします。
Apache Airflow v1.10.12 または Apache Airflow v2.0.2 を使用する環境では、Amazon MWAA は Apache Airflow ウェブサーバー上のアウトバウンドトラフィックを制限し、プラグインや Python の依存関係をウェブサーバーに直接インストールすることを許可していません。Apache Airflow v2.2.2 以降、Amazon MWAA はプラグインと依存関係をウェブサーバーに直接インストールできるようになりました。
カスタムプラグイン数
次のセクションでは、Apache Airflow リファレンスガイドのサンプルコードが 使用して、ローカル開発環境を構築する方法を示します。
plugins.zip でフラットなディレクトリ構造を使用する例
- Apache Airflow v2
-
次の例は、Apache Airflow v2 向けのフラットなディレクトリ構造を持つ plugins.zip
ファイルを示しています。
例 plugins/virtual_python_plugin.py
以下の例は、Python VirtualEnvOperator カスタムプラグインを示しています。
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
- Apache Airflow v1
-
次の例は、Apache Airflow v1 用のフラットなディレクトリ構造の plugins.zip
ファイルを示しています。
例 plugins/virtual_python_plugin.py
以下の例は、Python VirtualEnvOperator カスタムプラグインを示しています。
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.python_operator import PythonVirtualenvOperator
def _generate_virtualenv_cmd(self, tmp_dir):
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if self.system_site_packages:
cmd.append('--system-site-packages')
if self.python_version is not None:
cmd.append('--python=python{}'.format(self.python_version))
return cmd
PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd
class EnvVarPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
plugins.zip のネストされたディレクトリ構造を使用する例
- Apache Airflow v2
-
次の例は、Apache Airflow v2 向けの hooks
、operators
、および sensors
ディレクトリのためにそれぞれ異なるディレクトリを持つ plugins.zip
ファイルを示しています。
例 plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
次の例は、カスタムプラグインを使用する DAG (DAGs フォルダー) のインポートステートメントを示しています。
例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
次の例は、カスタムプラグインファイルに必要な各インポートステートメントを示しています。
例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
「Amazon MWAA CLI ユーティリティを使用したカスタムプラグインのテスト」の手順に従い、次に「plugins.zip ファイルの作成」で plugins
ディレクトリ内のコンテンツを圧縮してください。例えば、cd plugins
と指定します。
- Apache Airflow v1
-
次の例は、Apache Airflow v1.10.12 向けの hooks
、operators
、および sensors
ディレクトリのためにそれぞれ異なるディレクトリを持つ plugins.zip
ファイルを示しています。
例 plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
次の例は、カスタムプラグインを使用する DAG (DAGs フォルダー) のインポートステートメントを示しています。
例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_operator import MyOperator
from sensors.my_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
from utils.my_utils import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
次の例は、カスタムプラグインファイルに必要な各インポートステートメントを示しています。
例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
例 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
「Amazon MWAA CLI ユーティリティを使用したカスタムプラグインのテスト」の手順に従い、次に「plugins.zip ファイルの作成」で plugins
ディレクトリ内のコンテンツを圧縮してください。例えば、cd plugins
と指定します。
plugins.zip ファイルの作成
以下の手順では、plugins.zip ファイルをローカルで作成する場合に推奨される手順について説明します。
ステップ 1: Amazon MWAA CLI ユーティリティを使用してカスタムプラグインをテストする
-
コマンドラインインターフェイス (CLI) ユーティリティは、Amazon Managed Workflows for Apache Airflow 環境をローカルに複製します。
-
CLI は、Amazon MWAA のプロダクションイメージに似た Docker コンテナイメージをローカルでビルドします。これにより、Amazon MWAA にデプロイする前に、ローカルの Apache Airflow 環境を実行して DAG、カスタムプラグイン、依存関係を開発およびテストできます。
-
CLI を実行するには、GitHub の「aws-mwaa-local-runner」を参照してください。
ステップ 2: plugins.zip ファイルを作成する
ビルトインの ZIP アーカイブユーティリティやその他の ZIP ユーティリティ (7zip など) を使用して.zip ファイルを作成できます。
Windows OS 用のビルトイン ZIP ユーティリティは、.zip ファイルの作成時にサブフォルダを追加する場合があります。Amazon S3 バケットにアップロードする前に plugins.zip ファイルの内容を確認して、ディレクトリが追加されていないことを確認することをお勧めします。
-
ディレクトリをローカルの Airflow プラグインディレクトリに変更します。例:
myproject$ cd plugins
-
次のコマンドを実行して、コンテンツに実行権限があることを確認します (macOS と Linux のみ)。
plugins$ chmod -R 755 .
-
plugins
フォルダ内のコンテンツを圧縮します。
plugins$ zip -r plugins.zip .
plugins.zip
を Amazon S3 にアップロードします。
Amazon S3 コンソールまたは AWS Command Line Interface (AWS CLI) を使用して、Amazon S3 バケットに plugins.zip
ファイルをアップロードできます。
AWS CLI の使用
AWS Command Line Interface (AWS CLI) は、コマンドラインシェルでコマンドを使用して AWS サービスとやり取りするためのオープンソースツールです。このページのステップを完了するには、以下のものが必要です。
AWS CLI を使用してアップロードするには
-
コマンドプロンプトで、plugins.zip
ファイルが保存されているディレクトリに移動します。例:
cd plugins
-
以下のコマンドを使って、Amazon S3 バケットをすべてリストアップします
aws s3 ls
-
以下のコマンドを使用して、ご使用の環境の Amazon S3 バケット内のファイルとフォルダを一覧表示します。
aws s3 ls s3://YOUR_S3_BUCKET_NAME
-
環境の Amazon S3 バケットに plugins.zip
ファイルをアップロードするには、次のコマンドを使用します。
aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME
/plugins.zip
Amazon S3 コンソールの使用
Amazon S3 コンソールは、Amazon S3 バケット内のリソースを作成および管理できるウェブベースのユーザーインターフェイスです。
Amazon S3 コンソールを使ってアップロードするには
-
Amazon MWAA コンソールで、環境ページを開きます。
-
環境を選択します。
-
Amazon S3 コンソールの [DAG コード in S3] ペインで DAG コード内の [S3バケット] リンクを選択して、ストレージバケットを開きます。
-
[アップロード] を選択します。
-
[ファイルの追加] を選択します。
-
plugins.zip
のローカルコピーを選択し、[アップロード] を選択します。
環境へのカスタムプラグインのインストール
このセクションでは、plugins.zip ファイルへのパスを指定し、zip ファイルが更新されるたびに plugins.zip ファイルのバージョンを指定することで、Amazon S3 バケットにアップロードしたカスタムプラグインをインストールする方法について説明します。
Amazon MWAA コンソールで plugins.zip
へのパスを指定する(初回)
これが初めて plugins.zip
を Amazon S3 バケットにアップロードする場合、Amazon MWAA コンソールでファイルへのパスも指定する必要があります。1 回だけこのステップを行ってください。
-
Amazon MWAA コンソールで、環境ページを開きます。
-
環境を選択します。
-
[編集] を選択します。
-
[Amazon S3 の DAG コード] ペインで、[プラグインファイル - オプション] フィールドの横にある [S3 を参照] を選択します。
-
Amazon S3 バケットの plugins.zip
ファイルを選択します。
-
[選択] を選択します。
-
[次へ] → [環境の更新] を選択します。
Amazon MWAA コンソールで plugins.zip
のバージョンを指定する。
新しいバージョンの plugins.zip
を Amazon S3 バケットにアップロードするたびに、Amazon MWAA コンソールで plugins.zip
ファイルのバージョンを指定する必要があります。
-
Amazon MWAA コンソールで、環境ページを開きます。
-
環境を選択します。
-
[編集] を選択します。
-
[Amazon S3 の DAG コードペイン] で、ドロップダウンリストから plugins.zip
のバージョンを選択します。
-
[Next] を選択します。
plugins.zip のユースケースの例
次のステップ