使用建立SSH連線 SSHOperator - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用建立SSH連線 SSHOperator

下列範例說明如何使用定向非循環圖 (DAG) SSHOperator 中的,從適用於 Apache Airflow 環境的 Amazon 受管工作流程連接到遠端 Amazon EC2 執行個體。您可以使用類似的方法連接到任何具有SSH訪問權限的遠程實例。

在下列範例中,您將SSH秘密金鑰 (.pem) 上傳到 Amazon S3 上環境的dags目錄。然後,您可以使用安裝必要的依賴關係,requirements.txt並在 UI 中創建一個新的 Apache 氣流連接。最後,你寫一DAG個創建到遠程實例的SSH連接。

版本

  • 您可以使用此頁面上的代碼示例與 Python 3.10 中的阿帕奇氣流 V2

必要條件

若要使用此頁面上的範例程式碼,您需要下列項目:

  • Amazon 的MWAA環境

  • 一個SSH秘密金鑰 程式碼範例假設您有一個 Amazon EC2 執行個體,以及與 Amazon MWAA 環境.pem位於相同的區域。如果您沒有金鑰,請參閱 Amazon EC2 使用者指南中的建立或匯入 key pair

許可

  • 使用此頁面上的程式碼範例不需要其他權限。

要求

添加以下參數requirements.txt以在 Web 服務器上安裝apache-airflow-providers-ssh軟件包。一旦您的環境更新且 Amazon MWAA 成功安裝相依性,您就會在 UI 中看到新的SSH連線類型。

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
注意

-c定義中的約束URLrequirements.txt。這可確保 Amazon 為您的環境MWAA安裝正確的套件版本。

將您的密鑰複製到 Amazon S3

使用下列 AWS Command Line Interface 命令將.pem金鑰複製到 Amazon S3 中環境的dags目錄。

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon MWAA 複製的內容dags, 包括.pem密鑰, 到本地目/usr/local/airflow/dags/錄, 通過這樣做, Apache 氣流可以訪問密鑰.

創建一個新的 Apache 氣流連接

若要使用 Apache 氣流使用者介面建立新SSH連線
  1. 在 Amazon MWAA 控制台上打開「環境」頁面

  2. 從環境清單中,選擇適用於您環境的「開啟氣流使用者介面」。

  3. 在 Apache Airflow UI 頁面上,從頂端導覽列選擇 [管理員] 以展開下拉式清單,然後選擇 [連線]。

  4. 在 [列出連線] 頁面上,選擇 [+] 或 [新增記錄] 按鈕以新增連線。

  5. 在 [新增連線] 頁面上,新增下列資訊:

    1. 對於「連線 ID」,輸入ssh_new

    2. 對於「連線類型」,請SSH從下拉式清單中選擇。

      注意

      如果清單中沒有SSH連線類型,表示 Amazon MWAA 尚未安裝所需的apache-airflow-providers-ssh套件。請更新您的requirements.txt檔案以包含此套件,然後再試一次。

    3. 對於主機,請輸入要連接到的 Amazon EC2 執行個體的 IP 地址。例如:12.345.67.89

    4. 對於使用者名稱,請輸入是ec2-user否要連線到 Amazon EC2 執行個體。您的使用者名稱可能會有所不同,視您希望 Apache Airflow 連線的遠端執行個體類型而定。

    5. 在「額外」中,以JSON格式輸入下列機碼-值配對:

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      此機碼值配對會指示 Apache 氣流尋找本機/dags目錄中的秘密金鑰。

範例程式碼

以下DAG使用連接SSHOperator到目標 Amazon EC2 實例,然後運行 hostname Linux 命令以打印實例的名稱。您可以修改以在DAG遠端執行個體上執行任何命令或指令碼。

  1. 開啟終端機,然後瀏覽至儲存DAG程式碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並在本機儲存為ssh.py

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 執行下列 AWS CLI 命令以複製DAG到您環境的值區,然後DAG使用 Apache Airflow UI 觸發。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您會在中的工作記錄中看到類似下列ssh_task內容的輸出 ssh_operator_exampleDAG:

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916