Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Invocare DAGs con una funzione Lambda
Il seguente esempio di codice utilizza una AWS Lambdafunzione per ottenere un CLI token Apache Airflow e richiamare un grafo aciclico diretto () DAG in un ambiente Amazon. MWAA
Versione
Prerequisiti
Per utilizzare questo esempio di codice, devi:
-
Usa la modalità di accesso alla rete pubblica per il tuo MWAAambiente Amazon.
-
Disponi di una funzione Lambda che utilizzi l'ultimo runtime di Python.
Nota
Se la funzione Lambda e il tuo MWAA ambiente Amazon coincidonoVPC, puoi utilizzare questo codice su una rete privata. Per questa configurazione, il ruolo di esecuzione della funzione Lambda richiede l'autorizzazione per chiamare l'operazione Amazon Elastic Compute Cloud (AmazonEC2). CreateNetworkInterface API Puoi fornire questa autorizzazione utilizzando la policy AWSLambdaVPCAccessExecutionRole
Autorizzazioni
Per utilizzare l'esempio di codice in questa pagina, il ruolo di esecuzione del tuo MWAA ambiente Amazon deve avere accesso per eseguire l'airflow:CreateCliToken
azione. Puoi fornire questa autorizzazione utilizzando la politica AmazonMWAAAirflowCliAccess
AWS gestita:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }
Per ulteriori informazioni, consulta CLIPolitica Apache Airflow: A mazonMWAAAirflow CliAccess.
Dipendenze
-
Per utilizzare questo esempio di codice con Apache Airflow v2, non sono richieste dipendenze aggiuntive. Il codice utilizza l'installazione di base di Apache Airflow v2
nell'ambiente in uso.
esempio di codice
-
Apri la console all' AWS Lambda indirizzo. https://console.aws.amazon.com/lambda/
-
Scegli la tua funzione Lambda dall'elenco Funzioni.
-
Nella pagina della funzione, copia il codice seguente e sostituiscilo con i nomi delle tue risorse:
-
YOUR_ENVIRONMENT_NAME
— Il nome del tuo MWAA ambiente Amazon. -
YOUR_DAG_NAME
— Il nome del DAG file che desideri richiamare.
import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' client = boto3.client('mwaa') def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
-
-
Seleziona Deploy (Implementa).
-
Scegli Test per richiamare la tua funzione utilizzando la console Lambda.
-
Per verificare che Lambda abbia richiamato correttamente il tuoDAG, usa la MWAA console Amazon per accedere all'interfaccia utente Apache Airflow del tuo ambiente, quindi procedi come segue:
-
Nella DAGspagina, individua il nuovo obiettivo DAG nell'elenco di. DAGs
-
In Ultima esecuzione, controlla il timestamp dell'ultima DAG esecuzione. Questo timestamp dovrebbe corrispondere molto da vicino al timestamp più recente utilizzato nell'altro ambiente.
invoke_dag
-
In Attività recenti, verifica che l'ultima esecuzione sia andata a buon fine.
-