Lambda 함수를 사용한 DAG 호출 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Lambda 함수를 사용한 DAG 호출

다음 코드 예제는 Amazon MWAA 환경에서 AWS Lambda 함수를 사용하여 Apache Airflow CLI 토큰을 가져오고 방향성 비순환 그래프(DAG)를 호출합니다.

버전

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

사전 조건

이 코드 예제를 활용하려면 다음과 같이 해야 합니다.

참고

Lambda 함수와 Amazon MWAA 환경이 동일한 VPC에 있는 경우, 프라이빗 네트워크에서 이 코드를 사용할 수 있습니다. 이 구성의 경우 Lambda 함수의 실행 역할에 Amazon Elastic Compute Cloud(Amazon EC2) CreateNetworkInterface API 작업을 호출할 수 있는 권한이 필요합니다. AWSLambdaVPCAccessExecutionRole AWS 관리형 정책을 사용하여 이 권한을 제공할 수 있습니다.

권한

이 페이지의 코드 예제를 사용하려면 Amazon MWAA 환경의 실행 역할에 airflow:CreateCliToken 작업을 수행할 수 있는 액세스 권한이 필요합니다. AmazonMWAAAirflowCliAccess AWS 관리형 정책을 사용하여 이 권한을 제공할 수 있습니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

자세한 내용은 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccessWord 섹션을 참조하십시오.

의존성

  • 이 코드 예제를 Apache Airflow v2와 함께 사용하려면 추가 종속성이 필요하지 않습니다. 코드는 사용자 환경에 설치된 Apache Airflow v2 기본 설치를 사용합니다.

코드 예제

  1. https://console.aws.amazon.com/lambda/에서 AWS Lambda 콘솔을 엽니다.

  2. 함수 목록에서 Lambda 함수를 선택합니다.

  3. 함수 페이지에서 다음 코드를 복사하고 다음을 리소스 이름으로 바꿉니다.

    • YOUR_ENVIRONMENT_NAME – Amazon MWAA 환경의 이름입니다.

    • YOUR_DAG_NAME – 호출하려는 DAG의 이름입니다.

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. 배포를 선택합니다.

  5. Lambda 콘솔을 사용하여 함수를 호출하려면 테스트를 선택합니다.

  6. Lambda가 DAG를 성공적으로 호출했는지 확인하려면 Amazon MWAA를 사용하여 환경의 Apache Airflow UI로 이동한 후 다음을 수행합니다.

    1. DAG 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.

    2. 마지막 실행에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서 invoke_dag에 대한 최신 타임스탬프와 거의 일치해야 합니다.

    3. 최근 작업에서 마지막 실행이 성공했는지 확인합니다.