为 Apache Airflow 变量使用 AWS Secrets Manager 中的密钥 - Amazon Managed Workflows for Apache Airflow

为 Apache Airflow 变量使用 AWS Secrets Manager 中的密钥

以下示例调用 AWS Secrets Manager 来获取 Amazon MWAA 上的 Apache Airflow 变量的密钥。它假设您已完成 使用 AWS Secrets Manager 密钥配置 Apache Airflow 连接 中的步骤。

版本

  • 本页上的示例代码可与 Python 3.7 中的 Apache Airflow v1 一起使用。

  • 您可以在 Python 3.10 中将本页上的代码示例与 Apache Airflow v2 一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

要求

代码示例

以下步骤描述了如何创建 DAG 代码,以便调用 Secrets Manager 来获取密钥。

  1. 在命令提示符下,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 secrets-manager-var.py

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.models import Variable from airflow.utils.dates import days_ago from datetime import timedelta import os DAG_ID = os.path.basename(__file__).replace(".py", "") DEFAULT_ARGS = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, } def get_variable_fn(**kwargs): my_variable_name = Variable.get("test-variable", default_var="undefined") print("my_variable_name: ", my_variable_name) return my_variable_name with DAG( dag_id=DAG_ID, default_args=DEFAULT_ARGS, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval='@once', tags=['variable'] ) as dag: get_variable = PythonOperator( task_id="get_variable", python_callable=get_variable_fn, provide_context=True )

接下来做什么?

  • 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 dags 文件夹,请参阅 添加或更新 DAG