

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Managed Workflows for Apache Airflow
<a name="sample-code"></a>

このガイドには、Amazon Managed Workflows for Apache Airflow 環境で使用できる DAG やカスタムプラグインなどのコードサンプルが含まれています。 AWS サービスで Apache Airflow を使用するその他の例については、Apache Airflow GitHub リポジトリの [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags) ディレクトリを参照してください。

**Topics**
+ [DAG を使用して CLI に変数をインポートする](samples-variables-import.md)
+ [`SSHOperator` を使用して SSH 接続の作成](samples-ssh.md)
+ [Apache Airflow Snowflake 接続において、AWS Secrets Manager でのシークレットキーを使用する](samples-sm-snowflake.md)
+ [DAG を使用して CloudWatch にカスタムメトリクスを書き込む](samples-custom-metrics.md)
+ [Amazon MWAA 環境での Aurora PostgreSQL データベースのクリーンアップ](samples-database-cleanup.md)
+ [Amazon S3 の CSV ファイルへの環境メタデータのエクスポート](samples-dag-run-info-to-csv.md)
+ [Apache Airflow 変数の AWS Secrets Manager におけるシークレットキーの使用](samples-secrets-manager-var.md)
+ [AWS Secrets Manager の Apache Airflow 接続でのシークレットキーの使用](samples-secrets-manager.md)
+ [Oracle でのカスタムプラグインの作成](samples-oracle.md)
+ [Amazon MWAA での DAG のタイムゾーンの変更](samples-plugins-timezone.md)
+ [CodeArtifact トークンのリフレッシュ](samples-code-artifact.md)
+ [Apache Hive と Hadoop を使ったカスタムプラグインの作成](samples-hive.md)
+ [Apache Airflow Python VirtualEnv オペレータ用のカスタムプラグインを作成する](samples-virtualenv.md)
+ [Lambda 関数を使用して DAG を呼び出す](samples-lambda.md)
+ [さまざまな Amazon MWAA 環境での DAG の呼び出し](samples-invoke-dag.md)
+ [Amazon RDS for Microsoft SQL Server で Amazon MWAA を使用する](samples-sql-server.md)
+ [Amazon EKS での Amazon MWAA の使用](mwaa-eks-example.md)
+ [`ECSOperator` を使用して Amazon ECS に接続します。](samples-ecs-operator.md)
+ [Amazon MWAA での dbt の使用](samples-dbt.md)
+ [AWS ブログとチュートリアル](#samples-blogs-tutorials)

# DAG を使用して CLI に変数をインポートする
<a name="samples-variables-import"></a>

次のサンプルコードでは、Amazon Managed Workflows for Apache Airflow の CLI を使用して変数をインポートしています。

**Topics**
+ [バージョン](#samples-variables-import-version)
+ [前提条件](#samples-variables-import-prereqs)
+ [アクセス許可](#samples-variables-import-permissions)
+ [依存関係](#samples-variables-import-dependencies)
+ [コードサンプル](#samples-variables-import-code)
+ [次のステップ](#samples-variables-import-next-up)

## バージョン
<a name="samples-variables-import-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-variables-import-prereqs"></a>

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

## アクセス許可
<a name="samples-variables-import-permissions"></a>

AWS アカウント には `AmazonMWAAAirflowCliAccess` ポリシーへのアクセス権が必要です。詳細については、[Apache Airflow CLI ポリシー: AmazonMWAAAirflowCliAccess](access-policies.md) を参照してください。

## 依存関係
<a name="samples-variables-import-dependencies"></a>

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して Apache Airflow をインストールします。

## コードサンプル
<a name="samples-variables-import-code"></a>

以下のサンプルコードは、3 つの入力を受け取ります： Amazon MWAA 環境の名前 (`mwaa_env` で)、環境の AWS リージョン (`aws_region` で)、インポートしたい変数が含まれているローカルファイル (`var_file` で)。

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## 次のステップ
<a name="samples-variables-import-next-up"></a>
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。

# `SSHOperator` を使用して SSH 接続の作成
<a name="samples-ssh"></a>

次の例では、Amazon Managed Workflows for Apache Airflow 環境からリモートの Amazon EC2 インスタンスに接続するために `SSHOperator` をどのように使用できるかを示しています。同様の方法で、SSH アクセスを持つ任意のリモートインスタンスに接続できます。

次の例では、SSH シークレットキー (`.pem`) を Amazon S3 の環境の `dags` ディレクトリにアップロードします。次に、`requirements.txt` を使用して必要な依存関係をインストールし、UI で新しい Apache Airflow 接続を作成します。最後に、リモートインスタンスへの SSH 接続を作成する DAG を作成します。

**Topics**
+ [バージョン](#samples-ssh-version)
+ [前提条件](#samples-ssh-prereqs)
+ [アクセス許可](#samples-ssh-permissions)
+ [要件](#samples-ssh-dependencies)
+ [シークレットキーを Amazon S3 にコピーする](#samples-ssh-secret)
+ [新しい Apache Airflow 接続の作成](#samples-ssh-connection)
+ [コードサンプル](#samples-ssh-code)

## バージョン
<a name="samples-ssh-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-ssh-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。
+ SSH シークレットキー。このコードサンプルは、Amazon MWAA 環境と同じリージョンに Amazon EC2 インスタンスと `.pem` があることを前提としています。キーがない場合は、*Amazon EC2 ユーザーガイド* の [キーペアの作成またはインポート](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair) を参照してください。

## アクセス許可
<a name="samples-ssh-permissions"></a>

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

## 要件
<a name="samples-ssh-dependencies"></a>

次のパラメータを `requirements.txt` に追加して、ウェブサーバーに `apache-airflow-providers-ssh` パッケージをインストールします。環境が更新され、Amazon MWAA が依存関係を正常にインストールすると、UI に新しい **SSH** 接続タイプが表示されます。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**注記**  
`-c` は `requirements.txt` 内での制約 URL を定義します。これにより、Amazon MAA はお客様の環境に合った正しいパッケージバージョンをインストールできます。

## シークレットキーを Amazon S3 にコピーする
<a name="samples-ssh-secret"></a>

次の AWS Command Line Interface コマンドを使用して、`.pem` キーを Amazon S3 の環境の `dags` ディレクトリにコピーします。

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA は、`.pem` キーを含む `dags` のコンテンツをローカルの `/usr/local/airflow/dags/` ディレクトリにコピーすることで、Apache Airflow はキーにアクセスできます。

## 新しい Apache Airflow 接続の作成
<a name="samples-ssh-connection"></a>

**Apache Airflow UI を使用して新しい SSH 接続を作成するには**

1. Amazon MWAA コンソールで、[環境ページ](https://console.aws.amazon.com/mwaa/home#/environments) を開きます。

1. 環境のリストで、ご使用の環境に合った **Open Airflow UI** を選択します。

1. Apache Airflow UI ページで、メインのナビゲーションバーから **管理** を選択してドロップダウンリストを展開し、**接続** を選択します。

1. **接続リスト** ページで **\$1** を選択するか、**新規レコードを追加** ボタンをクリックして新しい接続を追加します。

1. **接続の追加** ページで、以下の情報を追加します。

   1. **接続 ID** には **ssh\$1new** と入力します。

   1. **接続タイプ** では、ドロップダウンリストから **SSH** を選択します。
**注記**  
**SSH** 接続タイプがリストに表示されない場合、Amazon MWAA は必要な `apache-airflow-providers-ssh` パッケージをインストールしていない可能性があります。このパッケージを含むように `requirements.txt` ファイルを更新してから、もう一度試してください。

   1. **ホスト** には、接続する Amazon EC2 インスタンスの IP アドレスを入力します。例えば、**12.345.67.89** です。

   1. **ユーザー名** に、Amazon EC2 インスタンスに接続する場合は **ec2-user** を入力します。Apache Airflow に接続させたいリモートインスタンスのタイプによって、ユーザー名は異なる場合があります。

   1. **抽出** には、以下のキーと値のペアを JSON 形式で入力します。

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      このキーと値のペアは、Apache Airflow に対してシークレットキーをローカルの `/dags` ディレクトリから検索するように指示します。

## コードサンプル
<a name="samples-ssh-code"></a>

次の DAG は `SSHOperator` を使用してターゲットの Amazon EC2 インスタンスに接続し、その後 `hostname` Linux コマンドを実行してインスタンスの名前を表示します。DAG を変更して、リモートインスタンスで任意のコマンドまたはスクリプトを実行できます。

1. ターミナルを開き、DAG コードが保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `ssh.py` として保存します。

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 成功した場合、`ssh_operator_example` DAG の `ssh_task` のタスクログで次のような出力が表示されます。

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# Apache Airflow Snowflake 接続において、AWS Secrets Manager でのシークレットキーを使用する
<a name="samples-sm-snowflake"></a>

以下は、Amazon Managed Workflows for Apache AirflowでApache Airflow Snowflake 接続のシークレットキーを取得するために AWS Secrets Manager を呼び出すサンプルです。[AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) でのステップを完了していることが前提となります。

**Topics**
+ [バージョン](#samples-sm-snowflake-version)
+ [前提条件](#samples-sm-snowflake-prereqs)
+ [アクセス許可](#samples-sm-snowflake-permissions)
+ [要件](#samples-sm-snowflake-dependencies)
+ [コードサンプル](#samples-sm-snowflake-code)
+ [次のステップ](#samples-sm-snowflake-next-up)

## バージョン
<a name="samples-sm-snowflake-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-sm-snowflake-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ Secrets Manager バックエンドを Apache Airflow 設定オプションとして使用する方法は、[AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) に記載されています。
+ [AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) に記載されている、Secrets Manager のApache Airflow 接続文字列。

## アクセス許可
<a name="samples-sm-snowflake-permissions"></a>
+ [AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) にリストされている Secrets Manager のアクセス許可。

## 要件
<a name="samples-sm-snowflake-dependencies"></a>

このページのサンプルコードを使用するには、次の依存関係を `requirements.txt` に追加してください。詳細については、[Python 依存関係のインストール](working-dags-dependencies.md) を参照してください。

```
apache-airflow-providers-snowflake==1.3.0
```

## コードサンプル
<a name="samples-sm-snowflake-code"></a>

以下の手順では、Secrets Manager を呼び出してシークレットを取得する DAG コードを作成する方法について説明します。

1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `snowflake_connection.py` として保存します。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## 次のステップ
<a name="samples-sm-snowflake-next-up"></a>
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。

# DAG を使用して CloudWatch にカスタムメトリクスを書き込む
<a name="samples-custom-metrics"></a>

次のコード例を使用して、`PythonOperator` を実行して Amazon MWAA 環境の OS レベルのメトリックスを取得する有向非巡回グラフ (DAG) を作成できます。その後、DAG はカスタムメトリクスとしてデータを Amazon CloudWatch に発行します。

OS レベルのカスタムメトリクスにより、環境ワーカーが仮想メモリや CPU などのリソースをどのように利用しているかをさらに把握できます。この情報を使用して、ワークロードに最適な [環境クラス](environment-class.md) を選択できます。

**Topics**
+ [バージョン](#samples-custom-metrics-version)
+ [前提条件](#samples-custom-metrics-prereqs)
+ [アクセス許可](#samples-custom-metrics-permissions)
+ [依存関係](#samples-custom-metrics-dependencies)
+ [コード例](#samples-custom-metrics-code)

## バージョン
<a name="samples-custom-metrics-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-custom-metrics-prereqs"></a>

このページのコード例を使用するには、以下のものが必要である：
+ [Amazon MWAA 環境](get-started.md)。

## アクセス許可
<a name="samples-custom-metrics-permissions"></a>

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

## 依存関係
<a name="samples-custom-metrics-dependencies"></a>
+ このページのコード例を使用するのに、追加の依存関係は必要ありません。

## コード例
<a name="samples-custom-metrics-code"></a>

1. コマンドプロンプトで、DAG コードが保存されているフォルダーに移動します。例:

   ```
   cd dags
   ```

1. 次のコード例の内容をコピーし、`dag-custom-metrics.py` としてローカルに保存します。`MWAA-ENV-NAME` をご使用の環境名に置き換えてください。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. DAG が正常に実行されると、Apache Airflow ログに次のような内容が表示されます。

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Amazon MWAA 環境での Aurora PostgreSQL データベースのクリーンアップ
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow は、Apache Airflow メタデータデータベースとして Aurora PostgreSQL データベースを使用します。このメタデータデータベースには DAG が実行され、タスクインスタンスが保存されます。次のサンプルコードは、Amazon MWAA 環境の専用の Aurora PostgreSQL データベースから定期的にエントリを消去します。

**Topics**
+ [バージョン](#samples-database-cleanup-version)
+ [前提条件](#samples-database-cleanup-prereqs)
+ [依存関係](#samples-sql-server-dependencies)
+ [コードサンプル](#samples-database-cleanup-code)

## バージョン
<a name="samples-database-cleanup-version"></a>

このページのコードサンプルは、Amazon MWAA でサポートされている Apache Airflow v2 に固有のものです。[サポートされている Apache Airflow バージョン](airflow-versions.md) を参照してください。

**ヒント**  
**Apache Airflow v3 ユーザーの場合**: データベースをクリーンアップ (メタストアテーブルから古いレコードを消去) する場合は、 `db clean` CLI コマンドを実行します。

## 前提条件
<a name="samples-database-cleanup-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。

## 依存関係
<a name="samples-sql-server-dependencies"></a>

このコード例を Apache Airflow v2 で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して、Apache Airflow をインストールします。

## コードサンプル
<a name="samples-database-cleanup-code"></a>

次の DAG は、`TABLES_TO_CLEAN` で指定されたテーブルのメタデータデータベースをクリーンアップします。この例では、30 日以上経過した指定されたテーブルからデータを削除します。エントリが削除される距離を調整するには、`MAX_AGE_IN_DAYS` を別の値に設定します。

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# Amazon S3 の CSV ファイルへの環境メタデータのエクスポート
<a name="samples-dag-run-info-to-csv"></a>

次のコード例は、データベースに対して一連の DAG 実行情報を照会し、Amazon S3 に保存されている `.csv` ファイルにデータを書き込む有向非循環グラフ (DAG) を作成する方法を示しています。

お使いの環境の Aurora PostgreSQL データベースから情報をエクスポートして、データをローカルで検査したり、オブジェクトストレージにアーカイブしたり、[Amazon S3 to Amazon Redshift オペレータ](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) や [データベースクリーンアップ](samples-database-cleanup.md) などのツールと組み合わせたりして Amazon MWAA メタデータを環境外に移動し、future 分析のために保存しておきたい場合があります。

[Apache Airflow のモデル](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models) にリストされているオブジェクトのいずれかに対してデータベースをクエリできます。このコードサンプルでは、3つのモデル `DagRun`、`TaskFail`、および `TaskInstance` を使用しており、DAG実行に関連する情報を提供します。

**Topics**
+ [バージョン](#samples-dag-run-info-to-csv-version)
+ [前提条件](#samples-dag-run-info-to-csv-prereqs)
+ [アクセス許可](#samples-dag-run-info-to-csv-permissions)
+ [要件](#samples-dag-run-info-to-csv-dependencies)
+ [コードサンプル](#samples-dag-run-info-to-csv-code)

## バージョン
<a name="samples-dag-run-info-to-csv-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-dag-run-info-to-csv-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ ‭[Amazon MWAA 環境](get-started.md)。
+ メタデータ情報をエクスポートする[新しい Amazon S3 バケット](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)。

## アクセス許可
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA は、クエリされたメタデータ情報を Amazon S3 に書き込むためのアクション`s3:PutObject` の許可が必要です。次のポリシーステートメントを、環境の実行ロールに追加します。

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

このポリシーは、書き込みアクセスを *amzn-s3-demo-bucket* のみに制限します。

## 要件
<a name="samples-dag-run-info-to-csv-dependencies"></a>

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して Apache Airflow をインストールします。

## コードサンプル
<a name="samples-dag-run-info-to-csv-code"></a>

次のステップでは、Aurora PostgreSQL にクエリを実行し、その結果を新しい Amazon S3 バケットに書き込む DAG を作成する方法について説明します。

1. ターミナルで、DAG コードが保存されているディレクトリに移動します。例えば、次のようになります。

   ```
   cd dags
   ```

1. 次のコード例の内容をコピーし、`metadata_to_csv.py` としてローカルに保存します。DAG がメタデータデータベースからクエリする最古のレコードの年齢を制御するために、`MAX_AGE_IN_DAYS` に割り当てられた値を変更することができます。

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  次の AWS CLI コマンドを実行して DAG を環境のバケットにコピーし、Apache Airflow UI を使用して DAG をトリガーします。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 成功した場合は、`export_db`タスクのタスクログに以下のような出力が表示されます。

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   今、`/files/export/` の新しい Amazon S3 バケット内の`.csv` ファイルにアクセスしてダウンロードできます。

# Apache Airflow 変数の AWS Secrets Manager におけるシークレットキーの使用
<a name="samples-secrets-manager-var"></a>

以下のサンプルは、Amazon Managed Workflows for Apache Airflow の Apache Airflow 変数のシークレットキーを取得するために AWS Secrets Manager を呼び出します。[AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) でのステップを完了していることが前提となります。

**Topics**
+ [バージョン](#samples-secrets-manager-var-version)
+ [前提条件](#samples-secrets-manager-var-prereqs)
+ [アクセス許可](#samples-secrets-manager-var-permissions)
+ [要件](#samples-hive-dependencies)
+ [コードサンプル](#samples-secrets-manager-var-code)
+ [次のステップ](#samples-secrets-manager-var-next-up)

## バージョン
<a name="samples-secrets-manager-var-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-secrets-manager-var-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ Secrets Manager バックエンドを Apache Airflow 設定オプションとして使用する方法は、[AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) に記載されています。
+ [AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) にリストされている、Secrets Manager の Apache Airflow 変数文字列。

## アクセス許可
<a name="samples-secrets-manager-var-permissions"></a>
+ [AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) にリストされている Secrets Manager のアクセス許可。

## 要件
<a name="samples-hive-dependencies"></a>

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して、Apache Airflow をインストールします。

## コードサンプル
<a name="samples-secrets-manager-var-code"></a>

以下の手順では、Secrets Manager を呼び出してシークレットを取得する DAG コードを作成する方法について説明します。

1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `secrets-manager-var.py` として保存します。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## 次のステップ
<a name="samples-secrets-manager-var-next-up"></a>
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。

# AWS Secrets Manager の Apache Airflow 接続でのシークレットキーの使用
<a name="samples-secrets-manager"></a>

以下のサンプルでは、Amazon Managed Workflows for Apache Airflow で Apache Airflow 接続のシークレットキーを取得するために AWS Secrets Manager を呼び出しています。[AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) でのステップを完了していることが前提となります。

**Topics**
+ [バージョン](#samples-secrets-manager-version)
+ [前提条件](#samples-secrets-manager-prereqs)
+ [アクセス許可](#samples-secrets-manager-permissions)
+ [要件](#samples-hive-dependencies)
+ [コードサンプル](#samples-secrets-manager-code)
+ [次のステップ](#samples-secrets-manager-next-up)

## バージョン
<a name="samples-secrets-manager-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-secrets-manager-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ Secrets Manager バックエンドを Apache Airflow 設定オプションとして使用する方法は、[AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) に記載されています。
+ [AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) に記載されている、Secrets Manager のApache Airflow 接続文字列。

## アクセス許可
<a name="samples-secrets-manager-permissions"></a>
+ [AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定](connections-secrets-manager.md) にリストされている Secrets Manager のアクセス許可。

## 要件
<a name="samples-hive-dependencies"></a>

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して、Apache Airflow をインストールします。

## コードサンプル
<a name="samples-secrets-manager-code"></a>

以下の手順では、Secrets Manager を呼び出してシークレットを取得する DAG コードを作成する方法について説明します。

1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `secrets-manager.py` として保存します。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## 次のステップ
<a name="samples-secrets-manager-next-up"></a>
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。

# Oracle でのカスタムプラグインの作成
<a name="samples-oracle"></a>

次のサンプルでは、Oracle for Amazon MWAA を使用してカスタムプラグインを作成する手順を詳しく説明しており、これを plugins.zip ファイル内の他のカスタムプラグインやバイナリと組み合わせることができます。

**Contents**
+ [バージョン](#samples-oracle-version)
+ [前提条件](#samples-oracle-prereqs)
+ [アクセス許可](#samples-oracle-permissions)
+ [要件](#samples-oracle-dependencies)
+ [コードサンプル](#samples-oracle-code)
+ [カスタムプラグインを作成する](#samples-oracle-create-pluginszip-steps)
  + [依存関係のダウンロード](#samples-oracle-install)
  + [カスタムプラグイン](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Airflow 設定オプション](#samples-oracle-airflow-config)
+ [次のステップ](#samples-oracle-next-up)

## バージョン
<a name="samples-oracle-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-oracle-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。
+ ワーカーのログ記録は、環境に応じて、任意のログレベル (`CRITICAL` または前のセクション) で有効になります。Amazon MWAA ログタイプとロググループの管理方法の詳細については、[Amazon CloudWatch の Airflow ログへのアクセス](monitoring-airflow.md) を参照してください

## アクセス許可
<a name="samples-oracle-permissions"></a>

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

## 要件
<a name="samples-oracle-dependencies"></a>

このページのサンプルコードを使用するには、次の依存関係を `requirements.txt` に追加してください。詳細については、[Python 依存関係のインストール](working-dags-dependencies.md) を参照してください。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## コードサンプル
<a name="samples-oracle-code"></a>

次のステップでは、カスタムプラグインをテストする DAG コードを作成する方法について説明します。

1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `oracle.py` として保存します。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## カスタムプラグインを作成する
<a name="samples-oracle-create-pluginszip-steps"></a>

このセクションでは、依存関係のダウンロード、カスタムプラグインと plugins.zip の作成方法について説明します。

### 依存関係のダウンロード
<a name="samples-oracle-install"></a>

Amazon MWAA は plugins.zip のコンテンツを各 Amazon MWAA スケジューラーとワーカーコンテナの `/usr/local/airflow/plugins` に抽出します。これはバイナリを環境に追加するために使用されます。次のステップでは、カスタムプラグインに必要なファイルを組み立てる方法について説明します。

**Amazon Linux コンテナイメージをプルする**

1. コマンドプロンプトで Amazon Linux コンテナイメージを取得し、コンテナをローカルで実行します。例:

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   コマンドプロンプトで bash コマンドラインを呼び出すことができます。例:

   ```
   bash-4.2#
   ```

1. Linux ネイティブの非同期 I/O 機能 (libaio) をインストールします。

   ```
   yum -y install libaio
   ```

1. 以降の手順では、このウィンドウを開いたままにしておいてください。次のファイルをローカルにコピーします: `lib64/libaio.so.1`、`lib64/libaio.so.1.0.0`、`lib64/libaio.so.1.0.1`。

**クライアントフォルダをダウンロード**

1. unzip パッケージをローカルにインストールします。例:

   ```
   sudo yum install unzip
   ```

1. `oracle_plugin` ディレクトリを作成します。例:

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. 次の curl コマンドを使用して、[Linux x86-64 (64 ビット) 用 Oracle インスタントクライアントダウンロード](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html)から [instantclient-basic-linux.x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip) をダウンロードします。

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. `client.zip` ファイルを解凍します。例:

   ```
   unzip *.zip
   ```

**Docker からファイルを抽出します。**

1. 新しいコマンドプロンプトで、Docker コンテナ ID を表示して書き留めます。例:

   ```
   docker container ls
   ```

   コマンドプロンプトでは、すべてのコンテナとその ID を返すことができます。例:

   ```
   debc16fd6970
   ```

1. `oracle_plugin` ディレクトリで、`lib64/libaio.so.1`、`lib64/libaio.so.1.0.0`、`lib64/libaio.so.1.0.1` ファイルをローカル `instantclient_18_5` フォルダに抽出してください。例:

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### カスタムプラグイン
<a name="samples-oracle-plugins-code"></a>

Apache Airflow は、起動時にプラグインフォルダにある Python ファイルの内容を実行します。これは環境変数の設定と変更に使用されます。次のステップでは、カスタムプラグインのサンプルコードを説明します。
+ 以下のコードサンプルの内容をコピーし、ローカルに `env_var_plugin_oracle.py` として保存します。

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

以下のステップは、`plugins.zip` を作成する方法について説明します。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの `plugins.zip` ファイルにすることができます。

**プラグインディレクトリの中身を Zip 圧縮する**

1. コマンドプロンプトで、`oracle_plugin` ディレクトリに移動します。例:

   ```
   cd oracle_plugin
   ```

1. `instantclient_18_5` ディレクトリを plugins.zip に圧縮します。例:

   ```
   zip -r ../plugins.zip ./
   ```

   コマンドプロンプトに次のように表示されます。

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. `client.zip` ファイルを削除します。例:

   ```
   rm client.zip
   ```

**env\$1var\$1plugin\$1oracle.py ファイルを圧縮します。**

1. plugins.zip のルートに `env_var_plugin_oracle.py` ファイルを追加します。例:

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. これで、plugins.zip には次のものが含まれるようになりました。

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Airflow 設定オプション
<a name="samples-oracle-airflow-config"></a>

Apache Airflow v2 を使用している場合、`core.lazy_load_plugins : False` を Apache Airflow の設定オプションとして追加してください。詳細については、[2 の設定オプションによるプラグインの読み込み](configuring-env-variables.md#configuring-2.0-airflow-override) を参照してください。

## 次のステップ
<a name="samples-oracle-next-up"></a>
+ この例の `requirements.txt` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[Python 依存関係のインストール](working-dags-dependencies.md) をご覧ください。
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。
+ この例の `plugins.zip` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[カスタムプラグインのインストール](configuring-dag-import-plugins.md) をご覧ください。

# Amazon MWAA での DAG のタイムゾーンの変更
<a name="samples-plugins-timezone"></a>

Apache Airflow は、有向非巡回グラフ (DAG) をデフォルトで UTC\$10 でスケジュールします。次のステップは、Amazon MWAA が [Pendulum](https://pypi.org/project/pendulum/) を使用して DAG を実行するタイムゾーンを変更する方法を示しています。このトピックでは、任意で、環境の Apache Airflow ログのタイムゾーンを変更するカスタムプラグインを作成する方法を示します。

**Topics**
+ [バージョン](#samples-plugins-timezone-version)
+ [前提条件](#samples-plugins-timezone-prerequisites)
+ [アクセス許可](#samples-plugins-timezone-permissions)
+ [Airflow ログのタイムゾーンを変更するプラグインを作成する](#samples-plugins-timezone-custom-plugin)
+ [`plugins.zip` を作成する](#samples-plugins-timezone-plugins-zip)
+ [コードサンプル](#samples-plugins-timezone-dag)
+ [次のステップ](#samples-plugins-timezone-plugins-next-up)

## バージョン
<a name="samples-plugins-timezone-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-plugins-timezone-prerequisites"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。

## アクセス許可
<a name="samples-plugins-timezone-permissions"></a>

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

## Airflow ログのタイムゾーンを変更するプラグインを作成する
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow は、起動時に `plugins` ディレクトリ内の Python ファイルを実行します。次のプラグインを使用すると、エグゼキューターのタイムゾーンをオーバーライドできます。これにより、Apache Airflow がログを書き込むタイムゾーンが変更されます。

1. カスタムプラグイン用に `plugins` という名前のディレクトリを作成し、そのディレクトリに移動します。例:

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `dag-timezone-plugin.py` として `plugins` フォルダに保存します。

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. `plugins` ディレクトリに、`__init__.py` という名前の空の Python ファイルを作成します。`plugins` ディレクトリは以下のようになっているはずです。

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## `plugins.zip` を作成する
<a name="samples-plugins-timezone-plugins-zip"></a>

以下のステップは、`plugins.zip` を作成する方法について説明します。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの `plugins.zip` ファイルにすることができます。

1. コマンドプロンプトで、前の手順で作成した `plugins` ディレクトリに移動してください。例:

   ```
   cd plugins
   ```

1. `plugins` ディレクトリ内のコンテンツを圧縮します。

   ```
   zip -r ../plugins.zip ./
   ```

1. `plugins.zip` を S3 バケットにアップロードします

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## コードサンプル
<a name="samples-plugins-timezone-dag"></a>

DAG が実行されるデフォルトのタイムゾーン (UTC\$10) を変更するには、[Pendulum](https://pypi.org/project/pendulum/) というライブラリを使用します。これは、タイムゾーン対応の日時を処理するための Python ライブラリです。

1. コマンドプロンプトで、DAG が保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下の例の内容をコピーして、`tz-aware-dag.py` として保存してください。

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 成功した場合、`tz_test` DAG で `tz_aware_task` のタスクログに以下のような出力が表示されます。

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## 次のステップ
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ この例の `plugins.zip` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[カスタムプラグインのインストール](configuring-dag-import-plugins.md) をご覧ください。

# CodeArtifact トークンのリフレッシュ
<a name="samples-code-artifact"></a>

CodeArtifact を使用して Python の依存関係をインストールする場合、Amazon MWAA にはアクティブなトークンが必要です。Amazon MWAA が実行時に CodeArtifact リポジトリにアクセスできるようにするには、[起動スクリプト](using-startup-script.md) を使用して [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url) をトークンで設定できます。

次のトピックでは、[https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token) CodeArtifact API オペレーションを使用して環境が起動または更新されるたびに新しいトークンを取得する起動スクリプトを作成する方法について説明します。

**Topics**
+ [バージョン](#samples-code-artifact-version)
+ [前提条件](#samples-code-artifact-prereqs)
+ [アクセス許可](#samples-code-artifact-permissions)
+ [コードサンプル](#samples-code-artifact-code)
+ [次のステップ](#samples-code-artifact-next-up)

## バージョン
<a name="samples-code-artifact-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-code-artifact-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。
+ 環境の依存関係を保存する [CodeArtifact リポジトリ](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html)です。

## アクセス許可
<a name="samples-code-artifact-permissions"></a>

CodeArtifact トークンを更新して結果を Amazon S3 に書き込むには、Amazon MWAA が実行ロールに以下のアクセス許可を持っている必要があります。
+ `codeartifact:GetAuthorizationToken` このアクションにより、Amazon MWAA は CodeArtifact から新しいトークンを取得することができます。次のポリシーは、作成したすべての CodeArtifact ドメインにアクセス許可を付与します。ステートメントのリソース値を変更し、環境からアクセスさせたいドメインのみを指定することで、ドメインへのアクセスをさらに制限できます。

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ `sts:GetServiceBearerToken` アクションは、CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html) APIオペレーションを呼び出すために必要です。この操作は、`pip` などのパッケージ・マネージャーを CodeArtifact とともに使用するときに使用しなければならないトークンを返します。CodeArtifact リポジトリでパッケージマネージャーを使用するには、環境の実行ロールロールが、以下のポリシーステートメントにリストするように `sts:GetServiceBearerToken` を許可する必要があります。

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## コードサンプル
<a name="samples-code-artifact-code"></a>

以下の手順では、CodeArtifact トークンを更新する起動スクリプトを作成する方法について説明します。

1. 以下のコードサンプルの内容をコピーし、ローカルに `code_artifact_startup_script.sh` として保存します。

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. スクリプトを保存したフォルダに移動します。新しいプロンプトウィンドウで `cp` を使用して、スクリプトをバケットにアップロードします。*amzn-s3-demo-bucket* を情報に置き換えてください。

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   成功すると、Amazon S3 はオブジェクトへの URL パスを出力します。

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   スクリプトをアップロードすると、環境が更新され、起動時にスクリプトが実行されます。

## 次のステップ
<a name="samples-code-artifact-next-up"></a>
+ スタートアップスクリプトを使用して環境をカスタマイズする方法については、[Amazon MWAA でのスタートアップスクリプトの使用](using-startup-script.md) を参照してください。
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。
+ この例の `plugins.zip` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[カスタムプラグインのインストール](configuring-dag-import-plugins.md) をご覧ください。

# Apache Hive と Hadoop を使ったカスタムプラグインの作成
<a name="samples-hive"></a>

Amazon MWAA は `plugins.zip` から `/usr/local/airflow/plugins` にコンテンツを抽出します。これを使用して、コンテナにバイナリを追加できます。さらに、Apache Airflow は*起動*時に `plugins` フォルダ内の Python ファイルの内容を実行するため、環境変数を設定および変更できます。以下のサンプルでは、Amazon Managed Workflows for Apache Airflow 環境で Apache Hive と Hadoop を使用してカスタムプラグインを作成する手順を説明し、他のカスタムプラグインやバイナリと組み合わせることができます。

**Topics**
+ [バージョン](#samples-hive-version)
+ [前提条件](#samples-hive-prereqs)
+ [アクセス許可](#samples-hive-permissions)
+ [要件](#samples-hive-dependencies)
+ [依存関係のダウンロード](#samples-hive-install)
+ [カスタムプラグイン](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [コードサンプル](#samples-hive-code)
+ [Airflow 設定オプション](#samples-hive-airflow-config)
+ [次のステップ](#samples-hive-next-up)

## バージョン
<a name="samples-hive-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-hive-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。

## アクセス許可
<a name="samples-hive-permissions"></a>

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

## 要件
<a name="samples-hive-dependencies"></a>

このページのサンプルコードを使用するには、次の依存関係を `requirements.txt` に追加してください。詳細については、[Python 依存関係のインストール](working-dags-dependencies.md) を参照してください。

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## 依存関係のダウンロード
<a name="samples-hive-install"></a>

Amazon MWAA は plugins.zip のコンテンツを各 Amazon MWAA スケジューラーとワーカーコンテナの `/usr/local/airflow/plugins` に抽出します。これはバイナリを環境に追加するために使用されます。次のステップでは、カスタムプラグインに必要なファイルを組み立てる方法について説明します。

1. コマンドプロンプトで、プラグインを作成したいディレクトリに移動します。例:

   ```
   cd plugins
   ```

1. [ミラー](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz)から [Hadoop](https://hadoop.apache.org/) をダウンロードします。例:

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. [ミラー](https://www.apache.org/dyn/closer.cgi/hive/)から [Hive](https://hive.apache.org/) をダウンロードします。例:

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. ディレクトリを作成します。例:

   ```
   mkdir hive_plugin
   ```

1. Hadoop を抽出します。

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Hive を抽出します。

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## カスタムプラグイン
<a name="samples-hive-plugins-code"></a>

Apache Airflow は、起動時にプラグインフォルダにある Python ファイルの内容を実行します。これは環境変数の設定と変更に使用されます。次のステップでは、カスタムプラグインのサンプルコードを説明します。

1. コマンドプロンプトで、`hive_plugin` ディレクトリに移動します。例:

   ```
   cd hive_plugin
   ```

1. 次のコードサンプルの内容をコピーし、同じ `hive_plugin` ディレクトリで、`hive_plugin.py` と名前を付けてローカルに保存します。

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. 次のテキストの内容をコピーし、`hive_plugin` ディレクトリに `.airflowignore` としてローカルに保存します。

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

以下のステップは、`plugins.zip` を作成する方法について説明します。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの `plugins.zip` のファイルにすることができます。

1. コマンドプロンプトで、前のステップの `hive_plugin` ディレクトリに移動します。例:

   ```
   cd hive_plugin
   ```

1. `plugins` フォルダ内のコンテンツを圧縮します。

   ```
   zip -r ../hive_plugin.zip ./
   ```

## コードサンプル
<a name="samples-hive-code"></a>

次のステップでは、カスタムプラグインをテストする DAG コードを作成する方法について説明します。

1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `hive.py` として保存します。

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Airflow 設定オプション
<a name="samples-hive-airflow-config"></a>

Apache Airflow v2 を使用している場合、`core.lazy_load_plugins : False` を Apache Airflow の設定オプションとして追加してください。詳細については、[2 の設定オプションによるプラグインの読み込み](configuring-env-variables.md#configuring-2.0-airflow-override) を参照してください。

## 次のステップ
<a name="samples-hive-next-up"></a>
+ この例の `requirements.txt` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[Python 依存関係のインストール](working-dags-dependencies.md) をご覧ください。
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。
+ この例の `plugins.zip` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[カスタムプラグインのインストール](configuring-dag-import-plugins.md) をご覧ください。

# Apache Airflow Python VirtualEnv オペレータ用のカスタムプラグインを作成する
<a name="samples-virtualenv"></a>

次のサンプルは、Amazon Managed Workflows for Apache Airflow 上のカスタムプラグインを使用して Apache Airflow `PythonVirtualenvOperator` にパッチを適用する方法を示しています。

**Topics**
+ [バージョン](#samples-virtualenv-version)
+ [前提条件](#samples-virtualenv-prereqs)
+ [アクセス許可](#samples-virtualenv-permissions)
+ [要件](#samples-virtualenv-dependencies)
+ [カスタムプラグインのサンプルコード](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [コードサンプル](#samples-virtualenv-code)
+ [Airflow 設定オプション](#samples-virtualenv-airflow-config)
+ [次のステップ](#samples-virtualenv-next-up)

## バージョン
<a name="samples-virtualenv-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-virtualenv-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。

## アクセス許可
<a name="samples-virtualenv-permissions"></a>

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

## 要件
<a name="samples-virtualenv-dependencies"></a>

このページのサンプルコードを使用するには、次の依存関係を `requirements.txt` に追加してください。詳細については、[Python 依存関係のインストール](working-dags-dependencies.md) を参照してください。

```
virtualenv
```

## カスタムプラグインのサンプルコード
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow は、起動時にプラグインフォルダにある Python ファイルの内容を実行します。このプラグインは、そのスタートアッププロセス中の組み込み `PythonVirtualenvOperator` をパッチし、Amazon MWAA と互換性があるようにします。次のステップでは、カスタムプラグインのサンプルコードを表示します。

1. コマンドプロンプトで、前のセクションの `plugins` ディレクトリに移動します。例:

   ```
   cd plugins
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `virtual_python_plugin.py` として保存します。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

以下のステップは、`plugins.zip` を作成する方法について説明します。

1. コマンドプロンプトで、前のセクションの `virtual_python_plugin.py` が含まれるディレクトリに移動してください。例:

   ```
   cd plugins
   ```

1. `plugins` フォルダ内のコンテンツを圧縮します。

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## コードサンプル
<a name="samples-virtualenv-code"></a>

次の手順では、カスタムプラグインの DAG コードが 作成する方法について説明します。

1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `virtualenv_test.py` として保存します。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Airflow 設定オプション
<a name="samples-virtualenv-airflow-config"></a>

Apache Airflow v2 を使用している場合、`core.lazy_load_plugins : False` を Apache Airflow の設定オプションとして追加してください。詳細については、[2 の設定オプションによるプラグインの読み込み](configuring-env-variables.md#configuring-2.0-airflow-override) を参照してください。

## 次のステップ
<a name="samples-virtualenv-next-up"></a>
+ この例の `requirements.txt` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[Python 依存関係のインストール](working-dags-dependencies.md) をご覧ください。
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。
+ この例の `plugins.zip` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[カスタムプラグインのインストール](configuring-dag-import-plugins.md) をご覧ください。

# Lambda 関数を使用して DAG を呼び出す
<a name="samples-lambda"></a>

次のコード例では、[AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) 関数を使用して Apache Airflow CLI トークンを取得し、Amazon MWAA 環境で有向非巡回グラフ (DAG) を呼び出します。

**Topics**
+ [バージョン](#samples-lambda-version)
+ [前提条件](#samples-lambda-prereqs)
+ [アクセス許可](#samples-lambda-permissions)
+ [依存関係](#samples-lambda-dependencies)
+ [コード例](#samples-lambda-code)

## バージョン
<a name="samples-lambda-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-lambda-prereqs"></a>

コードサンプルを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md) には、[パブリックネットワークアクセスモード](configuring-networking.md#webserver-options-public-network-onconsole) を使用してください。
+ 最新の Python ランタイムを使用する [Lambda 関数](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html) を用意してください。

**注記**  
Lambda 関数と Amazon MWAA 環境が同じ VPC にある場合は、このコードをプライベートネットワークで使用できます。この設定では、Lambda 関数の実行ロールに、Amazon Elastic Compute Cloud (Amazon EC2) **CreateNetworkInterface** API オペレーションを呼び出すアクセス許可が必要です。このアクセス許可は、 [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS管理ポリシーを使用して指定できます。

## アクセス許可
<a name="samples-lambda-permissions"></a>

このページのコード例を使用するには、Amazon MWAA 環境の実行ロールが `airflow:CreateCliToken` アクションを実行するためのアクセス権が必要です。このアクセス許可は、 `AmazonMWAAAirflowCliAccess` AWS管理ポリシーを使用して付与できます。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

詳細については、[Apache Airflow CLI ポリシー: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access) を参照してください。

## 依存関係
<a name="samples-lambda-dependencies"></a>

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して、Apache Airflow をインストールします。

## コード例
<a name="samples-lambda-code"></a>

1. [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/) で AWS Lambda コンソールを開きます。

1. **関数** リストから Lambda 関数を選択します。

1. 関数ページで次のコードをコピーし、以下をリソース名に置き換えます。
   + `YOUR_ENVIRONMENT_NAME` – Amazon MWAA 環境の名前。
   + `YOUR_DAG_NAME` — 呼び出したい DAG の名前。

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. **デプロイ** を選択します。

1. **テスト** を選択し、Lambda コンソールを使用して関数を呼び出します。

1. Lambda が DAG を正常に呼び出したことを確認するには、Amazon MWAA コンソールを使用して、お使いの環境の Apache Airflow UI に移動し、次の操作を行います。

   1. **DAG** ページの DAG のリストから新しいターゲット DAG を見つけます。

   1. **前回の実行** で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における `invoke_dag` の最新のタイムスタンプとほぼ一致する必要があります。

   1. **[最近のタスク]** で、前回の実行が成功したことを確認します。

# さまざまな Amazon MWAA 環境での DAG の呼び出し
<a name="samples-invoke-dag"></a>

次のコード例では、Apache Airflow CLI トークンを作成します。次に、このコードは、ある Amazon MWAA 環境の有向非巡回グラフ (DAG) を使用して、別の Amazon MWAA 環境の DAG を呼び出します。

**Topics**
+ [バージョン](#samples-invoke-dag-version)
+ [前提条件](#samples-invoke-dag-prereqs)
+ [アクセス許可](#samples-invoke-dag-permissions)
+ [依存関係](#samples-invoke-dag-dependencies)
+ [コード例](#samples-invoke-dag-code)

## バージョン
<a name="samples-invoke-dag-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-invoke-dag-prereqs"></a>

このページのコード例を使用するには、次のものが必要です。
+ **パブリックネットワーク** のウェブサーバーにアクセスできる 2 つの [Amazon MWAA 環境](get-started.md) (現在の環境を含む)。
+ ターゲット環境の Amazon Simple Storage Service (Amazon S3) バケットに、サンプルの DAG。

## アクセス許可
<a name="samples-invoke-dag-permissions"></a>

このページのコード例を使用するには、環境の実行ロールに Apache Airflow CLI トークンを作成する権限が必要です。 AWS管理ポリシーをアタッチ`AmazonMWAAAirflowCliAccess`して、このアクセス許可を付与できます。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

詳細については、[Apache Airflow CLI ポリシー: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access) を参照してください。

## 依存関係
<a name="samples-invoke-dag-dependencies"></a>

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して、Apache Airflow をインストールします。

## コード例
<a name="samples-invoke-dag-code"></a>

次のコード例は、現在の環境で DAG を使用して別の環境で DAG を呼び出していると想定しています。

1. ターミナルで、DAG コードが保存されているディレクトリに移動します。例えば、次のようになります。

   ```
   cd dags
   ```

1. 次のコード例の内容をコピーし、`invoke_dag.py` という名前でローカルに保存します。以下の値をお客様の情報に置き換えます。
   + `your-new-environment-name` — DAG を起動する他の環境の名前。
   + `your-target-dag-id` — 起動する他の環境の DAG の ID。

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  次の AWS CLI コマンドを実行して DAG を環境のバケットにコピーし、Apache Airflow UI を使用して DAG をトリガーします。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. DAG が正常に実行されると、`invoke_dag_task` のタスクログに以下のような出力が表示されます。

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   DAG が正常に呼び出されたことを確認するには、新しい環境の Apache Airflow UI に移動し、次の操作を行います。

   1. **DAG** ページの DAG のリストから新しいターゲット DAG を見つけます。

   1. **前回の実行** で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における `invoke_dag` の最新のタイムスタンプとほぼ一致する必要があります。

   1. **[最近のタスク]** で、前回の実行が成功したことを確認します。

# Amazon RDS for Microsoft SQL Server で Amazon MWAA を使用する
<a name="samples-sql-server"></a>

Amazon Managed Workflows for Apache Airflow を使用して [RDS for SQL Server](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html) に接続できます。次のサンプルコードでは、Amazon Managed Workflows for Apache Airflow 環境の DAG を使用して、Amazon RDS for Microsoft SQL Server に接続し、クエリを実行します。

**Topics**
+ [バージョン](#samples-sql-server-version)
+ [前提条件](#samples-sql-server-prereqs)
+ [依存関係](#samples-sql-server-dependencies)
+ [Apache Airflow v2 接続](#samples-sql-server-conn)
+ [コードサンプル](#samples-sql-server-code)
+ [次のステップ](#samples-sql-server-next-up)

## バージョン
<a name="samples-sql-server-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-sql-server-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。
+ Amazon MWAA と RDS for SQL Server は同じ Amazon VPC/ で実行されています
+ Amazon MWAA とサーバーの VPC セキュリティグループは、以下の接続で構成されます。
  + Amazon MWAA のセキュリティグループにある Amazon RDS 用にポート `1433` を開くためのインバウンドルール
  + または、Amazon MWAA から RDS へ `1433` のポートのオープンに関するアウトバウンドルール
+ SQL サーバー用 RDS 用 Apache Airflow Connection には、前のプロセスで作成された Amazon RDS SQL サーバーデータベースのホスト名、ポート、ユーザー名、パスワードが反映されます。

## 依存関係
<a name="samples-sql-server-dependencies"></a>

このセクションのサンプルコードを使用するには、`requirements.txt` に次の依存関係を追加します。詳細については、[Python 依存関係のインストール](working-dags-dependencies.md) を参照してください。

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Apache Airflow v2 接続
<a name="samples-sql-server-conn"></a>

Apache Airflow v2 の接続を使用している場合は、Airflow 接続オブジェクトに次のキーと値のペアが含まれていることを確認してください。

1. **接続 ID: **mssql\$1default

1. **接続タイプ: ** Amazon Web Services

1. **ホスト:** `YOUR_DB_HOST`

1. **スキーマ：**

1. **ログイン:**管理者

1. **パスワード: **

1. **ポート**: 1433

1. **エキストラ:**

## コードサンプル
<a name="samples-sql-server-code"></a>

1. コマンドプロンプトで、DAG コードが保存されているディレクトリに移動します。例:

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーし、ローカルに `sql-server.py` として保存します。

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## 次のステップ
<a name="samples-sql-server-next-up"></a>
+ この例の `requirements.txt` ファイルを Amazon S3 バケットにアップロードする方法について詳しくは、[Python 依存関係のインストール](working-dags-dependencies.md) をご覧ください。
+ この例の DAG コードを Amazon S3 バケットの `dags` フォルダにアップロードする方法については、[DAG の追加と更新](configuring-dag-folder.md) を参照してください。
+ サンプルスクリプトやその他の [pymssql モジュールの例](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html)を参照してください。
+ [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) を使用した特定の Microsoft SQL データベースでの SQL コード実行については、*「Apache Airflow リファレンスガイド」*を参照してください。

# Amazon EKS での Amazon MWAA の使用
<a name="mwaa-eks-example"></a>

以下のサンプルは、Amazon EKS で Amazon Managed Workflows for Apache Airflow を使用する方法を示しています。

**Topics**
+ [バージョン](#mwaa-eks-example-version)
+ [前提条件](#eksctl-prereqs)
+ [Amazon EC2 用のパブリックキーを作成します](#eksctl-create-key)
+ [クラスターを作成する](#create-cluster-eksctl)
+ [`mwaa` 名前空間を作成します。](#eksctl-namespace)
+ [`mwaa` 名前空間のロールを作成します。](#eksctl-role)
+ [Amazon EKS クラスタの IAM ロールを作成してアタッチします](#eksctl-iam-role)
+ [要件.txt ファイルを作成します](#eksctl-requirements)
+ [Amazon EKS 用のアイデンティティマッピングを作成します](#eksctl-identity-map)
+ [`kubeconfig` の作成](#eksctl-kube-config)
+ [DAG を作成する](#eksctl-create-dag)
+ [DAG と `kube_config.yaml` を Amazon S3 バケットに追加します](#eksctl-dag-bucket)
+ [サンプルを有効にして、トリガーしてください。](#eksctl-trigger-pod)

## バージョン
<a name="mwaa-eks-example-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="eksctl-prereqs"></a>

このトピックの例を使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。
+ eksctl。詳細については、[eksctl のインストール](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl) を参照してください。
+ kubectl。詳細については、[kubectl のインストールとセットアップ](https://kubernetes.io/docs/tasks/tools/install-kubectl/) を参照してください。eksctl と共にインストールされる場合もあります。
+ Amazon MWAA 環境を作成したリージョンの EC2 key pair。詳細については、[キーペアの作成またはインポート](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair) を参照してください。

**注記**  
`eksctl` コマンドを使用するときに、`--profile` を含めたデフォルト以外のプロファイルを指定できます。

## Amazon EC2 用のパブリックキーを作成します
<a name="eksctl-create-key"></a>

以下のコマンドを使用して、プライベートキーからパブリックキーを作成します。

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

詳しくは、[キーペアのパブリックキーを取得する](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key) を参照してください。

## クラスターを作成する
<a name="create-cluster-eksctl"></a>

次のコマンドを使用してクラスターを作成します。クラスターにカスタム名を付けたい場合や、別のリージョンで作成したい場合は、名前とリージョンの値を置き換えてください。　 クラスターは、Amazon MWAA 環境を作成したのと同じリージョン内で作成する必要があります。　 サブネットの値を、Amazon MWAA に使用している Amazon VPC ネットワークのサブネットと一致する　ように置き換えます。`ssh-public-key` の値は、使用するキーと一致するように置き換えてください。同じリージョン内にある Amazon EC2 の既存のキーを使用するか、Amazon MWAA 環境を作成したのと同じリージョンで新しいキーを作成できます。

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

クラスターの作成が完了するまで。少し時間がかかります。完了後、以下のコマンドを使用して、クラスターが正常に作成され、IAM OIDC プロバイダーが設定されていることを確認できます。

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## `mwaa` 名前空間を作成します。
<a name="eksctl-namespace"></a>

クラスターが正常に作成されたことを確認できたあと、以下のコマンドを使用してポッドの名前空間を作成します。

```
kubectl create namespace mwaa
```

## `mwaa` 名前空間のロールを作成します。
<a name="eksctl-role"></a>

名前空間を作成した後、MWAA 名前空間でポッドを実行できる EKS 上の Amazon MWAA ユーザー用のロールとロールバインディングを作成します。名前空間に別の名前を使用した場合は、`-n mwaa` の mwaa を使用した名前に置き換えます。

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

次のコマンドを実行して、新しいロールが Amazon EKS クラスターにアクセスできることを確認します。　 *mwaa* を使用していない場合は、必ず正しい名前を使用してください。

```
kubectl get pods -n mwaa --as mwaa-service
```

次のようなメッセージが返ってきます。

```
No resources found in mwaa namespace.
```

## Amazon EKS クラスタの IAM ロールを作成してアタッチします
<a name="eksctl-iam-role"></a>

IAM による認証に使用できるように、IAM ロールを作成して Amazon EKS (k8s) クラスターにバインドする必要があります。　 ロールはクラスターへのログインにのみ使用され、コンソールや API コールの権限はありません。　

[Amazon MWAA 実行ロール](mwaa-create-role.md) のステップを使用して Amazon MWAA 環境用の新しいロールを作成します。ただし、このトピックで説明されているポリシーを作成して追加せずに、次のポリシーを追加します。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

ロールを作成し、環境の実行ロールとして使用するように Amazon MWAA 環境を編集します。ロールを変更するには、使用する環境を編集します。**アクセス許可** で実行ロールを選択します。

既知の問題：
+ Amazon EKS での認証に失敗するサブパスを持つロール ARN には既知の問題があります。この問題の回避策は、Amazon MWAA 自体が作成したサービスロールを使用するのではなく、手動でサービスロールを作成することです。　 詳しくは、[aws-auth configmap の ARN にパスが含まれている場合、パスを持つロールは機能しない](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268) を参照してください。
+ Amazon MWAA サービスの一覧が IAM で利用できない場合は、Amazon EC2 などの代替サービスポリシーを選択し、ロールの信頼ポリシーを次のように更新する必要があります。　

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  詳細については、[IAM ロールで信頼ポリシーを使用する方法](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/) を参照してください。

## 要件.txt ファイルを作成します
<a name="eksctl-requirements"></a>

このセクションのサンプルコードを使用するには、以下のデータベースオプションのいずれかを `requirements.txt` に追加していることを確認してください。詳細については、[Python 依存関係のインストール](working-dags-dependencies.md) を参照してください。

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## Amazon EKS 用のアイデンティティマッピングを作成します
<a name="eksctl-identity-map"></a>

次のコマンドで作成したロールの ARN を使用して、Amazon EKS の ID マッピングを作成します。リージョン *us-east-1* を、環境を作成したリージョンに変更します。ロールの ARN を置き換え、最後に *mwaa-execution-role* ご使用の環境の実行ロールに置き換えます。

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## `kubeconfig` の作成
<a name="eksctl-kube-config"></a>

`kubeconfig` を作成するには以下のコマンドを使用する：

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

実行時に特定のプロファイルを使用した場合は、`update-kubeconfig` kube\$1config.yaml `env:` ファイルに追加されたセクションを削除して Amazon MWAA で正しく動作させる必要があります。そのためには、以下をファイルから削除して保存します。

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## DAG を作成する
<a name="eksctl-create-dag"></a>

次のコード例を使用して、DAG の `mwaa_pod_example.py` などの Python ファイルを作成します。

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## DAG と `kube_config.yaml` を Amazon S3 バケットに追加します
<a name="eksctl-dag-bucket"></a>

作成した DAG と `kube_config.yaml` ファイルを Amazon MWAA 環境の Amazon S3 バケットに配置します。Amazon S3 コンソールまたは AWS Command Line Interfaceを使用して、ファイルをバケットに入れることができます。

## サンプルを有効にして、トリガーしてください。
<a name="eksctl-trigger-pod"></a>

Apache Airflow で、サンプルを有効にしてからトリガーします。　

実行して正常に完了したら、次のコマンドを使用して、ポッドを確認します。

```
kubectl get pods -n mwaa
```

以下のような出力結果が取得できます。

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

次に、以下のコマンドを使用して、ポッドの出力を確認できます。名前の値を、前のコマンドで返された値に置き換えます。

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# `ECSOperator` を使用して Amazon ECS に接続します。
<a name="samples-ecs-operator"></a>

このトピックでは、`ECSOperator` を使用して、Amazon MWAA から Amazon Elastic Container Service (Amazon ECS) コンテナに接続する方法について説明します。次の手順では、環境の実行ロールに必要なアクセス許可を追加し、 CloudFormation テンプレートを使用して Amazon ECS Fargate クラスターを作成し、最後に新しいクラスターに接続する DAG を作成してアップロードします。

**Topics**
+ [バージョン](#samples-ecs-operator-version)
+ [前提条件](#samples-ecs-operator-prereqs)
+ [アクセス許可](#samples-ecs-operator-permissions)
+ [「Amazon ECS クラスターの作成」](#create-cfn-template)
+ [コードサンプル](#samples-ecs-operator-code)

## バージョン
<a name="samples-ecs-operator-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-ecs-operator-prereqs"></a>

このページのサンプルコードを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md)。

## アクセス許可
<a name="samples-ecs-operator-permissions"></a>
+ 環境の実行ロールには、Amazon ECS でタスクを実行する権限が必要です。[AmazonECS\$1FullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) AWS管理ポリシーを実行ロールにアタッチするか、次のポリシーを作成して、実行ロールにアタッチできます。

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ Amazon ECS でタスクを実行するために必要なプレミッションを追加することに加えて、Amazon MWAA 実行ロールの CloudWatch Logs ポリシーステートメントを変更して、Amazon ECS タスクロググループへのアクセスを許可する必要があります。Amazon ECS ロググループは、 の CloudFormation テンプレートによって作成されます[「Amazon ECS クラスターの作成」](#create-cfn-template)。

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

Amazon MWAA 実行ロールとポリシーをアタッチする方法の詳細については、[実行ロール](mwaa-create-role.md) を参照してください。

## 「Amazon ECS クラスターの作成」
<a name="create-cfn-template"></a>

次の CloudFormation テンプレートを使用して、Amazon MWAA ワークフローで使用する Amazon ECS Fargate クラスターを構築します。詳細については、*Amazon Elastic Container Service デベロッパーガイド* の [タスク定義の作成](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition) を参照してください。

1. 以下のコードで JSON ファイルを作成し、`ecs-mwaa-cfn.json` として保存します。

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. コマンドプロンプトで、次の AWS CLI コマンドを使用して新しいスタックを作成します。`SecurityGroups` と `SubnetIds` の値を Amazon MWAA 環境のセキュリティグループとサブネットの値に置き換える必要があります。

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   あるいは、以下のシェルスクリプトを使用できます。スクリプトは、 `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI コマンドを使用して環境のセキュリティグループとサブネットに必要な値を取得し、それに応じてスタックを作成します。スクリプトを実行するには、以下のようにします。

   1. スクリプトをコピーし、 CloudFormation テンプレートと同じディレクトリ`ecs-stack-helper.sh`に として保存します。

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. 以下のコマンドを使ってスクリプトを実行します。`environment-name` と `stack-name` をあなたの情報に置き換えます。

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   成功すると、新しい CloudFormation スタック ID を示す次の出力を参照します。

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

 CloudFormation スタックが完了し、Amazon ECS リソースを AWS プロビジョニングしたら、DAG を作成してアップロードする準備が整います。

## コードサンプル
<a name="samples-ecs-operator-code"></a>

1. コマンドプロンプトを開き、DAG コードが保存されているディレクトリに移動します。例えば、次のようになります。

   ```
   cd dags
   ```

1. 以下のコードサンプルの内容をコピーして、`mwaa-ecs-operator.py` としてローカルに保存し、新しい DAG を Amazon S3 にアップロードしてください。

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**注記**  
サンプルの DAG では、`awslogs_group` について、Amazon ECS タスクロググループの名前に合わせてロググループを変更する必要があるかもしれません。この例では、`mwaa-ecs-zero` という名前のロググループを想定しています。`awslogs_stream_prefix` には Amazon ECS タスクログストリームのプレフィックスを使用してください。この例では、ログストリームのプレフィックスが `ecs` であることを前提としています。

1.  次の AWS CLI コマンドを実行して DAG を環境のバケットにコピーし、Apache Airflow UI を使用して DAG をトリガーします。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 成功した場合、`ecs_fargate_dag` DAG の `ecs_operator_task` のタスクログで次のような出力が表示されます。

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# Amazon MWAA での dbt の使用
<a name="samples-dbt"></a>

このトピックでは、Amazon MWAA で dbt と Postgres を使用する方法を示します。次のステップでは、必要な依存関係を `requirements.txt` に追加し、サンプルの dbt プロジェクトを環境の Amazon S3 バケットにアップロードします。次に、サンプル DAG を使用して Amazon MWAA が依存関係をインストールしたことを確認し、最後に `BashOperator` を使用して dbt プロジェクトを実行します。

**Topics**
+ [バージョン](#samples-dbt-version)
+ [前提条件](#samples-dbt-prereqs)
+ [依存関係](#samples-dbt-dependencies)
+ [DBT プロジェクトを Amazon S3 にアップロードする](#samples-dbt-upload-project)
+ [DAG を使用して dbt 依存関係のインストールを検証します。](#samples-dbt-test-dependencies)
+ [DAG を使用して dbt プロジェクトを実行します。](#samples-dbt-run-project)

## バージョン
<a name="samples-dbt-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-dbt-prereqs"></a>

次の手順を完了するには、以下のものが必要です。
+ Apache Airflow v2.2.2 を使用する [Amazon MWAA 環境](get-started.md)。このサンプルは v2.2.2 で作成され、テストされています。他の Apache Airflow バージョンで使用するためには、サンプルを変更する必要がある場合があります。
+ dbt プロジェクトのサンプル。Amazon MWAA で dbt を使い始めるには、フォークを作成し、dbt-labs GitHub リポジトリから [dbt スタータープロジェクト](https://github.com/dbt-labs/dbt-starter-project)をクローンすることができます。

## 依存関係
<a name="samples-dbt-dependencies"></a>

dbt で Amazon MWAA を使用するには、次のスタートアップスクリプトを環境に追加します。詳細については、[Amazon MWAA でのスタートアップスクリプトの使用](using-startup-script.md) を参照してください。

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

以下のセクションでは、dbt プロジェクトディレクトリを Amazon S3 にアップロードし、Amazon MWAA が必要な dbt 依存関係を正常にインストールしたかどうかを検証する DAG を実行します。

## DBT プロジェクトを Amazon S3 にアップロードする
<a name="samples-dbt-upload-project"></a>

Amazon MWAA 環境で dbt プロジェクトを使用できるようにするには、プロジェクトディレクトリ全体を環境の `dags` フォルダにアップロードできます。環境が更新されると、Amazon MWAA は dbt ディレクトリをローカル `usr/local/airflow/dags/` フォルダにダウンロードします。

**DBT プロジェクトを Amazon S3 にアップロードするには**

1. dbt スタータープロジェクトをクローンしたディレクトリに移動します。

1. 次の Amazon S3 AWS CLI コマンドを実行して、 `--recursive`パラメータを使用してプロジェクトのコンテンツを環境の `dags`フォルダに再帰的にコピーします。このコマンドは、`dbt` と呼ばれるサブディレクトリを作成し、これをすべてのdbtプロジェクトに使用できます。サブディレクトリが既に存在する場合、プロジェクトファイルは既存のディレクトリにコピーされ、新しいディレクトリは作成されません。このコマンドは、この特定のスターターのために `dbt` ディレクトリ内にサブディレクトリも作成します。

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   プロジェクトのサブディレクトリに異なる名前を使用して、親 `dbt` ディレクトリ内の複数の dbt プロジェクトを整理できます。

## DAG を使用して dbt 依存関係のインストールを検証します。
<a name="samples-dbt-test-dependencies"></a>

次の DAGは、`BashOperator` を使用し、`requirements.txt` で指定されたdbtの依存関係を正常にインストールしたかどうかを Amazon MWAA が確認します。

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

以下を実行してタスクログにアクセスし、dbt とその依存関係がインストールされていることを確認します。

1. Amazon MWAA コンソールに移動し、使用可能な環境のリストから **Airflow UI を開く** を選択します。

1. Apache Airflow UI のリストから `dbt-installation-test` DAG を探し、`Last Run` 列にある日付を選択して、最後に成功したタスクを開きます。

1. **グラフビュー** を使用してタスクを選択し、`bash_command`タスクインスタンスの詳細を開きます。

1. **[Log]** を選択してタスクログを開き、次に、`requirements.txt` で指定された dbt のバージョンをログが正常にリストしていることを確認してください。

## DAG を使用して dbt プロジェクトを実行します。
<a name="samples-dbt-run-project"></a>

次の DAG は、`BashOperator` を使用して Amazon S3 にアップロードした dbt プロジェクトをローカル `usr/local/airflow/dags/` ディレクトリから書き込み可能な`/tmp`ディレクトリにコピーし、dbt プロジェクトを実行します。bash コマンドは、`dbt-starter-project`というタイトルのスターター dbt プロジェクトを想定しています。プロジェクトディレクトリの名前に従ってディレクトリ名を変更します。

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS ブログとチュートリアル
<a name="samples-blogs-tutorials"></a>
+ [Apache Airflow v2.x 向けの Amazon EKS と Amazon MWAA との連携](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)