本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 建立 SSH 連線 SSHOperator
下列範例說明如何在導向非循環圖表 (DAG) SSHOperator
中使用 ,從 Amazon Managed Workflows for Apache Airflow 環境連線至遠端 Amazon EC2 執行個體。您可以使用類似的方法來連線至具有 SSH 存取的任何遠端執行個體。
在下列範例中,您將 SSH 私密金鑰 (.pem
) 上傳至 Amazon S3 上的環境dags
目錄。然後,您可以使用 安裝必要的相依性,requirements.txt
並在 UI 中建立新的 Apache Airflow 連線。最後,您撰寫的 DAG 會建立與遠端執行個體的 SSH 連線。
版本
-
您可以在 Python 3.10
中使用此頁面上的程式碼範例搭配 Apache Airflow v2。
先決條件
若要使用此頁面上的範例程式碼,您需要下列項目:
-
SSH 私密金鑰。程式碼範例假設您的 Amazon EC2 執行個體和
.pem
位於與 Amazon MWAA 環境相同的區域。如果您沒有金鑰,請參閱《Amazon EC2 使用者指南》中的建立或匯入金鑰對。
許可
-
使用此頁面上的程式碼範例不需要額外的許可。
要求
將下列參數新增至 requirements.txt
,以在 Web 伺服器上安裝apache-airflow-providers-ssh
套件。一旦您的環境更新且 Amazon MWAA 成功安裝相依性,您將在 UI 中看到新的 SSH 連線類型。
-c https://raw.githubusercontent.com/apache/airflow/constraints-
Airflow-version
/constraints-Python-version
.txt apache-airflow-providers-ssh
注意
-c
定義 中的限制條件 URLrequirements.txt
。這可確保 Amazon MWAA 為您的環境安裝正確的套件版本。
將您的私密金鑰複製到 Amazon S3
使用下列 AWS Command Line Interface 命令將您的.pem
金鑰複製到 Amazon S3 中的環境dags
目錄。
$
aws s3 cp
your-secret-key
.pem s3://your-bucket
/dags/
Amazon MWAA 會將 中的內容dags
,包括.pem
金鑰,複製到本機/usr/local/airflow/dags/
目錄。透過執行此操作,Apache Airflow 可以存取金鑰。
建立新的 Apache Airflow 連線
使用 Apache Airflow UI 建立新的 SSH 連線
-
在 Amazon MWAA 主控台上開啟環境頁面
。 -
從環境清單中,為您的環境選擇開放氣流 UI。
-
在 Apache Airflow UI 頁面上,從頂端導覽列選擇管理以展開下拉式清單,然後選擇連線。
-
在列出連線頁面上,選擇 +,或新增記錄按鈕以新增連線。
-
在新增連線頁面上,新增下列資訊:
-
針對連線 ID,輸入
ssh_new
。 -
對於連線類型,從下拉式清單中選擇 SSH。
注意
如果清單中沒有 SSH 連線類型,Amazon MWAA 尚未安裝所需的
apache-airflow-providers-ssh
套件。更新您的requirements.txt
檔案以包含此套件,然後再試一次。 -
針對主機,輸入您要連線之 Amazon EC2 執行個體的 IP 地址。例如:
12.345.67.89
。 -
針對使用者名稱,
ec2-user
如果您要連線至 Amazon EC2 執行個體,請輸入 。您的使用者名稱可能不同,取決於您希望 Apache Airflow 連接的遠端執行個體類型。 -
對於 Extra,請以 JSON 格式輸入下列鍵/值對:
{ "key_file": "/usr/local/airflow/dags/
your-secret-key
.pem" }此鍵/值對會指示 Apache Airflow 在本機
/dags
目錄中尋找私密金鑰。
-
範例程式碼
下列 DAG 使用 SSHOperator
連線到您的目標 Amazon EC2 執行個體,然後執行 hostname
Linux 命令來列印執行個體的名稱。您可以修改 DAG,在遠端執行個體上執行任何命令或指令碼。
-
開啟終端機,然後導覽至存放 DAG 程式碼的目錄。例如:
cd dags
-
複製下列程式碼範例的內容,並在本機儲存為
ssh.py
。from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
-
執行下列 AWS CLI 命令,將 DAG 複製到您環境的儲存貯體,然後使用 Apache Airflow UI 觸發 DAG。
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
如果成功,您會在
ssh_operator_example
DAG 中的 任務日誌ssh_task
中看到類似下列的輸出:[2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None, Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/
your-secret-key
.pem'} [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful! [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916