Migrieren Sie zu einer neuen MWAA Amazon-Umgebung - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Migrieren Sie zu einer neuen MWAA Amazon-Umgebung

Erkunden Sie die folgenden Schritte, um Ihren vorhandenen Apache Airflow-Workload auf eine neue MWAA Amazon-Umgebung zu migrieren. Sie können diese Schritte verwenden, um von einer älteren Version von Amazon MWAA zu einer neuen Version zu migrieren oder Ihre selbstverwaltete Apache Airflow-Bereitstellung zu Amazon zu migrieren. MWAA In diesem Tutorial wird davon ausgegangen, dass Sie von einem vorhandenen Apache Airflow v1.10.12 zu einem neuen Amazon migrieren, auf MWAA dem Apache Airflow v2.5.1 ausgeführt wird. Sie können jedoch dieselben Verfahren verwenden, um von oder zu verschiedenen Apache Airflow-Versionen zu migrieren.

Voraussetzungen

Um die Schritte abschließen und Ihre Umgebung migrieren zu können, benötigen Sie Folgendes:

Schritt eins: Erstellen Sie eine neue MWAA Amazon-Umgebung, in der die neueste unterstützte Apache Airflow-Version ausgeführt wird

Sie können eine Umgebung mithilfe der detaillierten Schritte unter Erste Schritte mit Amazon MWAA im MWAAAmazon-Benutzerhandbuch oder mithilfe einer AWS CloudFormation Vorlage erstellen. Wenn Sie aus einer vorhandenen MWAA Amazon-Umgebung migrieren und eine AWS CloudFormation Vorlage verwendet haben, um Ihre alte Umgebung zu erstellen, können Sie die AirflowVersion Eigenschaft ändern, um die neue Version anzugeben.

MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion: 2.5.1 DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true

Wenn Sie aus einer vorhandenen MWAA Amazon-Umgebung migrieren, können Sie alternativ das folgende Python-Skript kopieren, das AWS SDKfor Python (Boto3) verwendet, um Ihre Umgebung zu klonen. Sie können das Skript auch herunterladen.

# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist, please doublecheck the profile name') except IndexError as error: print("Error:", error)

Schritt zwei: Migrieren Sie Ihre Workflow-Ressourcen

Apache Airflow v2 ist eine Hauptversion. Wenn Sie von Apache Airflow v1 migrieren, müssen Sie Ihre Workflow-Ressourcen vorbereiten und die Änderungen überprüfenDAGs, die Sie an Ihren Anforderungen und Plugins vornehmen. Zu diesem Zweck empfehlen wir, eine Bridge-Version von Apache Airflow auf Ihrem lokalen Betriebssystem mithilfe von Docker und dem Amazon MWAA Local Runner zu konfigurieren. Der Amazon MWAA Local Runner bietet ein Befehlszeilen-Interface (CLI) -Hilfsprogramm, das eine MWAA Amazon-Umgebung lokal repliziert.

Wenn Sie Apache Airflow-Versionen ändern, stellen Sie sicher, dass Sie --constraint URL in Ihrer Version auf die richtige Referenz verweisen. requirements.txt

Um Ihre Workflow-Ressourcen zu migrieren
  1. Erstellen Sie einen Fork des aws-mwaa-local-runnerRepositorys und klonen Sie eine Kopie des Amazon MWAA Local Runners.

  2. Checken Sie den v1.10.15 Zweig des aws-mwaa-local -runner-Repositorys aus. Apache Airflow hat v1.10.15 als Bridge-Version veröffentlicht, um die Migration zu Apache Airflow v2 zu unterstützen. Obwohl Amazon v1.10.15 MWAA nicht unterstützt, können Sie den Amazon MWAA Local Runner verwenden, um Ihre Ressourcen zu testen.

  3. Verwenden Sie das Amazon MWAA Local CLI Runner-Tool, um das Docker-Image zu erstellen und Apache Airflow lokal auszuführen. Weitere Informationen finden Sie unter dem lokalen Runner READMEim GitHub Repository.

  4. Wenn Apache Airflow lokal ausgeführt wird, folgen Sie den Schritten, die unter Upgrade von 1.10 auf 2 auf der Apache Airflow-Dokumentationswebsite beschrieben sind.

    1. Um Ihre zu aktualisierenrequirements.txt, folgen Sie den bewährten Methoden, die wir unter Verwaltung von Python-Abhängigkeiten im MWAAAmazon-Benutzerhandbuch empfehlen.

    2. Wenn Sie Ihre benutzerdefinierten Operatoren und Sensoren mit Ihren Plugins für Ihre bestehende Apache Airflow v1.10.12-Umgebung gebündelt haben, verschieben Sie sie in Ihren Ordner. DAG Weitere Informationen zu bewährten Methoden zur Modulverwaltung für Apache Airflow v2+ finden Sie unter Modulverwaltung auf der Apache Airflow-Dokumentationswebsite.

  5. Nachdem Sie die erforderlichen Änderungen an Ihren Workflow-Ressourcen vorgenommen haben, checken Sie den v2.5.1 Zweig des aws-mwaa-local -runner-Repositorys aus und testen Sie Ihren aktualisierten WorkflowDAGs, Ihre Anforderungen und Ihre benutzerdefinierten Plugins lokal. Wenn Sie zu einer anderen Apache Airflow-Version migrieren, können Sie stattdessen den entsprechenden lokalen Runner-Branch für Ihre Version verwenden.

  6. Nachdem Sie Ihre Workflow-Ressourcen erfolgreich getestet haben, kopieren Sie Ihre DAGsrequirements.txt, und Plugins in den Amazon S3 S3-Bucket, den Sie mit Ihrer neuen MWAA Amazon-Umgebung konfiguriert haben.

