SSHOperator を使用して SSH 接続の作成 - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SSHOperator を使用して SSH 接続の作成

次の例では、Amazon Managed Workflows for Apache Airflow 環境からリモートの Amazon EC2 インスタンスに接続するために SSHOperator をどのように使用できるかを示しています。同様の方法で、SSH アクセスを持つ任意のリモートインスタンスに接続できます。

次の例では、SSH シークレットキー (.pem) を Amazon S3 の環境のdagsディレクトリにアップロードします。次に、requirements.txt を使用して必要な依存関係をインストールし、UI で新しい Apache Airflow 接続を作成します。最後に、リモートインスタンスへの SSH 接続を作成する DAG を作成します。

バージョン

このページのコード例は、Python 3.10 の Apache Airflow v2 と Python 3.11 の Apache Airflow v3 で使用できます。 https://peps.python.org/pep-0619/ https://peps.python.org/pep-0664/

前提条件

このページのサンプルコードを使用するには、以下が必要です。

  • Amazon MWAA 環境

  • SSH シークレットキー。このコードサンプルは、Amazon MWAA 環境と同じリージョンに Amazon EC2 インスタンスと .pem があることを前提としています。キーがない場合は、Amazon EC2 ユーザーガイド」の「キーペアの作成またはインポート」を参照してください。

アクセス許可

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

要件

次のパラメータを に追加requirements.txtして、ウェブサーバーにapache-airflow-providers-sshパッケージをインストールします。環境が更新され、Amazon MWAA が依存関係を正常にインストールすると、UI に新しい SSH 接続タイプが表示されます。

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
注記

-crequirements.txt 内での制約 URL を定義します。これにより、Amazon MAA はお客様の環境に合った正しいパッケージバージョンをインストールできます。

シークレットキーを Amazon S3 にコピーする

次の AWS Command Line Interface コマンドを使用して、Amazon S3 の環境のdagsディレクトリに.pemキーをコピーします。

aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/

Amazon MWAA は、.pem キーを含む dags のコンテンツをローカルの /usr/local/airflow/dags/ ディレクトリにコピーすることで、Apache Airflow はキーにアクセスできます。

新しい Apache Airflow 接続の作成

Apache Airflow UI を使用して新しい SSH 接続を作成するには
  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 環境のリストで、ご使用の環境に合った [Open Airflow UI] を選択します。

  3. Apache Airflow UI ページで、メインナビゲーションバーから管理者を選択してドロップダウンリストを展開し、接続を選択します。

  4. [接続リスト] ページで [+] を選択するか、[新規レコードを追加] ボタンをクリックして新しい接続を追加します。

  5. [接続の追加] ページで、以下の情報を追加します。

    1. [接続 ID] には ssh_new と入力します。

    2. [接続タイプ] では、ドロップダウンリストから [SSH] を選択します。

      注記

      [SSH] 接続タイプがリストに表示されない場合、Amazon MWAA は必要な apache-airflow-providers-ssh パッケージをインストールしていない可能性があります。このパッケージを含むように requirements.txt ファイルを更新してから、もう一度試してください。

    3. [ホスト] には、接続する Amazon EC2 インスタンスの IP アドレスを入力します。例えば、12.345.67.89

    4. [ユーザー名] に、Amazon EC2 インスタンスに接続する場合は ec2-user を入力します。Apache Airflow に接続させたいリモートインスタンスのタイプによって、ユーザー名は異なる場合があります。

    5. [抽出] には、以下のキーと値のペアを JSON 形式で入力します。

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      このキーと値のペアは、ローカル/dagsディレクトリでシークレットキーを検索するように Apache Airflow に指示します。

コード例

次の DAG は SSHOperator を使用してターゲットの Amazon EC2 インスタンスに接続し、その後 hostname Linux コマンドを実行してインスタンスの名前を表示します。DAG を変更して、リモートインスタンスで任意のコマンドまたはスクリプトを実行できます。

  1. ターミナルを開き、DAG コードが保存されているディレクトリに移動します。例:

    cd dags
  2. 以下のコードサンプルの内容をコピーし、ローカルに ssh.py として保存します。

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 次の AWS CLI コマンドを実行して DAG を環境のバケットにコピーし、Apache Airflow UI を使用して DAG をトリガーします。

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 成功すると、DAG の ssh_operator_example のタスクログssh_taskに次のような出力が表示されます。

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916