Migração para um novo ambiente do Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Migração para um novo ambiente do Amazon MWAA

Examine as etapas a seguir para migrar a workload existente do Apache Airflow para um novo ambiente do Amazon MWAA. É possível usar essas etapas para migrar de uma versão mais antiga do Amazon MWAA para uma nova ou migrar sua implantação autogerenciada do Apache Airflow para o Amazon MWAA. Este tutorial pressupõe que você esteja migrando de um Apache Airflow v1.10.12 existente para um novo Amazon MWAA executando o Apache Airflow v2.5.1, mas é possível usar os mesmos procedimentos para migrar de ou para diferentes versões do Apache Airflow.

Pré-requisitos

Para poder concluir as etapas e migrar seu ambiente, você precisará de:

Etapa um: criar um novo ambiente Amazon MWAA executando a versão mais recente compatível do Apache Airflow

É possível criar um ambiente usando as etapas detalhadas em Introdução ao Amazon MWAA Guia do usuário do Amazon MWAA ou usando um modelo AWS CloudFormation. Se você estiver migrando de um ambiente Amazon MWAA existente e tiver usado um modelo AWS CloudFormation para criar seu ambiente antigo, poderá alterar a propriedade AirflowVersion para especificar a nova versão.

MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion: 2.5.1 DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true

Como alternativa, se estiver migrando de um ambiente Amazon MWAA existente, é possível copiar o seguinte script Python que usa o AWSSDK for Python (Boto3) para clonar seu ambiente. Você também pode fazer download do script .

# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist, please doublecheck the profile name') except IndexError as error: print("Error:", error)

Etapa 2: migrar seus recursos de fluxo de trabalho

O Apache Airflow v2 é um grande lançamento de versão. Se você estiver migrando do Apache Airflow v1, deverá preparar seus recursos de fluxo de trabalho e verificar as alterações feitas em seus DAGs, requisitos e plug-ins. Para fazer isso, recomendamos configurar uma versão bridge do Apache Airflow em seu sistema operacional local usando o Docker e o executor local Amazon MWAA. O executor local do Amazon MWAA fornece um utilitário de interface de linha de comando (CLI) que replica um ambiente do Amazon MWAA localmente.

Sempre que você estiver alterando as versões do Apache Airflow, certifique-se de referenciar o URL correto do --constraint no seu requirements.txt.

Para migrar seus recursos de fluxo de trabalho
  1. Crie uma bifurcação do repositório aws-mwaa-local-runner e clone uma cópia do executor local Amazon MWAA.

  2. Confira a ramificação v1.10.15 do repositório aws-mwaa-local-runner. O Apache Airflow lançou a versão 1.10.15 como uma versão intermediária para ajudar na migração para o Apache Airflow v2 e, embora o Amazon MWAA não ofereça suporte à versão 1.10.15, é possível usar o executor local do Amazon MWAA para testar seus recursos.

  3. Use a ferramenta CLI do executor local Amazon MWAA para criar a imagem do Docker e executar o Apache Airflow localmente. Para obter mais informações, consulte o executor local README no repositório GitHub.

  4. Ao usar o Apache Airflow executado localmente, siga as etapas descritas em Atualização da versão 1.10 para a 2 no site de documentação do Apache Airflow.

    1. Para atualizar seu requirements.txt, siga as melhores práticas que recomendamos em Gerenciar dependências do Python, no Guia do usuário do Amazon MWAA.

    2. Se você tiver empacotado seus operadores e sensores personalizados com seus plug-ins para seu ambiente Apache Airflow v1.10.12 existente, mova-os para sua pasta DAG. Para obter mais informações sobre as melhores práticas de gerenciamento de módulos para o Apache Airflow v2+, consulte Gerenciamento de módulos no site de documentação do Apache Airflow.

  5. Depois de fazer as alterações necessárias nos recursos do fluxo de trabalho, confira a ramificação v2.5.1 do repositório aws-mwaa-local-runner e teste localmente os DAGs, os requisitos e os plug-ins personalizados do fluxo de trabalho atualizados. Se você estiver migrando para uma versão diferente do Apache Airflow, é possível usar a ramificação local apropriada do executor para sua versão, em vez disso.

  6. Depois de testar com sucesso seus recursos de fluxo de trabalho, copie seus DAGs, requirements.txt e plug-ins para o bucket Amazon S3 do seu ambiente.

Etapa três: exportar os metadados do seu ambiente existente

As tabelas de metadados do Apache Airflow, como dag, dag_tag e dag_code são preenchidas automaticamente quando você copia os arquivos DAG atualizados para o bucket Amazon S3 do seu ambiente e o agendador os analisa. As tabelas relacionadas à permissão também são preenchidas automaticamente com base na permissão da perfil de execução do IAM. Você não precisa migrar.

É possível migrar dados relacionados ao histórico do DAG, variable, slot_pool, sla_misse, se necessário, tabelas xcom, job e log. O log da instância da tarefa é armazenado no CloudWatch Logs sob o grupo de logs airflow-{environment_name}. Se você quiser ver os logs da instância de tarefas de execuções mais antigas, esses logs devem ser copiados para o novo grupo de logs do ambiente. Recomendamos que você mova apenas alguns dias de logs para reduzir os custos associados.

