使用 SSHOperator 创建 SSH 连接 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 SSHOperator 创建 SSH 连接

以下示例介绍如何使用有向无环图 (DAG) SSHOperator 中的从适用于 Apache Airflow 的亚马逊托管工作流程环境连接到远程亚马逊 EC2 实例。您可以使用类似的方法连接到任何具有 SSH 访问权限的远程实例。

在以下示例中,您将 SSH 密钥 (.pem) 上传到在 Amazon S3 上的环境的 dags 目录中。然后,您可以使用 requirements.txt 安装必要的依赖项,并在 UI 中创建新的 Apache Airflow 连接。最后,您编写一个 DAG 来创建与远程实例的 SSH 连接。

版本

  • 您可以在 Python 3.10 中将本页上的代码示例与 Apache Airflow v2 一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

  • Amazon MWAA 环境

  • SSH 密钥。该代码示例假设您有一个 Amazon EC2 实例,并且与您的 Amazon MWAA 环境.pem位于同一区域。如果您没有密钥,请参阅《亚马逊 EC2 用户指南》中的创建或导入密钥对

权限

  • 无需其他权限即可使用本页上的代码示例。

要求

将以下参数添加到 requirements.txt,以将 apache-airflow-providers-ssh 程序包安装到 Web 服务器上。当环境更新并且 Amazon MWAA 成功安装依赖项后,您将在 UI 中看到新的 SSH 连接类型。

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
注意

-c 定义了 requirements.txt 中的约束 URL。这样可以确保 Amazon MWAA 为环境安装了正确的程序包版本。

将密钥复制到 Amazon S3

使用以下 AWS Command Line Interface 命令将您的.pem密钥复制到 Amazon S3 中的环境dags目录中。

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon MWAA 将包括 .pem 密钥在内的 dags 中的内容复制到本地 /usr/local/airflow/dags/ 目录,这样,Apache Airflow 就可以访问密钥了。

创建新的 Apache Airflow 连接

使用 Apache Airflow UI 创建新的 SSH 连接
  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 从环境列表中,为环境选择打开 Airflow UI

  3. 在 Apache Airflow UI 页面上,从顶部导航栏中选择管理员以展开下拉列表,然后选择连接

  4. 列出连接页面上,选择 +添加新记录按钮以添加新连接。

  5. 连接到 AD 页面上,提供以下信息:

    1. 连接名称中,输入 ssh_new

    2. 连接类型中,从下拉列表中选择 SSH

      注意

      如果列表中没有 SSH 连接类型,则表示 Amazon MWAA 尚未安装所需的 apache-airflow-providers-ssh 程序包。请更新 requirements.txt 文件以包含此程序包,然后重试。

    3. 对于主机,输入您要连接的 Amazon EC2 实例的 IP 地址。例如,12.345.67.89

    4. 在 “用户名” 中,输入您ec2-user是否要连接到 Amazon EC2 实例。用户名可能会有所不同,具体取决于您希望 Apache Airflow 连接到的远程实例的类型。

    5. 附加中,请以 JSON 格式输入以下键/值对:

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      此键值对指示 Apache Airflow 在本地 /dags 目录中查找密钥。

代码示例

以下 DAG 使用SSHOperator连接到您的目标 Amazon EC2 实例,然后运行 hostname Linux 命令来打印该实例的名称。您可以修改 DAG 以在远程实例上运行任何命令或脚本。

  1. 打开终端,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 ssh.py

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow 用户界面触发 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您将在 ssh_operator_example DAG 的任务日志中看到输出,类似于 ssh_task 的任务日志中的以下内容:

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916