Usando a DAG para escrever métricas personalizadas em CloudWatch - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usando a DAG para escrever métricas personalizadas em CloudWatch

Você pode usar o exemplo de código a seguir para escrever um gráfico acíclico direcionado (DAG) que executa um PythonOperator para recuperar métricas no nível do sistema operacional para um ambiente Amazon. MWAA DAGEm seguida, publica os dados como métricas personalizadas na Amazon CloudWatch.

Métricas personalizadas em nível de sistema operacional fornecem visibilidade adicional sobre como seus funcionários do ambiente estão utilizando recursos como memória virtual e. CPU Você pode usar essas informações para selecionar a classe de ambiente mais adequada à sua workload.

Version (Versão)

Pré-requisitos

Para usar o código de exemplo nesta página, você precisará de:

Permissões

  • Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

Dependências

  • Nenhuma dependência adicional é necessária para usar o exemplo de código desta página.

Exemplo de código

  1. No prompt de comando, navegue até a pasta em que seu DAG código está armazenado. Por exemplo:

    cd dags
  2. Copie o conteúdo do exemplo de código a seguir e salve-o localmente como dag-custom-metrics.py. Substitua MWAA-ENV-NAME pelo nome do seu ambiente.

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from datetime import datetime import os,json,boto3,psutil,socket def publish_metric(client,name,value,cat,unit='None'): environment_name = os.getenv("MWAA_ENV_NAME") value_number=float(value) hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print('writing value',value_number,'to metric',name) response = client.put_metric_data( Namespace='MWAA-Custom', MetricData=[ { 'MetricName': name, 'Dimensions': [ { 'Name': 'Environment', 'Value': environment_name }, { 'Name': 'Category', 'Value': cat }, { 'Name': 'Host', 'Value': ip_address }, ], 'Timestamp': datetime.now(), 'Value': value_number, 'Unit': unit }, ] ) print(response) return response def python_fn(**kwargs): client = boto3.client('cloudwatch') cpu_stats = psutil.cpu_stats() print('cpu_stats', cpu_stats) virtual = psutil.virtual_memory() cpu_times_percent = psutil.cpu_times_percent(interval=0) publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes') publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes') publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes') publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes') publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes') publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes') publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent') publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent') publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent') publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent') return "OK" with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag: t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
  3. Execute o AWS CLI comando a seguir para copiar o DAG para o bucket do seu ambiente e, em seguida, acione-o DAG usando a interface do usuário do Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Se a DAG execução for bem-sucedida, você deverá ver algo semelhante ao seguinte em seus registros do Apache Airflow:

    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    ...
    [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
    [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0