사용자 지정 플러그인 설치 - Amazon Managed Workflows for Apache Airflow

사용자 지정 플러그인 설치

Amazon Managed Workflows for Apache Airflow는 Apache Airflow의 내장 플러그인 관리자를 지원하므로 사용자는 사용자 지정 Apache Airflow 운영자, 후크, 센서 또는 인터페이스를 사용할 수 있습니다. 이 페이지에서는 plugins.zip 파일을 사용하여 Amazon MWAA 환경에 Apache Airflow 사용자 지정 플러그인을 설치하는 단계를 설명합니다.

사전 조건

이 페이지의 단계를 완료하려면 먼저 다음이 필요합니다.

  • 권한 — 관리자가 사용자 환경의 AmazonMWAAFullConsoleAccess 액세스 제어 정책에 대한 액세스 권한을 AWS 계정에 부여했어야 합니다. 또한 실행 역할이 Amazon MWAA 환경에서 사용자 환경에서 사용하는 AWS 리소스에 액세스할 수 있도록 허용해야 합니다.

  • 액세스 — 종속성을 웹 서버에 직접 설치하기 위해 퍼블릭 리포지토리에 액세스해야 하는 경우 퍼블릭 네트워크 웹 서버 액세스로 환경을 구성해야 합니다. 자세한 내용은 Apache Airflow 액세스 모드 단원을 참조하십시오.

  • Amazon S3 구성 — DAG, plugins.zip의 사용자 지정 플러그인 및 requirements.txt의 Python 종속성을 저장하는 데 사용되는 Amazon S3 버킷퍼블릭 액세스가 차단되고 버전 관리가 활성화된 상태로 구성되어야 합니다.

작동 방식

사용자 환경에서 사용자 지정 플러그인을 실행하려면 다음 세 가지 작업을 수행해야 합니다.

  1. 로컬에서 plugins.zip 파일을 생성합니다.

  2. Amazon S3 버킷에 로컬 plugins.zip 파일을 업로드합니다.

  3. Amazon MWAA 콘솔의 플러그인 파일 필드에 이 파일의 버전을 지정합니다.

참고

plugins.zip을 처음으로 Amazon S3 버킷에 업로드하는 경우 Amazon MWAA 콘솔에 파일 경로도 지정해야 합니다. 이 단계는 한 번만 완료하면 됩니다.

플러그인 사용 시기

플러그인은 Apache Airflow 설명서에 설명된 대로 Apache Airflow 사용자 인터페이스를 확장하는 데만 필요합니다. 사용자 지정 연산자는 DAG 코드와 함께 /dags 폴더에 직접 배치할 수 있습니다.

외부 시스템과 자체 통합을 생성해야 하는 경우 /dags 폴더 또는 그 안에 있는 하위 폴더에 배치하되 plugins.zip 폴더에는 배치하지 않습니다. Apache Airflow 2.x에서 플러그인은 주로 UI를 확장하는 데 사용됩니다.

마찬가지로 다른 종속성을 plugins.zip에 배치해서는 안 됩니다. 대신 Amazon S3 /dags 폴더 아래의 위치에 저장할 수 있으며, Apache Airflow가 시작되기 전에 각 Amazon MWAA 컨테이너에 동기화됩니다.

참고

Apache Airflow DAG 객체를 명시적으로 정의하지 않은 plugins.zip 폴더 또는 /dags의 모든 파일은 .airflowignore 파일에 나열되어야 합니다.

사용자 지정 플러그인 개요

Apache Airflow의 내장 플러그인 관리자는 파일을 $AIRFLOW_HOME/plugins 폴더에 놓기만 하면 외부 기능을 코어에 통합할 수 있습니다. 이를 통해 사용자 지정 Apache Airflow 운영자, 후크, 센서 또는 인터페이스를 사용할 수 있습니다. 다음 섹션에는 로컬 개발 환경의 플랫 및 중첩 디렉터리 구조의 예제 및 plugins.zip 내의 디렉터리 구조를 결정하는 결과 가져오기 문이 나와 있습니다.

사용자 지정 플러그인 디렉터리 및 크기 제한

