Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esegui la migrazione a un nuovo ambiente Amazon MWAA
Esplora i seguenti passaggi per migrare il tuo carico di lavoro Apache Airflow esistente in un nuovo ambiente Amazon. MWAA Puoi utilizzare questi passaggi per migrare da una versione precedente di Amazon MWAA a una nuova versione o migrare la tua distribuzione di Apache Airflow autogestita su Amazon. MWAA Questo tutorial presuppone che tu stia migrando da un Apache Airflow v1.10.12 esistente a un nuovo MWAA Amazon che esegue Apache Airflow v2.5.1, ma puoi utilizzare le stesse procedure per migrare da o verso diverse versioni di Apache Airflow.
Argomenti
- Prerequisiti
- Fase uno: creare un nuovo MWAA ambiente Amazon con l'ultima versione supportata di Apache Airflow
- Fase due: migra le risorse del flusso di lavoro
- Fase tre: Esportazione dei metadati dall'ambiente esistente
- Fase quattro: Importazione dei metadati nel nuovo ambiente
- Passaggi successivi
- Risorse correlate
Prerequisiti
Per completare i passaggi e migrare il tuo ambiente, avrai bisogno di quanto segue:
-
Una distribuzione di Apache Airflow. Può trattarsi di un MWAA ambiente Amazon autogestito o esistente.
-
Docker installato
per il tuo sistema operativo locale. -
AWS Command Line Interface versione 2 installata.
Fase uno: creare un nuovo MWAA ambiente Amazon con l'ultima versione supportata di Apache Airflow
Puoi creare un ambiente seguendo i passaggi dettagliati in Getting started with Amazon MWAA nella Amazon MWAA User Guide o utilizzando un AWS CloudFormation modello. Se stai migrando da un MWAA ambiente Amazon esistente e hai utilizzato un AWS CloudFormation modello per creare il tuo vecchio ambiente, puoi modificare la AirflowVersion
proprietà per specificare la nuova versione.
MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion:
2.5.1
DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true
In alternativa, se esegui la migrazione da un MWAA ambiente Amazon esistente, puoi copiare il seguente script Python che utilizza AWS SDKfor Python (Boto3) per
# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist, please doublecheck the profile name') except IndexError as error: print("Error:", error)
Fase due: migra le risorse del flusso di lavoro
Apache Airflow v2 è una versione principale. Se state effettuando la migrazione da Apache Airflow v1, dovete preparare le risorse del flusso di lavoro e verificare le modifiche apportate ai vostri requisiti e plugin. DAGs A tale scopo, ti consigliamo di configurare una versione bridge di Apache Airflow sul tuo sistema operativo locale utilizzando Docker e Amazon local runner. MWAA
Ogni volta che modifichi una versione di Apache Airflow, assicurati di fare riferimento a quella corretta nel tuo. --constraint
URL requirements.txt
Per migrare le risorse del flusso di lavoro
-
Crea un fork del aws-mwaa-local-runner
repository e clona una copia del runner locale di AmazonMWAA. -
Dai un'occhiata al
v1.10.15
ramo del repository -runner. aws-mwaa-local Apache Airflow ha rilasciato la versione 1.10.15 come versione bridge per facilitare la migrazione ad Apache Airflow v2 e, sebbene Amazon MWAA non supporti la versione 1.10.15, puoi utilizzare Amazon local runner per testare le tue risorse. MWAA -
Usa lo CLI strumento Amazon MWAA local runner per creare l'immagine Docker ed eseguire Apache Airflow localmente. Per ulteriori informazioni, consulta il runner README
locale nel repository. GitHub -
Utilizzando Apache Airflow in esecuzione localmente, segui i passaggi descritti in Aggiornamento da 1.10 a 2 nel sito Web della documentazione di Apache
Airflow. -
Per aggiornare le tue
requirements.txt
, segui le best practice consigliate nella sezione Managing Python dependencies, nella Amazon MWAA User Guide. -
Se hai combinato gli operatori e i sensori personalizzati con i plugin per l'ambiente Apache Airflow v1.10.12 esistente, spostali nella tua cartella. DAG Per ulteriori informazioni sulle migliori pratiche di gestione dei moduli per Apache Airflow v2+, consulta Module Management nel sito Web della documentazione di Apache Airflow.
-
-
Dopo aver apportato le modifiche necessarie alle risorse del flusso di lavoro, consultate il
v2.5.1
ramo del repository aws-mwaa-local -runner e testate localmente il flusso di lavoro aggiornato, i requisiti e i plugin personalizzati. DAGs Se stai migrando a una versione diversa di Apache Airflow, puoi invece utilizzare il ramo runner locale appropriato per la tua versione. -
Dopo aver testato con successo le risorse del flusso di lavoro, copia i tuoi DAGs e i plug-in nel bucket Amazon S3 che hai configurato con il tuo nuovo ambiente Amazon.
requirements.txt
MWAA
Fase tre: Esportazione dei metadati dall'ambiente esistente
Le tabelle di metadati di Apache Airflowdag
, ad esempiodag_tag
, dag_code
vengono compilate automaticamente quando copi DAG i file aggiornati nel bucket Amazon S3 del tuo ambiente e lo scheduler li analizza. Le tabelle relative alle autorizzazioni vengono inoltre compilate automaticamente in base all'autorizzazione del ruolo di esecuzione. IAM Non è necessario migrarle.
È possibile migrare i dati relativi alla DAG cronologia,variable
, slot_pool
sla_miss
, e, se necessario xcom
job
, alle tabelle. log
Il registro delle istanze delle attività viene archiviato nei CloudWatch registri del gruppo di airflow-
log. Se si desidera visualizzare i registri delle istanze di attività relativi alle esecuzioni precedenti, è necessario copiarli nel nuovo gruppo di registri di ambiente. Si consiglia di spostare solo i log di pochi giorni per ridurre i costi associati. {environment_name}
Se stai migrando da un MWAA ambiente Amazon esistente, non è possibile accedere direttamente al database dei metadati. Devi eseguire un DAG per esportare i metadati dal tuo MWAA ambiente Amazon esistente in un bucket Amazon S3 di tua scelta. I seguenti passaggi possono essere utilizzati anche per esportare i metadati di Apache Airflow se stai migrando da un ambiente autogestito.
Dopo l'esportazione dei dati, puoi eseguire un file DAG nel nuovo ambiente per importare i dati. Durante il processo di esportazione e importazione, tutti gli altri DAGs vengono messi in pausa.
Per esportare i metadati dall'ambiente esistente
-
Crea un bucket Amazon S3 utilizzando AWS CLI per archiviare i dati esportati. Sostituisci il
UUID
eregion
con le tue informazioni.$
aws s3api create-bucket \ --bucket mwaa-migration-
{UUID}
\ --region{region}
Nota
Se stai migrando dati sensibili, come le connessioni archiviate in variabili, ti consigliamo di abilitare la crittografia predefinita per il bucket Amazon S3.
-
Nota
Non si applica alla migrazione da un ambiente autogestito.
Modifica il ruolo di esecuzione dell'ambiente esistente e aggiungi la seguente politica per concedere l'accesso in scrittura al bucket creato nel primo passaggio.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-
{UUID}
/*" ] } ] } -
Clona il amazon-mwaa-examples
repository e accedi alla metadata-migration
sottodirectory per lo scenario di migrazione.$
git clone https://github.com/aws-samples/amazon-mwaa-examples.git
$
cd amazon-mwaa-examples/usecases/metadata-migration/
existing-version
-new-version
/ -
In
export_data.py
, sostituisci il valore della stringaS3_BUCKET
con il bucket Amazon S3 che hai creato per archiviare i metadati esportati.S3_BUCKET = 'mwaa-migration-
{UUID}
' -
Individua il
requirements.txt
file nella directory.metadata-migration
Se disponi già di un file dei requisiti per l'ambiente esistente, aggiungi i requisiti aggiuntivirequirements.txt
specificati nel file. Se non disponi di un file dei requisiti esistente, puoi semplicemente utilizzare quello fornito nellametadata-migration
directory. -
Copia
export_data.py
nella DAG directory del bucket Amazon S3 associato all'ambiente esistente. Se stai migrando da un ambiente autogestito, copialoexport_data.py
nella tua cartella./dags
-
Copia l'aggiornamento
requirements.txt
nel bucket Amazon S3 associato all'ambiente esistente, quindi modifica l'ambiente per specificare la nuova versione.requirements.txt
-
Dopo l'aggiornamento dell'ambiente, accedi all'interfaccia utente di Apache Airflow, riattiva la pausa e attiva l'esecuzione del
db_export
DAG flusso di lavoro. -
Verifica che i metadati vengano esportati
data/migration/
nelexisting-version
_to_new-version
/export/mwaa-migration-
bucket Amazon S3, con ogni tabella nel proprio file dedicato.{UUID}
Fase quattro: Importazione dei metadati nel nuovo ambiente
Per importare i metadati nel nuovo ambiente
-
In
import_data.py
, sostituisci i valori di stringa per quanto segue con le tue informazioni.-
Per la migrazione da un MWAA ambiente Amazon esistente:
S3_BUCKET = 'mwaa-migration-
{UUID}
' OLD_ENV_NAME='{old_environment_name}
' NEW_ENV_NAME='{new_environment_name}
' TI_LOG_MAX_DAYS ={number_of_days}
MAX_DAYS
controlla per quanti giorni di file di registro il flusso di lavoro copia nel nuovo ambiente. -
Per la migrazione da un ambiente autogestito:
S3_BUCKET = 'mwaa-migration-
{UUID}
' NEW_ENV_NAME='{new_environment_name}
'
-
-
(Facoltativo)
import_data.py
copia solo i registri delle attività non riuscite. Se desiderate copiare tutti i log delle attività, modificate lagetDagTasks
funzione e rimuovetelati.state = 'failed'
come mostrato nel seguente frammento di codice.def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
-
Modifica il ruolo di esecuzione del nuovo ambiente e aggiungi la seguente politica. La politica di autorizzazione consente MWAA ad Amazon di leggere dal bucket Amazon S3 in cui hai esportato i metadati Apache Airflow e di copiare i log delle istanze delle attività dai gruppi di log esistenti. Sostituisci tutti i segnaposto con le tue informazioni.
Nota
Se stai migrando da un ambiente autogestito, devi rimuovere le autorizzazioni relative ai CloudWatch registri dalla policy.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:
{region}
:{account_number}
:log-group:airflow-{old_environment_name}
*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}
", "arn:aws:s3:::mwaa-migration-{UUID}
/*" ] } ] } -
Copia
import_data.py
DAG nella directory del bucket Amazon S3 associato al nuovo ambiente, quindi accedi all'interfaccia utente di Apache Airflow per riattivare il flusso di lavoro e riavviare il flusso di lavoro.db_import
DAG Il nuovo DAG verrà visualizzato nell'interfaccia utente di Apache Airflow tra pochi minuti. -
Al termine dell'DAGesecuzione, verifica che la cronologia delle esecuzioni DAG venga copiata accedendo a ogni utente. DAG
Passaggi successivi
-
Per ulteriori informazioni sulle classi e le funzionalità di MWAA ambiente Amazon disponibili, consulta la classe di MWAA ambiente Amazon nella Amazon MWAA User Guide.
-
Per ulteriori informazioni su come Amazon MWAA gestisce gli operatori di autoscaling, consulta Amazon MWAAautomatic scaling nella Amazon User Guide. MWAA
-
Per ulteriori informazioni su Amazon MWAA RESTAPI, consulta Amazon MWAA REST API.
Risorse correlate
-
Modelli Apache Airflow
(documentazione Apache Airflow): scopri di più sui modelli di database di metadati Apache Airflow.