为 Apache Airflow 变量使用 AWS Secrets Manager 中的密钥
以下示例调用 AWS Secrets Manager 来获取 Amazon MWAA 上的 Apache Airflow 变量的密钥。它假设您已完成 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 中的步骤。
版本
-
本页上的示例代码可与 Python 3.7
中的 Apache Airflow v1 一起使用。
-
您可以在 Python 3.10
中将本页上的代码示例与 Apache Airflow v2 一起使用。
先决条件
要使用本页上的示例代码,您需要以下内容:
-
创建 Secrets Manager 后端作为 Apache Airflow 配置选项,如 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 所示。
-
Secrets Manager 中的 Apache Airflow 变量字符串,如 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 所示。
权限
-
Secrets Manager 权限,如 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 所示。
要求
-
要在 Apache Airflow v1 中使用此代码示例,无需附加依赖项。该代码在环境中使用 Apache Airflow v1 基础版安装
。
-
要在 Apache Airflow v2 中使用此代码示例,无需附加依赖项。该代码在环境中使用 Apache Airflow v2 基础版安装
。
代码示例
以下步骤描述了如何创建 DAG 代码,以便调用 Secrets Manager 来获取密钥。
-
在命令提示符下,导航到存储 DAG 代码的目录。例如:
cd dags
-
复制以下代码示例的内容,并在本地另存为
secrets-manager-var.py
。from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.models import Variable from airflow.utils.dates import days_ago from datetime import timedelta import os DAG_ID = os.path.basename(__file__).replace(".py", "") DEFAULT_ARGS = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, } def get_variable_fn(**kwargs): my_variable_name = Variable.get("test-variable", default_var="undefined") print("my_variable_name: ", my_variable_name) return my_variable_name with DAG( dag_id=DAG_ID, default_args=DEFAULT_ARGS, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval='@once', tags=['variable'] ) as dag: get_variable = PythonOperator( task_id="get_variable", python_callable=get_variable_fn, provide_context=True )
接下来做什么?
-
要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的
dags
文件夹,请参阅 添加或更新 DAG。