Creación de un complemento personalizado con Oracle - Amazon Managed Workflows para Apache Airflow

Creación de un complemento personalizado con Oracle

En el siguiente ejemplo, se explican los pasos necesarios para crear un complemento personalizado con Oracle para Amazon MWAA y se puede combinar con otros complementos y binarios personalizados en el archivo plugins.zip.

Versión

  • El código de ejemplo de esta página se puede utilizar con Apache Airflow v1 en Python 3.7.

  • Puede usar el código de ejemplo que aparece en esta página con Apache Airflow v2 en Python 3.10.

Requisitos previos

Para usar el código de muestra de esta página, necesitará lo siguiente:

  • Un entorno de Amazon MWAA.

  • El registro de procesos de trabajo debe estar habilitado en cualquier nivel de registro, CRITICAL o superior, para su entorno. Para obtener más información sobre los tipos de registros de Amazon MWAA y sobre cómo administrar sus grupos de registros, consulte Visualización registros en Amazon CloudWatch

Permisos

  • No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

Requisitos

Para usar el código de ejemplo de esta página, agregue las siguientes dependencias a su requirements.txt. Para obtener más información, consulte Instalación de dependencias de Python.

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt cx_Oracle apache-airflow-providers-oracle
Apache Airflow v1
cx_Oracle==8.1.0 apache-airflow[oracle]==1.10.12

Código de ejemplo

En los siguientes pasos se explica cómo crear el código DAG que probará el complemento personalizado.

  1. En el símbolo del sistema, vaya hasta el directorio en el que esté almacenado el código DAG. Por ejemplo:

    cd dags
  2. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como oracle.py.

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago import os import cx_Oracle DAG_ID = os.path.basename(__file__).replace(".py", "") def testHook(**kwargs): cx_Oracle.init_oracle_client() version = cx_Oracle.clientversion() print("cx_Oracle.clientversion",version) return version with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hook_test = PythonOperator( task_id="hook_test", python_callable=testHook, provide_context=True )

Creación del complemento personalizado

En esta sección se describe cómo descargar las dependencias, crear el complemento personalizado y el archivo plugins.zip.

Descarga de dependencias

Amazon MWAA extraerá el contenido del archivo plugins.zip en /usr/local/airflow/plugins en cada contenedor de procesos de trabajo y programador de Amazon MWAA. Se utiliza para añadir binarios a su entorno. En los siguientes pasos se describe cómo ensamblar los archivos necesarios para el complemento personalizado.

Extracción de la imagen del contenedor Linux de Amazon
  1. En el símbolo del sistema, extraiga la imagen del contenedor Linux de Amazon y ejecútelo localmente. Por ejemplo:

    docker pull amazonlinux docker run -it amazonlinux:latest /bin/bash

    El símbolo del sistema debe invocar un símbolo del sistema bash. Por ejemplo:

    bash-4.2#
  2. Instale la característica de E/S asíncrona nativa de Linux (libaio).

    yum -y install libaio
  3. Mantenga esta ventana abierta para los pasos siguientes. Copiaremos los siguientes archivos en el sistema local: lib64/libaio.so.1, lib64/libaio.so.1.0.0, lib64/libaio.so.1.0.1.

Descarga de la carpeta del cliente
  1. Instale el paquete de descompresión localmente. Por ejemplo:

    sudo yum install unzip
  2. Cree un directorio de oracle_plugin. Por ejemplo:

    mkdir oracle_plugin cd oracle_plugin
  3. Utilice el siguiente comando curl para descargar el archivo instantclient-basic-linux.x64-18.5.0.0.0dbru.zip de Oracle Instant Client Downloads para Linux x86-64 (64 bits).

    curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
  4. Descomprima el archivo client.zip. Por ejemplo:

    unzip *.zip
Extracción de archivos de Docker
  1. En un nuevo símbolo del sistema, muestre y anote su ID de contenedor de Docker. Por ejemplo:

    docker container ls

    El símbolo del sistema debería mostrar todos los contenedores y sus ID. Por ejemplo:

    debc16fd6970
  2. En su directorio oracle_plugin, extraiga los archivos lib64/libaio.so.1, lib64/libaio.so.1.0.0, lib64/libaio.so.1.0.1 en la carpeta instantclient_18_5 local. Por ejemplo:

    docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/

Complemento personalizado

Apache Airflow ejecutará el contenido de los archivos de Python en la carpeta de complementos durante el arranque. Esto se usa para establecer y modificar variables de entorno. En los siguientes pasos se describe el código de muestra del complemento personalizado.

  • Copie el contenido del código de ejemplo siguiente y guárdelo localmente como env_var_plugin_oracle.py.

    from airflow.plugins_manager import AirflowPlugin import os os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5' os.environ["DPI_DEBUG_LEVEL"]="64" class EnvVarPlugin(AirflowPlugin): name = 'env_var_plugin'

Plugins.zip

Los siguientes pasos muestran cómo crear el plugins.zip. El contenido de este ejemplo se puede combinar con sus otros complementos y binarios en un solo archivo plugins.zip.

Compresión del contenido del directorio de complementos
  1. En una línea de comando, vaya al directorio oracle_plugin. Por ejemplo:

    cd oracle_plugin
  2. Comprima el directorio instantclient_18_5 en plugins.zip. Por ejemplo:

    zip -r ../plugins.zip ./
  3. Debería ver lo siguiente en su símbolo del sistema:

    oracle_plugin$ ls client.zip instantclient_18_5
  4. Elimine el archivo client.zip. Por ejemplo:

    rm client.zip
Comprima el archivo env_var_plugin_oracle.py
  1. Agregue el archivo env_var_plugin_oracle.py a la raíz de plugins.zip. Por ejemplo:

    zip plugins.zip env_var_plugin_oracle.py
  2. Ahora su archivo plugins.zip debería incluir lo siguiente:

    env_var_plugin_oracle.py instantclient_18_5/

Opciones de configuración de Airflow

Si utiliza Apache Airflow v2, agregue core.lazy_load_plugins : False como opción de configuración de Apache Airflow. Para obtener más información, consulte Uso de las opciones de configuración para cargar complementos en la versión 2.

Siguientes pasos