Bermigrasi ke lingkungan Amazon MWAA baru - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Bermigrasi ke lingkungan Amazon MWAA baru

Jelajahi langkah-langkah berikut untuk memigrasikan beban kerja Apache Airflow yang ada ke lingkungan Amazon baru. MWAA Anda dapat menggunakan langkah-langkah ini untuk bermigrasi dari Amazon versi lama MWAA ke rilis versi baru, atau memigrasikan penyebaran Apache Airflow yang dikelola sendiri ke Amazon. MWAA Tutorial ini mengasumsikan Anda bermigrasi dari Apache Airflow v1.10.12 yang ada ke MWAA Amazon baru yang menjalankan Apache Airflow v2.5.1, tetapi Anda dapat menggunakan prosedur yang sama untuk bermigrasi dari, atau ke versi Apache Airflow yang berbeda.

Prasyarat

Untuk dapat menyelesaikan langkah-langkah dan memigrasikan lingkungan Anda, Anda memerlukan yang berikut:

Langkah satu: Buat MWAA lingkungan Amazon baru yang menjalankan versi Apache Airflow terbaru yang didukung

Anda dapat membuat lingkungan menggunakan langkah-langkah terperinci dalam Memulai Amazon MWAA di Panduan MWAA Pengguna Amazon, atau dengan menggunakan AWS CloudFormation templat. Jika Anda bermigrasi dari MWAA lingkungan Amazon yang ada, dan menggunakan AWS CloudFormation templat untuk membuat lingkungan lama, Anda dapat mengubah AirflowVersion properti untuk menentukan versi baru.

MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion: 2.5.1 DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true

Atau, jika bermigrasi dari MWAA lingkungan Amazon yang ada, Anda dapat menyalin skrip Python berikut yang menggunakan AWS SDKfor Python (Boto3) untuk mengkloning lingkungan Anda. Anda juga dapat mengunduh skrip.

# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist, please doublecheck the profile name') except IndexError as error: print("Error:", error)

Langkah kedua: Migrasikan sumber daya alur kerja Anda

Apache Airflow v2 adalah rilis versi utama. Jika Anda bermigrasi dari Apache Airflow v1, Anda harus menyiapkan sumber daya alur kerja Anda dan memverifikasi perubahan yang Anda buat pada, persyaratanDAGs, dan plugin Anda. Untuk melakukannya, kami sarankan untuk mengonfigurasi versi jembatan Apache Airflow di sistem operasi lokal Anda menggunakan Docker dan pelari lokal Amazon. MWAA Pelari MWAA lokal Amazon menyediakan utilitas antarmuka baris perintah (CLI) yang mereplikasi MWAA lingkungan Amazon secara lokal.

Setiap kali Anda mengubah versi Apache Airflow, pastikan Anda mereferensikan yang benar --constraint URL di versi Anda. requirements.txt

