Apache Hive と Hadoop を使ったカスタムプラグインの作成 - Amazon Managed Workflows for Apache Airflow

Apache Hive と Hadoop を使ったカスタムプラグインの作成

Amazon MWAA は plugins.zip から /usr/local/airflow/plugins にコンテンツを抽出します。これを使用して、コンテナにバイナリを追加できます。さらに、Apache Airflow は起動時に plugins フォルダ内の Python ファイルの内容を実行するため、環境変数を設定および変更できます。以下のサンプルでは、Amazon Managed Workflows for Apache Airflow 環境で Apache Hive と Hadoop を使用してカスタムプラグインを作成する手順を説明し、他のカスタムプラグインやバイナリと組み合わせることができます。

Version

  • このページのサンプルコードは、Python 3.7Apache Airflow v1 で使用できます。

  • このページのコード例は、Python 3.10Apache Airflow v2 と共に使用可能です。

前提条件

このページのサンプルコードを使用するには、以下が必要です。

アクセス許可

  • このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

要件

このページのサンプルコードを使用するには、次の依存関係を requirements.txt に追加してください。詳細については、「Python 依存関係のインストール」を参照してください。

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt apache-airflow-providers-amazon[apache.hive]
Apache Airflow v1
apache-airflow[hive]==1.10.12

依存関係のダウンロード

Amazon MWAA は plugins.zip のコンテンツを各 Amazon MWAA スケジューラーとワーカーコンテナの /usr/local/airflow/plugins に抽出します。これはバイナリを環境に追加するために使用されます。次のステップでは、カスタムプラグインに必要なファイルを組み立てる方法について説明します。

  1. コマンドプロンプトで、プラグインを作成したいディレクトリに移動します。例:

    cd plugins
  2. ミラーから Hadoop をダウンロードします。例えば:

    wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
  3. ミラーから Hive をダウンロードします。例えば:

    wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
  4. ディレクトリを作成します。例:

    mkdir hive_plugin
  5. Hadoop を抽出します。

    tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
  6. Hive を抽出します。

    tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin

カスタムプラグイン

Apache Airflow は、起動時にプラグインフォルダにある Python ファイルの内容を実行します。これは環境変数の設定と変更に使用されます。次のステップでは、カスタムプラグインのサンプルコードを説明します。

  1. コマンドプロンプトで、hive_plugin ディレクトリに移動します。例:

    cd hive_plugin
  2. 次のコードサンプルの内容をコピーし、同じ hive_plugin ディレクトリで、hive_plugin.py と名前を付けてローカルに保存します。

    from airflow.plugins_manager import AirflowPlugin import os os.environ["JAVA_HOME"]="/usr/lib/jvm/jre" os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0' os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop' os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin' os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" class EnvVarPlugin(AirflowPlugin): name = 'hive_plugin'
  3. 次のテキストの内容をコピーし、hive_plugin ディレクトリに .airflowignore としてローカルに保存します。

    hadoop-3.3.0 apache-hive-3.1.2-bin

Plugins.zip

以下のステップは、plugins.zip を作成する方法を示しています。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの plugins.zip のファイルにすることができます。

  1. コマンドプロンプトで、前のステップの hive_plugin ディレクトリに移動します。例:

    cd hive_plugin
  2. plugins フォルダ内のコンテンツを圧縮します。

    zip -r ../hive_plugin.zip ./

コードサンプル

次のステップでは、カスタムプラグインをテストする DAG コードを作成する方法について説明します。

  1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

    cd dags
  2. 以下のコードサンプルの内容をコピーし、ローカルに hive.py として保存します。

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hive_test = BashOperator( task_id="hive_test", bash_command='hive --help' )

Airflow 設定オプション

Apache Airflow v2 を使用している場合、core.lazy_load_plugins : False を Apache Airflow の構成オプションとして追加してください。詳細については、「2 の設定オプションによるプラグインの読み込み」を参照してください。

次のステップ

  • この例の requirements.txt ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、Python 依存関係のインストール をご覧ください。

  • この例の DAG コードを Amazon S3 バケットの dags フォルダにアップロードする方法については、DAG の追加と更新 を参照してください。

  • この例の plugins.zip ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、カスタムプラグインのインストール をご覧ください。