カスタムプラグインのインストール - Amazon Managed Workflows for Apache Airflow

カスタムプラグインのインストール

Amazon Managed Workflows for Apache Airflow は Apache Airflow の組み込みプラグインマネージャーをサポートしているため、カスタム Apache Airflow オペレータ、フック、センサー、またはインターフェイスを使用できます。このページでは、plugins.zip ファイルを使用して、Amazon MWAA 環境に Apache Airflow カスタムプラグインをインストールする手順について説明します。

前提条件

このページのステップを完了するには、以下のものが必要です。

  • 権限 — AWS アカウントには、管理者から、ご使用の環境の AmazonMWAAFullConsoleAccess アクセスコントロールポリシーへのアクセス権限が付与されている必要があります。さらに、Amazon MWAA 環境には、その環境で使用される AWS のリソースへのアクセスを実行ロールで許可されている必要があります。

  • アクセス — 依存関係をウェブサーバーに直接インストールするためにパブリックリポジトリにアクセスする必要がある場合は、パブリックネットワークのウェブサーバーアクセスが環境に設定されている必要があります。詳細については、「Apache Airflow のアクセスモード」を参照してください。

  • Amazon S3 設定plugins.zip で DAG、カスタムプラグイン、および requirements.txt で Python の依存関係を保存するために使用される Amazon S3 バケットは、Public Access Blocked と Versioning Enabledで構成する必要があります。

仕組み

カスタムプラグインを環境で実行するには、次の 3 つのことを行う必要があります。

  1. plugins.zip ファイルをローカルに作成します。

  2. plugins.zip のファイルを Amazon S3 のバケットにアップロードしてください。

  3. Amazon MWAA コンソールの [プラグインファイル] フィールドに、このファイルのバージョンを指定します。

注記

これが初めて plugins.zip を Amazon S3 バケットにアップロードする場合、Amazon MWAA コンソールでファイルへのパスも指定する必要があります。1 回だけこのステップを行ってください。

プラグインを使用するタイミング

プラグインは、「Apache Airflow ドキュメント」で説明されているように、Apache Airflow ユーザーインターフェイスを拡張する場合にのみ必要です。カスタム演算子は、DAG コードとともに /dags フォルダに直接配置できます。

外部システムと独自の統合を作成する必要がある場合は、/dags フォルダまたはサブフォルダ内に配置しますが、plugins.zip フォルダには配置しません。Apache Airflow 2.x では、プラグインは主に UI を拡張するために使用されます。

同様に、他の依存関係を plugins.zip に配置しないでください。代わりに、Amazon S3 /dags フォルダの下の場所に保存できます。このフォルダは、Apache Airflow が起動する前に各 Amazon MWAA コンテナに同期されます。

注記

Apache Airflow DAG オブジェクトを明示的に定義していない /dags フォルダ内または plugins.zip 内のファイルは、.airflowignore ファイルにリストする必要があります。

カスタムプラグイン数

Apache Airflow の組み込みプラグインマネージャは、単にファイルを$AIRFLOW_HOME/pluginsフォルダにドロップすることで外部の機能をコアに統合できます。これにより、カスタムの Apache Airflow オペレータ、フック、センサー、またはインターフェースを使用できます。次のセクションでは、ローカル開発環境におけるフラットでネストされたディレクトリ構造の例と、plugins.zip 内のディレクトリ構造を決定する import ステートメントの例を示します。

カスタムプラグインのディレクトリとサイズの制限

