Invocare DAGs con una funzione Lambda - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Invocare DAGs con una funzione Lambda

Il seguente esempio di codice utilizza una AWS Lambdafunzione per ottenere un CLI token Apache Airflow e richiamare un grafo aciclico diretto () DAG in un ambiente Amazon. MWAA

Versione

Prerequisiti

Per utilizzare questo esempio di codice, devi:

Nota

Se la funzione Lambda e il tuo MWAA ambiente Amazon coincidonoVPC, puoi utilizzare questo codice su una rete privata. Per questa configurazione, il ruolo di esecuzione della funzione Lambda richiede l'autorizzazione per chiamare l'operazione Amazon Elastic Compute Cloud (AmazonEC2). CreateNetworkInterface API Puoi fornire questa autorizzazione utilizzando la policy AWSLambdaVPCAccessExecutionRole AWS gestita.

Autorizzazioni

Per utilizzare l'esempio di codice in questa pagina, il ruolo di esecuzione del tuo MWAA ambiente Amazon deve avere accesso per eseguire l'airflow:CreateCliTokenazione. Puoi fornire questa autorizzazione utilizzando la politica AmazonMWAAAirflowCliAccess AWS gestita:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

Per ulteriori informazioni, consulta CLIPolitica Apache Airflow: A mazonMWAAAirflow CliAccess.

Dipendenze

  • Per utilizzare questo esempio di codice con Apache Airflow v2, non sono richieste dipendenze aggiuntive. Il codice utilizza l'installazione di base di Apache Airflow v2 nell'ambiente in uso.

esempio di codice

  1. Apri la console all' AWS Lambda indirizzo. https://console.aws.amazon.com/lambda/

  2. Scegli la tua funzione Lambda dall'elenco Funzioni.

  3. Nella pagina della funzione, copia il codice seguente e sostituiscilo con i nomi delle tue risorse:

    • YOUR_ENVIRONMENT_NAME— Il nome del tuo MWAA ambiente Amazon.

    • YOUR_DAG_NAME— Il nome del DAG file che desideri richiamare.

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. Seleziona Deploy (Implementa).

  5. Scegli Test per richiamare la tua funzione utilizzando la console Lambda.

  6. Per verificare che Lambda abbia richiamato correttamente il tuoDAG, usa la MWAA console Amazon per accedere all'interfaccia utente Apache Airflow del tuo ambiente, quindi procedi come segue:

    1. Nella DAGspagina, individua il nuovo obiettivo DAG nell'elenco di. DAGs

    2. In Ultima esecuzione, controlla il timestamp dell'ultima DAG esecuzione. Questo timestamp dovrebbe corrispondere molto da vicino al timestamp più recente utilizzato nell'altro ambiente. invoke_dag

    3. In Attività recenti, verifica che l'ultima esecuzione sia andata a buon fine.