SSHOperator
を使用して SSH 接続の作成
次の例では、Amazon Managed Workflows for Apache Airflow 環境からリモートの Amazon EC2 インスタンスに接続するために SSHOperator
をどのように使用できるかを示しています。同様の方法で、SSH アクセスを持つ任意のリモートインスタンスに接続できます。
次の例では、SSH シークレットキー (.pem
)を Amazon S3 の環境の dags
ディレクトリにアップロードします。次に、requirements.txt
を使用して必要な依存関係をインストールし、UI で新しい Apache Airflow 接続を作成します。最後に、リモートインスタンスへの SSH 接続を作成する DAG を作成します。
Version
-
このページのコード例は、Python 3.10
の Apache Airflow v2 と共に使用可能です。
前提条件
このページのサンプルコードを使用するには、以下が必要です。
-
SSH シークレットキー。このコードサンプルは、Amazon MWAA 環境と同じリージョンに Amazon EC2 インスタンスと
.pem
があることを前提としています。キーがない場合は、「Amazon EC2 ユーザーガイド」の「key pair の作成またはインポート」を参照してください。
アクセス許可
-
このページのコード例を使用する場合、追加のアクセス許可は必要ありません。
要件
次のパラメータを requirements.txt
に追加して、ウェブサーバーに apache-airflow-providers-ssh
パッケージをインストールしてください。環境が更新され、Amazon MWAA が依存関係を正常にインストールすると、UI に新しい [SSH] 接続タイプが表示されます。
-c https://raw.githubusercontent.com/apache/airflow/constraints-
Airflow-version
/constraints-Python-version
.txt apache-airflow-providers-ssh
注記
-c
は requirements.txt
内での制約 URL を定義します。これにより、Amazon MAA はお客様の環境に合った正しいパッケージバージョンをインストールできます。
シークレットキーを Amazon S3 にコピーする
次の AWS Command Line Interface コマンドを使用して、.pem
キーを Amazon S3 の環境の dags
ディレクトリにコピーします。
$
aws s3 cp
your-secret-key
.pem s3://your-bucket
/dags/
Amazon MWAA は、.pem
キーを含む dags
のコンテンツをローカルの /usr/local/airflow/dags/
ディレクトリにコピーすることで、Apache Airflow はキーにアクセスできます。
新しい Apache Airflow 接続の作成
Apache Airflow UI を使用して新しい SSH 接続を作成するには
-
Amazon MWAA コンソールで、環境ページ
を開きます。 -
環境のリストで、ご使用の環境に合った [Open Airflow UI] を選択します。
-
Apache Airflow UI ページで、上部のナビゲーションバーから [管理] を選択してドロップダウンリストを展開し、[接続] を選択します。
-
[接続リスト] ページで [+] を選択するか、[新規レコードを追加] ボタンをクリックして新しい接続を追加します。
-
[接続の追加] ページで、以下の情報を追加します。
-
[接続 ID] には
ssh_new
と入力します。 -
[接続タイプ] では、ドロップダウンリストから [SSH] を選択します。
注記
[SSH] 接続タイプがリストに表示されない場合、Amazon MWAA は必要な
apache-airflow-providers-ssh
パッケージをインストールしていない可能性があります。このパッケージを含むようにrequirements.txt
ファイルを更新してから、もう一度試してください。 -
[ホスト] には、接続する Amazon EC2 インスタンスの IP アドレスを入力します。例えば、
12.345.67.89
と指定します。 -
[ユーザー名] に、Amazon EC2 インスタンスに接続する場合は
ec2-user
を入力します。Apache Airflow に接続させたいリモートインスタンスのタイプによって、ユーザー名は異なる場合があります。 -
[抽出] には、以下のキーと値のペアを JSON 形式で入力します。
{ "key_file": "/usr/local/airflow/dags/
your-secret-key
.pem" }このキーと値のペアは、Apache Airflow に対してシークレットキーをローカルの
/dags
ディレクトリから探すように指示します。
-
コードサンプル
次の DAG は SSHOperator
を使用してターゲットの Amazon EC2 インスタンスに接続し、その後 hostname
Linux コマンドを実行してインスタンスの名前を表示します。DAG を変更して、リモートインスタンスで任意のコマンドまたはスクリプトを実行できます。
-
ターミナルを開き、DAG コードが保存されているディレクトリに移動します。例:
cd dags
-
以下のコードサンプルの内容をコピーし、ローカルに
ssh.py
として保存します。from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
-
以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
成功した場合、
ssh_operator_example
DAG 内のssh_task
タスクログで次のような出力が表示されます。[2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None, Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/
your-secret-key
.pem'} [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful! [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916