Menginstal plugin kustom - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menginstal plugin kustom

Alur Kerja Terkelola Amazon untuk Apache Airflow mendukung pengelola plugin bawaan Apache Airflow, memungkinkan Anda menggunakan operator, kait, sensor, atau antarmuka Apache Airflow khusus. Halaman ini menjelaskan langkah-langkah untuk menginstal plugin kustom Apache Airflow di MWAA lingkungan Amazon Anda menggunakan file. plugins.zip

Prasyarat

Anda akan memerlukan yang berikut ini sebelum Anda dapat menyelesaikan langkah-langkah di halaman ini.

  • Izin — AWS Akun Anda harus telah diberikan akses oleh administrator Anda ke kebijakan kontrol mazonMWAAFull ConsoleAccess akses A untuk lingkungan Anda. Selain itu, MWAA lingkungan Amazon Anda harus diizinkan oleh peran eksekusi Anda untuk mengakses AWS sumber daya yang digunakan oleh lingkungan Anda.

  • Akses — Jika Anda memerlukan akses ke repositori publik untuk menginstal dependensi langsung di server web, lingkungan Anda harus dikonfigurasi dengan akses server web jaringan publik. Untuk informasi selengkapnya, lihat Mode akses Apache Airflow.

  • Konfigurasi Amazon S3 - Bucket Amazon S3 yang digunakan untuk menyimpan plugin kustom DAGs Andaplugins.zip, dan dependensi Python harus dikonfigurasi dengan Akses Publik Diblokir dan requirements.txt Diaktifkan Versi.

Cara kerjanya

Untuk menjalankan plugin khusus di lingkungan Anda, Anda harus melakukan tiga hal:

  1. Buat plugins.zip file secara lokal.

  2. Unggah plugins.zip file lokal ke bucket Amazon S3 Anda.

  3. Tentukan versi file ini di bidang file Plugins di MWAA konsol Amazon.

catatan

Jika ini adalah pertama kalinya Anda mengunggah plugins.zip ke bucket Amazon S3 Anda, Anda juga perlu menentukan jalur ke file di konsol AmazonMWAA. Anda hanya perlu menyelesaikan langkah ini sekali.

Kapan menggunakan plugin

Plugin hanya diperlukan untuk memperluas antarmuka pengguna Apache Airflow, seperti yang diuraikan dalam dokumentasi Apache Airflow. Operator kustom dapat ditempatkan langsung di /dags folder di samping DAG kode Anda.

Jika Anda perlu membuat integrasi Anda sendiri dengan sistem eksternal, letakkan di dags folder/atau subfolder di dalamnya, tetapi tidak di plugins.zip folder. Di Apache Airflow 2.x, plugin terutama digunakan untuk memperluas UI.

Demikian pula, dependensi lain tidak boleh ditempatkan di. plugins.zip Sebagai gantinya, mereka dapat disimpan di lokasi di bawah /dags folder Amazon S3, di mana mereka akan disinkronkan ke setiap MWAA wadah Amazon sebelum Apache Airflow dimulai.

catatan

File apa pun di /dags folder atau plugins.zip yang tidak secara eksplisit mendefinisikan DAG objek Apache Airflow harus terdaftar dalam file. .airflowignore

Ikhtisar plugin kustom

Manajer plugin bawaan Apache Airflow dapat mengintegrasikan fitur eksternal ke intinya hanya dengan menjatuhkan file ke dalam folder. $AIRFLOW_HOME/plugins Ini memungkinkan Anda untuk menggunakan operator, kait, sensor, atau antarmuka Apache Airflow khusus. Bagian berikut memberikan contoh struktur direktori datar dan bersarang di lingkungan pengembangan lokal dan pernyataan impor yang dihasilkan, yang menentukan struktur direktori dalam plugins.zip.

Direktori plugin kustom dan batas ukuran

