本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 SSHOperator
创建 SSH 连接
以下示例介绍如何使用有向无环图 (DAG) SSHOperator
中的从适用于 Apache Airflow 的亚马逊托管工作流程环境连接到远程亚马逊 EC2 实例。您可以使用类似的方法连接到任何具有 SSH 访问权限的远程实例。
在以下示例中,您将 SSH 密钥 (.pem
) 上传到在 Amazon S3 上的环境的 dags
目录中。然后,您可以使用 requirements.txt
安装必要的依赖项,并在 UI 中创建新的 Apache Airflow 连接。最后,您编写一个 DAG 来创建与远程实例的 SSH 连接。
版本
-
您可以在 Python 3.10
中将本页上的代码示例与 Apache Airflow v2 一起使用。
先决条件
要使用本页上的示例代码,您需要以下内容:
-
SSH 密钥。该代码示例假设您有一个 Amazon EC2 实例,并且与您的 Amazon MWAA 环境
.pem
位于同一区域。如果您没有密钥,请参阅《亚马逊 EC2 用户指南》中的创建或导入密钥对。
权限
-
无需其他权限即可使用本页上的代码示例。
要求
将以下参数添加到 requirements.txt
,以将 apache-airflow-providers-ssh
程序包安装到 Web 服务器上。当环境更新并且 Amazon MWAA 成功安装依赖项后,您将在 UI 中看到新的 SSH 连接类型。
-c https://raw.githubusercontent.com/apache/airflow/constraints-
Airflow-version
/constraints-Python-version
.txt apache-airflow-providers-ssh
注意
-c
定义了 requirements.txt
中的约束 URL。这样可以确保 Amazon MWAA 为环境安装了正确的程序包版本。
将密钥复制到 Amazon S3
使用以下 AWS Command Line Interface 命令将您的.pem
密钥复制到 Amazon S3 中的环境dags
目录中。
$
aws s3 cp
your-secret-key
.pem s3://your-bucket
/dags/
Amazon MWAA 将包括 .pem
密钥在内的 dags
中的内容复制到本地 /usr/local/airflow/dags/
目录,这样,Apache Airflow 就可以访问密钥了。
创建新的 Apache Airflow 连接
使用 Apache Airflow UI 创建新的 SSH 连接
-
在 Amazon MWAA 控制台上打开环境页面
。 -
从环境列表中,为环境选择打开 Airflow UI。
-
在 Apache Airflow UI 页面上,从顶部导航栏中选择管理员以展开下拉列表,然后选择连接。
-
在列出连接页面上,选择 + 或添加新记录按钮以添加新连接。
-
在连接到 AD 页面上,提供以下信息:
-
在连接名称中,输入
ssh_new
。 -
在连接类型中,从下拉列表中选择 SSH。
注意
如果列表中没有 SSH 连接类型,则表示 Amazon MWAA 尚未安装所需的
apache-airflow-providers-ssh
程序包。请更新requirements.txt
文件以包含此程序包,然后重试。 -
对于主机,输入您要连接的 Amazon EC2 实例的 IP 地址。例如,
12.345.67.89
。 -
在 “用户名” 中,输入您
ec2-user
是否要连接到 Amazon EC2 实例。用户名可能会有所不同,具体取决于您希望 Apache Airflow 连接到的远程实例的类型。 -
在附加中,请以 JSON 格式输入以下键/值对:
{ "key_file": "/usr/local/airflow/dags/
your-secret-key
.pem" }此键值对指示 Apache Airflow 在本地
/dags
目录中查找密钥。
-
代码示例
以下 DAG 使用SSHOperator
连接到您的目标 Amazon EC2 实例,然后运行 hostname
Linux 命令来打印该实例的名称。您可以修改 DAG 以在远程实例上运行任何命令或脚本。
-
打开终端,导航到存储 DAG 代码的目录。例如:
cd dags
-
复制以下代码示例的内容,并在本地另存为
ssh.py
。from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
-
运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow 用户界面触发 DAG。
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
如果成功,您将在
ssh_operator_example
DAG 的任务日志中看到输出,类似于ssh_task
的任务日志中的以下内容:[2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None, Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/
your-secret-key
.pem'} [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful! [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916