Apache Airflow 스케줄러작업자는 시작하는 동안 /usr/local/airflow/plugins/*에 사용자 환경의 AWS 관리형 Fargate 컨테이너에서 사용자 지정 플러그인을 찾습니다.

  • 디렉터리 구조. (/*의) 디렉터리 구조는 plugins.zip 파일 내용을 기반으로 합니다. 예를 들어, 사용자의 plugins.zip에 최상위 디렉터리로 operators 디렉터리가 포함되어 있는 경우 해당 디렉터리는 사용자 환경의 /usr/local/airflow/plugins/operators로 추출됩니다.

  • 크기 제한. 1GB 미만의 plugins.zip 파일을 사용하는 것이 좋습니다. plugins.zip 파일 크기가 클수록 환경의 시작 시간이 길어집니다. Amazon MWAA가 plugins.zip 파일의 크기를 명시적으로 제한하지 않지만, 종속성을 10분 이내에 설치할 수 없는 경우 Fargate 서비스는 시간을 초과하여 환경을 안정적인 상태로 롤백하려고 시도합니다.

참고

Apache Airflow v1.10.12 또는 Apache Airflow v2.0.2를 사용하는 환경의 경우 Amazon MWAA는 Apache Airflow 웹 서버에서 아웃바운드 트래픽을 제한하며, 웹 서버에 직접 플러그인이나 Python 종속성을 설치할 수 없습니다. Apache Airflow v2.2.2부터 Amazon MWAA는 플러그인 및 종속 항목을 웹 서버에 직접 설치할 수 있습니다.

사용자 지정 플러그인의 예제

다음 섹션에서는 Apache Airflow 참조 가이드의 샘플 코드를 사용하여 로컬 개발 환경을 구성하는 방법을 보여줍니다.

plugins.zip 내 플랫 디렉터리 구조 사용 예제

Apache Airflow v2

다음 예제에서는 Apache Airflow v2에 플랫 디렉터리 구조가 있는 plugins.zip 파일을 보여줍니다.

예 PythonVirtualEnvOperator plugins.zip이 있는 플랫 디렉터리

다음 예제에서는 Apache Airflow PythonVirtualenvOperator용 사용자 지정 플러그인 생성에서 PythonVirtualEnvOperator 사용자 지정 플러그인에 대한 plugins.zip 파일의 최상위 트리를 보여줍니다.

├── virtual_python_plugin.py
예 plugins/virtual_python_plugin.py

다음 예제에서는 PythonVirtualEnvOperator 사용자 지정 플러그인을 보여줍니다.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

다음 예제에서는 Apache Airflow v1에 플랫 디렉터리 구조가 있는 plugins.zip 파일을 보여줍니다.

예 PythonVirtualEnvOperator plugins.zip이 있는 플랫 디렉터리

다음 예제에서는 Apache Airflow PythonVirtualenvOperator용 사용자 지정 플러그인 생성에서 PythonVirtualEnvOperator 사용자 지정 플러그인에 대한 plugins.zip 파일의 최상위 트리를 보여줍니다.

├── virtual_python_plugin.py
예 plugins/virtual_python_plugin.py

다음 예제에서는 PythonVirtualEnvOperator 사용자 지정 플러그인을 보여줍니다.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

plugins.zip 내 중첩 디렉터리 구조 사용 예제

Apache Airflow v2

다음 예제에서는 Apache Airflow v2의 hooks, operatorssensors 디렉터리에 별도의 디렉터리가 있는 plugins.zip 파일을 보여줍니다.

예 plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

다음 예제에서는 사용자 지정 플러그인을 사용하는 DAG(DAGs 폴더)의 가져오기 문을 보여줍니다.

예 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
예 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

다음 예제에서는 사용자 지정 플러그인 파일에 필요한 각 가져오기 문을 보여줍니다.

예 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
예 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
예 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
예 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Amazon MWAA CLI 유틸리티를 사용하여 사용자 지정 플러그인 테스트를 하고 plugins 디렉터리 내의 내용을 압축하는 plugins.zip 파일을 생성하는 단계를 따릅니다. 예: cd plugins.

Apache Airflow v1

다음 예제에서는 Apache Airflow v1.10.12의 hooks, operatorssensors 디렉터리에 별도의 디렉터리가 있는 plugins.zip 파일을 보여줍니다.

예 plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

다음 예제에서는 사용자 지정 플러그인을 사용하는 DAG(DAGs 폴더)의 가져오기 문을 보여줍니다.

예 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
예 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

다음 예제에서는 사용자 지정 플러그인 파일에 필요한 각 가져오기 문을 보여줍니다.

예 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
예 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
예 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
예 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Amazon MWAA CLI 유틸리티를 사용하여 사용자 지정 플러그인 테스트를 하고 plugins 디렉터리 내의 내용을 압축하는 plugins.zip 파일을 생성하는 단계를 따릅니다. 예: cd plugins.

plugins.zip 파일 생성

다음 단계에서는 로컬에서 plugins.zip 파일을 생성할 때 권장하는 단계를 설명합니다.

1단계: Amazon MWAA CLI 유틸리티를 사용하여 사용자 지정 플러그인 테스트

  • 명령줄 인터페이스(CLI) 유틸리티는 Amazon Managed Workflows for Apache Airflow 환경을 로컬로 복제합니다.

  • CLI는 Amazon MWAA 프로덕션 이미지와 유사한 Docker 컨테이너 이미지를 로컬로 구축합니다. 이를 통해 Amazon MWAA에 배포하기 전에 로컬 Apache Airflow 환경을 실행하여 DAG, 사용자 지정 플러그인 및 종속성을 개발하고 테스트할 수 있습니다.

  • CLI를 실행하려면 GitHub의 aws-mwaa-local-runner를 참조하십시오.

2단계: plugins.zip 파일 생성

내장된 ZIP 아카이브 유틸리티 또는 기타 ZIP 유틸리티(예: 7zip)를 사용하여.zip 파일을 생성할 수 있습니다.

참고

Windows OS용 내장 zip 유틸리티는 .zip 파일을 생성할 때 하위 폴더를 추가할 수 있습니다. Amazon S3 버킷에 업로드하기 전에 plugins.zip 파일의 내용을 확인하여 추가 디렉터리가 추가되지 않도록 하는 것이 좋습니다.

  1. 디렉터리를 로컬 Airflow 플러그인 디렉터리로 변경합니다. 예:

    myproject$ cd plugins
  2. 다음 명령을 실행하여 내용에 실행 권한이 있는지 확인합니다(macOS 및 Linux만 해당).

    plugins$ chmod -R 755 .
  3. plugins 폴더 내용을 압축합니다.

    plugins$ zip -r plugins.zip .

Amazon S3에 plugins.zip 업로드

Amazon S3 콘솔 또는 AWS Command Line Interface(AWS CLI)를 사용하여 Amazon S3 버킷에 plugins.zip 파일을 업로드할 수 있습니다.

AWS CLI 사용

AWS Command Line Interface(AWS CLI)는 명령줄 쉘의 명령을 사용하여 AWS 서비스와 상호 작용할 수 있는 오픈 소스 도구입니다. 이 페이지에서 단계를 완료하려면 다음이 필요합니다.

AWS CLI를 사용하여 업로드하려면
  1. 명령 프롬프트에서 plugins.zip 파일이 저장된 디렉터리로 이동합니다. 예:

    cd plugins
  2. 다음 명령을 사용하여 Amazon S3 버킷을 모두 나열합니다.

    aws s3 ls
  3. 다음 명령을 사용하여 사용자 환경의 Amazon S3 버킷에 있는 파일과 폴더를 나열합니다.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. 다음 명령을 사용해서 사용자 환경의 Amazon S3 버킷에 plugins.zip 파일을 업로드합니다.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Amazon S3 콘솔 사용

Amazon S3 콘솔은 Amazon S3 버킷의 리소스를 생성 및 관리할 수 있는 웹 기반 사용자 인터페이스입니다.

Amazon S3 콘솔을 사용하여 업로드하려면
  1. Amazon MWAA 콘솔에서 환경 페이지를 엽니다.

  2. 환경을 선택합니다.

  3. S3 창의 DAG 코드에서 S3 버킷 링크를 선택하여 Amazon S3 콘솔에서 스토리지 버킷을 엽니다.

  4. 업로드를 선택합니다.

  5. 파일 추가를 선택합니다.

  6. plugins.zip의 로컬 사본을 선택하고 업로드를 선택합니다.

사용자 환경에 사용자 지정 플러그인 설치

이 섹션에서는 plugins.zip 파일의 경로를 지정하고 zip 파일이 업데이트될 때마다 plugins.zip 파일의 버전을 지정하여 Amazon S3 버킷에 업로드한 사용자 지정 플러그인을 설치하는 방법을 설명합니다.

Amazon MWAA 콘솔에서 plugins.zip의 경로 지정(처음)

plugins.zip을 처음으로 Amazon S3 버킷에 업로드하는 경우 Amazon MWAA 콘솔에 파일 경로도 지정해야 합니다. 이 단계는 한 번만 완료하면 됩니다.

  1. Amazon MWAA 콘솔에서 환경 페이지를 엽니다.

  2. 환경을 선택합니다.

  3. 편집을 선택합니다.

  4. Amazon S3의 DAG 코드 창에서 플러그인 파일 - 옵션 필드 옆의 S3 찾아보기를 선택합니다.

  5. Amazon S3 버킷에 있는 plugins.zip 파일을 선택합니다.

  6. 선택을 선택합니다.

  7. 다음, 환경 업데이트를 선택합니다.

Amazon MWAA 콘솔에서 plugins.zip 버전 지정

Amazon S3 버킷에 사용자 plugins.zip의 새 버전을 업로드할 때마다 Amazon MWAA 콘솔에서 plugins.zip 파일의 버전을 지정해야 합니다.

  1. Amazon MWAA 콘솔에서 환경 페이지를 엽니다.

  2. 환경을 선택합니다.

  3. 편집을 선택합니다.

  4. Amazon S3의 DAG 코드 창의 드롭다운 목록에서 plugins.zip 버전을 선택합니다.

  5. Next(다음)를 선택합니다.

plugins.zip 사용 사례 예제

다음 단계

  • GitHub의 aws-mwaa-local-runner를 사용하여 DAG, 사용자 지정 플러그인, Python 종속성을 로컬에서 테스트합니다.