本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用创建SSH连接 SSHOperator
以下示例介绍如何使用定向无环图 (DAG) 从适用SSHOperator
于 Apache Airflow 的亚马逊托管工作流环境中连接到远程亚马逊EC2实例。您可以使用类似的方法连接到任何具有SSH访问权限的远程实例。
在以下示例中,您将SSH密钥 (.pem
) 上传到 Amazon S3 上的环境dags
目录。然后,您可以使用 requirements.txt
安装必要的依赖项,并在 UI 中创建新的 Apache Airflow 连接。最后,您编写DAG一个用于创建与远程实例的SSH连接。
版本
先决条件
要使用本页上的示例代码,您需要以下内容:
权限
-
无需其他权限即可使用本页上的代码示例。
要求
将以下参数添加到 requirements.txt
,以将 apache-airflow-providers-ssh
程序包安装到 Web 服务器上。环境更新且 Amazon MWAA 成功安装依赖项后,您将在用户界面中看到新的SSH连接类型。
-c https://raw.githubusercontent.com/apache/airflow/constraints-
Airflow-version
/constraints-Python-version
.txt apache-airflow-providers-ssh
注意
-c
定义了URL中的约束条件requirements.txt
。这可确保 Amazon 为您的环境MWAA安装正确的软件包版本。
将密钥复制到 Amazon S3
使用以下 AWS Command Line Interface 命令将您的.pem
密钥复制到 Amazon S3 中的环境dags
目录中。
$
aws s3 cp
your-secret-key
.pem s3://your-bucket
/dags/
亚马逊MWAA将中的dags
内容(包括.pem
密钥)复制到本地/usr/local/airflow/dags/
目录,通过这样做,Apache Airflow 就可以访问密钥。
创建新的 Apache Airflow 连接
使用 Apache Airflow 用户界面创建新SSH连接
-
在 Amazon MWAA 控制台上打开 “环境” 页面
。 -
从环境列表中,为环境选择打开 Airflow UI。
-
在 Apache Airflow UI 页面上,从顶部导航栏中选择管理员以展开下拉列表,然后选择连接。
-
在列出连接页面上,选择 + 或添加新记录按钮以添加新连接。
-
在连接到 AD 页面上,提供以下信息:
-
在连接名称中,输入
ssh_new
。 -
对于 “连接类型”,请SSH从下拉列表中进行选择。
注意
如果列表中没有该SSH连接类型,则表示 Amazon MWAA 尚未安装所需的
apache-airflow-providers-ssh
软件包。请更新requirements.txt
文件以包含此程序包,然后重试。 -
对于主机,输入您要连接的 Amazon EC2 实例的 IP 地址。例如,
12.345.67.89
。 -
在 “用户名” 中,输入您
ec2-user
是否要连接到 Amazon EC2 实例。用户名可能会有所不同,具体取决于您希望 Apache Airflow 连接到的远程实例的类型。 -
对于 E x tra,按格式输入以下键值对:JSON
{ "key_file": "/usr/local/airflow/dags/
your-secret-key
.pem" }此键值对指示 Apache Airflow 在本地
/dags
目录中查找密钥。
-
代码示例
以下内容DAG使用SSHOperator
连接到您的目标 Amazon EC2 实例,然后运行 hostname
Linux 命令来打印该实例的名称。您可以修改DAG以在远程实例上运行任何命令或脚本。
-
打开终端,然后导航到存储DAG代码的目录。例如:
cd dags
-
复制以下代码示例的内容,并在本地另存为
ssh.py
。from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
-
运行以下 AWS CLI 命令将复制到环境的DAG存储桶,然后DAG使用 Apache Airflow 用户界面触发该存储桶。
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
如果成功,您将在任务日志中看到类似于以下内容
ssh_task
的输出ssh_operator_example
DAG:[2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None, Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/
your-secret-key
.pem'} [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful! [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916