Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Crear una SSH conexión mediante el SSHOperator
El siguiente ejemplo describe cómo puede utilizar el SSHOperator
gráfico acíclico dirigido (DAG) para conectarse a una EC2 instancia remota de Amazon desde su entorno Amazon Managed Workflows for Apache Airflow. Puede utilizar un enfoque similar para conectarse a cualquier instancia remota con SSH acceso.
En el siguiente ejemplo, carga una clave SSH secreta (.pem
) en el dags
directorio de su entorno en Amazon S3. A continuación, instale las dependencias necesarias mediante requirements.txt
y cree una nueva conexión de Apache Airflow en la interfaz de usuario. Por último, escribe una DAG que cree una SSH conexión con la instancia remota.
Temas
Versión
-
Puede usar el ejemplo de código de esta página con Apache Airflow v2 en Python 3.10
.
Requisitos previos
Para usar el código de muestra de esta página, necesitará lo siguiente:
-
Una clave SSH secreta. El ejemplo de código supone que tienes una EC2 instancia de Amazon y una
.pem
en la misma región que tu MWAA entorno de Amazon. Si no tienes una clave, consulta Crear o importar un par de claves en la Guía del EC2 usuario de Amazon.
Permisos
-
No se necesitan permisos adicionales para usar el código de ejemplo de esta página.
Requisitos
Añada el siguiente parámetro a requirements.txt
para instalar el paquete apache-airflow-providers-ssh
en el servidor web. Cuando tu entorno se actualice y Amazon instale MWAA correctamente la dependencia, verás un nuevo tipo de SSHconexión en la interfaz de usuario.
-c https://raw.githubusercontent.com/apache/airflow/constraints-
Airflow-version
/constraints-Python-version
.txt apache-airflow-providers-ssh
nota
-c
define las restricciones URL enrequirements.txt
. Esto garantiza que Amazon MWAA instale la versión de paquete correcta para su entorno.
Cómo copiar su clave secreta en Amazon S3
Utilice el siguiente AWS Command Line Interface comando para copiar la .pem
clave en el dags
directorio de su entorno en Amazon S3.
$
aws s3 cp
your-secret-key
.pem s3://your-bucket
/dags/
Amazon MWAA copia el contenidodags
, incluida la .pem
clave, en el /usr/local/airflow/dags/
directorio local. De este modo, Apache Airflow puede acceder a la clave.
Creación de una nueva conexión con Apache Airflow
Para crear una nueva SSH conexión mediante la interfaz de usuario de Apache Airflow
-
Abre la página Entornos
en la MWAA consola de Amazon. -
En la lista de entornos, elija Abrir la interfaz de usuario de Airflow para su entorno.
-
En la página de interfaz de usuario de Apache Airflow, seleccione Administrador en la barra de navegación superior para ampliar la lista desplegable y, a continuación, seleccione Conexiones.
-
En la página Listar conexiones, seleccione + o el botón Añadir un nuevo registro para añadir una nueva conexión.
-
En la página Conectar a AD, proporcione la siguiente información:
-
En Nombre de conexión introduzca
ssh_new
. -
Para el tipo de conexión, elige una opción SSHde la lista desplegable.
nota
Si el tipo de SSHconexión no está disponible en la lista, Amazon no MWAA ha instalado el
apache-airflow-providers-ssh
paquete necesario. Actualice el archivorequirements.txt
para incluir este paquete e inténtelo de nuevo. -
En Host, introduce la dirección IP de la EC2 instancia de Amazon a la que quieres conectarte. Por ejemplo,
12.345.67.89
. -
En Nombre de usuario, introduce
ec2-user
si te estás conectando a una EC2 instancia de Amazon. Su nombre de usuario puede ser diferente, según el tipo de instancia remota a la que desee que se conecte Apache Airflow. -
Para Extra, introduce el siguiente par clave-valor en JSON formato:
{ "key_file": "/usr/local/airflow/dags/
your-secret-key
.pem" }Este par clave-valor indica a Apache Airflow que busque la clave secreta en el directorio local
/dags
.
-
Código de ejemplo
A continuación, se DAG utiliza SSHOperator
para conectarse a la EC2 instancia de Amazon de destino y, a continuación, se ejecuta el comando de hostname
Linux para imprimir el nombre de la instancia. Puedes modificarlo DAG para ejecutar cualquier comando o script en la instancia remota.
-
Abre una terminal y navega hasta el directorio donde está almacenado el DAG código. Por ejemplo:
cd dags
-
Copie el contenido del código de ejemplo siguiente y guárdelo localmente como
ssh.py
.from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
-
Ejecute el siguiente AWS CLI comando para copiarlo en el bucket de su entorno y, DAG a continuación, actívelo DAG mediante la interfaz de usuario de Apache Airflow.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
Si se ejecuta correctamente, verás un resultado similar al siguiente en los registros de
ssh_task
tareas dessh_operator_example
DAG:[2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None, Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/
your-secret-key
.pem'} [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful! [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916