搭配 Amazon MWAA 使用啟動指令碼 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 Amazon MWAA 使用啟動指令碼

啟動指令碼是您在環境的 Amazon S3 儲存貯體中託管的 shell (.sh) 指令碼,類似於您的 DAG、需求和外掛程式。Amazon MWAA 會在啟動期間在每個個別的 Apache Airflow 元件 (工作者、排程器和網頁伺服器) 上執行此指令碼,然後再安裝需求並初始化 Apache 氣流程。使用啟動指令碼執行下列作業:

  • 安裝執行階段 — 安裝工作流程和連線所需的 Linux 執行階段。

  • 定環境變數 — 為每個 Apache 氣流元件設定環境變數。覆寫常見變數PATH,例如PYTHONPATH、和LD_LIBRARY_PATH

  • 管理金鑰和權杖 — 將自訂儲存庫的存取權杖傳遞給requirements.txt並設定安全金鑰。

下列主題說明如何使用 CloudWatch 記錄設定啟動指令碼來安裝 Linux 執行階段、設定環境變數,以及疑難排解相關問題。

設定啟動指令碼

若要將啟動指令碼與現有的 Amazon MWAA 環境搭配使用,請將.sh檔案上傳到環境的 Amazon S3 儲存貯體。然後,若要將指令碼與環境產生關聯,請在您的環境詳細資料中指定下列項目:

  • 指令碼的 Amazon S3 URL 路徑 — 儲存貯體中託管之指令碼的相對路徑,例如,s3://mwaa-environment/startup.sh

  • 指令碼的 Amazon S3 版本識別碼 — Amazon S3 儲存貯體中的啟動殼層指令碼版本。每次更新指令碼時,您都必須指定 Amazon S3 指派給檔案的版本識別碼。版本識別碼為 Unicode、UTF-8 編碼、網址就緒、不透明的字串,長度不超過 1,024 個位元組,例如。3sL4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo

若要完成本節中的步驟,請使用下列範例指令碼。該腳本輸出分配給的值MWAA_AIRFLOW_COMPONENT。此環境變數可識別執行指令碼的每個 Apache 氣流元件。

複製代碼並將其保存在本地startup.sh

#!/bin/sh ​ echo "Printing Apache Airflow component" echo $MWAA_AIRFLOW_COMPONENT

接下來,將指令碼上傳到您的 Amazon S3 儲存貯體。

AWS Management Console
若要上傳殼層指令碼 (主控台)
  1. 登入 AWS Management Console,並開啟位於 https://console.aws.amazon.com/s3/ 的 Amazon S3 主控台。

  2. 從「值區」清單中,選擇與您環境相關聯的值區名稱。

  3. Objects (物件)標籤上,選擇 Upload (上傳)

  4. 在「上傳」頁面上,拖放您建立的 shell 指令碼。

  5. 選擇上傳

指令碼會出現在物件清單中。Amazon S3 為該文件創建一個新的版本 ID。如果您更新指令碼並使用相同的檔案名稱再次上傳,則會為檔案指定新的版本 ID。

AWS CLI
若要建立並上傳命令介面指令碼 (CLI)
  1. 開啟新的命令提示字元,然後執行 Amazon S3 ls 命令以列出並識別與您的環境相關聯的儲存貯體。

    $ aws s3 ls
  2. 導覽至您儲存殼層指令碼的資料夾。cp在新的提示視窗中使用,將指令碼上傳至您的值區。用您的信息替換您的 S3 桶

    $ aws s3 cp startup.sh s3://your-s3-bucket/startup.sh

    如果成功,Amazon S3 會將 URL 路徑輸出至物件:

    upload: ./startup.sh to s3://your-s3-bucket/startup.sh
  3. 使用下列命令擷取指令碼的最新版本 ID。

    $ aws s3api list-object-versions --bucket your-s3-bucket --prefix startup --query 'Versions[?IsLatest].[VersionId]' --output text
    BbdVMmBRjtestta1EsVnbybZp1Wqh1J4

當您將指令碼與環境產生關聯時,您可以指定此版本識別碼。

現在,將指令碼與您的環境相關聯。

AWS Management Console
建立指令碼與環境 (主控台) 的關聯
  1. 在 Amazon MWAA 主控台上開啟「環境」頁面

  2. 選取您要更新之環境的列,然後選擇「編輯」。

  3. 在 [指定詳細資料] 頁面上,對於啟動指令碼檔案-用,輸入指令碼的 Amazon S3 URL,例如:s3://your-mwaa-bucket/startup-sh.

  4. 從下拉式清單中選擇最新版本,或瀏覽S3以尋找指令碼。

  5. 選擇「下一步」,然後前往「檢閱並儲存」頁面。

  6. 檢閱變更,然後選擇儲存

環境更新可能需要 10 到 30 分鐘。Amazon MWAA 會在環境中的每個元件重新啟動時執行啟動指令碼。

