本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用建立SSH連線 SSHOperator
下列範例說明如何使用定向非循環圖 (DAG) SSHOperator
中的,從適用於 Apache Airflow 環境的 Amazon 受管工作流程連接到遠端 Amazon EC2 執行個體。您可以使用類似的方法連接到任何具有SSH訪問權限的遠程實例。
在下列範例中,您將SSH秘密金鑰 (.pem
) 上傳到 Amazon S3 上環境的dags
目錄。然後,您可以使用安裝必要的依賴關係,requirements.txt
並在 UI 中創建一個新的 Apache 氣流連接。最後,你寫一DAG個創建到遠程實例的SSH連接。
版本
-
您可以使用此頁面上的代碼示例與 Python 3.10
中的阿帕奇氣流 V2。
必要條件
若要使用此頁面上的範例程式碼,您需要下列項目:
-
一個SSH秘密金鑰 程式碼範例假設您有一個 Amazon EC2 執行個體,以及與 Amazon MWAA 環境
.pem
位於相同的區域。如果您沒有金鑰,請參閱 Amazon EC2 使用者指南中的建立或匯入 key pair。
許可
-
使用此頁面上的程式碼範例不需要其他權限。
要求
添加以下參數requirements.txt
以在 Web 服務器上安裝apache-airflow-providers-ssh
軟件包。一旦您的環境更新且 Amazon MWAA 成功安裝相依性,您就會在 UI 中看到新的SSH連線類型。
-c https://raw.githubusercontent.com/apache/airflow/constraints-
Airflow-version
/constraints-Python-version
.txt apache-airflow-providers-ssh
注意
-c
定義中的約束URLrequirements.txt
。這可確保 Amazon 為您的環境MWAA安裝正確的套件版本。
將您的密鑰複製到 Amazon S3
使用下列 AWS Command Line Interface 命令將.pem
金鑰複製到 Amazon S3 中環境的dags
目錄。
$
aws s3 cp
your-secret-key
.pem s3://your-bucket
/dags/
Amazon MWAA 複製的內容dags
, 包括.pem
密鑰, 到本地目/usr/local/airflow/dags/
錄, 通過這樣做, Apache 氣流可以訪問密鑰.
創建一個新的 Apache 氣流連接
若要使用 Apache 氣流使用者介面建立新SSH連線
-
在 Amazon MWAA 控制台上打開「環境」頁面
。 -
從環境清單中,選擇適用於您環境的「開啟氣流使用者介面」。
-
在 Apache Airflow UI 頁面上,從頂端導覽列選擇 [管理員] 以展開下拉式清單,然後選擇 [連線]。
-
在 [列出連線] 頁面上,選擇 [+] 或 [新增記錄] 按鈕以新增連線。
-
在 [新增連線] 頁面上,新增下列資訊:
-
對於「連線 ID」,輸入
ssh_new
。 -
對於「連線類型」,請SSH從下拉式清單中選擇。
注意
如果清單中沒有SSH連線類型,表示 Amazon MWAA 尚未安裝所需的
apache-airflow-providers-ssh
套件。請更新您的requirements.txt
檔案以包含此套件,然後再試一次。 -
對於主機,請輸入要連接到的 Amazon EC2 執行個體的 IP 地址。例如:
12.345.67.89
。 -
對於使用者名稱,請輸入是
ec2-user
否要連線到 Amazon EC2 執行個體。您的使用者名稱可能會有所不同,視您希望 Apache Airflow 連線的遠端執行個體類型而定。 -
在「額外」中,以JSON格式輸入下列機碼-值配對:
{ "key_file": "/usr/local/airflow/dags/
your-secret-key
.pem" }此機碼值配對會指示 Apache 氣流尋找本機
/dags
目錄中的秘密金鑰。
-
範例程式碼
以下DAG使用連接SSHOperator
到目標 Amazon EC2 實例,然後運行 hostname
Linux 命令以打印實例的名稱。您可以修改以在DAG遠端執行個體上執行任何命令或指令碼。
-
開啟終端機,然後瀏覽至儲存DAG程式碼的目錄。例如:
cd dags
-
複製下列程式碼範例的內容,並在本機儲存為
ssh.py
。from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
-
執行下列 AWS CLI 命令以複製DAG到您環境的值區,然後DAG使用 Apache Airflow UI 觸發。
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
如果成功,您會在中的工作記錄中看到類似下列
ssh_task
內容的輸出ssh_operator_example
DAG:[2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None, Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/
your-secret-key
.pem'} [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful! [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916