Amazon MWAA でのスタートアップスクリプトの使用 - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon MWAA でのスタートアップスクリプトの使用

スタートアップスクリプトは、DAG、要件、プラグインと同様に、環境の Amazon S3 バケットでホストするシェル (.sh) スクリプトです。Amazon MWAA は、要件のインストールと Apache Airflow プロセスの初期化の前に、このスクリプトをすべての個々の Apache Airflow コンポーネント (ワーカー、スケジューラ、ウェブサーバー) で起動中に実行します。起動スクリプトを使用して次のことができます。

  • ランタイムのインストール — ワークフローと接続に必要な Linux ランタイムをインストールします。

  • 環境変数の設定 — Apache Airflow コンポーネントごとに環境変数を設定します。PATHPYTHONPATHLD_LIBRARY_PATH などの一般的な変数を上書きします。

  • キーとトークンの管理 – カスタムリポジトリへのアクセストークンを requirements.txt に渡し、セキュリティキーを構成します。

以下のトピックでは、Linux ランタイムをインストールし、環境変数を設定し、CloudWatch Logs を使用して関連する問題をトラブルシューティングするための起動スクリプトを設定する方法について説明します。

スタートアップスクリプトを設定します。

既存の Amazon MWAA 環境で起動スクリプトを使用するには、環境の Amazon S3 バケットに .sh ファイルをアップロードします。次に、スクリプトを環境に関連付けるには、環境の詳細で以下を指定します。

  • スクリプトへの Amazon S3 URL パス — バケットでホストされているスクリプトへの相対パス。例: s3://mwaa-environment/startup.sh

  • スクリプトの Amazon S3 バージョン ID — Amazon S3 バケット内の起動シェルスクリプトのバージョン。スクリプトを更新するたびに、Amazon S3 がファイルに割り当てるバージョン ID を指定する必要があります。バージョン ID は、ユニコード、UTF-8 エンコード、URL レディ、不透明文字列で、長さは 1,024 バイト以下で、例えば 3sL4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo です。

このセクションの手順を完了するには、次のサンプルスクリプトを使用します。このスクリプトは、MWAA_AIRFLOW_COMPONENT に割り当てられた値を出力します。この環境変数は、スクリプトが実行される各 Apache Airflow コンポーネントを識別します。

コードをコピーし、startup.sh としてローカルに保存します。

#!/bin/sh ​ echo "Printing Apache Airflow component" echo $MWAA_AIRFLOW_COMPONENT

次に、スクリプトを Amazon S3 バケットにアップロードします。

