DAG の追加と更新 - Amazon Managed Workflows for Apache Airflow

DAG の追加と更新

有向非巡回グラフ (DAG) は、DAG の構造をコードとして定義する Python ファイル内で定義されます。AWS CLI および Amazon S3 コンソールを使用して、DAG をご利用の環境にアップロードできます。このページでは、Amazon S3 バケット内の dags フォルダを使用して、Amazon Managed Workflows for Apache Airflow 環境に Apache Airflow DAG を追加または更新する手順について説明します。

前提条件

このページのステップを完了するには、以下のものが必要です。

  • 権限 — AWS アカウントには、管理者から、ご使用の環境の AmazonMWAAFullConsoleAccess アクセスコントロールポリシーへのアクセス権限が付与されている必要があります。さらに、Amazon MWAA 環境には、その環境で使用される AWS のリソースへのアクセスを実行ロールで許可されている必要があります。

  • アクセス — 依存関係をウェブサーバーに直接インストールするためにパブリックリポジトリにアクセスする必要がある場合は、パブリックネットワークのウェブサーバーアクセスが環境に設定されている必要があります。詳細については、「Apache Airflow のアクセスモード」を参照してください。

  • Amazon S3 設定plugins.zip で DAG、カスタムプラグイン、および requirements.txt で Python の依存関係を保存するために使用される Amazon S3 バケットは、Public Access Blocked と Versioning Enabledで構成する必要があります。

仕組み

有向非巡回グラフ (DAG) は、DAG の構造をコードとして定義する単一の Python ファイル内で定義されます。その構成は以下の通りである:

Amazon MWAA 環境で Apache Airflow プラットフォームを実行するには、DAG 定義をストレージバケット内の dags フォルダにコピーする必要があります。たとえば、ストレージバケット内の DAG フォルダは以下のようになっている場合があります。

例 DAG フォルダー
dags/ └ dag_def.py

Amazon MWAA は、Amazon S3 バケットの新規オブジェクトと変更されたオブジェクトを 30 秒ごとに Amazon MWAA スケジューラとワーカーコンテナの /usr/local/airflow/dags フォルダに自動的に同期し、ファイルタイプに関係なく Amazon S3 ソースのファイル階層を維持します。新しい DAG が Apache Airflow UI に表示されるまでの時間は、scheduler.dag_dir_list_interval によって制御されます。既存の DAG への変更は、次の DAG 処理ループ で取り込まれます。

注記

DAG フォルダーには airflow.cfg 設定ファイルを含める必要はありません。Amazon MWAA コンソールからデフォルトの Apache Airflow 設定を上書きできます。詳細については、「Amazon MWAA での Apache Airflow 構成オプションの使用」を参照してください。

v2 での変更点

  • 新規:オペレータ、フック、エグゼキューター。Apache Airflow v1と Apache Airflow v2 の間で、Amazon MWAA の plugins.zip に指定する DAG 内のインポート文やカスタムプラグインが変更されています。例えば、Apache Airflow v1 の from airflow.contrib.hooks.aws_hook import AwsHook は、Apache Airflow v2 では from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook に変更されています。詳細については、Apache Airflow リファレンスガイドの「Python API リファレンス」を参照してください。

Amazon MWAA CLI ユーティリティを使用した DAG のテスト

  • コマンドラインインターフェイス (CLI) ユーティリティは、Amazon Managed Workflows for Apache Airflow 環境をローカルに複製します。

  • CLI は、Amazon MWAA のプロダクションイメージに似た Docker コンテナイメージをローカルでビルドします。これにより、Amazon MWAA にデプロイする前に、ローカルの Apache Airflow 環境を実行して DAG、カスタムプラグイン、依存関係を開発およびテストできます。

  • CLI を実行するには、GitHub の「aws-mwaa-local-runner」を参照してください。

Amazon S3 への DAG コードのアップロード

Amazon S3 コンソールまたは AWS Command Line Interface (AWS CLI) を使用して、Amazon S3 バケットに DAG コードをアップロードできます。以下の手順では、コード(.py)をAmazon S3バケット内の dags という名前のフォルダーにアップロードすることを前提としています。

AWS CLI の使用

AWS Command Line Interface (AWS CLI) は、コマンドラインシェルでコマンドを使用して AWS サービスとやり取りするためのオープンソースツールです。このページのステップを完了するには、以下のものが必要です。

AWS CLI を使用してアップロードするには
  1. 以下のコマンドを使って、Amazon S3 バケットをすべてリストアップします

    aws s3 ls
  2. 以下のコマンドを使用して、ご使用の環境の Amazon S3 バケット内のファイルとフォルダを一覧表示します。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  3. 以下のコマンドは、dag_def.py ファイルを dags フォルダにアップロードします。

    aws s3 cp dag_def.py s3://YOUR_S3_BUCKET_NAME/dags/

    Amazon S3 バケットに dags という名前のフォルダがまだ存在しない場合、このコマンドは dags フォルダを作成し、新しいフォルダに名前が dag_def.py のファイルをアップロードします。

Amazon S3 コンソールの使用

Amazon S3 コンソールは、Amazon S3 バケット内のリソースを作成および管理できるウェブベースのユーザーインターフェイスです。以下の手順では、DAGs フォルダーが dags という名前であると仮定しています。

Amazon S3 コンソールを使ってアップロードするには
  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 環境を選択します。

  3. Amazon S3コンソールの [DAG コード in S3] ペインでDAG コード内のS3バケット リンクを選択して、ストレージバケットを開きます。

  4. dags フォルダを選択します。

  5. [アップロード] を選択します。

  6. [ファイルの追加] を選択します。

  7. dag_def.py のローカルコピーを選択し、[アップロード] を選択します。

Amazon MWAA コンソールで DAG フォルダへのパスを指定する (初回)

以下の手順では、dags という名前の Amazon S3 バケット上のフォルダへのパスを指定していると仮定します。

  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. DAG を実行する環境を選択します。

  3. [編集] を選択します。

  4. [Amazon S3 ペインの DAG コード] で、[DAG フォルダ] のフィールドの横にある [S3 を参照] を選択します。

  5. dags フォルダを選択します。

  6. [選択] を選択します。

  7. [次へ][環境の更新] を選択します。

Apache Airflow UI での変更の表示

Apache Airflow へのログイン

Apache Airflow UI を表示するためには、AWS Identity and Access Management (IAM) で AWS アカウントに対して Apache Airflow UI アクセスポリシー: AmazonMWAAWebServerAccess のアクセス許可が必要です。

Apache Airflow UI にアクセスするには
  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 環境を選択します。

  3. [Airflow UI を開く] を選択します。

次のステップ

  • GitHub の aws-mwaa-local-runner を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。