AWS CLI
將指令碼與環境 (CLI) 產生關聯的步驟
  • 開啟命令提示字元,並update-environment使用指定指令碼的 Amazon S3 URL 和版本識別碼。

    $ aws mwaa update-environment \ --name your-mwaa-environment \ --startup-script-s3-path startup.sh \ --startup-script-s3-object-version BbdVMmBRjtestta1EsVnbybZp1Wqh1J4

    如果成功,Amazon MWAA 返回環境的 Amazon 資源名稱(ARN):

    arn:aws::airflow:us-west-2:123456789012:environment/your-mwaa-environment 

環境更新可能需要 10 到 30 分鐘。Amazon MWAA 會在環境中的每個元件重新啟動時執行啟動指令碼。

最後,擷取記錄事件以驗證指令碼是否如預期般運作。當您為每個 Apache 氣流元件啟動記錄時,Amazon MWAA 會建立新的日誌群組和日誌串流。如需詳細資訊,請參閱 Apache 氣流記錄檔類型

AWS Management Console
若要檢查 Apache 氣流記錄資料流 (主控台)
  1. 在 Amazon MWAA 主控台上開啟「環境」頁面

  2. 選擇您的環境。

  3. 在 [視] 窗格中,選擇要檢視其記錄檔的記錄群組,例如 Airflow 排程器記錄群組

  4. 在 CloudWatch 主控台的「記錄串流」清單中,選擇具有下列前置詞的串流:startup_script_exection_ip

  5. 在 [記錄事件] 窗格中,您將看到列印值的命令輸出MWAA_AIRFLOW_COMPONENT。例如,對於排程器記錄,您將會執行下列動作:

    Printing Apache Airflow component
    scheduler
    Finished running startup script. Execution time: 0.004s.
    Running verification
    Verification completed

您可以重複上述步驟來檢視 Worker 和 Web 伺服器記錄。

使用啟動指令碼安裝 Linux 執行階段

使用啟動指令碼更新 Apache Airflow 元件的作業系統,並安裝其他執行階段程式庫以搭配您的工作流程使用。例如,執行下列指令碼yum update來更新作業系統。

在啟動指令碼yum update中執行時,您必須使用--exclude=python*範例中所示排除 Python。為了讓您的環境能夠執行,Amazon MWAA 會安裝與您的環境相容的特定 Python 版本。因此,您無法使用啟動指令碼更新環境的 Python 版本。

#!/bin/sh echo "Updating operating system" sudo yum update -y --exclude=python*

若要在特定的 Apache Airflow 元件上安裝執行階段,請使用MWAA_AIRFLOW_COMPONENTiffi條件陳述式。此範例會執行單一命令,在排程器和 Worker 上安裝程式libaio庫,但不會在 Web 伺服器上安裝程式庫。

重要
  • 如果您已設定私人 Web 伺服器,則必須使用下列條件,或在本機提供所有安裝檔案,以避免安裝逾時。

  • sudo於執行需要管理權限的作業。

#!/bin/sh if [[ "${MWAA_AIRFLOW_COMPONENT}" != "webserver" ]] then sudo yum -y install libaio fi

您可以使用啟動腳本來檢查 Python 版本。

#!/bin/sh export PYTHON_VERSION_CHECK=`python -c 'import sys; version=sys.version_info[:3]; print("{0}.{1}.{2}".format(*version))'` echo "Python version is $PYTHON_VERSION_CHECK"

Amazon MWAA 不支援覆寫預設 Python 版本,因為這可能會導致與已安裝的 Apache 氣流程式庫不相容。

使用啟動指令碼設定環境變數

使用啟動指令碼來設定環境變數並修改 Apache 氣流組態。下面定義了一個新的變量,ENVIRONMENT_STAGE。您可以在 DAG 或自訂模組中參考此變數。

#!/bin/sh export ENVIRONMENT_STAGE="development" echo "$ENVIRONMENT_STAGE"

使用啟動指令碼覆寫常見的 Apache 氣流或系統變數。例如,您設定LD_LIBRARY_PATH為指示 Python 在您指定的路徑中尋找二進位檔案。這可讓您使用外掛程式為工作流程提供自訂二進位檔案:

#!/bin/sh export LD_LIBRARY_PATH=/usr/local/airflow/plugins/your-custom-binary

預留環境變數

