本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Amazon MWAA 中使用启动脚本
启动脚本是您托管在环境的 Amazon S3 存储桶中的 shell (.sh
) 脚本,类似于 DAG、要求和插件。在安装要求和初始化 Apache Airflow 流程之前,Amazon MWAA 会在每个单独的 Apache Airflow 组件(工作线程、计划程序和 Web 服务器)上启动时运行此脚本。使用启动脚本执行以下操作:
-
安装运行时 - 安装工作流程和连接所需的 Linux 运行时。
-
配置环境变量 - 为每个 Apache Airflow 组件设置环境变量。覆盖常用变量,例如
PATH
、PYTHONPATH
和LD_LIBRARY_PATH
。 -
管理密钥和令牌 - 将自定义存储库的访问令牌传递给
requirements.txt
并配置安全密钥。
以下主题介绍如何配置启动脚本以安装 Linux 运行时、设置环境变量以及使用 CloudWatch Logs 排查相关问题。
配置启动脚本
要在现有 Amazon MWAA 环境中使用启动脚本,请将 .sh
文件上传到环境的 Amazon S3 存储桶。然后,要将脚本与环境关联,请在环境详细信息中指定以下内容:
-
脚本的 Amazon S3 URL 路径 — 存储桶中托管的脚本的相对路径,例如
s3://mwaa-environment/
startup.sh
-
脚本的 Amazon S3 版本 ID — Amazon S3 存储桶中启动 shell 脚本的版本。每次更新脚本时,您都必须指定 Amazon S3 为文件分配的版本 ID。版本 ID 是 Unicode、以 UTF-8 编码、URL 就绪、不透明的字符串,长度不超过 1,024 字节,例如
3sL4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo
。
要完成本节中的步骤,请使用以下示例脚本。该脚本输出分配给 MWAA_AIRFLOW_COMPONENT
的值。此环境变量标识在其上运行脚本的每个 Apache Airflow 组件。
复制代码并将其本地另存为 startup.sh
。
#!/bin/sh echo "Printing Apache Airflow component" echo $MWAA_AIRFLOW_COMPONENT
接下来,将脚本上传到 Amazon S3 存储桶。
现在,将脚本与环境相关联。
最后,检索日志事件以验证脚本是否按预期运行。当您为每个 Apache Airflow 组件激活日志记录时,Amazon MWAA 会创建新的日志组和日志流。有关更多信息,请参阅 Apache Airflow 日志类型。
使用启动脚本安装 Linux 运行时
使用启动脚本更新 Apache Airflow 组件的操作系统,并安装其他运行时库以与工作流程一起使用。例如,以下脚本运行 yum update
来更新操作系统。
在启动脚本中运行 yum update
时,必须使用 --exclude=python*
排除 Python,如示例所示。为了让环境运行,Amazon MWAA 会安装与环境兼容的特定版本的 Python。因此,您无法使用启动脚本更新环境的 Python 版本。
#!/bin/sh echo "Updating operating system" sudo yum update -y --exclude=python*
要在特定的 Apache Airflow 组件上安装运行时,请使用 MWAA_AIRFLOW_COMPONENT
和 if
和 fi
条件语句。此示例运行单个命令将 libaio
库安装在计划程序和工作线程上,但不安装在 Web 服务器上。
重要
-
如果您配置了私有 Web 服务器,则必须使用以下条件或在本地提供所有安装文件,以避免安装超时。
-
使用
sudo
运行需要管理权限的操作。
#!/bin/sh if [[ "${MWAA_AIRFLOW_COMPONENT}" != "webserver" ]] then sudo yum -y install libaio fi
您可以使用启动脚本来检查 Python 版本。
#!/bin/sh export PYTHON_VERSION_CHECK=`python -c 'import sys; version=sys.version_info[:3]; print("{0}.{1}.{2}".format(*version))'` echo "Python version is $PYTHON_VERSION_CHECK"
Amazon MWAA 不支持覆盖默认 Python 版本,因为这可能会导致与已安装的 Apache Airflow 库不兼容。
使用启动脚本设置环境变量
使用启动脚本来设置环境变量和修改 Apache Airflow 配置。以下示例定义了新变量 ENVIRONMENT_STAGE
。您可以在 DAG 或自定义模块中引用此变量。
#!/bin/sh export ENVIRONMENT_STAGE="development" echo "$ENVIRONMENT_STAGE"
使用启动脚本覆盖常见的 Apache Airflow 或系统变量。例如,您设置 LD_LIBRARY_PATH
来指示 Python,以在您指定的路径中查找二进制文件。这允许您使用插件
#!/bin/sh export LD_LIBRARY_PATH=/usr/local/airflow/plugins/
your-custom-binary
预留环境变量
Amazon MWAA 保留了一组关键的环境变量。如果您覆盖保留变量,Amazon MWAA 会将其恢复为默认值。以下列出了保留变量:
-
MWAA__AIRFLOW__COMPONENT
— 用于使用以下值之一标识 Apache Airflow 组件:scheduler
、worker
或webserver
。 -
AIRFLOW__WEBSERVER__SECRET_KEY
— 用于在 Apache Airflow Web 服务器中安全签署会话 cookie 的密钥。 -
AIRFLOW__CORE__FERNET_KEY
— 用于加密和解密存储在元数据数据库中的敏感数据的密钥,例如连接密码。 -
AIRFLOW_HOME
— Apache Airflow 主目录的路径,配置文件和 DAG 文件存储在本地。 -
AIRFLOW__CELERY__BROKER_URL
— 用于 Apache Airflow 计划程序和 Celery Worker 节点之间通信的消息代理的 URL。 -
AIRFLOW__CELERY__RESULT_BACKEND
— 用于存储 Celery 任务结果的数据库的 URL。 -
AIRFLOW__CORE__EXECUTOR
— Apache Airflow 应该使用的执行程序类。在 Amazon MWAA 中,这是CeleryExecutor
-
AIRFLOW__CORE__LOAD_EXAMPLES
— 用于激活或停用示例 DAG 的加载。 -
AIRFLOW__METRICS__METRICS_BLOCK_LIST
— 用于管理 Amazon MWAA 在 CloudWatch 中发出和捕获的 Apache Airflow 指标。 -
SQL_ALCHEMY_CONN
— 用于在 Amazon MWAA 中存储 Apache Airflow 元数据的 RDS for PostgreSQL 数据库的连接字符串。 -
AIRFLOW__CORE__SQL_ALCHEMY_CONN
— 用于与SQL_ALCHEMY_CONN
相同的目的,但遵循新的 Apache Airflow 命名约定。 -
AIRFLOW__CELERY__DEFAULT_QUEUE
— Apache Airflow 中 Celery 任务的默认队列。 -
AIRFLOW__OPERATORS__DEFAULT_QUEUE
— 使用特定 Apache Airflow 运算符执行任务的默认队列。 -
AIRFLOW_VERSION
— 安装在 Amazon MWAA 环境中的 Apache Airflow 版本。 -
AIRFLOW_CONN_AWS_DEFAULT
— 用于与其他 AWS 服务集成的默认 AWS 凭证。 -
AWS_DEFAULT_REGION
— 设置默认 AWS 区域,与其他 AWS 服务集成的默认凭证一起使用。 -
AWS_REGION
— 如果已定义此环境变量,它将覆盖环境变量AWS_DEFAULT_REGION
和配置文件设置区域中的值。 -
PYTHONUNBUFFERED
— 用于向容器日志发送stdout
和stderr
流。 -
AIRFLOW__METRICS__STATSD_ALLOW_LIST
— 用于配置以逗号分隔前缀的允许列表,以发送以列表元素开头的指标。 -
AIRFLOW__METRICS__STATSD_ON
— 激活向StatsD
发送指标。 -
AIRFLOW__METRICS__STATSD_HOST
— 用于连接到StatSD
进程守护程序。 -
AIRFLOW__METRICS__STATSD_PORT
— 用于连接到StatSD
进程守护程序。 -
AIRFLOW__METRICS__STATSD_PREFIX
— 用于连接到StatSD
进程守护程序。 -
AIRFLOW__CELERY__WORKER_AUTOSCALE
— 设置最大和最小并发度。 -
AIRFLOW__CORE__DAG_CONCURRENCY
— 设置计划程序可以在一个 DAG 中并行运行的任务实例数。 -
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG
— 设置每个 DAG 的最大活动任务数。 -
AIRFLOW__CORE__PARALLELISM
— 定义可以同时运行的最大任务实例数。 -
AIRFLOW__SCHEDULER__PARSING_PROCESSES
— 设置计划程序为调度 DAG 而解析的最大进程数。 -
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT
— 定义在将消息重新传送给其他工作线程之前,工作线程等待确认任务的秒数。 -
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__REGION
— 为底层 Celery 传输设置 AWS 区域。 -
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__PREDEFINED_QUEUES
— 为底层 Celery 传输设置队列。 -
AIRFLOW_SCHEDULER_ALLOWED_RUN_ID_PATTERN
— 用于在触发 DAG 时验证run_id
参数输入的有效性。 -
AIRFLOW__WEBSERVER__BASE_URL
— 用于托管 Apache Airflow UI 的 Web 服务器的 URL。
非预留环境变量
您可以使用启动脚本来覆盖未保留的环境变量。以下列表提供了一些常见变量:
-
PATH
— 指定操作系统在其中搜索可执行文件和脚本的目录列表。在命令行中运行命令时,系统会检查PATH
中的目录以查找并执行该命令。在 Apache Airflow 中创建自定义运算符或任务时,可能需要依赖外部脚本或可执行文件。如果包含这些文件的目录不在PATH
变量中指定的目录中,则当系统无法找到它们时,任务将无法运行。通过将相应的目录添加到PATH
,Apache Airflow 任务可以找到并运行所需的可执行文件。 -
PYTHONPATH
— Python 解释器使用它来确定要在哪些目录中搜索导入的模块和程序包。它是您可以添加到默认搜索路径的目录列表。这使解释器可以查找和加载未包含在标准库中或未安装在系统目录中的 Python 库。使用此变量添加模块和自定义 Python 程序包,并将其与 DAG 一起使用。 -
LD_LIBRARY_PATH
— Linux 中的动态链接器和加载器使用的环境变量,用于查找和加载共享库。它指定了包含共享库的目录列表,这些共享库在默认系统库目录之前接受搜索。使用此变量来指定自定义二进制文件。 -
CLASSPATH
— 由 Java 运行时环境(JRE)和 Java 开发套件(JDK)用于在运行时查找和加载 Java 类、库和资源。它是一个包含已编译的 Java 代码的目录、JAR 文件和 ZIP 档案的列表。