使用 建立 SSH 連線 SSHOperator - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 建立 SSH 連線 SSHOperator

下列範例說明如何在導向非循環圖表 (DAG) SSHOperator中使用 ,從 Amazon Managed Workflows for Apache Airflow 環境連線至遠端 Amazon EC2 執行個體。您可以使用類似的方法來連線至具有 SSH 存取的任何遠端執行個體。

在下列範例中,您將 SSH 私密金鑰 (.pem) 上傳至 Amazon S3 上的環境dags目錄。然後,您可以使用 安裝必要的相依性,requirements.txt並在 UI 中建立新的 Apache Airflow 連線。最後,您撰寫的 DAG 會建立與遠端執行個體的 SSH 連線。

版本

  • 您可以在 Python 3.10 中使用此頁面上的程式碼範例搭配 Apache Airflow v2

先決條件

若要使用此頁面上的範例程式碼,您需要下列項目:

  • Amazon MWAA 環境

  • SSH 私密金鑰。程式碼範例假設您的 Amazon EC2 執行個體和 .pem位於與 Amazon MWAA 環境相同的區域。如果您沒有金鑰,請參閱《Amazon EC2 使用者指南》中的建立或匯入金鑰對

許可

  • 使用此頁面上的程式碼範例不需要額外的許可。

要求

將下列參數新增至 requirements.txt,以在 Web 伺服器上安裝apache-airflow-providers-ssh套件。一旦您的環境更新且 Amazon MWAA 成功安裝相依性,您將在 UI 中看到新的 SSH 連線類型。

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
注意

-c 定義 中的限制條件 URLrequirements.txt。這可確保 Amazon MWAA 為您的環境安裝正確的套件版本。

將您的私密金鑰複製到 Amazon S3

使用下列 AWS Command Line Interface 命令將您的.pem金鑰複製到 Amazon S3 中的環境dags目錄。

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon MWAA 會將 中的內容dags,包括.pem金鑰,複製到本機/usr/local/airflow/dags/目錄。透過執行此操作,Apache Airflow 可以存取金鑰。

建立新的 Apache Airflow 連線

使用 Apache Airflow UI 建立新的 SSH 連線
  1. 在 Amazon MWAA 主控台上開啟環境頁面

  2. 從環境清單中,為您的環境選擇開放氣流 UI

  3. 在 Apache Airflow UI 頁面上,從頂端導覽列選擇管理以展開下拉式清單,然後選擇連線

  4. 列出連線頁面上,選擇 ,或新增記錄按鈕以新增連線。

  5. 新增連線頁面上,新增下列資訊:

    1. 針對連線 ID,輸入 ssh_new

    2. 對於連線類型,從下拉式清單中選擇 SSH

      注意

      如果清單中沒有 SSH 連線類型,Amazon MWAA 尚未安裝所需的apache-airflow-providers-ssh套件。更新您的requirements.txt檔案以包含此套件,然後再試一次。

    3. 針對主機,輸入您要連線之 Amazon EC2 執行個體的 IP 地址。例如:12.345.67.89

    4. 針對使用者名稱ec2-user如果您要連線至 Amazon EC2 執行個體,請輸入 。您的使用者名稱可能不同,取決於您希望 Apache Airflow 連接的遠端執行個體類型。

    5. 對於 Extra,請以 JSON 格式輸入下列鍵/值對:

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      此鍵/值對會指示 Apache Airflow 在本機/dags目錄中尋找私密金鑰。

範例程式碼

下列 DAG 使用 SSHOperator 連線到您的目標 Amazon EC2 執行個體,然後執行 hostname Linux 命令來列印執行個體的名稱。您可以修改 DAG,在遠端執行個體上執行任何命令或指令碼。

  1. 開啟終端機,然後導覽至存放 DAG 程式碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並在本機儲存為 ssh.py

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 執行下列 AWS CLI 命令,將 DAG 複製到您環境的儲存貯體,然後使用 Apache Airflow UI 觸發 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您會在 ssh_operator_example DAG 中的 任務日誌ssh_task中看到類似下列的輸出:

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916