기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
CloudWatch에서 DAG를 사용하여 사용자 지정 지표 작성
다음 코드 예제를 사용하여 PythonOperator
를 실행하는 방향성 비순환 그래프(DAG)를 작성하여 Amazon MWAA 환경에 대한 OS 수준 지표를 검색할 수 있습니다. 그런 다음 DAG는 데이터를 Amazon CloudWatch에 사용자 지정 지표로 게시합니다.
사용자 지정 OS 수준 지표를 통해 환경 작업자가 가상 메모리 및 CPU와 같은 리소스를 어떻게 활용하고 있는지 추가로 파악할 수 있습니다. 이 정보를 사용하여 워크로드에 가장 적합한 환경 클래스를 선택할 수 있습니다.
버전
-
이 페이지의 코드 예제는 Python 3.10
의 Apache Airflow v2에서 사용할 수 있습니다.
필수 조건
이 페이지에서 코드 예제를 사용하려면 다음이 필요합니다.
권한
-
이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.
의존성
-
이 페이지의 코드 예제를 사용하는 데 추가 종속성이 필요하지 않습니다.
코드 예제
-
명령 프롬프트에서 DAG 코드가 저장된 폴더로 이동합니다. 예:
cd dags
-
다음 코드 예제의 콘텐츠를 복사하고 로컬에서
dag-custom-metrics.py
로 저장합니다. 환경 이름을MWAA-ENV-NAME
로 변경합니다.from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from datetime import datetime import os,json,boto3,psutil,socket def publish_metric(client,name,value,cat,unit='None'): environment_name = os.getenv("MWAA_ENV_NAME") value_number=float(value) hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print('writing value',value_number,'to metric',name) response = client.put_metric_data( Namespace='MWAA-Custom', MetricData=[ { 'MetricName': name, 'Dimensions': [ { 'Name': 'Environment', 'Value': environment_name }, { 'Name': 'Category', 'Value': cat }, { 'Name': 'Host', 'Value': ip_address }, ], 'Timestamp': datetime.now(), 'Value': value_number, 'Unit': unit }, ] ) print(response) return response def python_fn(**kwargs): client = boto3.client('cloudwatch') cpu_stats = psutil.cpu_stats() print('cpu_stats', cpu_stats) virtual = psutil.virtual_memory() cpu_times_percent = psutil.cpu_times_percent(interval=0) publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes') publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes') publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes') publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes') publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes') publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes') publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent') publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent') publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent') publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent') return "OK" with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag: t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
-
다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
DAG가 성공적으로 실행되면 Apache Airflow 로그에 다음과 비슷한 콘텐츠가 표시됩니다.
[2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0) [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}} [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}} [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}} ... [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446 [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0