創建一個自定義插件與阿帕奇蜂巢和 Hadoop - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

創建一個自定義插件與阿帕奇蜂巢和 Hadoop

Amazon MWAA 提取的內容plugins.zip/usr/local/airflow/plugins. 這可以用來將二進製文件添加到容器中。此外,Apache 氣流會在啟動時執行plugins資料夾中的 Python 檔案內容,讓您能夠設定和修改環境變數。以下範例將引導您完成在適用於 Apache Airflow 環境的 Amazon 受管工作流程上使用 Apache Hive 和 Hadoop 建立自訂外掛程式的步驟,並可與其他自訂外掛程式和二進位檔案結合使用。

版本

  • 此頁面上的示例代碼可以與 Python 3.7 中的阿帕奇氣流 V1 一起使用。

  • 您可以使用此頁面上的代碼示例與 Python 3.10 中的阿帕奇氣流 V2

必要條件

若要使用此頁面上的範例程式碼,您需要下列項目:

許可

  • 使用此頁面上的程式碼範例不需要其他權限。

要求

若要使用此頁面上的範例程式碼,請將下列相依性新增至requirements.txt. 如需進一步了解,請參閱 安裝 Python 相依性

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt apache-airflow-providers-amazon[apache.hive]
Apache Airflow v1
apache-airflow[hive]==1.10.12

下載相依性

Amazon MWAA 將/usr/local/airflow/plugins在每個 Amazon MWAA 調度程序和工作容器上提取 plugins.zip 的內容。這是用來將二進製文件添加到您的環境中。下列步驟說明如何組合自訂外掛程式所需的檔案。

  1. 在命令提示符中,導航到要創建插件的目錄。例如:

    cd plugins
  2. 鏡像下載 Hadoop 的,例如:

    wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
  3. 鏡像下載 Hive,例如:

    wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
  4. 建立目錄。例如:

    mkdir hive_plugin
  5. 提取 Hadoop 的。

    tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
  6. 提取蜂巢。

    tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin

自定義插件

阿帕奇氣流將在啟動時執行插件文件夾中的 Python 文件的內容。這是用來設置和修改環境變量。下列步驟說明自訂外掛程式的範例程式碼。

  1. 在命令提示字元中,導覽至目hive_plugin錄。例如:

    cd hive_plugin
  2. 複製下列程式碼範例的內容,並在本機儲存為目hive_pluginhive_plugin.py中。

    from airflow.plugins_manager import AirflowPlugin import os os.environ["JAVA_HOME"]="/usr/lib/jvm/jre" os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0' os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop' os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin' os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" class EnvVarPlugin(AirflowPlugin): name = 'hive_plugin'
  3. 應付以下文本的內容,並在本地保存為目hive_plugin.airflowignore中。

    hadoop-3.3.0 apache-hive-3.1.2-bin

Plugins.zip

以下步驟顯示如何創建plugins.zip。這個例子的內容可以與其他插件和二進製文件組合成一個plugins.zip文件。

  1. 在命令提示符中,導航到上一步中的hive_plugin目錄。例如:

    cd hive_plugin
  2. 壓縮文plugins件夾中的內容。

    zip -r ../hive_plugin.zip ./

範例程式碼

下列步驟說明如何建立將測試自訂外掛程式的程式DAG碼。

  1. 在命令提示符中,導航到存儲DAG代碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並在本機儲存為hive.py

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hive_test = BashOperator( task_id="hive_test", bash_command='hive --help' )

氣流組態選項

如果您使用的是 Apache 氣流 v2,請添加core.lazy_load_plugins : False為 Apache 氣流配置選項。若要深入瞭解,請參閱使用設定選項載入外掛程式 2

後續步驟?

  • 在中了解如何將此範例中的requirements.txt檔案上傳到您的 Amazon S3 儲存貯體安裝 Python 相依性

  • 了解如何將此範例中的DAG程式碼上傳到的 Amazon S3 儲存貯體中的dags資料夾新增或更新 DAG

  • 在本範例中進一步了解如何將plugins.zip檔案上傳到中的 Amazon S3 儲存貯體安裝自定義插件