Amazon Managed Workflows for Apache Airflow supports Apache Airflow's built-in plugin manager, allowing you to use custom Apache Airflow operators, hooks, sensors, or interfaces. This page describes the steps to install
Apache Airflow custom pluginsplugins.zip
file.
Contents
Prerequisites
You'll need the following before you can complete the steps on this page.
-
Permissions — Your AWS account must have been granted access by your administrator to the AmazonMWAAFullConsoleAccess access control policy for your environment. In addition, your Amazon MWAA environment must be permitted by your execution role to access the AWS resources used by your environment.
-
Access — If you require access to public repositories to install dependencies directly on the web server, your environment must be configured with public network web server access. For more information, see Apache Airflow access modes.
-
Amazon S3 configuration — The Amazon S3 bucket used to store your DAGs, custom plugins in
plugins.zip
, and Python dependencies inrequirements.txt
must be configured with Public Access Blocked and Versioning Enabled.
How it works
To run custom plugins on your environment, you must do three things:
-
Create a
plugins.zip
file locally. -
Upload the local
plugins.zip
file to your Amazon S3 bucket. -
Specify the version of this file in the Plugins file field on the Amazon MWAA console.
Note
If this is the first time you're uploading a plugins.zip
to your Amazon S3 bucket, you also need to specify the path to the file on the Amazon MWAA console. You only need to complete this step once.
When to use the plugins
Plugins are required only for extending the Apache Airflow user interface, as outlined in the Apache Airflow documentation/dags
folder alongside your DAG
code.
If you need to create your own integrations with external systems, place them in the
/dags
folder or a subfolder within it, but not in the
plugins.zip
folder. In Apache Airflow 2.x, plugins are primarily used for
extending the UI.
Similarly, other dependencies should not be placed in plugins.zip
.
Instead, they can be stored in a location under the Amazon S3 /dags
folder, where they will be synchronized to each Amazon MWAA container before Apache Airflow
starts.
Note
Any file in the /dags
folder or in plugins.zip
that does not
explicitly define an Apache Airflow DAG object must be listed in an
.airflowignore
file.
Custom plugins overview
Apache Airflow's built-in plugin manager can integrate external features to its core by simply dropping files in an $AIRFLOW_HOME/plugins
folder. It allows you to use custom Apache Airflow operators, hooks, sensors, or interfaces. The following section provides an example of flat and nested directory structures in a local development environment and the resulting import statements, which determines the directory structure within a plugins.zip.
Custom plugins directory and size limits
The Apache Airflow Scheduler and the Workers look for custom plugins during startup on the AWS-managed Fargate container for your environment at /usr/local/airflow/plugins/
.*
-
Directory structure. The directory structure (at
/
) is based on the contents of your*
plugins.zip
file. For example, if yourplugins.zip
contains theoperators
directory as a top-level directory, then the directory will be extracted to/usr/local/airflow/plugins/
on your environment.operators
-
Size limit. We recommend a
plugins.zip
file less than than 1 GB. The larger the size of aplugins.zip
file, the longer the startup time on an environment. Although Amazon MWAA doesn't limit the size of aplugins.zip
file explicitly, if dependencies can't be installed within ten minutes, the Fargate service will time-out and attempt to rollback the environment to a stable state.
Note
For environments using Apache Airflow v1.10.12 or Apache Airflow v2.0.2, Amazon MWAA limits outbound traffic on the Apache Airflow web server, and does not allow you to install plugins nor Python dependencies directly on the web server. Starting with Apache Airflow v2.2.2, Amazon MWAA can install plugins and dependencies directly on the web server.
Examples of custom plugins
The following section uses sample code in the Apache Airflow reference guide to show how to structure your local development environment.
Example using a flat directory structure in plugins.zip
The following example shows a plugins.zip
file with a flat directory structure for Apache Airflow v2.
Example flat directory with PythonVirtualenvOperator plugins.zip
The following example shows the top-level tree of a plugins.zip file for the PythonVirtualenvOperator custom plugin in Creating a custom plugin for Apache Airflow PythonVirtualenvOperator.
├── virtual_python_plugin.py
Example plugins/virtual_python_plugin.py
The following example shows the PythonVirtualenvOperator custom plugin.
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
Example using a nested directory structure in plugins.zip
The following example shows a plugins.zip
file with separate directories for hooks
, operators
, and a sensors
directory for Apache Airflow v2.
Example plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
The following example shows the import statements in the DAG (DAGs folder) that uses the custom plugins.
Example dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
Example plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
The following examples show each of the import statements needed in the custom plugin files.
Example hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
Example sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
Example operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
Example operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
Follow the steps in Testing custom plugins using the Amazon MWAA CLI utility, and then Creating a plugins.zip file to zip the contents within your plugins
directory. For example, cd plugins
.
Creating a plugins.zip file
The following steps describe the steps we recommend to create a plugins.zip file locally.
Step one: Test custom plugins using the Amazon MWAA CLI utility
-
The command line interface (CLI) utility replicates an Amazon Managed Workflows for Apache Airflow environment locally.
-
The CLI builds a Docker container image locally that’s similar to an Amazon MWAA production image. This allows you to run a local Apache Airflow environment to develop and test DAGs, custom plugins, and dependencies before deploying to Amazon MWAA.
-
To run the CLI, see the aws-mwaa-local-runner
on GitHub.
Step two: Create the plugins.zip file
You can use a built-in ZIP archive utility, or any other ZIP utility (such as 7zip
Note
The built-in zip utility for Windows OS may add subfolders when you create a .zip file. We recommend verifying the contents of the plugins.zip file before uploading to your Amazon S3 bucket to ensure no additional directories were added.
-
Change directories to your local Airflow plugins directory. For example:
myproject$
cd plugins
-
Run the following command to ensure that the contents have executable permissions (macOS and Linux only).
plugins$
chmod -R 755 .
-
Zip the contents within your
plugins
folder.plugins$
zip -r plugins.zip .
Uploading plugins.zip
to Amazon S3
You can use the Amazon S3 console or the AWS Command Line Interface (AWS CLI) to upload a plugins.zip
file to your Amazon S3 bucket.
Using the AWS CLI
The AWS Command Line Interface (AWS CLI) is an open source tool that enables you to interact with AWS services using commands in your command-line shell. To complete the steps on this page, you need the following:
To upload using the AWS CLI
-
In your command prompt, navigate to the directory where your
plugins.zip
file is stored. For example:cd plugins
-
Use the following command to list all of your Amazon S3 buckets.
aws s3 ls
-
Use the following command to list the files and folders in the Amazon S3 bucket for your environment.
aws s3 ls s3://
YOUR_S3_BUCKET_NAME
-
Use the following command to upload the
plugins.zip
file to the Amazon S3 bucket for your environment.aws s3 cp plugins.zip s3://
YOUR_S3_BUCKET_NAME
/plugins.zip
Using the Amazon S3 console
The Amazon S3 console is a web-based user interface that allows you to create and manage the resources in your Amazon S3 bucket.
To upload using the Amazon S3 console
-
Open the Environments page
on the Amazon MWAA console. -
Choose an environment.
-
Select the S3 bucket link in the DAG code in S3 pane to open your storage bucket on the Amazon S3 console.
-
Choose Upload.
-
Choose Add file.
-
Select the local copy of your
plugins.zip
, choose Upload.
Installing custom plugins on your environment
This section describes how to install the custom plugins you uploaded to your Amazon S3 bucket by specifying the path to the plugins.zip file, and specifying the version of the plugins.zip file each time the zip file is updated.
Specifying the path to plugins.zip
on the Amazon MWAA console (the first time)
If this is the first time you're uploading a plugins.zip
to your Amazon S3 bucket, you also need to specify the path to the file on the Amazon MWAA console. You only need to complete this step once.
-
Open the Environments page
on the Amazon MWAA console. -
Choose an environment.
-
Choose Edit.
-
On the DAG code in Amazon S3 pane, choose Browse S3 next to the Plugins file - optional field.
-
Select the
plugins.zip
file on your Amazon S3 bucket. -
Choose Choose.
-
Choose Next, Update environment.
Specifying the plugins.zip
version on the Amazon MWAA console
You need to specify the version of your plugins.zip
file on the Amazon MWAA console each time you upload a new version of your plugins.zip
in your Amazon S3 bucket.
-
Open the Environments page
on the Amazon MWAA console. -
Choose an environment.
-
Choose Edit.
-
On the DAG code in Amazon S3 pane, choose a
plugins.zip
version in the dropdown list. -
Choose Next.
Example use cases for plugins.zip
-
Learn how to create a custom plugin in Custom plugin with Apache Hive and Hadoop.
-
Learn how to create a custom plugin in Custom plugin to patch PythonVirtualenvOperator .
-
Learn how to create a custom plugin in Custom plugin with Oracle.
-
Learn how to create a custom plugin in Changing a DAG's timezone on Amazon MWAA.
What's next?
-
Test your DAGs, custom plugins, and Python dependencies locally using the aws-mwaa-local-runner
on GitHub.