Amazon MWAA 保留一組關鍵環境變數。如果覆寫保留變數,Amazon MWAA 會將其還原為預設值。以下列出了保留變量:

  • MWAA__AIRFLOW__COMPONENT— 用來識別具有下列其中一個值的 Apache 氣流元件:schedulerworker、或webserver

  • AIRFLOW__WEBSERVER__SECRET_KEY— 用於在 Apache 氣流網頁伺服器中安全簽署工作階段 Cookie 的密鑰。

  • AIRFLOW__CORE__FERNET_KEY— 用於加密和解密中繼資料資料庫中儲存之敏感資料的金鑰,例如連線密碼。

  • AIRFLOW_HOME— Apache 氣流主目錄的路徑,其中組態檔和 DAG 檔案儲存在本機。

  • AIRFLOW__CELERY__BROKER_URL— 用於 Apache 氣流排程器和 Celly 工作者節點之間通訊的訊息代理程式 URL。

  • AIRFLOW__CELERY__RESULT_BACKEND— 用來儲存 Celery 工作結果的資料庫 URL。

  • AIRFLOW__CORE__EXECUTOR-阿帕奇氣流應該使用的執行程序類。在 Amazon MWAA,這是一個 CeleryExecutor

  • AIRFLOW__CORE__LOAD_EXAMPLES— 用於啟動或停用範例 DAG 的載入。

  • AIRFLOW__METRICS__METRICS_BLOCK_LIST— 用於管理 Amazon MWAA 中發出和捕獲的 Apache 氣流指標。 CloudWatch

  • SQL_ALCHEMY_CONN— RDS 適用於 PostgreSQL 資料庫的連接字串,用於將阿帕奇氣流中繼資料儲存在 Amazon MWAA 中。

  • AIRFLOW__CORE__SQL_ALCHEMY_CONN— 用於與之相同的用途SQL_ALCHEMY_CONN,但遵循新的 Apache 氣流命名慣例。

  • AIRFLOW__CELERY__DEFAULT_QUEUE— Apache 氣流中芹菜工作的預設佇列。

  • AIRFLOW__OPERATORS__DEFAULT_QUEUE— 使用特定 Apache 氣流運算子之工作的預設佇列。

  • AIRFLOW_VERSION— 在 Amazon MWAA 環境中安裝的 Apache 氣流版本。

  • AIRFLOW_CONN_AWS_DEFAULT— 用來與中的其他AWS服務整合的預設認AWS證。

  • AWS_DEFAULT_REGION— 設置與默認憑據一起使用的默認AWS區域以與其他AWS服務集成。

  • AWS_REGION— 如果已定義,此環境變數會覆寫環境變數AWS_DEFAULT_REGION和設定檔設定區域中的值。

  • PYTHONUNBUFFERED-用於發送stdoutstderr流式傳輸到容器日誌。

  • AIRFLOW__METRICS__STATSD_ALLOW_LIST— 用於配置逗號分隔前綴的允許列表,以發送以列表元素開頭的指標。

  • AIRFLOW__METRICS__STATSD_ON— 啟動傳送量度至StatsD

  • AIRFLOW__METRICS__STATSD_HOST-用於連接到StatSD守護進程。

  • AIRFLOW__METRICS__STATSD_PORT-用於連接到StatSD守護進程。

  • AIRFLOW__METRICS__STATSD_PREFIX-用於連接到StatSD守護進程。

  • AIRFLOW__CELERY__WORKER_AUTOSCALE— 設定最大和最小並行。

  • AIRFLOW__CORE__DAG_CONCURRENCY— 設定一個 DAG 中排程器可同時執行的工作執行個體數目。

  • AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG— 設定每個 DAG 的使用中工作數目上限。

  • AIRFLOW__CORE__PARALLELISM— 定義可同時執行的工作實例數目上限。

  • AIRFLOW__SCHEDULER__PARSING_PROCESSES— 設定排程器剖析以排程 DAG 的最大處理程序數目。

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT— 定義 Worker 在將訊息重新傳遞給另一個 Worker 之前等待確認工作的秒數。

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__REGION— 設置AWS區域的基礎芹菜運輸.

  • AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__PREDEFINED_QUEUES— 為基礎 Celery 傳輸設定佇列。

  • AIRFLOW_SCHEDULER_ALLOWED_RUN_ID_PATTERN— 用於在觸發 DAG 時驗證run_id參數輸入的有效性。

  • AIRFLOW__WEBSERVER__BASE_URL— 用於託管 Apache 氣流使用者介面的網頁伺服器 URL。

未預留的環境變數

您可以使用啟動指令碼覆寫未保留的環境變數。下面列出了一些這些常見的變量:

  • PATH— 指定作業系統在其中搜尋可執行檔和程序檔的目錄清單。當指令在指令行中執行時,系統會檢查目錄以尋找並執行指令。PATH當您在 Apache Airflow 中建立自訂運算子或工作時,您可能需要仰賴外部指令碼或可執行檔。如果包含這些檔案的目錄不在PATH變數中指定的,則當系統找不到工作時,工作將無法執行。透過將適當的目錄新增至PATH,Apache Airflow 工作可以尋找並執行所需的可執行檔。

  • PYTHONPATH— 由 Python 解譯器用於確定要搜索導入模塊和包的目錄。這是您可以新增至預設搜尋路徑的目錄清單。這使解釋器可以查找並加載未包含在標準庫中或安裝在系統目錄中的 Python 庫。使用此變數可新增您的模組和自訂 Python 套件,並將它們與 DAG 搭配使用。

  • LD_LIBRARY_PATH— Linux 中動態鏈接器和加載器用於查找和加載共享庫的環境變量。它指定包含共享庫的目錄列表,這些目錄在默認系統庫目錄之前進行搜索。使用此變數可指定您的自訂二進位檔案。

  • CLASSPATH— 由 Java 執行階段環境 (JRE) 和 Java 開發套件 (JDK) 用來在執行階段尋找和載入 Java 類別、程式庫和資源。它是包含編譯的 Java 代碼的目錄,JAR 文件和 ZIP 存檔的列表。