

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Amazon Managed Workflows for Apache Airflow용 코드 예제
<a name="sample-code"></a>

이 가이드에는 Amazon Managed Workflows for Apache Airflow 환경에서 사용할 수 있는 DAG 및 사용자 지정 플러그인을 비롯한 코드 샘플이 포함되어 있습니다. AWS 서비스와 함께 Apache Airflow를 사용하는 추가 예제는 Apache Airflow GitHub 리포지토리의 [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags) 디렉터리를 참조하세요.

**Topics**
+ [DAG를 사용하여 CLI에서 변수 가져오기](samples-variables-import.md)
+ [`SSHOperator`을(를) 사용하여 SSH 연결 생성](samples-ssh.md)
+ [Apache Airflow Snowflake 연결에 AWS Secrets Manager의 암호 키 사용](samples-sm-snowflake.md)
+ [CloudWatch에서 DAG를 사용하여 사용자 지정 지표 작성](samples-custom-metrics.md)
+ [Amazon MWAA 환경에서 Aurora PostgreSQL 데이터베이스 정리](samples-database-cleanup.md)
+ [Amazon S3의 CSV 파일로 환경 메타데이터 내보내기](samples-dag-run-info-to-csv.md)
+ [Apache Airflow 변수에 AWS Secrets Manager 암호 키 사용](samples-secrets-manager-var.md)
+ [Apache Airflow 연결을 위한 AWS Secrets Manager의 암호 키 사용](samples-secrets-manager.md)
+ [Oracle을 이용한 사용자 지정 플러그인 생성](samples-oracle.md)
+ [Amazon MWAA에서 DAG 시간대 변경](samples-plugins-timezone.md)
+ [CodeArtifact 토큰 새로고침](samples-code-artifact.md)
+ [Apache Hive 및 Hadoop을 사용하여 사용자 지정 플러그인 생성](samples-hive.md)
+ [Apache Airflow PythonVirtualenvOperator용 사용자 지정 플러그인 생성](samples-virtualenv.md)
+ [Lambda 함수를 사용한 DAG 호출](samples-lambda.md)
+ [여러 Amazon MWAA 환경에서 DAG 호출](samples-invoke-dag.md)
+ [Amazon RDS for Microsoft SQL Server와 함께 Amazon MWAA 사용](samples-sql-server.md)
+ [Amazon EKS에서 Amazon MWAA 사용](mwaa-eks-example.md)
+ [`ECSOperator`를 사용하여 Amazon ECS에 연결](samples-ecs-operator.md)
+ [Amazon MWAA에서 dbt 사용](samples-dbt.md)
+ [AWS 블로그 및 자습서](#samples-blogs-tutorials)

# DAG를 사용하여 CLI에서 변수 가져오기
<a name="samples-variables-import"></a>

다음 샘플 코드는 Amazon Managed Workflows for Apache Airflow의 CLI를 사용하여 변수를 가져옵니다.

**Topics**
+ [버전](#samples-variables-import-version)
+ [사전 조건](#samples-variables-import-prereqs)
+ [권한](#samples-variables-import-permissions)
+ [종속성](#samples-variables-import-dependencies)
+ [코드 샘플](#samples-variables-import-code)
+ [다음 단계](#samples-variables-import-next-up)

## 버전
<a name="samples-variables-import-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-variables-import-prereqs"></a>

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

## 권한
<a name="samples-variables-import-permissions"></a>

AWS 계정은 `AmazonMWAAAirflowCliAccess` 정책에 대한 액세스 권한이 필요합니다. 자세한 내용은 [Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess](access-policies.md) 섹션을 참조하세요.

## 종속성
<a name="samples-variables-import-dependencies"></a>

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 샘플
<a name="samples-variables-import-code"></a>

다음 샘플 코드는 Amazon MWAA 환경 이름(`mwaa_env`), 환경의 AWS 리전 리전(`var_file`) 및 가져오려는 변수가 포함된 로컬 파일(`aws_region`)의 세 가지 입력을 사용합니다.

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## 다음 단계
<a name="samples-variables-import-next-up"></a>
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.

# `SSHOperator`을(를) 사용하여 SSH 연결 생성
<a name="samples-ssh"></a>

다음 예제는 DAG(유방향 비순환 그래프)에서 `SSHOperator`을(를) 사용하여 Amazon Managed Workflows for Apache Airflow 환경에서 원격 Amazon EC2 인스턴스에 연결하는 방법을 설명합니다. 비슷한 접근 방식을 사용하여 SSH 액세스가 있는 모든 원격 인스턴스에 연결할 수 있습니다.

다음 예제에서는 Amazon S3에 있는 사용자 환경의 `dags` 디렉터리에 SSH 암호 키(`.pem`)를 업로드합니다. 그런 다음 `requirements.txt`을(를) 사용하여 필요한 종속성을 설치하고 UI에 새 Apache Airflow 연결을 생성합니다. 마지막으로 원격 인스턴스에 대한 SSH 연결을 생성하는 DAG를 작성합니다.

**Topics**
+ [버전](#samples-ssh-version)
+ [사전 조건](#samples-ssh-prereqs)
+ [권한](#samples-ssh-permissions)
+ [요구 사항](#samples-ssh-dependencies)
+ [암호 키를 Amazon S3에 복사](#samples-ssh-secret)
+ [새 Apache Airflow 연결 생성](#samples-ssh-connection)
+ [코드 샘플](#samples-ssh-code)

## 버전
<a name="samples-ssh-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-ssh-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).
+ SSH 암호 키. 코드 샘플은 Amazon MWAA 환경과 동일한 리전에 Amazon EC2 인스턴스와 `.pem`이(가) 있다고 가정합니다. 키가 없는 경우 *Amazon EC2 사용 설명서*의 [키 쌍 생성 또는 가져오기](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair)를 참조하세요.

## 권한
<a name="samples-ssh-permissions"></a>

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

## 요구 사항
<a name="samples-ssh-dependencies"></a>

웹 서버에 `apache-airflow-providers-ssh` 패키지를 설치하려면 다음 파라미터를 `requirements.txt`에 추가합니다. 환경이 업데이트되고 Amazon MWAA가 종속성을 성공적으로 설치하면 UI에 새 **SSH** 연결 유형이 표시됩니다.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**참고**  
`-c`는 `requirements.txt`의 제약 조건 URL을 정의합니다. 이렇게 하면 Amazon MWAA가 사용자 환경에 맞는 올바른 패키지 버전을 설치할 수 있습니다.

## 암호 키를 Amazon S3에 복사
<a name="samples-ssh-secret"></a>

다음 AWS Command Line Interface 명령을 사용하여 Amazon S3의 환경 `dags` 디렉터리에 `.pem` 키를 복사합니다.

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA는 `.pem` 키를 포함해 `dags`의 콘텐츠를 로컬 `/usr/local/airflow/dags/` 디렉터리에 복사합니다. 이렇게 하면 Apache Airflow가 키에 액세스할 수 있습니다.

## 새 Apache Airflow 연결 생성
<a name="samples-ssh-connection"></a>

**Apache Airflow UI를 사용하여 새 SSH 연결을 생성하려면**

1. Amazon MWAA 콘솔에서 [환경 페이지](https://console.aws.amazon.com/mwaa/home#/environments)를 엽니다.

1. 환경 목록에서 사용자 환경에 맞는 **Airflow UI 열기**를 선택합니다.

1. Apache Airflow UI 페이지의 기본 내비게이션 바에서 **관리자**를 선택하여 드롭다운 목록을 확장한 다음 **연결**을 선택합니다.

1. **연결 목록** 페이지에서 **\$1**를 선택하거나 **새 레코드 추가** 버튼을 선택하여 새 연결을 추가합니다.

1. **연결 추가** 페이지에서 다음 정보를 추가합니다.

   1. **연결 ID**에 **ssh\$1new**를 입력합니다.

   1. **연결 유형**의 경우 드롭다운 목록에서 **SSH**를 선택합니다.
**참고**  
목록에 **SSH** 연결 유형이 없는 경우 Amazon MWAA가 필요한 `apache-airflow-providers-ssh` 패키지를 설치하지 않은 것입니다. 이 패키지를 포함하도록 `requirements.txt` 파일을 업데이트한 다음 다시 시도하세요.

   1. **Host**에 연결하려는 Amazon EC2 인스턴스의 IP 주소를 입력합니다. 예를 들어 **12.345.67.89**입니다.

   1. Amazon EC2 인스턴스에 연결 중인 경우 **사용자 이름**에 **ec2-user**을(를) 입력합니다. Apache Airflow를 연결하려는 원격 인스턴스의 유형에 따라 사용자 이름이 달라질 수 있습니다.

   1. **Extra**에는 다음 키-값 쌍을 JSON 형식으로 입력합니다.

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      이 키-값 쌍은 Apache Airflow가 로컬 `/dags` 디렉터리에서 암호 키를 검색하도록 지시합니다.

## 코드 샘플
<a name="samples-ssh-code"></a>

다음 DAG는 `SSHOperator`을(를) 사용하여 대상 Amazon EC2 인스턴스에 연결한 다음, `hostname` Linux 명령을 실행하여 인스턴스 이름을 인쇄합니다. 원격 인스턴스에서 모든 명령 또는 스크립트를 실행하도록 DAG를 수정할 수 있습니다.

1. 터미널을 열고 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `ssh.py`로 저장합니다.

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 성공하면 `ssh_operator_example` DAG의 `ssh_task`에 대한 작업 로그에 다음과 비슷한 출력을 얻게 됩니다.

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# Apache Airflow Snowflake 연결에 AWS Secrets Manager의 암호 키 사용
<a name="samples-sm-snowflake"></a>

다음 샘플은 Amazon Managed Workflows for Apache Airflow에서 Apache Airflow Snowflake 연결의 암호 키를 얻기 위해 AWS Secrets Manager을(를) 호출합니다. [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)의 단계를 완료했다고 가정합니다.

**Topics**
+ [버전](#samples-sm-snowflake-version)
+ [사전 조건](#samples-sm-snowflake-prereqs)
+ [권한](#samples-sm-snowflake-permissions)
+ [요구 사항](#samples-sm-snowflake-dependencies)
+ [코드 샘플](#samples-sm-snowflake-code)
+ [다음 단계](#samples-sm-snowflake-next-up)

## 버전
<a name="samples-sm-snowflake-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-sm-snowflake-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Apache Airflow 구성 옵션인 Secrets Manager 백엔드.
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Secrets Manager의 Apache Airflow 연결 문자열.

## 권한
<a name="samples-sm-snowflake-permissions"></a>
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Secrets Manager 권한.

## 요구 사항
<a name="samples-sm-snowflake-dependencies"></a>

이 페이지의 샘플 코드를 사용하려면 다음 종속성을 사용자 `requirements.txt`에 추가합니다. 자세한 내용은 [Python 종속성 설치](working-dags-dependencies.md) 섹션을 참조하세요.

```
apache-airflow-providers-snowflake==1.3.0
```

## 코드 샘플
<a name="samples-sm-snowflake-code"></a>

다음 단계는 Secrets Manager를 호출하여 암호를 가져오는 DAG 코드를 만드는 방법을 설명합니다.

1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `snowflake_connection.py`로 저장합니다.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## 다음 단계
<a name="samples-sm-snowflake-next-up"></a>
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.

# CloudWatch에서 DAG를 사용하여 사용자 지정 지표 작성
<a name="samples-custom-metrics"></a>

다음 코드 예제를 사용하여 `PythonOperator`를 실행하는 방향성 비순환 그래프(DAG)를 작성하여 Amazon MWAA 환경에 대한 OS 수준 지표를 검색할 수 있습니다. 그런 다음 DAG는 데이터를 Amazon CloudWatch에 사용자 지정 지표로 게시합니다.

사용자 지정 OS 수준 지표를 통해 환경 작업자가 가상 메모리 및 CPU와 같은 리소스를 어떻게 활용하고 있는지 추가로 파악할 수 있습니다. 이 정보를 사용하여 워크로드에 가장 적합한 [환경 클래스](environment-class.md)를 선택할 수 있습니다.

**Topics**
+ [버전](#samples-custom-metrics-version)
+ [사전 조건](#samples-custom-metrics-prereqs)
+ [권한](#samples-custom-metrics-permissions)
+ [종속성](#samples-custom-metrics-dependencies)
+ [코드 예제](#samples-custom-metrics-code)

## 버전
<a name="samples-custom-metrics-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-custom-metrics-prereqs"></a>

이 페이지에서 코드 예제를 사용하려면 다음이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).

## 권한
<a name="samples-custom-metrics-permissions"></a>

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

## 종속성
<a name="samples-custom-metrics-dependencies"></a>
+ 이 페이지의 코드 예제를 사용하는 데 추가 종속성이 필요하지 않습니다.

## 코드 예제
<a name="samples-custom-metrics-code"></a>

1. 명령 프롬프트에서 DAG 코드가 저장된 폴더로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 예제의 콘텐츠를 복사하고 로컬에서 `dag-custom-metrics.py`로 저장합니다. 환경 이름을 `MWAA-ENV-NAME` 로 변경합니다.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. DAG가 성공적으로 실행되면 Apache Airflow 로그에 다음과 비슷한 콘텐츠를 볼 수 있습니다.

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Amazon MWAA 환경에서 Aurora PostgreSQL 데이터베이스 정리
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow는 Aurora PostgreSQL 데이터베이스를 Apache Airflow 메타데이터 베이스로 사용하며, 여기서 DAG가 실행되고 작업 인스턴스가 저장됩니다. 다음 샘플 코드는 Amazon MWAA 환경의 전용 Aurora PostgreSQL 데이터베이스에서 항목을 정기적으로 정리합니다.

**Topics**
+ [버전](#samples-database-cleanup-version)
+ [사전 조건](#samples-database-cleanup-prereqs)
+ [종속성](#samples-sql-server-dependencies)
+ [코드 샘플](#samples-database-cleanup-code)

## 버전
<a name="samples-database-cleanup-version"></a>

이 페이지의 코드 샘플은 Amazon MWAA에서 지원되는 Apache Airflow v2에만 해당됩니다. [지원되는 Apache Airflow 버전](airflow-versions.md)을 참조하세요.

**작은 정보**  
**Apache Airflow v3 사용자의 경우**: 데이터베이스를 정리(메타스토어 테이블에서 이전 레코드 제거)하려는 경우, `db clean` CLI 명령을 실행합니다.

## 사전 조건
<a name="samples-database-cleanup-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).

## 종속성
<a name="samples-sql-server-dependencies"></a>

이 코드 예제를 Apache Airflow v2와 함께 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 샘플
<a name="samples-database-cleanup-code"></a>

다음 DAG는 `TABLES_TO_CLEAN`에 지정된 테이블에 대한 메타데이터 데이터베이스를 정리합니다. 이 예제는 지정된 테이블에서 30일이 넘은 데이터를 삭제합니다. 항목이 삭제되는 기간을 조정하려면 `MAX_AGE_IN_DAYS`를 다른 값으로 설정합니다.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# Amazon S3의 CSV 파일로 환경 메타데이터 내보내기
<a name="samples-dag-run-info-to-csv"></a>

다음 코드 예제를 사용하여 다양한 DAG 실행 정보에 대해 데이터베이스를 쿼리하고 Amazon S3에 저장된 `.csv` 파일에 데이터를 쓰는 DAG(방향성 비순환 그래프)를 생성합니다.

데이터를 로컬에서 검사하거나, 이를 객체 스토리지에 보관하거나, [Amazon S3에서 Amazon Redshift 운영자](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) 및 [데이터베이스 정리](samples-database-cleanup.md)와 같은 도구와 결합하기 위해, Amazon MWAA 메타데이터를 환경 외부로 이동하고 향후 분석을 위해 보존하기 위해 사용자 환경의 Aurora PostgreSQL 데이터베이스에서 정보를 내보내고자 할 수 있습니다.

[Apache Airflow 모델](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models)에 나열된 모든 객체에 대해 데이터베이스를 쿼리할 수 있습니다. 이 코드 샘플은 DAG 실행과 관련된 정보를 제공하는 세 가지 모델인 `DagRun`, `TaskFail` 및 `TaskInstance`를 사용합니다.

**Topics**
+ [버전](#samples-dag-run-info-to-csv-version)
+ [사전 조건](#samples-dag-run-info-to-csv-prereqs)
+ [권한](#samples-dag-run-info-to-csv-permissions)
+ [요구 사항](#samples-dag-run-info-to-csv-dependencies)
+ [코드 샘플](#samples-dag-run-info-to-csv-code)

## 버전
<a name="samples-dag-run-info-to-csv-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-dag-run-info-to-csv-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).
+ 메타데이터 정보를 내보내고자 하는 [새 Amazon S3 버킷](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html).

## 권한
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA에는 쿼리된 메타데이터 정보를 Amazon S3에 쓰기 위한 Amazon S3 작업`s3:PutObject`에 대해 권한이 필요합니다. 다음 정책 문을 사용자 환경의 실행 역할에 추가합니다.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

이 정책은 쓰기 액세스 권한을 *amzn-s3-demo-bucket*으로만 제한합니다.

## 요구 사항
<a name="samples-dag-run-info-to-csv-dependencies"></a>

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 샘플
<a name="samples-dag-run-info-to-csv-code"></a>

다음 단계는 Aurora PostgreSQL을 쿼리하고 새 Amazon S3 버킷에 결과를 쓰는 DAG를 생성하는 방법을 설명합니다.

1. 터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예제:

   ```
   cd dags
   ```

1. 다음 코드 예제의 콘텐츠를 복사하고 로컬에서 `metadata_to_csv.py`로 저장합니다. 사용자 DAG가 메타데이터 데이터베이스에서 쿼리하는 가장 오래된 기록의 보존 기간을 제어하기 위해 `MAX_AGE_IN_DAYS`에 할당된 값을 변경할 수 있습니다.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  다음 AWS CLI 명령을 실행하여 DAG를 환경의 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 성공적으로 실행되면 `export_db` 작업의 작업 로그에 다음과 유사한 출력이 표시됩니다.

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   이제 새 Amazon S3 버킷에서 내보낸 `.csv` 파일에 액세스하여 `/files/export/`에 다운로드할 수 있습니다.

# Apache Airflow 변수에 AWS Secrets Manager 암호 키 사용
<a name="samples-secrets-manager-var"></a>

다음 샘플에서는 Amazon Managed Workflows for Apache Airflow에서 Apache Airflow 변수에 대한 암호 키를 가져오기 위해 AWS Secrets Manager를 직접적으로 호출합니다. [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)의 단계를 완료했다고 가정합니다.

**Topics**
+ [버전](#samples-secrets-manager-var-version)
+ [사전 조건](#samples-secrets-manager-var-prereqs)
+ [권한](#samples-secrets-manager-var-permissions)
+ [요구 사항](#samples-hive-dependencies)
+ [코드 샘플](#samples-secrets-manager-var-code)
+ [다음 단계](#samples-secrets-manager-var-next-up)

## 버전
<a name="samples-secrets-manager-var-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-secrets-manager-var-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Apache Airflow 구성 옵션인 Secrets Manager 백엔드.
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Secrets Manager의 Apache Airflow 변수 문자열.

## 권한
<a name="samples-secrets-manager-var-permissions"></a>
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Secrets Manager 권한.

## 요구 사항
<a name="samples-hive-dependencies"></a>

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 샘플
<a name="samples-secrets-manager-var-code"></a>

다음 단계는 Secrets Manager를 호출하여 암호를 가져오는 DAG 코드를 만드는 방법을 설명합니다.

1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `secrets-manager-var.py`로 저장합니다.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## 다음 단계
<a name="samples-secrets-manager-var-next-up"></a>
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.

# Apache Airflow 연결을 위한 AWS Secrets Manager의 암호 키 사용
<a name="samples-secrets-manager"></a>

다음 샘플에서는 Amazon Managed Workflows for Apache Airflow에서 Apache Airflow 연결을 위한 암호 키를 가져오기 위해 AWS Secrets Manager를 직접적으로 호출합니다. [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)의 단계를 완료했다고 가정합니다.

**Topics**
+ [버전](#samples-secrets-manager-version)
+ [사전 조건](#samples-secrets-manager-prereqs)
+ [권한](#samples-secrets-manager-permissions)
+ [요구 사항](#samples-hive-dependencies)
+ [코드 샘플](#samples-secrets-manager-code)
+ [다음 단계](#samples-secrets-manager-next-up)

## 버전
<a name="samples-secrets-manager-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-secrets-manager-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Apache Airflow 구성 옵션인 Secrets Manager 백엔드.
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Secrets Manager의 Apache Airflow 연결 문자열.

## 권한
<a name="samples-secrets-manager-permissions"></a>
+ [AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성](connections-secrets-manager.md)에 나열된 바와 같은 Secrets Manager 권한.

## 요구 사항
<a name="samples-hive-dependencies"></a>

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 샘플
<a name="samples-secrets-manager-code"></a>

다음 단계는 Secrets Manager를 호출하여 암호를 가져오는 DAG 코드를 만드는 방법을 설명합니다.

1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `secrets-manager.py`로 저장합니다.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## 다음 단계
<a name="samples-secrets-manager-next-up"></a>
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.

# Oracle을 이용한 사용자 지정 플러그인 생성
<a name="samples-oracle"></a>

다음 샘플은 Amazon MWAA를 위해 Oracle을 이용한 사용자 지정 플러그인을 생성하는 단계를 안내하며, plugins.zip 파일에서 다른 사용자 지정 플러그인 및 바이너리와 결합할 수 있습니다.

**Contents**
+ [버전](#samples-oracle-version)
+ [사전 조건](#samples-oracle-prereqs)
+ [권한](#samples-oracle-permissions)
+ [요구 사항](#samples-oracle-dependencies)
+ [코드 샘플](#samples-oracle-code)
+ [사용자 지정 플러그인 만들기](#samples-oracle-create-pluginszip-steps)
  + [다운로드 종속성](#samples-oracle-install)
  + [사용자 지정 플러그인](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Airflow 구성 옵션](#samples-oracle-airflow-config)
+ [다음 단계](#samples-oracle-next-up)

## 버전
<a name="samples-oracle-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-oracle-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).
+ 사용자 환경의 모든 로그 수준, `CRITICAL` 또는 이전 섹션에서 작업자 로깅이 활성화됨. Amazon MWAA 로그 유형 및 로그 그룹 관리 방법에 대한 자세한 내용은 [Amazon CloudWatch에서 Airflow 로그 액세스](monitoring-airflow.md) 섹션을 참조하세요.

## 권한
<a name="samples-oracle-permissions"></a>

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

## 요구 사항
<a name="samples-oracle-dependencies"></a>

이 페이지의 샘플 코드를 사용하려면 다음 종속성을 사용자 `requirements.txt`에 추가합니다. 자세한 내용은 [Python 종속성 설치](working-dags-dependencies.md) 섹션을 참조하세요.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## 코드 샘플
<a name="samples-oracle-code"></a>

다음 단계에서는 사용자 지정 플러그인을 테스트할 DAG 코드를 생성하는 방법을 설명합니다.

1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `oracle.py`로 저장합니다.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## 사용자 지정 플러그인 만들기
<a name="samples-oracle-create-pluginszip-steps"></a>

이 섹션에서는 종속성을 다운로드하고, 사용자 지정 플러그인과 plugins.zip 파일을 만드는 방법을 설명합니다.

### 다운로드 종속성
<a name="samples-oracle-install"></a>

Amazon MWAA는 plugins.zip 콘텐츠를 각 Amazon MWAA 스케줄러 및 작업자 컨테이너에 있는 `/usr/local/airflow/plugins`로 추출합니다. 이는 환경에 바이너리를 추가하는 데 사용됩니다. 다음 단계에서는 사용자 지정 플러그인에 필요한 파일을 조합하는 방법을 설명합니다.

**Amazon Linux 컨테이너 이미지 가져오기**

1. 명령 프롬프트에서 Amazon Linux 컨테이너 이미지를 가져와서 로컬에서 컨테이너를 실행합니다. 예:

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   명령 프롬프트에서 bash 명령줄을 간접적으로 호출할 수 있습니다. 예:

   ```
   bash-4.2#
   ```

1. Linux 네이티브 비동기 I/O 기능(libaio)을 설치합니다.

   ```
   yum -y install libaio
   ```

1. 후속 단계를 위해 이 창을 열어 둡니다. `lib64/libaio.so.1`, `lib64/libaio.so.1.0.0`, `lib64/libaio.so.1.0.1` 파일을 로컬로 복사할 예정입니다.

**클라이언트 폴더 다운로드**

1. unzip 패키지를 로컬에 설치합니다. 예:

   ```
   sudo yum install unzip
   ```

1. `oracle_plugin` 디렉터리를 생성합니다. 예:

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. 다음 curl 명령을 사용하여 [Linux x86-64용 Oracle Instant 클라이언트 다운로드(64비트)](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html)에서 [instantclient-basic-linux.x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip)을 다운로드합니다.

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. `client.zip` 파일의 압축을 풉니다. 예:

   ```
   unzip *.zip
   ```

**Docker에서 파일을 추출합니다.**

1. 새 명령 프롬프트에서 Docker 컨테이너 ID를 표시하고 기록해 둡니다. 예:

   ```
   docker container ls
   ```

   명령 프롬프트는 모든 컨테이너와 해당 ID를 반환할 수 있습니다. 예:

   ```
   debc16fd6970
   ```

1. `oracle_plugin` 디렉터리에서 `lib64/libaio.so.1`, `lib64/libaio.so.1.0.0`, `lib64/libaio.so.1.0.1` 파일을 로컬 `instantclient_18_5` 폴더로 추출합니다. 예:

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### 사용자 지정 플러그인
<a name="samples-oracle-plugins-code"></a>

Apache Airflow는 스타트업 시 플러그인 폴더에 있는 Python 파일의 콘텐츠를 실행합니다. 이는 환경 변수를 설정하고 수정하는 데 사용됩니다. 다음 단계에서는 사용자 지정 플러그인의 샘플 코드를 설명합니다.
+ 다음 코드 샘플의 내용을 복사하고 로컬에서 `env_var_plugin_oracle.py`로 저장합니다.

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

다음 단계에서는 `plugins.zip`을 생성하는 방법에 대해 설명합니다. 이 예제의 내용은 다른 플러그인 및 바이너리와 결합하여 단일 `plugins.zip` 파일로 만들 수 있습니다.

**플러그인 디렉터리의 콘텐츠를 압축합니다.**

1. 명령 프롬프트에서 `oracle_plugin` 디렉터리로 이동합니다. 예:

   ```
   cd oracle_plugin
   ```

1. `instantclient_18_5` 디렉터리를 plugins.zip으로 압축합니다. 예:

   ```
   zip -r ../plugins.zip ./
   ```

   명령 프롬프트에 다음이 표시됩니다.

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. `client.zip` 파일을 제거합니다. 예:

   ```
   rm client.zip
   ```

**env\$1var\$1plugin\$1oracle.py 파일을 압축합니다.**

1. plugins.zip. 파일의 루트에 `env_var_plugin_oracle.py` 파일을 추가합니다. 예:

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. 이제 plugins.zip 파일에 다음 정보가 포함됩니다.

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Airflow 구성 옵션
<a name="samples-oracle-airflow-config"></a>

Apache Airflow v2를 사용하는 경우 Apache Airflow 구성 옵션으로 `core.lazy_load_plugins : False`을 추가합니다. 자세한 내용은 [2에서 구성 옵션을 사용하여 플러그인 로드](configuring-env-variables.md#configuring-2.0-airflow-override)를 참조하세요.

## 다음 단계
<a name="samples-oracle-next-up"></a>
+ 이 예제의 `requirements.txt` 파일을 [Python 종속성 설치](working-dags-dependencies.md)의 Amazon S3 버킷에 업로드하는 방법을 알아봅니다.
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.
+ 이 예제의 `plugins.zip` 파일을 [사용자 지정 플러그인 설치](configuring-dag-import-plugins.md)의 Amazon S3 버킷에 업로드하는 방법에 대해 자세히 알아봅니다.

# Amazon MWAA에서 DAG 시간대 변경
<a name="samples-plugins-timezone"></a>

Apache Airflow는 기본적으로 방향성 비순환 그래프(DAG)를 UTC\$10으로 스케줄링합니다. 다음 단계는 Amazon MWAA가 [Pendulum](https://pypi.org/project/pendulum/)을 사용하여 DAG를 실행하는 시간대를 변경하는 방법을 보여줍니다. 선택적으로 이 주제에서는 사용자 지정 플러그인을 생성하여 사용자 환경의 Apache Airflow 로그의 시간대를 변경하는 방법을 보여줍니다.

**Topics**
+ [버전](#samples-plugins-timezone-version)
+ [사전 조건](#samples-plugins-timezone-prerequisites)
+ [권한](#samples-plugins-timezone-permissions)
+ [Airflow 로그의 시간대를 변경하는 플러그인을 생성합니다.](#samples-plugins-timezone-custom-plugin)
+ [생성`plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [코드 샘플](#samples-plugins-timezone-dag)
+ [다음 단계](#samples-plugins-timezone-plugins-next-up)

## 버전
<a name="samples-plugins-timezone-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-plugins-timezone-prerequisites"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).

## 권한
<a name="samples-plugins-timezone-permissions"></a>

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

## Airflow 로그의 시간대를 변경하는 플러그인을 생성합니다.
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow는 시작시 `plugins` 디렉터리의 Python 파일을 실행합니다. 다음 플러그인을 사용하면 실행자의 시간대를 재정의하여 Apache Airflow가 로그를 기록하는 시간대를 수정할 수 있습니다.

1. 사용자 지정 플러그인에 대해 지정된 이름 `plugins` 디렉터리를 생성하고 디렉터리로 이동합니다. 예:

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. 다음 코드 샘플의 콘텐츠를 복사하고 로컬의 `plugins` 폴더에 `dag-timezone-plugin.py`로 저장합니다.

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. `plugins` 디렉터리에서 `__init__.py`라는 빈 Python 파일을 만듭니다. `plugins` 디렉터리는 다음과 비슷해야 합니다.

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## 생성`plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

다음 단계에서는 `plugins.zip`을 생성하는 방법에 대해 설명합니다. 이 예제의 콘텐츠를 다른 플러그인 및 바이너리와 결합하여 단일 `plugins.zip` 파일로 만들 수 있습니다.

1. 명령 프롬프트에서 이전 단계의 `plugins` 디렉터리로 이동합니다. 예:

   ```
   cd plugins
   ```

1. `plugins` 디렉터리 내의 콘텐츠를 압축합니다.

   ```
   zip -r ../plugins.zip ./
   ```

1. S3 버킷에 `plugins.zip`를 업로드합니다.

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## 코드 샘플
<a name="samples-plugins-timezone-dag"></a>

DAG가 실행되는 기본 시간대(UTC\$10)를 변경하려면 시간대를 인식하는 날짜/시간을 처리하는 Python 라이브러리인 [Pendulum](https://pypi.org/project/pendulum/)이라는 라이브러리를 사용합니다.

1. 명령 프롬프트에서 DAG가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 예제의 콘텐츠를 복사하고 `tz-aware-dag.py`로 저장합니다.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 성공하면 `tz_test` DAG의 `tz_aware_task`에 대한 작업 로그에 다음과 비슷한 결과가 출력됩니다.

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## 다음 단계
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ 이 예제의 `plugins.zip` 파일을 [사용자 지정 플러그인 설치](configuring-dag-import-plugins.md)의 Amazon S3 버킷에 업로드하는 방법에 대해 자세히 알아봅니다.

# CodeArtifact 토큰 새로고침
<a name="samples-code-artifact"></a>

CodeArtifact를 사용하여 Python 종속성을 설치하는 경우 Amazon MWAA에는 활성 토큰이 필요합니다. Amazon MWAA가 런타임의 CodeArtifact 리포지토리에 액세스할 수 있도록 하려면 [시작 스크립트](using-startup-script.md)를 사용하고 토큰과 [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url)을 설정하면 됩니다.

다음 주제에서는 [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token) CodeArtifact API 작업을 사용하여 환경이 시작되거나 업데이트될 때마다 새 토큰을 검색하는 시작 스크립트를 만드는 방법을 설명합니다.

**Topics**
+ [버전](#samples-code-artifact-version)
+ [사전 조건](#samples-code-artifact-prereqs)
+ [권한](#samples-code-artifact-permissions)
+ [코드 샘플](#samples-code-artifact-code)
+ [다음 단계](#samples-code-artifact-next-up)

## 버전
<a name="samples-code-artifact-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-code-artifact-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).
+ 환경에 대한 종속성을 저장하는 [CodeArtifact 리포지토리](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html)입니다.

## 권한
<a name="samples-code-artifact-permissions"></a>

CodeArtifact 토큰을 새로 고치고 결과를 Amazon S3에 쓰려면 Amazon MWAA의 실행 역할에 다음과 같은 권한이 있어야 합니다.
+ 이 `codeartifact:GetAuthorizationToken` 작업을 통해 Amazon MWAA는 CodeArtifact에서 새 토큰을 검색할 수 있습니다. 다음 정책은 사용자가 생성하는 모든 CodeArtifact 도메인에 권한을 부여합니다. 명령문의 리소스 값을 수정하고 사용자 환경에서 액세스할 도메인만 지정하여 도메인에 대한 액세스를 추가로 제한할 수 있습니다.

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html) API 오퍼레이션을 호출하려면 `sts:GetServiceBearerToken` 액션이 필요합니다. 이 작업은 `pip`와(과) CodeArtifact 같은 패키지 관리자를 사용할 때 사용해야 하는 토큰을 반환합니다. CodeArtifact 리포지토리와 함께 패키지 관리자를 사용하려면 사용자 환경의 실행 역할 역할이 다음 정책 설명에 나열된 바와 같이 `sts:GetServiceBearerToken`을 허용해야 합니다.

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## 코드 샘플
<a name="samples-code-artifact-code"></a>

다음 단계는 CodeArtifact 토큰을 업데이트하는 시작 스크립트를 만드는 방법을 설명합니다.

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `code_artifact_startup_script.sh`로 저장합니다.

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. 스크립트를 저장한 폴더로 이동합니다. 새 프롬프트 창에서 `cp`를 사용하여 스크립트를 버킷에 업로드합니다. *amzn-s3-demo-bucket*을 자신의 정보로 바꿉니다.

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   성공하면 Amazon S3가 객체에 대한 URL 경로를 출력합니다.

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   스크립트를 업로드하면 시작 시 환경이 업데이트되고 스크립트가 실행됩니다.

## 다음 단계
<a name="samples-code-artifact-next-up"></a>
+ 시작 스크립트를 사용하여 [Amazon MWAA에서 시작 스크립트 사용](using-startup-script.md)에서 환경을 사용자 지정하는 방법을 알아봅니다.
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.
+ 이 예제의 `plugins.zip` 파일을 [사용자 지정 플러그인 설치](configuring-dag-import-plugins.md)의 Amazon S3 버킷에 업로드하는 방법에 대해 자세히 알아봅니다.

# Apache Hive 및 Hadoop을 사용하여 사용자 지정 플러그인 생성
<a name="samples-hive"></a>

Amazon MWAA는 `plugins.zip`의 콘텐츠를 `/usr/local/airflow/plugins`로 추출합니다. 이를 사용하여 컨테이너에 바이너리를 추가할 수 있습니다. 또한 Apache Airflow는 *스타트업* 시 `plugins` 폴더에 있는 Python 파일의 콘텐츠를 실행하므로 환경 변수를 설정하고 수정할 수 있습니다. 다음 샘플은 Amazon Managed Workflows for Apache Airflow 환경에서 Apache Hive 및 Hadoop을 사용하여 사용자 지정 플러그인을 생성하고 다른 사용자 지정 플러그인 및 바이너리와 결합할 수 있는 단계를 안내합니다.

**Topics**
+ [버전](#samples-hive-version)
+ [사전 조건](#samples-hive-prereqs)
+ [권한](#samples-hive-permissions)
+ [요구 사항](#samples-hive-dependencies)
+ [다운로드 종속성](#samples-hive-install)
+ [사용자 지정 플러그인](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [코드 샘플](#samples-hive-code)
+ [Airflow 구성 옵션](#samples-hive-airflow-config)
+ [다음 단계](#samples-hive-next-up)

## 버전
<a name="samples-hive-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-hive-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).

## 권한
<a name="samples-hive-permissions"></a>

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

## 요구 사항
<a name="samples-hive-dependencies"></a>

이 페이지의 샘플 코드를 사용하려면 다음 종속성을 사용자 `requirements.txt`에 추가합니다. 자세한 내용은 [Python 종속성 설치](working-dags-dependencies.md) 섹션을 참조하세요.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## 다운로드 종속성
<a name="samples-hive-install"></a>

Amazon MWAA는 plugins.zip 콘텐츠를 각 Amazon MWAA 스케줄러 및 작업자 컨테이너에 있는 `/usr/local/airflow/plugins`로 추출합니다. 이는 환경에 바이너리를 추가하는 데 사용됩니다. 다음 단계에서는 사용자 지정 플러그인에 필요한 파일을 조합하는 방법을 설명합니다.

1. 명령 프롬프트에서 플러그인을 만들려는 디렉터리로 이동합니다. 예:

   ```
   cd plugins
   ```

1. [미러](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz)에서 [Hadoop](https://hadoop.apache.org/)을 다운로드합니다. 예를 들면 다음과 같습니다.

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. [미러](https://www.apache.org/dyn/closer.cgi/hive/)에서 [Hive](https://hive.apache.org/)를 다운로드합니다. 예를 들면 다음과 같습니다.

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. 디렉터리를 생성합니다. 예:

   ```
   mkdir hive_plugin
   ```

1. Hadoop을 추출합니다.

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Hive를 추출합니다.

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## 사용자 지정 플러그인
<a name="samples-hive-plugins-code"></a>

Apache Airflow는 스타트업 시 플러그인 폴더에 있는 Python 파일의 콘텐츠를 실행합니다. 이는 환경 변수를 설정하고 수정하는 데 사용됩니다. 다음 단계에서는 사용자 지정 플러그인의 샘플 코드를 설명합니다.

1. 명령 프롬프트에서 `hive_plugin` 디렉터리로 이동합니다. 예:

   ```
   cd hive_plugin
   ```

1. 다음 코드 샘플의 콘테츠를 복사하여 로컬의 `hive_plugin` 디렉터리에 `hive_plugin.py`로 저장합니다.

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. 다음 텍스트의 콘텐츠를 복사하여 로컬의 `hive_plugin` 디렉터리에 `.airflowignore`로 저장합니다.

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

다음 단계에서는 `plugins.zip`을 생성하는 방법에 대해 설명합니다. 이 예제의 내용은 다른 플러그인 및 바이너리와 결합하여 단일 `plugins.zip` 파일로 만들 수 있습니다.

1. 명령 프롬프트에서 이전 단계의 `hive_plugin` 디렉터리로 이동합니다. 예:

   ```
   cd hive_plugin
   ```

1. `plugins` 폴더 내 콘텐츠를 압축합니다.

   ```
   zip -r ../hive_plugin.zip ./
   ```

## 코드 샘플
<a name="samples-hive-code"></a>

다음 단계에서는 사용자 지정 플러그인을 테스트할 DAG 코드를 생성하는 방법을 설명합니다.

1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `hive.py`로 저장합니다.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Airflow 구성 옵션
<a name="samples-hive-airflow-config"></a>

Apache Airflow v2를 사용하는 경우 Apache Airflow 구성 옵션으로 `core.lazy_load_plugins : False`을 추가합니다. 자세한 내용은 [2에서 구성 옵션을 사용하여 플러그인 로드](configuring-env-variables.md#configuring-2.0-airflow-override)를 참조하세요.

## 다음 단계
<a name="samples-hive-next-up"></a>
+ 이 예제의 `requirements.txt` 파일을 [Python 종속성 설치](working-dags-dependencies.md)의 Amazon S3 버킷에 업로드하는 방법을 알아봅니다.
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.
+ 이 예제의 `plugins.zip` 파일을 [사용자 지정 플러그인 설치](configuring-dag-import-plugins.md)의 Amazon S3 버킷에 업로드하는 방법에 대해 자세히 알아봅니다.

# Apache Airflow PythonVirtualenvOperator용 사용자 지정 플러그인 생성
<a name="samples-virtualenv"></a>

다음 샘플은 Amazon Managed Workflows for Apache Airflow에서 사용자 지정 플러그인을 사용하여 Apache Airflow `PythonVirtualenvOperator`를 패치하는 방법을 설명합니다.

**Topics**
+ [버전](#samples-virtualenv-version)
+ [사전 조건](#samples-virtualenv-prereqs)
+ [권한](#samples-virtualenv-permissions)
+ [요구 사항](#samples-virtualenv-dependencies)
+ [사용자 지정 플러그인 샘플 코드](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [코드 샘플](#samples-virtualenv-code)
+ [Airflow 구성 옵션](#samples-virtualenv-airflow-config)
+ [다음 단계](#samples-virtualenv-next-up)

## 버전
<a name="samples-virtualenv-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-virtualenv-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).

## 권한
<a name="samples-virtualenv-permissions"></a>

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

## 요구 사항
<a name="samples-virtualenv-dependencies"></a>

이 페이지의 샘플 코드를 사용하려면 다음 종속성을 사용자 `requirements.txt`에 추가합니다. 자세한 내용은 [Python 종속성 설치](working-dags-dependencies.md) 섹션을 참조하세요.

```
virtualenv
```

## 사용자 지정 플러그인 샘플 코드
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow는 스타트업 시 플러그인 폴더에 있는 Python 파일의 콘텐츠를 실행합니다. 이 플러그인은 시작 프로세스 중에 내장 `PythonVirtualenvOperator`을 패치하여 Amazon MWAA와 호환되도록 합니다. 다음 단계는 사용자 지정 플러그인의 샘플 코드를 보여줍니다.

1. 명령 프롬프트에서 이전 섹션의 `plugins` 디렉터리로 이동합니다. 예:

   ```
   cd plugins
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `virtual_python_plugin.py`로 저장합니다.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

다음 단계에서는 `plugins.zip`을 생성하는 방법에 대해 설명합니다.

1. 명령 프롬프트에서 이전 섹션의 `virtual_python_plugin.py`를 포함한 디렉터리로 이동합니다. 예:

   ```
   cd plugins
   ```

1. `plugins` 폴더 내 콘텐츠를 압축합니다.

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## 코드 샘플
<a name="samples-virtualenv-code"></a>

다음 단계에서는 사용자 지정 플러그인의 DAG 코드를 생성하는 방법을 설명합니다.

1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `virtualenv_test.py`로 저장합니다.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Airflow 구성 옵션
<a name="samples-virtualenv-airflow-config"></a>

Apache Airflow v2를 사용하는 경우 Apache Airflow 구성 옵션으로 `core.lazy_load_plugins : False`을 추가합니다. 자세한 내용은 [2에서 구성 옵션을 사용하여 플러그인 로드](configuring-env-variables.md#configuring-2.0-airflow-override)를 참조하세요.

## 다음 단계
<a name="samples-virtualenv-next-up"></a>
+ 이 예제의 `requirements.txt` 파일을 [Python 종속성 설치](working-dags-dependencies.md)의 Amazon S3 버킷에 업로드하는 방법을 알아봅니다.
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.
+ 이 예제의 `plugins.zip` 파일을 [사용자 지정 플러그인 설치](configuring-dag-import-plugins.md)의 Amazon S3 버킷에 업로드하는 방법에 대해 자세히 알아봅니다.

# Lambda 함수를 사용한 DAG 호출
<a name="samples-lambda"></a>

다음 코드 예제는 Amazon MWAA 환경에서 [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) 함수를 사용하여 Apache Airflow CLI 토큰을 가져오고 방향성 비순환 그래프(DAG)를 호출합니다.

**Topics**
+ [버전](#samples-lambda-version)
+ [사전 조건](#samples-lambda-prereqs)
+ [권한](#samples-lambda-permissions)
+ [종속성](#samples-lambda-dependencies)
+ [코드 예제](#samples-lambda-code)

## 버전
<a name="samples-lambda-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-lambda-prereqs"></a>

이 코드 예제를 활용하려면 다음과 같이 해야 합니다.
+ [Amazon MWAA 환경](get-started.md)에서는 [퍼블릭 네트워크 액세스 모드](configuring-networking.md#webserver-options-public-network-onconsole)를 사용합니다.
+ 최신 Python 런타임을 사용하는 [Lambda 함수](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html)를 보유합니다.

**참고**  
Lambda 함수와 Amazon MWAA 환경이 동일한 VPC에 있는 경우, 프라이빗 네트워크에서 이 코드를 사용할 수 있습니다. 이 구성의 경우 Lambda 함수의 실행 역할에 Amazon Elastic Compute Cloud(Amazon EC2) **CreateNetworkInterface** API 작업을 호출할 수 있는 권한이 필요합니다. [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS관리형 정책을 사용하여이 권한을 제공할 수 있습니다.

## 권한
<a name="samples-lambda-permissions"></a>

이 페이지의 코드 예제를 사용하려면 Amazon MWAA 환경의 실행 역할에 `airflow:CreateCliToken` 작업을 수행할 수 있는 액세스 권한이 필요합니다. `AmazonMWAAAirflowCliAccess` AWS관리형 정책을 사용하여이 권한을 제공할 수 있습니다.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

자세한 정보는 [Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access) 섹션을 참조하세요.

## 종속성
<a name="samples-lambda-dependencies"></a>

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 예제
<a name="samples-lambda-code"></a>

1. [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/) AWS Lambda 콘솔을 엽니다.

1. **함수** 목록에서 Lambda 함수를 선택합니다.

1. 함수 페이지에서 다음 코드를 복사하고 다음을 리소스 이름으로 바꿉니다.
   + `YOUR_ENVIRONMENT_NAME` – Amazon MWAA 환경의 이름입니다.
   + `YOUR_DAG_NAME` – 호출하려는 DAG의 이름입니다.

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. **배포(Deploy)**를 선택합니다.

1. Lambda 콘솔을 사용하여 함수를 호출하려면 **테스트**를 선택합니다.

1. Lambda가 DAG를 성공적으로 호출했는지 확인하려면 Amazon MWAA를 사용하여 환경의 Apache Airflow UI로 이동한 후 다음을 수행합니다.

   1. **DAG** 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.

   1. **마지막 실행**에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서 `invoke_dag`에 대한 최신 타임스탬프와 거의 일치해야 합니다.

   1. **최근 작업**에서 마지막 실행이 성공했는지 확인합니다.

# 여러 Amazon MWAA 환경에서 DAG 호출
<a name="samples-invoke-dag"></a>

다음 코드 예제는 Apache Airflow CLI 토큰을 생성합니다. 그런 다음 코드는 한 Amazon MWAA 환경에서 방향성 비순환 그래프(DAG)를 사용하여 다른 Amazon MWAA 환경에서 DAG를 호출합니다.

**Topics**
+ [버전](#samples-invoke-dag-version)
+ [사전 조건](#samples-invoke-dag-prereqs)
+ [권한](#samples-invoke-dag-permissions)
+ [종속성](#samples-invoke-dag-dependencies)
+ [코드 예제](#samples-invoke-dag-code)

## 버전
<a name="samples-invoke-dag-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-invoke-dag-prereqs"></a>

이 페이지에서 코드 예제를 사용하려면 다음이 필요합니다.
+ **퍼블릭 네트워크** 웹 서버에 액세스할 수 있는 두 개의 [Amazon MWAA 환경](get-started.md)(현재 환경 포함).
+ 대상 환경의 Amazon Simple Storage Service(S3) 버킷에 업로드된 샘플 DAG.

## 권한
<a name="samples-invoke-dag-permissions"></a>

이 페이지의 코드 예제를 사용하려면 사용자 환경의 실행 역할에 Apache Airflow CLI 토큰을 생성할 권한이 있어야 합니다. AWS관리형 정책을 연결하여이 권한을 부여`AmazonMWAAAirflowCliAccess`할 수 있습니다.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

자세한 정보는 [Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access) 섹션을 참조하세요.

## 종속성
<a name="samples-invoke-dag-dependencies"></a>

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 예제
<a name="samples-invoke-dag-code"></a>

다음 코드 예제는 현재 환경에서 DAG를 사용하여 다른 환경에서 DAG를 호출한다고 가정합니다.

1. 터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예제:

   ```
   cd dags
   ```

1. 다음 코드 예제의 내용을 복사하고 로컬에서 `invoke_dag.py`로 저장합니다. 다음 값을 사용자의 정보로 교체합니다.
   + `your-new-environment-name`— DAG를 호출하려는 다른 환경의 이름.
   + `your-target-dag-id`— 호출하려는 다른 환경에서 DAG의 ID.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  다음 AWS CLI 명령을 실행하여 DAG를 환경의 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. DAG가 성공적으로 실행되면 `invoke_dag_task`의 작업 로그에 다음과 유사한 출력을 얻게 됩니다.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   DAG가 성공적으로 호출되었는지 확인하려면 새 환경에 대한 Apache Airflow UI로 이동한 다음 다음을 수행합니다.

   1. **DAG** 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.

   1. **마지막 실행**에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서 `invoke_dag`에 대한 최신 타임스탬프와 거의 일치해야 합니다.

   1. **최근 작업**에서 마지막 실행이 성공했는지 확인합니다.

# Amazon RDS for Microsoft SQL Server와 함께 Amazon MWAA 사용
<a name="samples-sql-server"></a>

Amazon Managed Workflows for Apache Airflow를 사용하여 [RDS for SQL Server](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html)에 연결할 수 있습니다. 다음 샘플 코드는 Amazon Managed Workflows for Apache Airflow 환경에서 DAG를 사용하여 Amazon RDS for Microsoft SQL Server 서버에 연결하고 쿼리를 실행합니다.

**Topics**
+ [버전](#samples-sql-server-version)
+ [사전 조건](#samples-sql-server-prereqs)
+ [종속성](#samples-sql-server-dependencies)
+ [Apache Airflow v2 연결](#samples-sql-server-conn)
+ [코드 샘플](#samples-sql-server-code)
+ [다음 단계](#samples-sql-server-next-up)

## 버전
<a name="samples-sql-server-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-sql-server-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).
+ Amazon MWAA와 RDS for SQL Server 는 동일한 Amazon VPC에서 실행/
+ Amazon MWAA 및 서버의 VPC 보안 그룹은 다음과 같은 연결로 구성됩니다.
  + Amazon MWAA 보안 그룹의 Amazon RDS에 대해 열려 있는 포트 `1433`에 대한 인바운드 규칙
  + 또는 Amazon MWAA에서 RDS로 열려 있는 `1433` 포트에 대한 아웃바운드 규칙
+ RDS for SQL Server에 대한 Apache Airflow 연결에는 이전 프로세스에서 만든 Amazon RDS SQL 서버 데이터베이스의 호스트 이름, 포트, 사용자 이름 및 암호가 반영됩니다.

## 종속성
<a name="samples-sql-server-dependencies"></a>

이 섹션의 샘플 코드를 사용하려면 다음 종속성을 사용자 `requirements.txt`에 추가합니다. 자세한 내용은 [Python 종속성 설치](working-dags-dependencies.md) 섹션을 참조하세요.

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Apache Airflow v2 연결
<a name="samples-sql-server-conn"></a>

Apache Airflow v2에서 연결을 사용하는 경우 Airflow 연결 객체에 다음과 같은 키-값 페어가 포함되어 있는지 확인합니다.

1. **연결 ID: ** mssql\$1default

1. **연결 유형: ** Amazon Web Services

1. **호스트: ** `YOUR_DB_HOST`

1. **스키마: **

1. **로그인: ** 관리자

1. **암호: **

1. **포트: ** 1433

1. **추가: **

## 코드 샘플
<a name="samples-sql-server-code"></a>

1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하고 로컬에서 `sql-server.py`로 저장합니다.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## 다음 단계
<a name="samples-sql-server-next-up"></a>
+ 이 예제의 `requirements.txt` 파일을 [Python 종속성 설치](working-dags-dependencies.md)의 Amazon S3 버킷에 업로드하는 방법을 알아봅니다.
+ 이 예제의 DAG 코드를 [DAG 추가 또는 업데이트](configuring-dag-folder.md)에서 Amazon S3 버킷의 `dags` 폴더에 업로드하는 방법을 알아봅니다.
+ 예제 스크립트와 기타 [pymssql 모듈 예제](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html)를 살펴봅니다.
+ *Apache Airflow 참조 가이드*에서 [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator)를 사용하여 특정 Microsoft SQL 데이터베이스에서 SQL 코드를 실행하는 방법에 대해 자세히 알아봅니다.

# Amazon EKS에서 Amazon MWAA 사용
<a name="mwaa-eks-example"></a>

다음 샘플은 Amazon EKS와 함께 Amazon Managed Workflows for Apache Airflow를 사용하는 방법을 보여줍니다.

**Topics**
+ [버전](#mwaa-eks-example-version)
+ [사전 조건](#eksctl-prereqs)
+ [Amazon EC2용 퍼블릭 키 생성](#eksctl-create-key)
+ [클러스터 생성](#create-cluster-eksctl)
+ [`mwaa` 네임스페이스 생성](#eksctl-namespace)
+ [`mwaa` 네임스페이스에 대한 역할 생성](#eksctl-role)
+ [Amazon EKS 클러스터에 대한 IAM 역할 생성 및 연결](#eksctl-iam-role)
+ [requirements.txt 파일 생성](#eksctl-requirements)
+ [Amazon EKS에 대한 자격 증명 매핑 생성](#eksctl-identity-map)
+ [`kubeconfig` 생성](#eksctl-kube-config)
+ [DAG 생성](#eksctl-create-dag)
+ [Amazon S3 버킷에 DAG 및 `kube_config.yaml` 추가](#eksctl-dag-bucket)
+ [예제 활성화 및 트리거](#eksctl-trigger-pod)

## 버전
<a name="mwaa-eks-example-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="eksctl-prereqs"></a>

이 항목의 예제를 사용하려면 다음이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).
+ eksctl. 자세한 내용은 [eksctl 설치](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl)를 참조하세요.
+ kubectl. 자세한 내용은 [kubectl 설치 및 설정](https://kubernetes.io/docs/tasks/tools/install-kubectl/)을 참조하세요. 경우에 따라 eksctl과 함께 설치되기도 합니다.
+ Amazon MWAA 환경을 생성하는 리전의 EC2 키 페어입니다. 자세한 내용은 [키 페어 생성 또는 가져오기](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair)를 참조하세요.

**참고**  
`eksctl` 명령을 사용할 때 `--profile`를 포함하여 기본값 이외의 프로필을 지정할 수 있습니다.

## Amazon EC2용 퍼블릭 키 생성
<a name="eksctl-create-key"></a>

다음 명령을 사용하여 프라이빗 키 페어에서 퍼블릭 키를 생성합니다.

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

자세한 내용은 [키 페어에 대한 퍼블릭 키 검색](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key)을 참조하세요.

## 클러스터 생성
<a name="create-cluster-eksctl"></a>

다음 명령을 사용하여 클러스터를 생성합니다. 클러스터에 사용자 지정 이름을 지정하거나 다른 리전에 생성하려면 이름 및 리전 값을 바꿉니다. Amazon MWAA 환경을 생성하는 동일한 리전에서 클러스터를 생성해야 합니다. Amazon MWAA에 사용하는 Amazon VPC 네트워크의 서브넷과 일치하도록 서브넷 값을 바꿉니다. `ssh-public-key`의 값을 사용하는 키와 일치하도록 바꿉니다. 동일한 리전에 있는 Amazon EC2의 기존 키를 사용하거나 Amazon MWAA 환경을 생성한 동일한 리전에서 새 키를 생성할 수 있습니다.

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

클러스터 생성을 완료하는 데 시간이 걸립니다. 완료되면 다음 명령을 사용하여 클러스터가 성공적으로 생성되었고 IAM OIDC 공급자가 구성되어 있는지 확인할 수 있습니다.

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## `mwaa` 네임스페이스 생성
<a name="eksctl-namespace"></a>

클러스터가 성공적으로 생성되었는지 확인한 후 다음 명령을 사용하여 포드에 대한 네임스페이스를 생성합니다.

```
kubectl create namespace mwaa
```

## `mwaa` 네임스페이스에 대한 역할 생성
<a name="eksctl-role"></a>

네임스페이스를 생성한 후, MWAA 네임스페이스에서 포드를 실행할 수 있는 EKS의 Amazon MWAA 사용자에 대한 역할 및 역할 바인딩을 생성합니다. 네임스페이스에 다른 이름을 사용한 경우 `-n mwaa`의 mwaa를 사용한 이름으로 바꿉니다.

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

다음 명령을 실행하여 새 역할이 Amazon EKS 클러스터에 액세스할 수 있는지 확인합니다. *mwaa*를 사용하지 않았다면 반드시 올바른 이름을 사용하세요.

```
kubectl get pods -n mwaa --as mwaa-service
```

다음과 같은 메시지가 표시됩니다.

```
No resources found in mwaa namespace.
```

## Amazon EKS 클러스터에 대한 IAM 역할 생성 및 연결
<a name="eksctl-iam-role"></a>

IAM 역할을 생성한 다음 Amazon EKS(k8s) 클러스터에 바인딩해야 IAM을 통한 인증에 사용할 수 있습니다. 역할은 클러스터에 로그인하는 데만 사용되며 콘솔 또는 API 호출에 대한 권한은 없습니다.

[Amazon MWAA 실행 역할](mwaa-create-role.md)의 단계를 사용하여 Amazon MWAA 환경을 위한 새 역할을 생성합니다. 하지만 해당 항목에 설명된 정책을 생성하고 첨부하는 대신 다음 정책을 첨부하세요.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

역할을 생성한 후 Amazon MWAA 환경을 편집하여 생성한 역할을 환경의 실행 역할로 사용합니다. 역할을 변경하려면 사용할 환경을 편집합니다. **권한**에서 실행 역할을 선택합니다.

**알려진 문제:**
+ Amazon EKS로 인증할 수 없는 하위 경로가 있는 역할 ARN에 알려진 문제가 있습니다. 이에 대한 해결 방법은 Amazon MWAA에서 자체적으로 생성한 서비스 역할을 사용하는 대신 서비스 역할을 수동으로 생성하는 것입니다. 자세한 내용은 [aws-auth configmap의 ARN에 경로가 포함된 경우 경로가 있는 역할이 작동하지 않음](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)을 참조하세요.
+ IAM에서 Amazon MWAA 서비스 목록을 사용할 수 없는 경우 Amazon EC2와 같은 대체 서비스 정책을 선택한 다음, 역할의 신뢰 정책을 다음과 일치하도록 업데이트해야 합니다.

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  자세한 내용은 [IAM 역할에 신뢰 정책을 사용하는 방법](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/)을 참조하세요.

## requirements.txt 파일 생성
<a name="eksctl-requirements"></a>

이 섹션의 샘플 코드를 사용하려면 다음 데이터베이스 옵션 중 하나를 `requirements.txt`에 추가했는지 확인합니다. 자세한 내용은 [Python 종속성 설치](working-dags-dependencies.md) 섹션을 참조하세요.

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## Amazon EKS에 대한 자격 증명 매핑 생성
<a name="eksctl-identity-map"></a>

다음 명령에서 생성한 역할의 ARN을 사용하여 Amazon EKS에 대한 자격 증명 매핑을 생성합니다. 리전 *us-east-1*을 사용자가 환경을 생성한 리전으로 바꿉니다. 역할의 ARN을 바꾸고, 마지막으로 *mwaa-execution-role*을 사용자 환경의 실행 역할로 바꿉니다.

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## `kubeconfig` 생성
<a name="eksctl-kube-config"></a>

다음 명령을 실행해 `kubeconfig`을 생성합니다.

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

`update-kubeconfig` 실행 시 특정 프로필을 사용한 경우 kube\$1config.yaml 파일에 추가된 `env:` 섹션을 제거해야 Amazon MWAA에서 제대로 작동할 수 있습니다. 이렇게 하려면 파일에서 다음을 삭제한 다음 저장합니다.

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## DAG 생성
<a name="eksctl-create-dag"></a>

다음 코드 예제를 사용하여 DAG에 대한 `mwaa_pod_example.py`와 같은 Python 파일을 만듭니다.

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## Amazon S3 버킷에 DAG 및 `kube_config.yaml` 추가
<a name="eksctl-dag-bucket"></a>

생성한 DAG와 `kube_config.yaml` 파일을 Amazon MWAA 환경용 Amazon S3 버킷에 넣습니다. Amazon S3 콘솔 또는 AWS Command Line Interface를 사용하여 파일을 버킷에 넣을 수 있습니다.

## 예제 활성화 및 트리거
<a name="eksctl-trigger-pod"></a>

Apache Airflow에서 예제를 활성화한 다음 트리거합니다.

성공적으로 실행되고 완료되면 다음 명령을 사용하여 포드를 확인합니다.

```
kubectl get pods -n mwaa
```

출력은 다음과 비슷합니다.

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

이후 다음 명령을 사용하여 포드의 출력을 확인할 수 있습니다. 이름 값을 이전 명령에서 반환된 값으로 바꿉니다.

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# `ECSOperator`를 사용하여 Amazon ECS에 연결
<a name="samples-ecs-operator"></a>

이 주제에서는 `ECSOperator`를 사용하여 Amazon MWAA에서 Amazon Elastic Container Service(Amazon ECS) 컨테이너에 연결하기 위한 샘플 코드를 설명합니다. 다음 단계에서는 환경의 실행 역할에 필요한 권한을 추가하고, CloudFormation 템플릿을 사용하여 Amazon ECS Fargate 클러스터를 생성하고, 마지막으로 새 클러스터에 연결되는 DAG를 생성하고 업로드합니다.

**Topics**
+ [버전](#samples-ecs-operator-version)
+ [사전 조건](#samples-ecs-operator-prereqs)
+ [권한](#samples-ecs-operator-permissions)
+ [Amazon ECS 클러스터를 생성 ](#create-cfn-template)
+ [코드 샘플](#samples-ecs-operator-code)

## 버전
<a name="samples-ecs-operator-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-ecs-operator-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).

## 권한
<a name="samples-ecs-operator-permissions"></a>
+ 환경의 실행 역할에는 Amazon ECS에서 작업을 실행할 수 있는 권한이 필요합니다. [AWS AmazonECS\$1FullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) 관리형 정책을 실행 역할에 연결하거나 다음 정책을 생성하여 실행 역할에 연결할 수 있습니다.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ Amazon ECS에서 작업을 실행하는 데 필요한 권한을 추가하는 것 외에도 다음에 나열된 바와 같이 Amazon MWAA 실행 역할에서 CloudWatch Logs 정책 설명을 수정하여 Amazon ECS 작업 로그 그룹에 대한 액세스를 허용해야 합니다. Amazon ECS 로그 그룹은의 CloudFormation 템플릿에 의해 생성됩니다[Amazon ECS 클러스터를 생성 ](#create-cfn-template).

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

Amazon MWAA 실행 역할 및 정책 연결 방법에 대한 자세한 내용은 [실행 역할](mwaa-create-role.md) 섹션을 참조하세요.

## Amazon ECS 클러스터를 생성 
<a name="create-cfn-template"></a>

다음 CloudFormation 템플릿을 사용하여 Amazon MWAA 워크플로에 사용할 Amazon ECS Fargate 클러스터를 빌드합니다. 자세한 내용을 알아보려면 *Amazon Elastic Container Service 개발자 안내서*의 [작업 정의 생성](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition)을 참조하세요.

1. 다음 코드로 JSON 파일을 생성하고 `ecs-mwaa-cfn.json`으로 저장합니다.

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. 명령 프롬프트에서 다음 AWS CLI 명령을 사용하여 새 스택을 생성합니다. `SecurityGroups` 값과 `SubnetIds` 값을 Amazon MWAA 환경의 보안 그룹 및 서브넷의 값으로 바꿔야 합니다.

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   또는 다음 쉘 스크립트를 사용할 수 있습니다. 스크립트는 `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI 명령을 사용하여 환경의 보안 그룹 및 서브넷에 필요한 값을 검색한 다음 그에 따라 스택을 생성합니다. 스크립트를 실행하려면 다음을 수행합니다.

   1. 스크립트를 복사하고 CloudFormation 템플릿과 동일한 디렉터리`ecs-stack-helper.sh`에 로 저장합니다.

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. 다음 명령을 사용하여 스크립트를 실행합니다. `environment-name` 및 `stack-name`을(를) 자신의 정보로 대체합니다.

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   성공하면 새 CloudFormation 스택 ID를 보여주는 다음 출력을 참조합니다.

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

 CloudFormation 스택이 완료되고 Amazon ECS 리소스를 AWS 프로비저닝한 후에는 DAG를 생성하고 업로드할 준비가 된 것입니다.

## 코드 샘플
<a name="samples-ecs-operator-code"></a>

1. 명령 프롬프트를 열고 DAG 코드가 저장된 디렉터리로 이동합니다. 예제:

   ```
   cd dags
   ```

1. 다음 코드 샘플의 내용을 복사하여 로컬에 `mwaa-ecs-operator.py`로 저장한 다음 새 DAG를 Amazon S3에 업로드합니다.

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**참고**  
예제 DAG에서는 `awslogs_group`의 경우 Amazon ECS 작업 로그 그룹의 이름을 사용하여 로그 그룹을 수정해야 할 수 있습니다. 예제에서는 `mwaa-ecs-zero`이라는 이름의 로그 그룹을 가정합니다. `awslogs_stream_prefix`의 경우 Amazon ECS 작업 로그 스트림 접두사를 사용합니다. 이 예제에서는 로그 스트림 접두사, `ecs`를 가정합니다.

1.  다음 AWS CLI 명령을 실행하여 DAG를 환경의 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 성공하면 `ecs_fargate_dag` DAG의 `ecs_operator_task`에 대한 작업 로그에 다음과 비슷한 출력을 얻게 됩니다.

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# Amazon MWAA에서 dbt 사용
<a name="samples-dbt"></a>

이 주제에서는 Amazon MWAA으로 dbt 및 Postgres를 사용하는 방법을 보여줍니다. 다음 단계에서는 필수 종속성을 `requirements.txt`에 추가하고 샘플 dbt 프로젝트를 환경의 Amazon S3 버킷에 업로드합니다. 그런 다음 샘플 DAG를 사용하여 Amazon MWAA가 종속성을 설치했는지 확인하고 마지막으로 `BashOperator`를 사용하여 dbt 프로젝트를 실행합니다.

**Topics**
+ [버전](#samples-dbt-version)
+ [사전 조건](#samples-dbt-prereqs)
+ [종속성](#samples-dbt-dependencies)
+ [Amazon S3에 dbt 프로젝트를 업로드합니다.](#samples-dbt-upload-project)
+ [DAG를 사용하여 dbt 종속성 설치를 확인합니다.](#samples-dbt-test-dependencies)
+ [DAG를 사용하여 dbt 프로젝트를 실행합니다.](#samples-dbt-run-project)

## 버전
<a name="samples-dbt-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-dbt-prereqs"></a>

다음 단계를 완료하려면 먼저 다음을 수행해야 합니다.
+ Apache Airflow v2.2.2를 사용하는 [Amazon MWAA 환경](get-started.md). 이 샘플은 v2.2.2로 작성 및 테스트되었습니다. 다른 Apache Airflow 버전과 함께 사용하려면 샘플을 수정해야 할 수 있습니다.
+ 샘플 dbt 프로젝트. Amazon MWAA에서 dbt 사용을 시작하려면 포크를 생성하고 dbt-labs GitHub 리포지토리에서 [dbt 스타터 프로젝트](https://github.com/dbt-labs/dbt-starter-project)를 복제하면 됩니다.

## 종속성
<a name="samples-dbt-dependencies"></a>

Amazon MWAA에서 dbt를 사용하려면 환경에 다음 시작 스크립트를 추가합니다. 자세한 내용은 [Amazon MWAA에서 시작 스크립트 사용](using-startup-script.md)을 참조하세요.

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

다음 섹션에서는 dbt 프로젝트 디렉터리를 Amazon S3에 업로드하고 Amazon MWAA가 필수 dbt 종속성을 성공적으로 설치했는지 여부를 검증하는 DAG를 실행합니다.

## Amazon S3에 dbt 프로젝트를 업로드합니다.
<a name="samples-dbt-upload-project"></a>

Amazon MWAA 환경에서 dbt 프로젝트를 사용할 수 있으려면 전체 프로젝트 디렉터리를 환경 `dags` 폴더에 업로드하면 됩니다. 환경이 업데이트되면 Amazon MWAA는 dbt 디렉터리를 로컬 `usr/local/airflow/dags/` 폴더에 다운로드합니다.

**Amazon S3에 dbt 프로젝트를 업로드하려면**

1. dbt 스타터 프로젝트를 복제한 디렉터리로 이동합니다.

1. 다음 Amazon S3 AWS CLI 명령을 실행하여 `--recursive` 파라미터를 사용하여 프로젝트의 콘텐츠를 환경의 `dags` 폴더에 재귀적으로 복사합니다. 이 명령은 모든 dbt 프로젝트에 사용할 수 있는 `dbt`라고 하는 하위 디렉터리를 생성합니다. 하위 디렉터리가 이미 있는 경우 프로젝트 파일은 기존 디렉터리에 복사되며 새 디렉터리는 생성되지 않습니다. 또한 이 명령은 이 특정 스타터 프로젝트의 `dbt` 디렉터리 내에 하위 디렉터리를 생성합니다.

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   프로젝트 하위 디렉터리에 다른 이름을 사용하여 상위 `dbt` 디렉터리 내에 여러 dbt 프로젝트를 구성할 수 있습니다.

## DAG를 사용하여 dbt 종속성 설치를 확인합니다.
<a name="samples-dbt-test-dependencies"></a>

다음 DAG는 `BashOperator` 및 bash 명령을 사용하여 Amazon MWAA가 `requirements.txt`에 지정된 dbt 종속성을 성공적으로 설치했는지 확인합니다.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

작업 로그에 액세스하고 dbt 및 해당 종속성이 설치되었는지 확인하려면 다음을 수행합니다.

1. Amazon MWAA 콘솔로 이동한 다음 사용 가능한 환경 목록에서 **Open Airflow UI**를 선택합니다.

1. Apache Airflow UI의 목록에서 `dbt-installation-test` DAG를 찾은 다음 `Last Run` 열에서 날짜를 선택하여 마지막으로 성공한 작업을 엽니다.

1. **그래프 보기**를 사용하여 `bash_command` 작업을 선택하여 작업 인스턴스 세부 정보를 엽니다.

1. **로그**를 선택하여 작업 로그를 연 다음, 로그에 `requirements.txt` 지정된 dbt 버전이 성공적으로 나열되는지 확인합니다.

## DAG를 사용하여 dbt 프로젝트를 실행합니다.
<a name="samples-dbt-run-project"></a>

다음 DAG는 `BashOperator`를 사용하여 Amazon S3에 업로드한 dbt 프로젝트를 로컬 `usr/local/airflow/dags/` 디렉터리에서 쓰기 액세스 가능한 `/tmp` 디렉터리로 복사한 다음 dbt 프로젝트를 실행합니다. bash 명령은 제목이 `dbt-starter-project`인 스타터 dbt 프로젝트를 가정합니다. 프로젝트 디렉터리 이름에 따라 디렉터리 이름을 수정합니다.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS 블로그 및 자습서
<a name="samples-blogs-tutorials"></a>
+ [Apache Airflow v2.x용 Amazon EKS 및 Amazon MWAA 작업](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)