Untuk memigrasikan sumber daya alur kerja
  1. Buat fork aws-mwaa-local-runnerrepositori, dan kloning salinan pelari lokal AmazonMWAA.

  2. Periksa v1.10.15 cabang repositori aws-mwaa-local -runner. Apache Airflow merilis v1.10.15 sebagai rilis jembatan untuk membantu migrasi ke Apache Airflow v2, dan meskipun Amazon tidak mendukung v1.10.15, Anda dapat menggunakan pelari lokal MWAA Amazon untuk menguji sumber daya Anda. MWAA

  3. Gunakan CLI alat pelari MWAA lokal Amazon untuk membuat image Docker dan menjalankan Apache Airflow secara lokal. Untuk informasi selengkapnya, lihat pelari lokal READMEdi GitHub repositori.

  4. Menggunakan Apache Airflow berjalan secara lokal, ikuti langkah-langkah yang dijelaskan dalam Upgrade dari 1.10 ke 2 di situs dokumentasi Apache Airflow.

    1. Untuk memperbaruirequirements.txt, ikuti praktik terbaik yang kami rekomendasikan dalam Mengelola dependensi Python, di Panduan Pengguna Amazon. MWAA

    2. Jika Anda telah menggabungkan operator dan sensor khusus Anda dengan plugin Anda untuk lingkungan Apache Airflow v1.10.12 yang ada, pindahkan ke folder Anda. DAG Untuk informasi selengkapnya tentang praktik terbaik manajemen modul untuk Apache Airflow v2+, lihat Manajemen Modul di situs web dokumentasi Apache Airflow.

  5. Setelah Anda membuat perubahan yang diperlukan pada sumber daya alur kerja Anda, periksa v2.5.1 cabang repositori aws-mwaa-local -runner, dan uji alur kerja, persyaratanDAGs, dan plugin kustom Anda yang diperbarui secara lokal. Jika Anda bermigrasi ke versi Apache Airflow yang berbeda, Anda dapat menggunakan cabang runner lokal yang sesuai untuk versi Anda.

  6. Setelah berhasil menguji sumber daya alur kerja, salin DAGsrequirements.txt, dan plugin ke bucket Amazon S3 yang dikonfigurasi dengan lingkungan Amazon baru. MWAA

Langkah ketiga: Mengekspor metadata dari lingkungan yang ada

Tabel metadata Apache Airflow sepertidag,dag_tag, dan dag_code secara otomatis terisi saat Anda menyalin DAG file yang diperbarui ke bucket Amazon S3 lingkungan Anda dan penjadwal menguraikannya. Tabel terkait izin juga terisi secara otomatis berdasarkan izin peran IAM eksekusi Anda. Anda tidak perlu memigrasikan mereka.

Anda dapat memigrasikan data yang terkait dengan DAG riwayatvariable,slot_pool,sla_miss,, dan jika diperlukan, xcomjob, dan log tabel. Log contoh tugas disimpan dalam CloudWatch Log di bawah grup airflow-{environment_name} log. Jika Anda ingin melihat log instance tugas untuk proses yang lebih lama, log tersebut harus disalin ke grup log lingkungan baru. Kami menyarankan Anda memindahkan log hanya beberapa hari untuk mengurangi biaya terkait.

Jika Anda bermigrasi dari MWAA lingkungan Amazon yang ada, tidak ada akses langsung ke database metadata. Anda harus menjalankan a DAG untuk mengekspor metadata dari MWAA lingkungan Amazon yang ada ke bucket Amazon S3 pilihan Anda. Langkah-langkah berikut juga dapat digunakan untuk mengekspor metadata Apache Airflow jika Anda bermigrasi dari lingkungan yang dikelola sendiri.

Setelah data diekspor, Anda kemudian dapat menjalankan DAG di lingkungan baru Anda untuk mengimpor data. Selama proses ekspor dan impor, semua lainnya DAGs dijeda.