Penjadwal Aliran Udara Apache dan Pekerja mencari plugin khusus selama startup pada wadah AWS Fargate yang dikelola untuk lingkungan Anda di. /usr/local/airflow/plugins/*

  • Struktur direktori. Struktur direktori (at/*) didasarkan pada isi plugins.zip file Anda. Misalnya, jika Anda plugins.zip berisi operators direktori sebagai direktori tingkat atas, maka direktori akan diekstraksi ke lingkungan /usr/local/airflow/plugins/operators Anda.

  • Batas ukuran. Kami merekomendasikan plugins.zip file kurang dari 1 GB. Semakin besar ukuran plugins.zip file, semakin lama waktu startup pada suatu lingkungan. Meskipun Amazon MWAA tidak membatasi ukuran plugins.zip file secara eksplisit, jika dependensi tidak dapat diinstal dalam waktu sepuluh menit, layanan Fargate akan habis waktu dan mencoba mengembalikan lingkungan ke status stabil.

catatan

Untuk lingkungan yang menggunakan Apache Airflow v1.10.12 atau Apache Airflow v2.0.2, Amazon MWAA membatasi lalu lintas keluar di server web Apache Airflow, dan tidak memungkinkan Anda untuk menginstal plugin atau dependensi Python langsung di server web. Dimulai dengan Apache Airflow v2.2.2, MWAA Amazon dapat menginstal plugin dan dependensi langsung di server web.

Contoh plugin kustom

Bagian berikut menggunakan kode contoh dalam panduan referensi Apache Airflow untuk menunjukkan cara menyusun lingkungan pengembangan lokal Anda.

Contoh menggunakan struktur direktori datar di plugins.zip

Apache Airflow v2

Contoh berikut menunjukkan plugins.zip file dengan struktur direktori datar untuk Apache Airflow v2.

contoh direktori datar dengan PythonVirtualenvOperator plugins.zip

Contoh berikut menunjukkan pohon tingkat atas file plugins.zip untuk plugin PythonVirtualenvOperator kustom diMembuat plugin khusus untuk Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
contoh plugins/virtual_python_plugin.py

Contoh berikut menunjukkan plugin PythonVirtualenvOperator kustom.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

Contoh berikut menunjukkan plugins.zip file dengan struktur direktori datar untuk Apache Airflow v1.

contoh direktori datar dengan PythonVirtualenvOperator plugins.zip

Contoh berikut menunjukkan pohon tingkat atas file plugins.zip untuk plugin PythonVirtualenvOperator kustom diMembuat plugin khusus untuk Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
contoh plugins/virtual_python_plugin.py

Contoh berikut menunjukkan plugin PythonVirtualenvOperator kustom.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Contoh menggunakan struktur direktori bersarang di plugins.zip

Apache Airflow v2

Contoh berikut menunjukkan plugins.zip file dengan direktori terpisah untukhooks,operators, dan sensors direktori untuk Apache Airflow v2.

contoh plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

Contoh berikut menunjukkan pernyataan impor di DAG (DAGsfolder) yang menggunakan plugin kustom.

contoh dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
contoh plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Contoh berikut menunjukkan setiap pernyataan impor yang diperlukan dalam file plugin kustom.

contoh hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
contoh sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
contoh operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
contoh operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Ikuti langkah-langkah dalam Menguji plugin kustom menggunakan MWAA CLI utilitas Amazon, lalu Membuat file plugins.zip untuk zip konten dalam plugins direktori Anda. Misalnya, cd plugins.

Apache Airflow v1

Contoh berikut menunjukkan plugins.zip file dengan direktori terpisah untukhooks,operators, dan sensors direktori untuk Apache Airflow v1.10.12.

contoh plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

Contoh berikut menunjukkan pernyataan impor di DAG (DAGsfolder) yang menggunakan plugin kustom.

contoh dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
contoh plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Contoh berikut menunjukkan setiap pernyataan impor yang diperlukan dalam file plugin kustom.

contoh hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
contoh sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
contoh operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
contoh operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Ikuti langkah-langkah dalam Menguji plugin kustom menggunakan MWAA CLI utilitas Amazon, lalu Membuat file plugins.zip untuk zip konten dalam plugins direktori Anda. Misalnya, cd plugins.

Membuat file plugins.zip

Langkah-langkah berikut menjelaskan langkah-langkah yang kami sarankan untuk membuat file plugins.zip secara lokal.

Langkah satu: Uji plugin khusus menggunakan utilitas Amazon MWAA CLI

  • Utilitas antarmuka baris perintah (CLI) mereplikasi Alur Kerja Terkelola Amazon untuk lingkungan Apache Airflow secara lokal.

  • CLIMembangun image container Docker secara lokal yang mirip dengan image produksi AmazonMWAA. Ini memungkinkan Anda menjalankan lingkungan Apache Airflow lokal untuk mengembangkan dan mengujiDAGs, plugin khusus, dan dependensi sebelum menerapkan ke Amazon. MWAA

  • Untuk menjalankanCLI, lihat aws-mwaa-local-runnerdi GitHub.

Langkah kedua: Buat file plugins.zip

Anda dapat menggunakan utilitas ZIP arsip bawaan, atau ZIP utilitas lainnya (seperti 7zip) untuk membuat file.zip.

catatan

Utilitas zip bawaan untuk OS Windows dapat menambahkan subfolder saat Anda membuat file.zip. Sebaiknya verifikasi konten file plugins.zip sebelum mengunggah ke bucket Amazon S3 Anda untuk memastikan tidak ada direktori tambahan yang ditambahkan.

  1. Ubah direktori ke direktori plugin Airflow lokal Anda. Sebagai contoh:

    myproject$ cd plugins
  2. Jalankan perintah berikut untuk memastikan bahwa konten memiliki izin yang dapat dieksekusi (hanya macOS dan Linux).

    plugins$ chmod -R 755 .
  3. Zip konten di dalam plugins folder Anda.

    plugins$ zip -r plugins.zip .

Mengunggah plugins.zip ke Amazon S3

Anda dapat menggunakan konsol Amazon S3 atau AWS Command Line Interface (AWS CLI) untuk mengunggah plugins.zip file ke bucket Amazon S3 Anda.

Menggunakan AWS CLI

The AWS Command Line Interface (AWS CLI) adalah alat open source yang memungkinkan Anda berinteraksi dengan AWS layanan menggunakan perintah di shell baris perintah Anda. Untuk menyelesaikan langkah-langkah di halaman ini, Anda memerlukan yang berikut:

Untuk mengunggah menggunakan AWS CLI
  1. Di prompt perintah Anda, arahkan ke direktori tempat plugins.zip file Anda disimpan. Sebagai contoh:

    cd plugins
  2. Gunakan perintah berikut untuk membuat daftar semua bucket Amazon S3 Anda.

    aws s3 ls
  3. Gunakan perintah berikut untuk mencantumkan file dan folder di bucket Amazon S3 untuk lingkungan Anda.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Gunakan perintah berikut untuk mengunggah plugins.zip file ke bucket Amazon S3 untuk lingkungan Anda.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Menggunakan konsol Amazon S3

Konsol Amazon S3 adalah antarmuka pengguna berbasis web yang memungkinkan Anda membuat dan mengelola sumber daya di bucket Amazon S3 Anda.

Untuk mengunggah menggunakan konsol Amazon S3
  1. Buka halaman Lingkungan di MWAA konsol Amazon.

  2. Pilih lingkungan.

  3. Pilih tautan bucket S3 di DAGkode di panel S3 untuk membuka bucket penyimpanan Anda di konsol Amazon S3.

  4. Pilih Unggah.

  5. Pilih Tambahkan file.

  6. Pilih salinan lokal Andaplugins.zip, pilih Unggah.

Menginstal plugin khusus di lingkungan Anda

Bagian ini menjelaskan cara menginstal plugin khusus yang Anda unggah ke bucket Amazon S3 Anda dengan menentukan jalur ke file plugins.zip, dan menentukan versi file plugins.zip setiap kali file zip diperbarui.

Menentukan jalur ke MWAA konsol plugins.zip Amazon (pertama kali)

Jika ini adalah pertama kalinya Anda mengunggah plugins.zip ke bucket Amazon S3 Anda, Anda juga perlu menentukan jalur ke file di konsol AmazonMWAA. Anda hanya perlu menyelesaikan langkah ini sekali.

  1. Buka halaman Lingkungan di MWAA konsol Amazon.

  2. Pilih lingkungan.

  3. Pilih Edit.

  4. Pada DAGkode di panel Amazon S3, pilih Browse S3 di sebelah file Plugins - bidang opsional.

  5. Pilih plugins.zip file di bucket Amazon S3 Anda.

  6. Pilih Tutup.

  7. Pilih Berikutnya, Perbarui lingkungan.

Menentukan plugins.zip versi di konsol Amazon MWAA

Anda perlu menentukan versi plugins.zip file Anda di MWAA konsol Amazon setiap kali Anda mengunggah versi baru Anda plugins.zip di bucket Amazon S3 Anda.

  1. Buka halaman Lingkungan di MWAA konsol Amazon.

  2. Pilih lingkungan.

  3. Pilih Edit.

  4. Pada DAGkode di panel Amazon S3, pilih plugins.zip versi dalam daftar dropdown.

  5. Pilih Berikutnya.

Contoh kasus penggunaan untuk plugins.zip

Apa selanjutnya?

  • UjiDAGs, plugin kustom, dan dependensi Python Anda secara lokal menggunakan on. aws-mwaa-local-runner GitHub