Apache Airflow スケジューラとワーカーは、AWS で管理される Fargate コンテナが /usr/local/airflow/plugins/* での環境でスタートアップする際に、カスタムプラグインを探します。

  • ディレクトリ構造。(/*での) ディレクトリ構造は、plugins.zip ファイルの内容に基づいています。例えば、plugins.zipoperators ディレクトリがトップレベルディレクトリとして含まれている場合、そのディレクトリは環境の /usr/local/airflow/plugins/operators に抽出されます。

  • サイズ制限。1 GB 未満の plugins.zip ファイルをお勧めします。plugins.zip ファイルのサイズが大きいほど、環境でのスタートアップ時間が長くなります。Amazon MWAA は plugins.zip ファイルのサイズを明示的に制限していませんが、10 分以内に依存関係をインストールできない場合、Fargate サービスはタイムアウトし、環境を安定した状態にロールバックしようとします。

注記

Apache Airflow v1.10.12 または Apache Airflow v2.0.2 を使用する環境では、Amazon MWAA は Apache Airflow ウェブサーバー上のアウトバウンドトラフィックを制限し、プラグインや Python の依存関係をウェブサーバーに直接インストールすることを許可していません。Apache Airflow v2.2.2 以降、Amazon MWAA はプラグインと依存関係をウェブサーバーに直接インストールできるようになりました。

カスタムプラグイン数

次のセクションでは、Apache Airflow リファレンスガイドのサンプルコードが 使用して、ローカル開発環境を構築する方法を示します。

plugins.zip でフラットなディレクトリ構造を使用する例

Apache Airflow v2

次の例は、Apache Airflow v2 向けのフラットなディレクトリ構造を持つ plugins.zip ファイルを示しています。

例 Python VirtualEnvOperator plugins.zip を含むフラットディレクトリ

次の例では、Apache Airflow Python VirtualEnv オペレータ用のカスタムプラグインを作成する の PythonVirtualEnvOperator カスタムプラグインの plugins.zip ファイルの最上位ツリーを示します。

├── virtual_python_plugin.py
例 plugins/virtual_python_plugin.py

以下の例は、Python VirtualEnvOperator カスタムプラグインを示しています。

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

次の例は、Apache Airflow v1 用のフラットなディレクトリ構造の plugins.zip ファイルを示しています。

例 Python VirtualEnvOperator plugins.zip を含むフラットディレクトリ

次の例では、Apache Airflow Python VirtualEnv オペレータ用のカスタムプラグインを作成する の PythonVirtualEnvOperator カスタムプラグインの plugins.zip ファイルの最上位ツリーを示します。

├── virtual_python_plugin.py
例 plugins/virtual_python_plugin.py

以下の例は、Python VirtualEnvOperator カスタムプラグインを示しています。

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

plugins.zip のネストされたディレクトリ構造を使用する例

Apache Airflow v2

次の例は、Apache Airflow v2 向けの hooksoperators、および sensors ディレクトリのためにそれぞれ異なるディレクトリを持つ plugins.zip ファイルを示しています。

例 plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

次の例は、カスタムプラグインを使用する DAG (DAGs フォルダー) のインポートステートメントを示しています。

例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

次の例は、カスタムプラグインファイルに必要な各インポートステートメントを示しています。

例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Amazon MWAA CLI ユーティリティを使用したカスタムプラグインのテスト」の手順に従い、次に「plugins.zip ファイルの作成」で plugins ディレクトリ内のコンテンツを圧縮してください。例えば、cd plugins と指定します。

Apache Airflow v1

次の例は、Apache Airflow v1.10.12 向けの hooksoperators、および sensors ディレクトリのためにそれぞれ異なるディレクトリを持つ plugins.zip ファイルを示しています。

例 plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

次の例は、カスタムプラグインを使用する DAG (DAGs フォルダー) のインポートステートメントを示しています。

例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

次の例は、カスタムプラグインファイルに必要な各インポートステートメントを示しています。

例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
例 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Amazon MWAA CLI ユーティリティを使用したカスタムプラグインのテスト」の手順に従い、次に「plugins.zip ファイルの作成」で plugins ディレクトリ内のコンテンツを圧縮してください。例えば、cd plugins と指定します。

plugins.zip ファイルの作成

以下の手順では、plugins.zip ファイルをローカルで作成する場合に推奨される手順について説明します。

ステップ 1: Amazon MWAA CLI ユーティリティを使用してカスタムプラグインをテストする

  • コマンドラインインターフェイス (CLI) ユーティリティは、Amazon Managed Workflows for Apache Airflow 環境をローカルに複製します。

  • CLI は、Amazon MWAA のプロダクションイメージに似た Docker コンテナイメージをローカルでビルドします。これにより、Amazon MWAA にデプロイする前に、ローカルの Apache Airflow 環境を実行して DAG、カスタムプラグイン、依存関係を開発およびテストできます。

  • CLI を実行するには、GitHub の「aws-mwaa-local-runner」を参照してください。

ステップ 2: plugins.zip ファイルを作成する

ビルトインの ZIP アーカイブユーティリティやその他の ZIP ユーティリティ (7zip など) を使用して.zip ファイルを作成できます。

注記

Windows OS 用のビルトイン ZIP ユーティリティは、.zip ファイルの作成時にサブフォルダを追加する場合があります。Amazon S3 バケットにアップロードする前に plugins.zip ファイルの内容を確認して、ディレクトリが追加されていないことを確認することをお勧めします。

  1. ディレクトリをローカルの Airflow プラグインディレクトリに変更します。例:

    myproject$ cd plugins
  2. 次のコマンドを実行して、コンテンツに実行権限があることを確認します (macOS と Linux のみ)。

    plugins$ chmod -R 755 .
  3. plugins フォルダ内のコンテンツを圧縮します。

    plugins$ zip -r plugins.zip .

plugins.zip を Amazon S3 にアップロードします。

Amazon S3 コンソールまたは AWS Command Line Interface (AWS CLI) を使用して、Amazon S3 バケットに plugins.zip ファイルをアップロードできます。

AWS CLI の使用

AWS Command Line Interface (AWS CLI) は、コマンドラインシェルでコマンドを使用して AWS サービスとやり取りするためのオープンソースツールです。このページのステップを完了するには、以下のものが必要です。

AWS CLI を使用してアップロードするには
  1. コマンドプロンプトで、plugins.zip ファイルが保存されているディレクトリに移動します。例:

    cd plugins
  2. 以下のコマンドを使って、Amazon S3 バケットをすべてリストアップします

    aws s3 ls
  3. 以下のコマンドを使用して、ご使用の環境の Amazon S3 バケット内のファイルとフォルダを一覧表示します。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. 環境の Amazon S3 バケットに plugins.zip ファイルをアップロードするには、次のコマンドを使用します。

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Amazon S3 コンソールの使用

Amazon S3 コンソールは、Amazon S3 バケット内のリソースを作成および管理できるウェブベースのユーザーインターフェイスです。

Amazon S3 コンソールを使ってアップロードするには
  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 環境を選択します。

  3. Amazon S3 コンソールの [DAG コード in S3] ペインで DAG コード内の [S3バケット] リンクを選択して、ストレージバケットを開きます。

  4. [アップロード] を選択します。

  5. [ファイルの追加] を選択します。

  6. plugins.zip のローカルコピーを選択し、[アップロード] を選択します。

環境へのカスタムプラグインのインストール

このセクションでは、plugins.zip ファイルへのパスを指定し、zip ファイルが更新されるたびに plugins.zip ファイルのバージョンを指定することで、Amazon S3 バケットにアップロードしたカスタムプラグインをインストールする方法について説明します。

Amazon MWAA コンソールで plugins.zip へのパスを指定する(初回)

これが初めて plugins.zip を Amazon S3 バケットにアップロードする場合、Amazon MWAA コンソールでファイルへのパスも指定する必要があります。1 回だけこのステップを行ってください。

  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 環境を選択します。

  3. [編集] を選択します。

  4. [Amazon S3 の DAG コード] ペインで、[プラグインファイル - オプション] フィールドの横にある [S3 を参照] を選択します。

  5. Amazon S3 バケットの plugins.zip ファイルを選択します。

  6. [選択] を選択します。

  7. [次へ][環境の更新] を選択します。

Amazon MWAA コンソールで plugins.zip のバージョンを指定する。

新しいバージョンの plugins.zip を Amazon S3 バケットにアップロードするたびに、Amazon MWAA コンソールで plugins.zip ファイルのバージョンを指定する必要があります。

  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 環境を選択します。

  3. [編集] を選択します。

  4. [Amazon S3 の DAG コードペイン] で、ドロップダウンリストから plugins.zip のバージョンを選択します。

  5. [Next] を選択します。

plugins.zip のユースケースの例

次のステップ

  • GitHub の aws-mwaa-local-runner を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。