Se você estiver migrando de um ambiente Amazon MWAA existente, não há acesso direto ao banco de dados de metadados. Você deve executar um DAG para exportar os metadados do seu ambiente do Amazon MWAA existente para um bucket do Amazon S3 de sua escolha. As etapas a seguir também podem ser usadas para exportar metadados do Apache Airflow se você estiver migrando de um ambiente autogerenciado.

Depois que os dados são exportados, é possível executar um DAG em seu novo ambiente para importar os dados. Durante o processo de exportação e importação, todos os outros DAGs são pausados.

Para exportar os metadados do seu ambiente existente
  1. Crie um bucket do Amazon S3 usando AWS CLI para armazenar os dados exportados. Substitua UUID e region por suas próprias informações.

    $ aws s3api create-bucket \ --bucket mwaa-migration-{UUID}\ --region {region}
    nota

    Se você estiver migrando dados confidenciais, como conexões armazenadas em variáveis, recomendamos que você habilite a criptografia padrão para o bucket do Amazon S3.

  2. nota

    Não se aplica à migração de um ambiente autogerenciado.

    Modifique o perfil de execução do ambiente existente e adicione a política a seguir para conceder acesso de gravação ao bucket que você criou na etapa um.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  3. Clone o repositório amazon-mwaa-examples e navegue até o subdiretório metadata-migration do seu cenário de migração.

    $ git clone https://github.com/aws-samples/amazon-mwaa-examples.git $ cd amazon-mwaa-examples/usecases/metadata-migration/existing-version-new-version/
  4. Em export_data.py, substitua o valor da string pelo S3_BUCKET bucket do Amazon S3 que você criou para armazenar metadados exportados.

    S3_BUCKET = 'mwaa-migration-{UUID}'
  5. Localize o arquivo requirements.txt no diretório metadata-migration. Se você já tiver um arquivo de requisitos para seu ambiente existente, adicione os requisitos adicionais especificados em requirements.txt para seu arquivo. Se você não tiver um arquivo de requisitos existente, basta usar o fornecido no diretório metadata-migration.

  6. Copie export_data.py para o diretório do DAG do bucket do Amazon S3 associado ao seu ambiente existente. Se estiver migrando de um ambiente autogerenciado, copie export_data.py para sua pasta /dags.

  7. Copie seu requirements.txt atualizado para o bucket do Amazon S3 associado ao seu ambiente existente e, em seguida, edite o ambiente para especificar a nova versão requirements.txt.

  8. Depois que o ambiente for atualizado, acesse a IU do Apache Airflow, retome o DAG db_export e acione a execução do fluxo de trabalho.

  9. Verifique se os metadados são exportados para data/migration/existing-version_to_new-version/export/ o bucket do Amazon S3 mwaa-migration-{UUID}, com cada tabela em seu próprio arquivo dedicado.

Etapa quatro: importar os metadados para seu novo ambiente

Para importar os metadados para seu novo ambiente
  1. Em import_data.py, substitua os valores de string pelos seguintes com suas informações.

    • Para migração de um ambiente Amazon MWAA existente:

      S3_BUCKET = 'mwaa-migration-{UUID}' OLD_ENV_NAME='{old_environment_name}' NEW_ENV_NAME='{new_environment_name}' TI_LOG_MAX_DAYS = {number_of_days}

      MAX_DAYS controla quantos dias de arquivos de log o fluxo de trabalho copia para o novo ambiente.

    • Para migrar de um ambiente autogerenciado:

      S3_BUCKET = 'mwaa-migration-{UUID}' NEW_ENV_NAME='{new_environment_name}'
  2. (Opcional) import_data.py copia somente logs de tarefas com falha. Se você quiser copiar todos os logs de tarefas, modifique a função getDagTasks e remova ti.state = 'failed' conforme mostrado no trecho de código a seguir.

    def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
  3. Modifique o perfil de execução do seu novo ambiente e adicione a política a seguir. A política de permissão permite que o Amazon MWAA leia do bucket Amazon S3 para o qual você exportou os metadados do Apache Airflow e copie os logs de instâncias de tarefas dos grupo de logs existentes. Substitua todos os marcadores por suas próprias informações.

    nota

    Se você estiver migrando de um ambiente autogerenciado, deverá remover as permissões relacionadas ao CloudWatch Logs da política.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:{region}:{account_number}:log-group:airflow-{old_environment_name}*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}", "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  4. Copie import_data.py para o diretório DAG do bucket Amazon S3 associado ao seu novo ambiente e, em seguida, acesse a IU do Apache Airflow para interromper o DAG db_import e acionar o fluxo de trabalho. O novo DAG aparecerá na IU do Apache Airflow em alguns minutos.

  5. Depois que a execução do DAG for concluída, verifique se o histórico de execução do DAG foi copiado acessando cada DAG individual.

Próximas etapas

  • Modelos do Apache Airflow (documentação do Apache Airflow): saiba mais sobre os modelos de banco de dados de metadados do Apache Airflow.