Invocation DAGs avec une fonction Lambda - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Invocation DAGs avec une fonction Lambda

L'exemple de code suivant utilise une AWS Lambdafonction pour obtenir un CLI jeton Apache Airflow et invoquer un graphe acyclique dirigé (DAG) dans un environnement AmazonMWAA.

Version

  • Vous pouvez utiliser l'exemple de code présenté sur cette page avec Apache Airflow v2 en Python 3.10.

Prérequis

Pour utiliser cet exemple de code, vous devez :

Note

Si la fonction Lambda et votre MWAA environnement Amazon sont identiquesVPC, vous pouvez utiliser ce code sur un réseau privé. Pour cette configuration, le rôle d'exécution de la fonction Lambda doit être autorisé à appeler l'opération Amazon Elastic Compute Cloud EC2 (Amazon) CreateNetworkInterfaceAPI. Vous pouvez fournir cette autorisation à l'aide de la politique AWSLambdaVPCAccessExecutionRole AWS gérée.

Autorisations

Pour utiliser l'exemple de code présenté sur cette page, le rôle d'exécution de votre MWAA environnement Amazon doit avoir accès pour effectuer l'airflow:CreateCliTokenaction. Vous pouvez fournir cette autorisation à l'aide de la politique AmazonMWAAAirflowCliAccess AWS gérée :

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

Pour de plus amples informations, veuillez consulter CLIPolitique Apache Airflow : A mazonMWAAAirflow CliAccess.

Dépendances

Exemple de code

  1. Ouvrez la AWS Lambda console à l'adresse https://console.aws.amazon.com/lambda/.

  2. Choisissez votre fonction Lambda dans la liste des fonctions.

  3. Sur la page des fonctions, copiez le code suivant et remplacez-le par le nom de vos ressources :

    • YOUR_ENVIRONMENT_NAME— Le nom de votre MWAA environnement Amazon.

    • YOUR_DAG_NAME— Le nom de la DAG personne que vous souhaitez invoquer.

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. Choisissez Deploy (Déployer).

  5. Choisissez Test pour appeler votre fonction à l'aide de la console Lambda.

  6. Pour vérifier que votre Lambda a correctement invoqué votreDAG, utilisez la MWAA console Amazon pour accéder à l'interface utilisateur Apache Airflow de votre environnement, puis procédez comme suit :

    1. Sur la DAGspage, localisez votre nouvelle cible DAG dans la liste desDAGs.

    2. Sous Dernière exécution, vérifiez l'horodatage de la dernière DAG exécution. Cet horodatage doit correspondre étroitement à l'horodatage le plus récent de votre autre invoke_dag environnement.

    3. Sous Tâches récentes, vérifiez que la dernière exécution a été réussie.