Schritt drei: Exportieren der Metadaten aus Ihrer bestehenden Umgebung

Apache Airflow-Metadatentabellen wie dagdag_tag,, und dag_code werden automatisch aufgefüllt, wenn Sie die aktualisierten DAG Dateien in den Amazon S3 S3-Bucket Ihrer Umgebung kopieren und der Scheduler sie analysiert. Tabellen, die sich auf Berechtigungen beziehen, werden ebenfalls automatisch auf der Grundlage Ihrer Ausführungsrollenberechtigung aufgefüllt. IAM Sie müssen sie nicht migrieren.

Sie können Daten migrieren, die sich auf den DAG Verlauf variableslot_pool,sla_miss, und, falls erforderlichxcom,job, und log Tabellen beziehen. Das Protokoll der Taskinstanz wird in den CloudWatch Protokollen unter der airflow-{environment_name} Protokollgruppe gespeichert. Wenn Sie die Protokolle der Task-Instanz für ältere Läufe sehen möchten, müssen diese Protokolle in die neue Umgebungsprotokollgruppe kopiert werden. Wir empfehlen, dass Sie nur Protokolle für einige Tage verschieben, um die damit verbundenen Kosten zu senken.

Wenn Sie aus einer bestehenden MWAA Amazon-Umgebung migrieren, gibt es keinen direkten Zugriff auf die Metadaten-Datenbank. Sie müssen a ausführenDAG, um die Metadaten aus Ihrer vorhandenen MWAA Amazon-Umgebung in einen Amazon S3-Bucket Ihrer Wahl zu exportieren. Die folgenden Schritte können auch verwendet werden, um Apache Airflow-Metadaten zu exportieren, wenn Sie aus einer selbstverwalteten Umgebung migrieren.

Nachdem die Daten exportiert wurden, können Sie DAG in Ihrer neuen Umgebung a ausführen, um die Daten zu importieren. Während des Export- und Importvorgangs DAGs werden alle anderen angehalten.

Um die Metadaten aus Ihrer vorhandenen Umgebung zu exportieren
  1. Erstellen Sie einen Amazon S3 S3-Bucket mit dem AWS CLI , um die exportierten Daten zu speichern. Ersetzen Sie das UUID und region durch Ihre Informationen.

    $ aws s3api create-bucket \ --bucket mwaa-migration-{UUID}\ --region {region}
    Anmerkung

    Wenn Sie sensible Daten migrieren, z. B. Verbindungen, die Sie in Variablen speichern, empfehlen wir Ihnen, die Standardverschlüsselung für den Amazon S3 S3-Bucket zu aktivieren.

  2. Anmerkung

    Gilt nicht für die Migration aus einer selbstverwalteten Umgebung.

    Ändern Sie die Ausführungsrolle der vorhandenen Umgebung und fügen Sie die folgende Richtlinie hinzu, um Schreibzugriff auf den Bucket zu gewähren, den Sie in Schritt 1 erstellt haben.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  3. Klonen Sie das amazon-mwaa-examplesRepository und navigieren Sie zum metadata-migration Unterverzeichnis für Ihr Migrationsszenario.

    $ git clone https://github.com/aws-samples/amazon-mwaa-examples.git $ cd amazon-mwaa-examples/usecases/metadata-migration/existing-version-new-version/
  4. Ersetzen Sie in export_data.py den Zeichenkettenwert für S3_BUCKET durch den Amazon S3 S3-Bucket, den Sie zum Speichern exportierter Metadaten erstellt haben.

    S3_BUCKET = 'mwaa-migration-{UUID}'
  5. Suchen Sie die requirements.txt Datei im metadata-migration Verzeichnis. Wenn Sie bereits über eine Anforderungsdatei für Ihre bestehende Umgebung verfügen, fügen Sie Ihrer Datei die zusätzlichen Anforderungen requirements.txt hinzu, die in angegeben sind. Wenn Sie noch keine Anforderungsdatei haben, können Sie einfach die Datei verwenden, die im metadata-migration Verzeichnis bereitgestellt wird.

  6. Kopieren export_data.py Sie in das DAG Verzeichnis des Amazon S3 S3-Buckets, der Ihrer vorhandenen Umgebung zugeordnet ist. Wenn Sie aus einer selbstverwalteten Umgebung migrieren, kopieren Sie export_data.py in Ihren /dags Ordner.

  7. Kopieren Sie Ihre Aktualisierung requirements.txt in den Amazon S3 S3-Bucket, der mit Ihrer vorhandenen Umgebung verknüpft ist, und bearbeiten Sie dann die Umgebung, um die neue requirements.txt Version anzugeben.

  8. Nachdem die Umgebung aktualisiert wurde, greifen Sie auf die Apache Airflow-Benutzeroberfläche zu, heben Sie die db_export DAG Pause auf und lösen Sie die Ausführung des Workflows aus.

  9. Stellen Sie sicher, dass die Metadaten data/migration/existing-version_to_new-version/export/ in den mwaa-migration-{UUID} Amazon S3 S3-Bucket exportiert werden, wobei sich jede Tabelle in einer eigenen Datei befindet.

