Oracle でのカスタムプラグインの作成 - Amazon Managed Workflows for Apache Airflow

Oracle でのカスタムプラグインの作成

次のサンプルでは、Oracle for Amazon MWAA を使用してカスタムプラグインを作成する手順を詳しく説明しており、これを plugins.zip ファイル内の他のカスタムプラグインやバイナリと組み合わせることができます。

Version

  • このページのサンプルコードは、Python 3.7Apache Airflow v1 で使用できます。

  • このページのコード例は、Python 3.10Apache Airflow v2 と共に使用可能です。

前提条件

このページのサンプルコードを使用するには、以下が必要です。

アクセス許可

  • このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

要件

このページのサンプルコードを使用するには、次の依存関係を requirements.txt に追加してください。詳細については、Python 依存関係のインストール を参照してください。

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt cx_Oracle apache-airflow-providers-oracle
Apache Airflow v1
cx_Oracle==8.1.0 apache-airflow[oracle]==1.10.12

コードサンプル

次のステップでは、カスタムプラグインをテストする DAG コードを作成する方法について説明します。

  1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

    cd dags
  2. 以下のコードサンプルの内容をコピーし、ローカルに oracle.py として保存します。

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago import os import cx_Oracle DAG_ID = os.path.basename(__file__).replace(".py", "") def testHook(**kwargs): cx_Oracle.init_oracle_client() version = cx_Oracle.clientversion() print("cx_Oracle.clientversion",version) return version with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hook_test = PythonOperator( task_id="hook_test", python_callable=testHook, provide_context=True )

カスタムプラグインを作成する

このセクションでは、依存関係のダウンロード、カスタムプラグインと plugins.zip の作成方法について説明します。

依存関係のダウンロード

Amazon MWAA は plugins.zip のコンテンツを各 Amazon MWAA スケジューラーとワーカーコンテナの /usr/local/airflow/plugins に抽出します。これはバイナリを環境に追加するために使用されます。次のステップでは、カスタムプラグインに必要なファイルを組み立てる方法について説明します。

Amazon Linux コンテナイメージをプルする
  1. コマンドプロンプトで Amazon Linux コンテナイメージを取得し、コンテナをローカルで実行します。例:

    docker pull amazonlinux docker run -it amazonlinux:latest /bin/bash

    コマンドプロンプトで bash コマンドラインを呼び出す必要があります。例:

    bash-4.2#
  2. Linux ネイティブの非同期 I/O 機能 (libaio) をインストールします。

    yum -y install libaio
  3. 以降の手順では、このウィンドウを開いたままにしておいてください。次のファイルをローカルにコピーします: lib64/libaio.so.1lib64/libaio.so.1.0.0lib64/libaio.so.1.0.1

クライアントフォルダをダウンロード
  1. unzip パッケージをローカルにインストールします。例:

    sudo yum install unzip
  2. oracle_plugin ディレクトリの作成 例:

    mkdir oracle_plugin cd oracle_plugin
  3. 次の curl コマンドを使用して、Linux x86-64 (64 ビット) 用 Oracle インスタントクライアントダウンロードから instantclient-basic-linux.x64-18.5.0.0.0dbru.zip をダウンロードします。

    curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
  4. client.zip ファイルを解凍します。例:

    unzip *.zip
Docker からファイルを抽出します。
  1. 新しいコマンドプロンプトで、Docker コンテナ ID を表示して書き留めます。例:

    docker container ls

    コマンドプロンプトでは、すべてのコンテナとその ID が返されるはずです。例:

    debc16fd6970
  2. oracle_plugin ディレクトリで、lib64/libaio.so.1lib64/libaio.so.1.0.0lib64/libaio.so.1.0.1 ファイルをローカル instantclient_18_5 フォルダに抽出してください。例:

    docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/

カスタムプラグイン

Apache Airflow は、起動時にプラグインフォルダにある Python ファイルの内容を実行します。これは環境変数の設定と変更に使用されます。次のステップでは、カスタムプラグインのサンプルコードを説明します。

  • 以下のコードサンプルの内容をコピーし、ローカルに env_var_plugin_oracle.py として保存します。

    from airflow.plugins_manager import AirflowPlugin import os os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5' os.environ["DPI_DEBUG_LEVEL"]="64" class EnvVarPlugin(AirflowPlugin): name = 'env_var_plugin'

Plugins.zip

以下のステップは、plugins.zip を作成する方法を示しています。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの plugins.zip ファイルにすることができます。

プラグインディレクトリの中身を Zip 圧縮する
  1. コマンドプロンプトで、oracle_plugin ディレクトリに移動します。例:

    cd oracle_plugin
  2. instantclient_18_5 ディレクトリを plugins.zip に圧縮します。例:

    zip -r ../plugins.zip ./
  3. コマンドプロンプトに次のように表示されるはずです。

    oracle_plugin$ ls client.zip instantclient_18_5
  4. client.zip ファイルを削除します。例:

    rm client.zip
env_var_plugin_oracle.py ファイルを圧縮します。
  1. plugins.zip のルートに env_var_plugin_oracle.py ファイルを追加します。例:

    zip plugins.zip env_var_plugin_oracle.py
  2. これで、plugins.zip に次のものが含まれている必要があります。

    env_var_plugin_oracle.py instantclient_18_5/

Airflow 設定オプション

Apache Airflow v2 を使用している場合、core.lazy_load_plugins : False を Apache Airflow の構成オプションとして追加してください。詳細については、「2 の設定オプションによるプラグインの読み込み」を参照してください。

次のステップ

  • この例の requirements.txt ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、Python 依存関係のインストール をご覧ください。

  • この例の DAG コードを Amazon S3 バケットの dags フォルダにアップロードする方法については、DAG の追加と更新 を参照してください。

  • この例の plugins.zip ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、カスタムプラグインのインストール をご覧ください。