Creación de una conexión SSH mediante SSHOperator - Amazon Managed Workflows para Apache Airflow

Creación de una conexión SSH mediante SSHOperator

El siguiente ejemplo describe cómo puede utilizar el SSHOperator en un gráfico acíclico dirigido (DAG) para conectarse a una instancia remota de Amazon EC2 desde su entorno Amazon Managed Workflows para Apache Airflow. Puede utilizar un enfoque similar para conectarse a cualquier instancia remota con acceso SSH.

En el siguiente ejemplo, carga una clave secreta SSH (.pem) en el directorio dags de su entorno en Amazon S3. A continuación, instale las dependencias necesarias mediante requirements.txt y cree una nueva conexión de Apache Airflow en la interfaz de usuario. Por último, escribe un DAG que crea una conexión SSH con la instancia remota.

Versión

  • Puede usar el código de ejemplo que aparece en esta página con Apache Airflow v2 en Python 3.10.

Requisitos previos

Para usar el código de muestra de esta página, necesitará lo siguiente:

  • Un entorno de Amazon MWAA.

  • Una clave secreta de SSH. En el código de muestra se supone que tiene una instancia de Amazon EC2 y una .pem en la misma región que su entorno de Amazon MWAA. Si no tiene una clave, consulte Crear o importar un par de claves en la Guía del usuario de Amazon EC2.

Permisos

  • No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

Requisitos

Añada el siguiente parámetro a requirements.txt para instalar el paquete apache-airflow-providers-ssh en el servidor web. Una vez que su entorno se actualice y Amazon MWAA instale correctamente la dependencia, verá un nuevo tipo de conexión SSH en la interfaz de usuario.

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
nota

-c define la URL de las restricciones en requirements.txt. Esto garantiza que Amazon MWAA instale la versión de paquete correcta para su entorno.

Cómo copiar su clave secreta en Amazon S3

Utilice el siguiente comando de la AWS Command Line Interface para copiar la clave .pem en el directorio dags de su entorno en Amazon S3.

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon MWAA copia el contenido en dags, incluida la clave .pem, en el directorio local /usr/local/airflow/dags/. De este modo, Apache Airflow puede acceder a la clave.

Creación de una nueva conexión con Apache Airflow

Creación de una nueva conexión SSH mediante la interfaz de usuario de Apache Airflow
  1. Abra la página Entornos en la consola de Amazon MWAA.

  2. En la lista de entornos, elija Abrir la interfaz de usuario de Airflow para su entorno.

  3. En la página de interfaz de usuario de Apache Airflow, seleccione Administrador en la barra de navegación superior para ampliar la lista desplegable y, a continuación, seleccione Conexiones.

  4. En la página Listar conexiones, seleccione + o el botón Añadir un nuevo registro para añadir una nueva conexión.

  5. En la página Conectar a AD, proporcione la siguiente información:

    1. En Nombre de conexión introduzca ssh_new.

    2. Para el tipo de conexión, seleccione SSH en la lista desplegable.

      nota

      Si el tipo de conexión SSH no está disponible en la lista, Amazon MWAA no ha instalado el paquete apache-airflow-providers-ssh necesario. Actualice el archivo requirements.txt para incluir este paquete e inténtelo de nuevo.

    3. Para Host, introduzca la dirección IP de la instancia de Amazon EC2 a la que desee conectarse. Por ejemplo, 12.345.67.89.

    4. En Nombre de usuario, introduzca ec2-user si se está conectando a una instancia de Amazon EC2. Su nombre de usuario puede ser diferente, según el tipo de instancia remota a la que desee que se conecte Apache Airflow.

    5. Para Extra, introduzca el siguiente par clave-valor en formato JSON:

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      Este par clave-valor indica a Apache Airflow que busque la clave secreta en el directorio local /dags.

Código de ejemplo

El siguiente DAG utiliza SSHOperator para conectarse a la instancia de Amazon EC2 de destino y, a continuación, ejecuta el comando de Linux hostname para imprimir el nombre de la instancia. Puede modificar el DAG para ejecutar cualquier comando o script en la instancia remota.

  1. Abra una terminal y navegue hasta el directorio en el que está almacenado el código del DAG. Por ejemplo:

    cd dags
  2. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como ssh.py.

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. Ejecute el siguiente comando de la AWS CLI para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Si se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas del ssh_task en el ssh_operator_example DAG:

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916