Schritt vier: Importieren der Metadaten in Ihre neue Umgebung

Um die Metadaten in Ihre neue Umgebung zu importieren
  1. Ersetzen Sie in import_data.py die folgenden Zeichenfolgenwerte durch Ihre Informationen.

    • Für die Migration aus einer bestehenden MWAA Amazon-Umgebung:

      S3_BUCKET = 'mwaa-migration-{UUID}' OLD_ENV_NAME='{old_environment_name}' NEW_ENV_NAME='{new_environment_name}' TI_LOG_MAX_DAYS = {number_of_days}

      MAX_DAYSsteuert, wie viele Tage Protokolldateien der Workflow in die neue Umgebung kopiert.

    • Für die Migration aus einer selbstverwalteten Umgebung:

      S3_BUCKET = 'mwaa-migration-{UUID}' NEW_ENV_NAME='{new_environment_name}'
  2. (Optional) import_data.py kopiert nur die Protokolle fehlgeschlagener Aufgaben. Wenn Sie alle Aufgabenprotokolle kopieren möchten, ändern Sie die getDagTasks Funktion und entfernen Sie ti.state = 'failed' sie, wie im folgenden Codeausschnitt gezeigt.

    def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
  3. Ändern Sie die Ausführungsrolle Ihrer neuen Umgebung und fügen Sie die folgende Richtlinie hinzu. Die Berechtigungsrichtlinie ermöglicht es AmazonMWAA, aus dem Amazon S3 S3-Bucket zu lesen, in den Sie die Apache Airflow-Metadaten exportiert haben, und Task-Instance-Protokolle aus vorhandenen Protokollgruppen zu kopieren. Ersetzen Sie alle Platzhalter durch Ihre Informationen.

    Anmerkung

    Wenn Sie aus einer selbstverwalteten Umgebung migrieren, müssen Sie die Berechtigungen für CloudWatch Logs aus der Richtlinie entfernen.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:{region}:{account_number}:log-group:airflow-{old_environment_name}*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}", "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  4. Kopieren Sie import_data.py in das DAG Verzeichnis des Amazon S3 S3-Buckets, der mit Ihrer neuen Umgebung verknüpft ist, und greifen Sie dann auf die Apache Airflow-Benutzeroberfläche zu, um die Pause aufzuheben db_import DAG und den Workflow auszulösen. Das neue DAG wird in wenigen Minuten in der Apache Airflow-Benutzeroberfläche angezeigt.

  5. Stellen Sie nach Abschluss des DAG Laufs sicher, dass Ihr DAG Laufverlauf kopiert wurde, indem Sie auf jeden einzelnen DAG Benutzer zugreifen.

Nächste Schritte

  • Weitere Informationen zu den verfügbaren MWAA Amazon-Umgebungsklassen und -funktionen finden Sie unter MWAAAmazon-Umgebungsklasse im MWAAAmazon-Benutzerhandbuch.

  • Weitere Informationen darüber, wie Amazon MWAA mit Autoscaling-Workern umgeht, finden Sie unter Amazon MWAA Automatic Scaling im MWAAAmazon-Benutzerhandbuch.

  • Weitere Informationen zum Amazonas MWAA REST API finden Sie unter Amazon MWAA REST API.

  • Apache Airflow-Modelle (Apache Airflow-Dokumentation) — Erfahren Sie mehr über Apache Airflow-Metadaten-Datenbankmodelle.