Apache Airflow PythonVirtualenvOperator용 사용자 지정 플러그인 생성
다음 샘플은 Amazon Managed Workflows for Apache Airflow에서 사용자 지정 플러그인을 사용하여 Apache Airflow PythonVirtualenvOperator를 패치하는 방법을 보여줍니다.
버전
사전 조건
이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
권한
요구 사항
이 페이지의 샘플 코드를 사용하려면 다음 종속성을 사용자 requirements.txt
에 추가합니다. 자세한 내용은 Python 종속성 설치 단원을 참조하십시오.
virtualenv
사용자 지정 플러그인 샘플 코드
Apache Airflow는 스타트업 시 플러그인 폴더에 있는 Python 파일의 콘텐츠를 실행합니다. 이 플러그인은 시작 프로세스 중에 내장 PythonVirtualenvOperator
을 패치하여 Amazon MWAA와 호환되도록 합니다. 다음 단계는 사용자 지정 플러그인의 샘플 코드를 보여줍니다.
- Apache Airflow v2
-
-
명령 프롬프트에서 위의 plugins
디렉터리로 이동합니다. 예:
cd plugins
-
다음 코드 샘플의 내용을 복사하고 로컬에서 virtual_python_plugin.py
로 저장합니다.
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
- Apache Airflow v1
-
-
명령 프롬프트에서 위의 plugins
디렉터리로 이동합니다. 예:
cd plugins
-
다음 코드 샘플의 내용을 복사하고 로컬에서 virtual_python_plugin.py
로 저장합니다.
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.python_operator import PythonVirtualenvOperator
def _generate_virtualenv_cmd(self, tmp_dir):
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if self.system_site_packages:
cmd.append('--system-site-packages')
if self.python_version is not None:
cmd.append('--python=python{}'.format(self.python_version))
return cmd
PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd
class EnvVarPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
Plugins.zip
다음 단계에서는 plugins.zip
을 생성하는 방법을 보여줍니다.
-
명령 프롬프트에서 위의 virtual_python_plugin.py
이 포함된 디렉터리로 이동합니다. 예:
cd plugins
-
plugins
폴더 내 콘텐츠를 압축합니다.
zip plugins.zip virtual_python_plugin.py
코드 샘플
다음 단계에서는 사용자 지정 플러그인의 DAG 코드를 생성하는 방법을 설명합니다.
- Apache Airflow v2
-
-
명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:
cd dags
-
다음 코드 샘플의 내용을 복사하고 로컬에서 virtualenv_test.py
로 저장합니다.
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator
from airflow.utils.dates import days_ago
import os
os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
def virtualenv_fn():
import boto3
print("boto3 version ",boto3.__version__)
with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_task",
python_callable=virtualenv_fn,
requirements=["boto3>=1.17.43"],
system_site_packages=False,
dag=dag,
)
- Apache Airflow v1
-
-
명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:
cd dags
-
다음 코드 샘플의 내용을 복사하고 로컬에서 virtualenv_test.py
로 저장합니다.
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.utils.dates import days_ago
import os
os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
def virtualenv_fn():
import boto3
print("boto3 version ",boto3.__version__)
with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_task",
python_callable=virtualenv_fn,
requirements=["boto3>=1.17.43"],
system_site_packages=False,
dag=dag,
)
Airflow 구성 옵션
Apache Airflow v2를 사용하는 경우 Apache Airflow 구성 옵션으로 core.lazy_load_plugins : False
을 추가합니다. 자세한 내용은 2에서 구성 옵션을 사용하여 플러그인 로드를 참조하십시오.
다음 단계
-
이 예제의 requirements.txt
파일을 Python 종속성 설치의 Amazon S3 버킷에 업로드하는 방법을 알아봅니다.
-
이 예제의 DAG 코드를 DAG 추가 또는 업데이트에서 Amazon S3 버킷의 dags
폴더에 업로드하는 방법을 알아봅니다.
-
이 예제의 plugins.zip
파일을 사용자 지정 플러그인 설치의 Amazon S3 버킷에 업로드하는 방법에 대해 자세히 알아봅니다.