Migração para um novo ambiente do Amazon MWAA
Examine as etapas a seguir para migrar a workload existente do Apache Airflow para um novo ambiente do Amazon MWAA. É possível usar essas etapas para migrar de uma versão mais antiga do Amazon MWAA para uma nova ou migrar sua implantação autogerenciada do Apache Airflow para o Amazon MWAA. Este tutorial pressupõe que você esteja migrando de um Apache Airflow v1.10.12 existente para um novo Amazon MWAA executando o Apache Airflow v2.5.1, mas é possível usar os mesmos procedimentos para migrar de ou para diferentes versões do Apache Airflow.
Tópicos
- Pré-requisitos
- Etapa um: criar um novo ambiente Amazon MWAA executando a versão mais recente compatível do Apache Airflow
- Etapa 2: migrar seus recursos de fluxo de trabalho
- Etapa três: exportar os metadados do seu ambiente existente
- Etapa quatro: importar os metadados para seu novo ambiente
- Próximas etapas
- Recursos relacionados
Pré-requisitos
Para poder concluir as etapas e migrar seu ambiente, você precisará de:
-
Uma implantação do Apache Airflow. Isso pode ser um ambiente Amazon MWAA autogerenciado ou existente.
-
O Docker instalado
para seu sistema operacional local. -
AWS Command Line Interface versão 2 instalado.
Etapa um: criar um novo ambiente Amazon MWAA executando a versão mais recente compatível do Apache Airflow
É possível criar um ambiente usando as etapas detalhadas em Introdução ao Amazon MWAA Guia do usuário do Amazon MWAA ou usando um modelo AWS CloudFormation. Se você estiver migrando de um ambiente Amazon MWAA existente e tiver usado um modelo AWS CloudFormation para criar seu ambiente antigo, poderá alterar a propriedade AirflowVersion
para especificar a nova versão.
MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion:
2.5.1
DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true
Como alternativa, se estiver migrando de um ambiente Amazon MWAA existente, é possível copiar o seguinte script Python que usa o AWSSDK for Python (Boto3)
# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist, please doublecheck the profile name') except IndexError as error: print("Error:", error)
Etapa 2: migrar seus recursos de fluxo de trabalho
O Apache Airflow v2 é um grande lançamento de versão. Se você estiver migrando do Apache Airflow v1, deverá preparar seus recursos de fluxo de trabalho e verificar as alterações feitas em seus DAGs, requisitos e plug-ins. Para fazer isso, recomendamos configurar uma versão bridge do Apache Airflow em seu sistema operacional local usando o Docker e o executor local Amazon MWAA
Sempre que você estiver alterando as versões do Apache Airflow, certifique-se de referenciar o URL correto do --constraint
no seu requirements.txt
.
Para migrar seus recursos de fluxo de trabalho
-
Crie uma bifurcação do repositório aws-mwaa-local-runner
e clone uma cópia do executor local Amazon MWAA. -
Confira a ramificação
v1.10.15
do repositório aws-mwaa-local-runner. O Apache Airflow lançou a versão 1.10.15 como uma versão intermediária para ajudar na migração para o Apache Airflow v2 e, embora o Amazon MWAA não ofereça suporte à versão 1.10.15, é possível usar o executor local do Amazon MWAA para testar seus recursos. -
Use a ferramenta CLI do executor local Amazon MWAA para criar a imagem do Docker e executar o Apache Airflow localmente. Para obter mais informações, consulte o executor local README
no repositório GitHub. -
Ao usar o Apache Airflow executado localmente, siga as etapas descritas em Atualização da versão 1.10 para a 2
no site de documentação do Apache Airflow. -
Para atualizar seu
requirements.txt
, siga as melhores práticas que recomendamos em Gerenciar dependências do Python, no Guia do usuário do Amazon MWAA. -
Se você tiver empacotado seus operadores e sensores personalizados com seus plug-ins para seu ambiente Apache Airflow v1.10.12 existente, mova-os para sua pasta DAG. Para obter mais informações sobre as melhores práticas de gerenciamento de módulos para o Apache Airflow v2+, consulte Gerenciamento de módulos
no site de documentação do Apache Airflow.
-
-
Depois de fazer as alterações necessárias nos recursos do fluxo de trabalho, confira a ramificação
v2.5.1
do repositório aws-mwaa-local-runner e teste localmente os DAGs, os requisitos e os plug-ins personalizados do fluxo de trabalho atualizados. Se você estiver migrando para uma versão diferente do Apache Airflow, é possível usar a ramificação local apropriada do executor para sua versão, em vez disso. -
Depois de testar com sucesso seus recursos de fluxo de trabalho, copie seus DAGs,
requirements.txt
e plug-ins para o bucket Amazon S3 do seu ambiente.
Etapa três: exportar os metadados do seu ambiente existente
As tabelas de metadados do Apache Airflow, como dag
, dag_tag
e dag_code
são preenchidas automaticamente quando você copia os arquivos DAG atualizados para o bucket Amazon S3 do seu ambiente e o agendador os analisa. As tabelas relacionadas à permissão também são preenchidas automaticamente com base na permissão da perfil de execução do IAM. Você não precisa migrar.
É possível migrar dados relacionados ao histórico do DAG, variable
, slot_pool
, sla_miss
e, se necessário, tabelas xcom
, job
e log
. O log da instância da tarefa é armazenado no CloudWatch Logs sob o grupo de logs airflow-
. Se você quiser ver os logs da instância de tarefas de execuções mais antigas, esses logs devem ser copiados para o novo grupo de logs do ambiente. Recomendamos que você mova apenas alguns dias de logs para reduzir os custos associados. {environment_name}
Se você estiver migrando de um ambiente Amazon MWAA existente, não há acesso direto ao banco de dados de metadados. Você deve executar um DAG para exportar os metadados do seu ambiente do Amazon MWAA existente para um bucket do Amazon S3 de sua escolha. As etapas a seguir também podem ser usadas para exportar metadados do Apache Airflow se você estiver migrando de um ambiente autogerenciado.
Depois que os dados são exportados, é possível executar um DAG em seu novo ambiente para importar os dados. Durante o processo de exportação e importação, todos os outros DAGs são pausados.
Para exportar os metadados do seu ambiente existente
-
Crie um bucket do Amazon S3 usando AWS CLI para armazenar os dados exportados. Substitua
UUID
eregion
por suas próprias informações.$
aws s3api create-bucket \ --bucket mwaa-migration-
{UUID}
\ --region{region}
nota
Se você estiver migrando dados confidenciais, como conexões armazenadas em variáveis, recomendamos que você habilite a criptografia padrão para o bucket do Amazon S3.
-
nota
Não se aplica à migração de um ambiente autogerenciado.
Modifique o perfil de execução do ambiente existente e adicione a política a seguir para conceder acesso de gravação ao bucket que você criou na etapa um.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-
{UUID}
/*" ] } ] } -
Clone o repositório amazon-mwaa-examples
e navegue até o subdiretório metadata-migration
do seu cenário de migração.$
git clone https://github.com/aws-samples/amazon-mwaa-examples.git
$
cd amazon-mwaa-examples/usecases/metadata-migration/
existing-version
-new-version
/ -
Em
export_data.py
, substitua o valor da string peloS3_BUCKET
bucket do Amazon S3 que você criou para armazenar metadados exportados.S3_BUCKET = 'mwaa-migration-
{UUID}
' -
Localize o arquivo
requirements.txt
no diretóriometadata-migration
. Se você já tiver um arquivo de requisitos para seu ambiente existente, adicione os requisitos adicionais especificados emrequirements.txt
para seu arquivo. Se você não tiver um arquivo de requisitos existente, basta usar o fornecido no diretóriometadata-migration
. -
Copie
export_data.py
para o diretório do DAG do bucket do Amazon S3 associado ao seu ambiente existente. Se estiver migrando de um ambiente autogerenciado, copieexport_data.py
para sua pasta/dags
. -
Copie seu
requirements.txt
atualizado para o bucket do Amazon S3 associado ao seu ambiente existente e, em seguida, edite o ambiente para especificar a nova versãorequirements.txt
. -
Depois que o ambiente for atualizado, acesse a IU do Apache Airflow, retome o DAG
db_export
e acione a execução do fluxo de trabalho. -
Verifique se os metadados são exportados para
data/migration/
o bucket do Amazon S3existing-version
_to_new-version
/export/mwaa-migration-
, com cada tabela em seu próprio arquivo dedicado.{UUID}
Etapa quatro: importar os metadados para seu novo ambiente
Para importar os metadados para seu novo ambiente
-
Em
import_data.py
, substitua os valores de string pelos seguintes com suas informações.-
Para migração de um ambiente Amazon MWAA existente:
S3_BUCKET = 'mwaa-migration-
{UUID}
' OLD_ENV_NAME='{old_environment_name}
' NEW_ENV_NAME='{new_environment_name}
' TI_LOG_MAX_DAYS ={number_of_days}
MAX_DAYS
controla quantos dias de arquivos de log o fluxo de trabalho copia para o novo ambiente. -
Para migrar de um ambiente autogerenciado:
S3_BUCKET = 'mwaa-migration-
{UUID}
' NEW_ENV_NAME='{new_environment_name}
'
-
-
(Opcional)
import_data.py
copia somente logs de tarefas com falha. Se você quiser copiar todos os logs de tarefas, modifique a funçãogetDagTasks
e removati.state = 'failed'
conforme mostrado no trecho de código a seguir.def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
-
Modifique o perfil de execução do seu novo ambiente e adicione a política a seguir. A política de permissão permite que o Amazon MWAA leia do bucket Amazon S3 para o qual você exportou os metadados do Apache Airflow e copie os logs de instâncias de tarefas dos grupo de logs existentes. Substitua todos os marcadores por suas próprias informações.
nota
Se você estiver migrando de um ambiente autogerenciado, deverá remover as permissões relacionadas ao CloudWatch Logs da política.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:
{region}
:{account_number}
:log-group:airflow-{old_environment_name}
*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}
", "arn:aws:s3:::mwaa-migration-{UUID}
/*" ] } ] } -
Copie
import_data.py
para o diretório DAG do bucket Amazon S3 associado ao seu novo ambiente e, em seguida, acesse a IU do Apache Airflow para interromper o DAGdb_import
e acionar o fluxo de trabalho. O novo DAG aparecerá na IU do Apache Airflow em alguns minutos. -
Depois que a execução do DAG for concluída, verifique se o histórico de execução do DAG foi copiado acessando cada DAG individual.
Próximas etapas
-
Para obter mais informações sobre as classes e capacidades de ambiente do Amazon MWAA disponíveis, consulte Classe de ambiente do Amazon MWAA no Guia do usuário do Amazon MWAA.
-
Para obter mais informações sobre como o Amazon MWAA lida com ajuste de escala automático de operadores, consulte Ajuste de escala automático do Amazon MWAA no Guia de usuário do Amazon MWAA.
-
Para obter mais informações sobre a API REST DO Amazon MWAA, consulte a API REST do Amazon MWAA.
Recursos relacionados
-
Modelos do Apache Airflow
(documentação do Apache Airflow): saiba mais sobre os modelos de banco de dados de metadados do Apache Airflow.