在 Amazon MWAA 中使用启动脚本 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon MWAA 中使用启动脚本

启动脚本是您托管在环境的 Amazon S3 存储桶中的 shell(.sh)脚本,类似于 DAG、要求和插件。在安装要求和初始化 Apache Airflow 流程之前,Amazon MWAA 会在每个单独的 Apache Airflow 组件(工作线程、计划程序和 Web 服务器)上启动时运行此脚本。使用启动脚本执行以下操作:

  • 安装运行时 - 安装工作流程和连接所需的 Linux 运行时。

  • 配置环境变量 - 为每个 Apache Airflow 组件设置环境变量。覆盖常用变量,例如 PATHPYTHONPATHLD_LIBRARY_PATH

  • 管理密钥和令牌 - 将自定义存储库的访问令牌传递给 requirements.txt 并配置安全密钥。

以下主题介绍如何配置启动脚本以安装 Linux 运行时、设置环境变量以及使用 CloudWatch Logs 排查相关问题。

配置启动脚本

要在现有 Amazon MWAA 环境中使用启动脚本,请将 .sh 文件上传到环境的 Amazon S3 存储桶。然后,要将脚本与环境关联,请在环境详细信息中指定以下内容:

  • 脚本的 Amazon S3 URL 路径 — 存储桶中托管的脚本的相对路径,例如 s3://mwaa-environment/startup.sh

  • 脚本的 Amazon S3 版本 ID — Amazon S3 存储桶中启动 shell 脚本的版本。每次更新脚本时,您都必须指定 Amazon S3 为文件分配的版本 ID。版本 ID 是 Unicode、以 UTF-8 编码、URL 就绪、不透明的字符串,长度不超过 1,024 字节,例如 3sL4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo

要完成本节中的步骤,请使用以下示例脚本。该脚本输出分配给 MWAA_AIRFLOW_COMPONENT 的值。此环境变量标识在其上运行脚本的每个 Apache Airflow 组件。

复制代码并将其本地另存为 startup.sh

#!/bin/sh ​ echo "Printing Apache Airflow component" echo $MWAA_AIRFLOW_COMPONENT

接下来,将脚本上传到 Amazon S3 存储桶。

AWS Management Console
要上传 shell 脚本(控制台),请执行以下操作
  1. 登录到 AWS Management Console,然后通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/

  2. 存储桶列表中,选择与环境关联的存储桶的名称。

  3. Objects(对象)选项卡上,选择 Upload(上载)。

  4. 上传页面上,拖放您创建的 shell 脚本。

  5. 请选择 Upload(上传)。

该脚本出现在对象列表中。Amazon S3 将为文件创建新的版本 ID。如果您更新脚本并使用相同的文件名将其再次上传,则会为该文件分配一个新的版本 ID。

AWS CLI
要创建和上传 shell 脚本(CLI),请执行以下操作
  1. 打开新的命令提示符,然后运行 Amazon S3 ls 命令以列出和识别与环境关联的存储桶。

    $ aws s3 ls
  2. 导航到保存 shell 脚本的文件夹。在新提示窗口中使用 cp 将脚本上传到存储桶。用您的信息替换 your-s3-bucket

    $ aws s3 cp startup.sh s3://your-s3-bucket/startup.sh

    如果成功,Amazon S3 会输出该对象的 URL 路径:

    upload: ./startup.sh to s3://your-s3-bucket/startup.sh
  3. 使用以下命令检索脚本的最新版本 ID。

    $ aws s3api list-object-versions --bucket your-s3-bucket --prefix startup --query 'Versions[?IsLatest].[VersionId]' --output text
    BbdVMmBRjtestta1EsVnbybZp1Wqh1J4

当您将脚本与环境关联时,可以指定此版本 ID。

现在,将脚本与环境相关联。