Untuk mengekspor metadata dari lingkungan yang ada
  1. Buat bucket Amazon S3 menggunakan AWS CLI untuk menyimpan data yang diekspor. Ganti UUID dan region dengan informasi Anda.

    $ aws s3api create-bucket \ --bucket mwaa-migration-{UUID}\ --region {region}
    catatan

    Jika Anda memigrasikan data sensitif, seperti koneksi yang disimpan dalam variabel, sebaiknya aktifkan enkripsi default untuk bucket Amazon S3.

  2. catatan

    Tidak berlaku untuk migrasi dari lingkungan yang dikelola sendiri.

    Ubah peran eksekusi lingkungan yang ada dan tambahkan kebijakan berikut untuk memberikan akses tulis ke bucket yang Anda buat di langkah pertama.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  3. Kloning amazon-mwaa-examplesrepositori, dan arahkan ke metadata-migration subdirektori untuk skenario migrasi Anda.

    $ git clone https://github.com/aws-samples/amazon-mwaa-examples.git $ cd amazon-mwaa-examples/usecases/metadata-migration/existing-version-new-version/
  4. Diexport_data.py, ganti nilai string S3_BUCKET dengan bucket Amazon S3 yang Anda buat untuk menyimpan metadata yang diekspor.

    S3_BUCKET = 'mwaa-migration-{UUID}'
  5. Temukan requirements.txt file di metadata-migration direktori. Jika Anda sudah memiliki file persyaratan untuk lingkungan yang ada, tambahkan persyaratan tambahan yang ditentukan requirements.txt ke file Anda. Jika Anda tidak memiliki file persyaratan yang ada, Anda cukup menggunakan yang disediakan di metadata-migration direktori.

  6. Salin export_data.py ke DAG direktori bucket Amazon S3 yang terkait dengan lingkungan Anda yang ada. Jika bermigrasi dari lingkungan yang dikelola sendiri, salin export_data.py ke folder Anda/dags.

  7. Salin pembaruan Anda requirements.txt ke bucket Amazon S3 yang terkait dengan lingkungan yang ada, lalu edit lingkungan untuk menentukan versi barurequirements.txt.

  8. Setelah lingkungan diperbarui, akses Apache Airflow UI, unpause db_exportDAG, dan memicu alur kerja untuk berjalan.

  9. Verifikasi bahwa metadata diekspor ke data/migration/existing-version_to_new-version/export/ bucket mwaa-migration-{UUID} Amazon S3, dengan setiap tabel dalam file khusus itu sendiri.

Langkah empat: Mengimpor metadata ke lingkungan baru Anda

Untuk mengimpor metadata ke lingkungan baru Anda
  1. Diimport_data.py, ganti nilai string untuk yang berikut ini dengan informasi Anda.

    • Untuk migrasi dari MWAA lingkungan Amazon yang ada:

      S3_BUCKET = 'mwaa-migration-{UUID}' OLD_ENV_NAME='{old_environment_name}' NEW_ENV_NAME='{new_environment_name}' TI_LOG_MAX_DAYS = {number_of_days}

      MAX_DAYSmengontrol berapa hari file log yang disalin alur kerja ke lingkungan baru.

    • Untuk migrasi dari lingkungan yang dikelola sendiri:

      S3_BUCKET = 'mwaa-migration-{UUID}' NEW_ENV_NAME='{new_environment_name}'
  2. (Opsional) hanya import_data.py menyalin log tugas yang gagal. Jika Anda ingin menyalin semua log tugas, memodifikasi getDagTasks fungsi, dan menghapus ti.state = 'failed' seperti yang ditunjukkan dalam cuplikan kode berikut.

    def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
  3. Ubah peran eksekusi lingkungan baru Anda dan tambahkan kebijakan berikut. Kebijakan izin MWAA memungkinkan Amazon membaca dari bucket Amazon S3 tempat Anda mengekspor metadata Apache Airflow, dan menyalin log instance tugas dari grup log yang ada. Ganti semua placeholder dengan informasi Anda.

    catatan

    Jika Anda bermigrasi dari lingkungan yang dikelola sendiri, Anda harus menghapus izin terkait CloudWatch Log dari kebijakan.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:{region}:{account_number}:log-group:airflow-{old_environment_name}*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}", "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  4. Salin import_data.py ke DAG direktori bucket Amazon S3 yang terkait dengan lingkungan baru Anda, lalu akses Apache Airflow UI untuk menghentikan jeda dan memicu alur kerja. db_import DAG Yang baru DAG akan muncul di Apache Airflow UI dalam beberapa menit.

  5. Setelah proses DAG selesai, verifikasi bahwa riwayat proses DAG Anda disalin dengan mengakses setiap individu. DAG

Langkah selanjutnya

  • Model Apache Airflow (Apache Airflow Documentation) - Pelajari lebih lanjut tentang model database metadata Apache Airflow.