AWS Management Console
シェルスクリプトをアップロードするには (コンソール)
  1. AWS Management Console にサインインし、Amazon S3 コンソール (https://console.aws.amazon.com/s3/) を開きます。

  2. [バケット] リストで、環境に関連付けられているバケットの名前を選択します。

  3. [オブジェクト] タブで、[アップロード] を選択します。

  4. [アップロード] ページで、作成したシェルスクリプトをドラッグアンドドロップします。

  5. [アップロード] を選択します。

スクリプトは [オブジェクト] のリストに表示されます。Amazon S3 によって、ファイルの新しいバージョン ID が作成されます。スクリプトを更新し、同じファイル名を使用して再度アップロードすると、新しいバージョン ID がファイルに割り当てられます。

AWS CLI
シェルスクリプト (CLI) を作成してアップロードするには
  1. 新しいコマンドプロンプトを開き、Amazon S3 ls コマンドを実行して、環境に関連付けられているバケットを一覧表示して識別します。

    $ aws s3 ls
  2. シェルスクリプトを保存したフォルダに移動します。新しいプロンプトウィンドウで cp を使用して、スクリプトをバケットにアップロードします。your-s3-bucket を情報に置き換えてください。

    $ aws s3 cp startup.sh s3://your-s3-bucket/startup.sh

    成功すると、Amazon S3 はオブジェクトへの URL パスを出力します。

    upload: ./startup.sh to s3://your-s3-bucket/startup.sh
  3. 次のコマンドを使って、スクリプトの最新バージョン ID を取得します。

    $ aws s3api list-object-versions --bucket your-s3-bucket --prefix startup --query 'Versions[?IsLatest].[VersionId]' --output text
    BbdVMmBRjtestta1EsVnbybZp1Wqh1J4

このバージョン ID は、スクリプトを環境に関連付けるときに指定します。

次に、スクリプトを環境に関連付けます。

AWS Management Console
スクリプトを環境に関連付けるには (コンソール)
  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 更新する環境の行を選択し、[編集] を選択します。

  3. [詳細の指定] ページの [スタートアップスクリプトファイル — オプション] に、スクリプトの Amazon S3 URL を入力します 例: s3://your-mwaa-bucket/startup-sh.

  4. ドロップダウンリストから最新バージョンを選択するか、[ブラウザ S3] をクリックしてスクリプトを検索します。

  5. [次] を選択して、[レビュー] ページを開きます。

  6. 変更を確認し、[保存] を選択します。

環境の更新には 10 分から 30 分かかることがあります。Amazon MWAA は、環境内の各コンポーネントの再起動時に起動スクリプトを実行します。

AWS CLI
スクリプトを環境 (CLI) に関連付けるには
  • コマンドプロンプトを開き、update-environment を使用して、スクリプトの Amazon S3 URL とバージョン ID を指定します。

    $ aws mwaa update-environment \ --name your-mwaa-environment \ --startup-script-s3-path startup.sh \ --startup-script-s3-object-version BbdVMmBRjtestta1EsVnbybZp1Wqh1J4

    成功すると、Amazon MWAA から環境の Amazon リソースネーム (ARN) が返されます。

    arn:aws::airflow:us-west-2:123456789012:environment/your-mwaa-environment 

環境の更新には 10 分から 30 分かかることがあります。Amazon MWAA は、環境内の各コンポーネントの再起動時に起動スクリプトを実行します。

最後に、ログイベントを取得して、スクリプトが期待どおりに動作していることを確認します。各 Apache Airflow コンポーネントのロギングを有効にすると、Amazon MWAA は新しいロググループとログストリームを作成します。詳細については、Apache Airflow ログタイプを参照してください。

AWS Management Console
Apache Airflow ログストリームを確認するには (コンソール)
  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 環境を選択します。

  3. [モニタリング] ペインで、ログを表示したいロググループ ([Airflow スケジューラロググループ] など) を選択します。

  4. CloudWatch コンソールで、[ログストリーム] リストから以下のプレフィックスを持つストリームを選択します: startup_script_exection_ip.

  5. [ログイベント] ペインには、MWAA_AIRFLOW_COMPONENT の値を表示するコマンドの出力が表示されます。たとえば、スケジューラーログの場合、次のようになります。

    Printing Apache Airflow component
    scheduler
    Finished running startup script. Execution time: 0.004s.
    Running verification
    Verification completed

前の手順を繰り返して、ワーカーとウェブサーバーのログを表示できます。

スタートアップスクリプトを使用して Linux ランタイムをインストールします。

起動スクリプトを使用して Apache Airflow コンポーネントのオペレーティングシステムを更新し、ワークフローで使用する追加のランタイムライブラリをインストールします。例えば、次のスクリプトは yum update を実行してオペレーティングシステムを更新します。

スタートアップスクリプトで yum update を実行する際は、例に示すように --exclude=python* を使用して Python を除外する必要があります。環境を実行するために、Amazon MWAA は環境と互換性のある特定のバージョンの Python をインストールします。そのため、起動スクリプトを使用して環境の Python バージョンを更新することはできません。

#!/bin/sh echo "Updating operating system" sudo yum update -y --exclude=python*

特定の Apache Airflow コンポーネントにランタイムをインストールするには、MWAA_AIRFLOW_COMPONENTiffi の条件文を使用します。この例では、libaio ライブラリをスケジューラとワーカーにインストールする単一のコマンドを実行しますが、ウェブサーバーにはインストールしません。

重要
  • プライベートウェブサーバーを設定した場合は、インストールタイムアウトを回避するために、次の条件を使用するか、すべてのインストールファイルをローカルで提供する必要があります。

  • 管理特権が必要な操作を実行するには、sudo を使用します。

#!/bin/sh if [[ "${MWAA_AIRFLOW_COMPONENT}" != "webserver" ]] then sudo yum -y install libaio fi

起動スクリプトを使用して Python のバージョンを確認できます。

#!/bin/sh export PYTHON_VERSION_CHECK=`python -c 'import sys; version=sys.version_info[:3]; print("{0}.{1}.{2}".format(*version))'` echo "Python version is $PYTHON_VERSION_CHECK"

Amazon MWAA はデフォルトの Python バージョンのオーバーライドをサポートしていません。これは、インストールされている Apache Airflow ライブラリとの互換性がなくなる可能性があるためです。

起動スクリプトを使用して環境変数を設定する

起動スクリプトを使用して環境変数を設定し、Apache Airflow 設定を変更します。以下は、新しい変数 ENVIRONMENT_STAGE を定義しています。この変数は DAG またはカスタムモジュールで参照できます。

#!/bin/sh export ENVIRONMENT_STAGE="development" echo "$ENVIRONMENT_STAGE"

起動スクリプトを使用して、一般的な Apache Airflow またはシステム変数を上書きします。例えば、LD_LIBRARY_PATH を設定して Python に対してバイナリを指定したパスで探すように指示します。これにより、プラグインを使用してワークフローにカスタムバイナリを提供できます。

#!/bin/sh export LD_LIBRARY_PATH=/usr/local/airflow/plugins/your-custom-binary

予約済み環境変数

Amazon MWAA は一連の重要な環境変数を予約します。予約された変数を上書きすると、Amazon MWAA はその変数をデフォルトに戻します。リザーブド変数は以下のとおりです。

  • MWAA__AIRFLOW__COMPONENT — 次のいずれかの値で Apache Airflow コンポーネントを識別するために使用されます: schedulerworker、または webserver

  • AIRFLOW__WEBSERVER__SECRET_KEY — Apache Airflow ウェブサーバのセッションクッキーに安全に署名するために使用されるシークレットキー。

  • AIRFLOW__CORE__FERNET_KEY — メタデータデータベースに保存されている機密データ(接続パスワードなど)の暗号化と復号化に使用されるキー。

  • AIRFLOW_HOME — 設定ファイルと DAG ファイルがローカルに保存されている Apache Airflow ホームディレクトリへのパス。

  • AIRFLOW__CELERY__BROKER_URL — Apache Airflow スケジューラーと Celery ワーカーノード間の通信に使用されるメッセージブローカーの URL。

  • AIRFLOW__CELERY__RESULT_BACKEND — Celery タスクの結果を保存するために使用されるデータベースの URL。

  • AIRFLOW__CORE__EXECUTOR — Apache Airflow が使用する必要のあるエグゼキュータークラス。Amazon MWAA では、これは CeleryExecutor です

  • AIRFLOW__CORE__LOAD_EXAMPLES — サンプル DAG のロードを有効化または無効化するために使用されます。

  • AIRFLOW__METRICS__METRICS_BLOCK_LIST — Amazon MWAA がどのような Apache Airflow メトリクスを生成し、CloudWatch でキャプチャするかを管理するために使用されます。

  • SQL_ALCHEMY_CONN — Amazon MWAA に Apache Airflow メタデータを保存するために使用される RDS for PostgreSQL データベースの接続文字列。

  • AIRFLOW__CORE__SQL_ALCHEMY_CONNSQL_ALCHEMY_CONN と同じ目的で使用されますが、新しい Apache Airflow の命名規則に従います。

  • AIRFLOW__CELERY__DEFAULT_QUEUE — Apache Airflow の Celery タスクのデフォルトキュー。

  • AIRFLOW__OPERATORS__DEFAULT_QUEUE — 特定の Apache Airflow オペレータを使用するタスクのデフォルトキュー。

  • AIRFLOW_VERSION — Amazon MWAA 環境にインストールされている Apache Airflow バージョン。

  • AIRFLOW_CONN_AWS_DEFAULT — 他の AWS サービスと統合するために使用されるデフォルトの AWS 資格情報。

  • AWS_DEFAULT_REGION — デフォルトの資格情報と統合するために使用されるデフォルトの AWS リージョンを設定します。他の AWS サービスと統合します。

  • AWS_REGION — 定義されている場合、この環境変数は環境変数 AWS_DEFAULT_REGION およびプロファイル設定のリージョンの値を上書きします。

  • PYTHONUNBUFFERED —コンテナログに stdout および stderr のストリームを送信するために使用されます。

  • AIRFLOW__METRICS__STATSD_ALLOW_LIST — リストの要素で始まるメトリクスを送信するための、コンマで区切られたプレフィックスの許可リストを設定するために使用されます。

  • AIRFLOW__METRICS__STATSD_ONStatsD へのメトリクスの送信を有効にします。

  • AIRFLOW__METRICS__STATSD_HOSTStatSD デーモンへの接続に使用されます。

  • AIRFLOW__METRICS__STATSD_PORTStatSD デーモンへの接続に使用されます。

  • AIRFLOW__METRICS__STATSD_PREFIXStatSD デーモンへの接続に使用されます。

  • AIRFLOW__CELERY__WORKER_AUTOSCALE — 同時実行数の最大値と最小値を設定します。

  • AIRFLOW__CORE__DAG_CONCURRENCY — 1 つの DAG でスケジューラーが同時に実行できるタスクインスタンスの数を設定します。

  • AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG — DAG あたりのアクティブタスクの最大数を設定します。

  • AIRFLOW__CORE__PARALLELISM — 同時に実行できるタスクインスタンスの最大数を定義します。

  • AIRFLOW__SCHEDULER__PARSING_PROCESSES — DAG をスケジュールするためにスケジューラーが解析するプロセスの最大数を設定します。

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT — メッセージが別のワーカーに再配信されるまでに、ワーカーがタスクの承認を待つ秒数を定義します。

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__REGION — 基礎となる Celery トランスポートの AWS リージョンを設定します。

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__PREDEFINED_QUEUES — 基盤となる Celery トランスポートのキューを設定します。

  • AIRFLOW_SCHEDULER_ALLOWED_RUN_ID_PATTERN— DAG をトリガーする際の run_id パラメータの入力の妥当性を確認するために使用されます。

  • AIRFLOW__WEBSERVER__BASE_URL — Apache Airflow UI をホストするために使用されるウェブサーバーの URL。

予約されていない環境変数

起動スクリプトを使用して、予約されていない環境変数を上書きできます。以下に、これらの一般的な変数のいくつかを列挙します。

  • PATH — オペレーティングシステムが実行ファイルとスクリプトを検索するディレクトリのリストを指定します。コマンドがコマンドラインで実行されると、システムはコマンドを見つけて実行するために PATH のディレクトリを順番にチェックします。Apache Airflow でカスタムオペレータやタスクを作成する場合、外部スクリプトや実行可能ファイルに依存することがあるかもしれません。これらのファイルを含むディレクトリが PATH の変数で指定されていない場合、システムはそれらを見つけることができないため、タスクは実行に失敗します。適切なディレクトリを PATH に追加することで、Apache Airflow タスクは必要な実行可能ファイルを見つけて実行できるようになります。

  • PYTHONPATH — Python インタープリターが、インポートされたモジュールやパッケージを検索するディレクトリを決定するために使用されます。デフォルトの検索パスに追加できるディレクトリのリストです。これにより、インタプリタは標準ライブラリに含まれていない、またはシステムディレクトリにインストールされていない Python ライブラリを見つけて読み込むことができます。この変数を使用してモジュールとカスタム Python パッケージを追加し、DAG で使用します。

  • LD_LIBRARY_PATH — Linux の動的リンカーとローダーが共有ライブラリを検索してロードするために使用する環境変数。共有ライブラリを含むディレクトリのリストを指定し、デフォルトのシステムライブラリディレクトリの前に検索します。この変数を使用してカスタムバイナリを指定します。

  • CLASSPATH — Java ランタイム環境 (JRE) と Java 開発キット (JDK) によって使用され、実行時に Java クラス、ライブラリ、リソースを検索してロードします。コンパイルされた Java コードを含むディレクトリ、JAR ファイル、および ZIP アーカイブのリストです。