使用创建SSH连接 SSHOperator - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用创建SSH连接 SSHOperator

以下示例介绍如何使用定向无环图 (DAG) 从适用SSHOperator于 Apache Airflow 的亚马逊托管工作流环境中连接到远程亚马逊EC2实例。您可以使用类似的方法连接到任何具有SSH访问权限的远程实例。

在以下示例中,您将SSH密钥 (.pem) 上传到 Amazon S3 上的环境dags目录。然后,您可以使用 requirements.txt 安装必要的依赖项,并在 UI 中创建新的 Apache Airflow 连接。最后,您编写DAG一个用于创建与远程实例的SSH连接。

版本

先决条件

要使用本页上的示例代码,您需要以下内容:

  • 亚马逊MWAA环境

  • 一SSH把密钥。该代码示例假设您有一个 Amazon EC2 实例,并且与您的亚马逊MWAA环境.pem位于同一区域。如果您没有密钥,请参阅《亚马逊EC2用户指南》中的创建或导入密钥对

权限

  • 无需其他权限即可使用本页上的代码示例。

要求

将以下参数添加到 requirements.txt,以将 apache-airflow-providers-ssh 程序包安装到 Web 服务器上。环境更新且 Amazon MWAA 成功安装依赖项后,您将在用户界面中看到新的SSH连接类型。

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
注意

-c定义了URL中的约束条件requirements.txt。这可确保 Amazon 为您的环境MWAA安装正确的软件包版本。

将密钥复制到 Amazon S3

使用以下 AWS Command Line Interface 命令将您的.pem密钥复制到 Amazon S3 中的环境dags目录中。

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

亚马逊MWAA将中的dags内容(包括.pem密钥)复制到本地/usr/local/airflow/dags/目录,通过这样做,Apache Airflow 就可以访问密钥。

创建新的 Apache Airflow 连接

使用 Apache Airflow 用户界面创建新SSH连接
  1. 在 Amazon MWAA 控制台上打开 “环境” 页面

  2. 从环境列表中,为环境选择打开 Airflow UI

  3. 在 Apache Airflow UI 页面上,从顶部导航栏中选择管理员以展开下拉列表,然后选择连接

  4. 列出连接页面上,选择 +添加新记录按钮以添加新连接。

  5. 连接到 AD 页面上,提供以下信息:

    1. 连接名称中,输入 ssh_new

    2. 对于 “连接类型”,请SSH从下拉列表中进行选择。

      注意

      如果列表中没有该SSH连接类型,则表示 Amazon MWAA 尚未安装所需的apache-airflow-providers-ssh软件包。请更新 requirements.txt 文件以包含此程序包,然后重试。

    3. 对于主机,输入您要连接的 Amazon EC2 实例的 IP 地址。例如,12.345.67.89

    4. 在 “用户名” 中,输入您ec2-user是否要连接到 Amazon EC2 实例。用户名可能会有所不同,具体取决于您希望 Apache Airflow 连接到的远程实例的类型。

    5. 对于 E x tra,按格式输入以下键值对:JSON

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      此键值对指示 Apache Airflow 在本地 /dags 目录中查找密钥。

代码示例

以下内容DAG使用SSHOperator连接到您的目标 Amazon EC2 实例,然后运行 hostname Linux 命令来打印该实例的名称。您可以修改DAG以在远程实例上运行任何命令或脚本。

  1. 打开终端,然后导航到存储DAG代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 ssh.py

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 运行以下 AWS CLI 命令将复制到环境的DAG存储桶,然后DAG使用 Apache Airflow 用户界面触发该存储桶。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您将在任务日志中看到类似于以下内容ssh_task的输出 ssh_operator_exampleDAG:

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916