AWS Management Console
要将脚本与环境(控制台)关联,请执行以下操作
  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择要更新的环境所在的行,然后选择编辑

  3. 指定详情页面上,在启动脚本文件(可选,输入脚本的 Amazon S3 URL,例如:s3://your-mwaa-bucket/startup-sh.

  4. 从下拉列表中选择最新版本,或者浏览 S3 以查找脚本。

  5. 选择下一步,继续进入查看页面。

  6. 查看更改,然后选择保存

环境更新可能需要 10 到 30 分钟。当环境中的每个组件重新启动时,Amazon MWAA 会运行启动脚本。

AWS CLI
要将脚本与环境(CLI)关联,请执行以下操作
  • 打开命令提示符并使用update-environment 为脚本指定 Amazon S3 URL 和版本 ID。

    $ aws mwaa update-environment \ --name your-mwaa-environment \ --startup-script-s3-path startup.sh \ --startup-script-s3-object-version BbdVMmBRjtestta1EsVnbybZp1Wqh1J4

    如果成功,Amazon MWAA 将返回环境的 Amazon 资源名称(ARN):

    arn:aws::airflow:us-west-2:123456789012:environment/your-mwaa-environment 

环境更新可能需要 10 到 30 分钟。当环境中的每个组件重新启动时,Amazon MWAA 会运行启动脚本。

最后,检索日志事件以验证脚本是否按预期运行。当您为每个 Apache Airflow 组件激活日志记录时,Amazon MWAA 会创建新的日志组和日志流。有关更多信息,请参阅 Apache Airflow 日志类型

AWS Management Console
查看 Apache Airflow 日志流(控制台)
  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择环境。

  3. 监控窗格中,选择要查看其日志的日志组,例如 Airflow 计划程序日志组

  4. 在 CloudWatch 控制台中,从日志流列表中选择一个带有前缀 startup_script_exection_ip 的流。

  5. 日志事件窗格上,您将看到用于打印 MWAA_AIRFLOW_COMPONENT 的值的命令输出。例如,对于计划程序日志,您将执行以下操作:

    Printing Apache Airflow component
    scheduler
    Finished running startup script. Execution time: 0.004s.
    Running verification
    Verification completed

您可以重复前述步骤来查看工作线程和 Web 服务器日志。

使用启动脚本安装 Linux 运行时

使用启动脚本更新 Apache Airflow 组件的操作系统,并安装其他运行时库以与工作流程一起使用。例如,以下脚本运行 yum update 来更新操作系统。

在启动脚本yum update中运行 时,必须使用 --exclude=python* 排除 Python,如示例所示。为了让环境运行,Amazon MWAA 会安装与环境兼容的特定版本的 Python。因此,您无法使用启动脚本更新环境的 Python 版本。

#!/bin/sh echo "Updating operating system" sudo yum update -y --exclude=python*

要在特定的 Apache Airflow 组件上安装运行时,请使用 MWAA_AIRFLOW_COMPONENTiffi 条件语句。此示例运行单个命令将 libaio 库安装在计划程序和工作线程上,但不安装在 Web 服务器上。

重要
  • 如果您配置了私有 Web 服务器,则必须使用以下条件或在本地提供所有安装文件,以避免安装超时。

  • 使用 sudo 运行需要管理权限的操作。

#!/bin/sh if [[ "${MWAA_AIRFLOW_COMPONENT}" != "webserver" ]] then sudo yum -y install libaio fi

您可以使用启动脚本来检查 Python 版本。

#!/bin/sh export PYTHON_VERSION_CHECK=`python -c 'import sys; version=sys.version_info[:3]; print("{0}.{1}.{2}".format(*version))'` echo "Python version is $PYTHON_VERSION_CHECK"

Amazon MWAA 不支持覆盖默认 Python 版本,因为这可能会导致与已安装的 Apache Airflow 库不兼容。

使用启动脚本设置环境变量

使用启动脚本来设置环境变量和修改 Apache Airflow 配置。以下示例定义了新变量 ENVIRONMENT_STAGE。您可以在 DAG 或自定义模块中引用此变量。

#!/bin/sh export ENVIRONMENT_STAGE="development" echo "$ENVIRONMENT_STAGE"

使用启动脚本覆盖常见的 Apache Airflow 或系统变量。例如,您设置 LD_LIBRARY_PATH 来指示 Python,以在您指定的路径中查找二进制文件。这允许您使用插件为工作流程提供自定义二进制文件:

#!/bin/sh export LD_LIBRARY_PATH=/usr/local/airflow/plugins/your-custom-binary

预留环境变量

Amazon MWAA 保留了一组关键的环境变量。如果您覆盖保留变量,Amazon MWAA 会将其恢复为默认值。以下列出了保留变量:

  • MWAA__AIRFLOW__COMPONENT— 用于使用以下值之一标识 Apache Airflow 组件:schedulerworkerwebserver

  • AIRFLOW__WEBSERVER__SECRET_KEY— 用于在 Apache Airflow Web 服务器中安全签署会话 cookie 的密钥。

  • AIRFLOW__CORE__FERNET_KEY— 用于加密和解密存储在元数据数据库中的敏感数据的密钥,例如连接密码。

  • AIRFLOW_HOME— Apache Airflow 主目录的路径,配置文件和 DAG 文件存储在本地。

  • AIRFLOW__CELERY__BROKER_URL— 用于 Apache Airflow 计划程序和 Celery Worker 节点之间通信的消息代理的 URL。

  • AIRFLOW__CELERY__RESULT_BACKEND— 用于存储 Celery 任务结果的数据库的 URL。

  • AIRFLOW__CORE__EXECUTOR— Apache Airflow 应该使用的执行程序类。在 Amazon MWAA 中,这是 CeleryExecutor

  • AIRFLOW__CORE__LOAD_EXAMPLES— 用于激活或停用示例 DAG 的加载。

  • AIRFLOW__METRICS__METRICS_BLOCK_LIST— 用于管理 Amazon MWAA 在 CloudWatch 中发出和捕获的 Apache Airflow 指标。

  • SQL_ALCHEMY_CONN— 用于在 Amazon MWAA 中存储 Apache Airflow 元数据的 RDS for PostgreSQL 数据库的连接字符串。

  • AIRFLOW__CORE__SQL_ALCHEMY_CONN— 用于与 SQL_ALCHEMY_CONN 相同的目的,但遵循新的 Apache Airflow 命名约定。

  • AIRFLOW__CELERY__DEFAULT_QUEUE— Apache Airflow 中 Celery 任务的默认队列。

  • AIRFLOW__OPERATORS__DEFAULT_QUEUE— 使用特定 Apache Airflow 运算符执行任务的默认队列。

  • AIRFLOW_VERSION— 安装在 Amazon MWAA 环境中的 Apache Airflow 版本。

  • AIRFLOW_CONN_AWS_DEFAULT— 用于与其他 AWS 服务集成的默认 AWS 凭证。

  • AWS_DEFAULT_REGION— 设置默认 AWS 区域,与其他 AWS 服务集成的默认凭证一起使用。

  • AWS_REGION — 如果已定义此环境变量,它将覆盖环境变量 AWS_DEFAULT_REGION 和配置文件设置区域中的值。

  • PYTHONUNBUFFERED— 用于向容器日志发送 stdoutstderr 流。

  • AIRFLOW__METRICS__STATSD_ALLOW_LIST— 用于配置以逗号分隔前缀的允许列表,以发送以列表元素开头的指标。

  • AIRFLOW__METRICS__STATSD_ON— 激活向 StatsD 发送指标。

  • AIRFLOW__METRICS__STATSD_HOST— 用于连接到 StatSD 进程守护程序。

  • AIRFLOW__METRICS__STATSD_PORT— 用于连接到 StatSD 进程守护程序。

  • AIRFLOW__METRICS__STATSD_PREFIX— 用于连接到 StatSD 进程守护程序。

  • AIRFLOW__CELERY__WORKER_AUTOSCALE— 设置最大和最小并发度。

  • AIRFLOW__CORE__DAG_CONCURRENCY— 设置计划程序可以在一个 DAG 中并行运行的任务实例数。

  • AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG— 设置每个 DAG 的最大活动任务数。

  • AIRFLOW__CORE__PARALLELISM— 定义可以同时运行的最大任务实例数。

  • AIRFLOW__SCHEDULER__PARSING_PROCESSES— 设置计划程序为调度 DAG 而解析的最大进程数。

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT— 定义在将消息重新传送给其他工作线程之前,工作线程等待确认任务的秒数。

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__REGION— 为底层 Celery 传输设置 AWS 区域。

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__PREDEFINED_QUEUES— 为底层 Celery 传输设置队列。

  • AIRFLOW_SCHEDULER_ALLOWED_RUN_ID_PATTERN— 用于在触发 DAG 时验证 run_id 参数输入的有效性。

  • AIRFLOW__WEBSERVER__BASE_URL— 用于托管 Apache Airflow UI 的 Web 服务器的 URL。

非预留环境变量

您可以使用启动脚本来覆盖未保留的环境变量。以下列表提供了一些常见变量:

  • PATH— 指定操作系统在其中搜索可执行文件和脚本的目录列表。在命令行中运行命令时,系统会检查 PATH 中的目录以查找并执行该命令。在 Apache Airflow 中创建自定义运算符或任务时,可能需要依赖外部脚本或可执行文件。如果包含这些文件的目录不在 PATH 变量中指定的目录中,则当系统无法找到它们时,任务将无法运行。通过将相应的目录添加到 PATH,Apache Airflow 任务可以找到并运行所需的可执行文件。

  • PYTHONPATH— Python 解释器使用它来确定要在哪些目录中搜索导入的模块和程序包。它是您可以添加到默认搜索路径的目录列表。这使解释器可以查找和加载未包含在标准库中或未安装在系统目录中的 Python 库。使用此变量添加模块和自定义 Python 程序包,并将其与 DAG 一起使用。

  • LD_LIBRARY_PATH— Linux 中的动态链接器和加载器使用的环境变量,用于查找和加载共享库。它指定了包含共享库的目录列表,这些共享库在默认系统库目录之前接受搜索。使用此变量来指定自定义二进制文件。

  • CLASSPATH— 由 Java 运行时环境(JRE)和 Java 开发套件(JDK)用于在运行时查找和加载 Java 类、库和资源。它是一个包含已编译的 Java 代码的目录、JAR 文件和 ZIP 档案的列表。