Apache Hive と Hadoop を使ったカスタムプラグインの作成
Amazon MWAA は plugins.zip
から /usr/local/airflow/plugins
にコンテンツを抽出します。これを使用して、コンテナにバイナリを追加できます。さらに、Apache Airflow は起動時に plugins
フォルダ内の Python ファイルの内容を実行するため、環境変数を設定および変更できます。以下のサンプルでは、Amazon Managed Workflows for Apache Airflow 環境で Apache Hive と Hadoop を使用してカスタムプラグインを作成する手順を説明し、他のカスタムプラグインやバイナリと組み合わせることができます。
Version
-
このページのサンプルコードは、Python 3.7
の Apache Airflow v1 で使用できます。
-
このページのコード例は、Python 3.10
の Apache Airflow v2 と共に使用可能です。
前提条件
このページのサンプルコードを使用するには、以下が必要です。
アクセス許可
-
このページのコード例を使用する場合、追加のアクセス許可は必要ありません。
要件
このページのサンプルコードを使用するには、次の依存関係を requirements.txt
に追加してください。詳細については、「Python 依存関係のインストール」を参照してください。
依存関係のダウンロード
Amazon MWAA は plugins.zip のコンテンツを各 Amazon MWAA スケジューラーとワーカーコンテナの /usr/local/airflow/plugins
に抽出します。これはバイナリを環境に追加するために使用されます。次のステップでは、カスタムプラグインに必要なファイルを組み立てる方法について説明します。
-
コマンドプロンプトで、プラグインを作成したいディレクトリに移動します。例:
cd plugins
-
wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
-
wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
-
ディレクトリを作成します。例:
mkdir hive_plugin
-
Hadoop を抽出します。
tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
-
Hive を抽出します。
tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
カスタムプラグイン
Apache Airflow は、起動時にプラグインフォルダにある Python ファイルの内容を実行します。これは環境変数の設定と変更に使用されます。次のステップでは、カスタムプラグインのサンプルコードを説明します。
-
コマンドプロンプトで、
hive_plugin
ディレクトリに移動します。例:cd hive_plugin
-
次のコードサンプルの内容をコピーし、同じ
hive_plugin
ディレクトリで、hive_plugin.py
と名前を付けてローカルに保存します。from airflow.plugins_manager import AirflowPlugin import os os.environ["JAVA_HOME"]="/usr/lib/jvm/jre" os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0' os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop' os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin' os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" class EnvVarPlugin(AirflowPlugin): name = 'hive_plugin'
-
次のテキストの内容をコピーし、
hive_plugin
ディレクトリに.airflowignore
としてローカルに保存します。hadoop-3.3.0 apache-hive-3.1.2-bin
Plugins.zip
以下のステップは、plugins.zip
を作成する方法を示しています。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの plugins.zip
のファイルにすることができます。
-
コマンドプロンプトで、前のステップの
hive_plugin
ディレクトリに移動します。例:cd hive_plugin
-
plugins
フォルダ内のコンテンツを圧縮します。zip -r ../hive_plugin.zip ./
コードサンプル
次のステップでは、カスタムプラグインをテストする DAG コードを作成する方法について説明します。
-
コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:
cd dags
-
以下のコードサンプルの内容をコピーし、ローカルに
hive.py
として保存します。from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hive_test = BashOperator( task_id="hive_test", bash_command='hive --help' )
Airflow 設定オプション
Apache Airflow v2 を使用している場合、core.lazy_load_plugins : False
を Apache Airflow の構成オプションとして追加してください。詳細については、「2 の設定オプションによるプラグインの読み込み」を参照してください。
次のステップ
-
この例の
requirements.txt
ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、Python 依存関係のインストール をご覧ください。 -
この例の DAG コードを Amazon S3 バケットの
dags
フォルダにアップロードする方法については、DAG の追加と更新 を参照してください。 -
この例の
plugins.zip
ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、カスタムプラグインのインストール をご覧ください。