알림, 배포 및 예약 설정
이 주제에서는 AWS Glue Data Quality에 대한 알림, 배포 및 예약을 설정하는 방법을 설명합니다.
목차
Amazon EventBridge 통합에서 알림 설정
AWS Glue Data Quality는 데이터 품질 규칙 세트 평가 실행 완료 시 생성되는 EventBridge 이벤트의 게시를 지원합니다. 이를 통해 데이터 품질 규칙 실패 시 알림을 쉽게 설정할 수 있습니다.
다음은 데이터 카탈로그에서 데이터 품질 규칙 세트를 평가할 때 발생하는 샘플 이벤트입니다. 이 정보를 사용하여 Amazon EventBridge에서 사용할 수 있는 데이터를 검토할 수 있습니다. 추가 API 직접 호출을 통해 자세한 내용을 확인할 수 있습니다. 예를 들어 결과 ID로 get_data_quality_result
API를 직접 호출하여 특정 실행의 세부 정보를 가져옵니다.
{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_DATA_CATALOG", "runId":"dqrun-12334567890", "databaseName": "db-123", "tableName": "table-123", "catalogId": "123456789012" }, "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00, "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }
다음은 AWS Glue ETL 또는 AWS Glue Studio 노트북에서 데이터 품질 규칙 세트를 평가할 때 게시되는 샘플 이벤트입니다.
{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_JOB", "jobId": "jr-12334567890", "jobName": "dq-eval-job-1234", "evaluationContext": "", } "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00 "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }
Data Catalog와 ETL 작업 모두에서 데이터 품질 평가를 실행하는 경우, 기본적으로 선택되는 Amazon CloudWatch에 지표 게시 옵션을 선택한 상태로 유지해야 EventBridge에 게시할 수 있습니다.
EventBridge 알림 설정

생성된 이벤트를 수신하고 대상을 정의하려면 Amazon EventBridge 규칙을 구성해야 합니다. 규칙을 생성하려면:
Amazon EventBridge 콘솔을 엽니다.
탐색 표시줄의 버스 섹션에서 규칙을 선택합니다.
[Create Rule]을 선택합니다.
규칙 세부 정보 정의에서:
이름에
myDQRule
을 입력합니다.설명을 입력합니다(선택 사항).
이벤트 버스의 경우 이벤트 버스를 선택합니다. 이벤트 버스가 아직 없는 경우 기본값으로 둡니다.
규칙 유형에서 이벤트 패턴이 있는 규칙을 선택하고 다음을 선택합니다.
이벤트 패턴 작성에서:
이벤트 소스에서 AWS 이벤트 또는 EventBridge 파트너 이벤트를 선택합니다.
샘플 이벤트 섹션은 건너뜁니다.
생성 방법으로 패턴 양식 사용을 선택합니다.
이벤트 패턴의 경우:
이벤트 소스에 대한 AWS 서비스를 선택합니다.
AWS 서비스에 대한 Glue Data Quality를 선택합니다.
이벤트 유형에 대해 사용할 수 있는 데이터 품질 평가 결과를 선택합니다.
특정 상태에 대해 실패를 선택합니다. 그러면 다음과 비슷한 이벤트 패턴이 나타납니다.
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "state": ["FAILED"] } }
더 많은 구성 옵션은 이벤트 패턴의 추가 구성 옵션 섹션을 참조하세요.
대상 선택에서:
대상 유형에 대해 AWS 서비스를 선택합니다.
대상 선택 드롭다운을 사용하여 연결하려는 AWS 서비스(SNS, Lambda, SQS 등)를 선택하고 다음을 선택합니다.
태그 구성에서 새 태그 추가를 클릭하여 선택적 태그를 추가하고 다음을 선택합니다.
모든 선택 항목의 요약 페이지가 표시됩니다. 하단에서 규칙 생성을 선택합니다.
이벤트 패턴의 추가 구성 옵션
성공 또는 실패 시 이벤트 필터링 외에도 다양한 파라미터에서 이벤트를 추가로 필터링할 수 있습니다.
이렇게 하려면 이벤트 패턴 섹션으로 이동한 다음 패턴 편집을 선택하여 추가 파라미터를 지정합니다. 이벤트 패턴의 필드는 대소문자를 구분합니다. 다음은 이벤트 패턴 구성에 대한 예제입니다.
특정 규칙 세트를 평가하는 특정 테이블의 이벤트를 캡처하려면 다음 유형의 패턴을 사용합니다.
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_DATA_CATALOG"], "databaseName": "db-123", "tableName": "table-123", }, "rulesetNames": ["ruleset1", "ruleset2"] "state": ["FAILED"] } }
ETL 환경에서 특정 작업의 이벤트를 캡처하려면 다음 유형의 패턴을 사용합니다.
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_JOB"], "jobName": ["dq_evaluation_job1", "dq_evaluation_job2"] }, "state": ["FAILED"] } }
점수가 특정 임계값(예: 70%) 미만인 이벤트를 캡처하려면:
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "score": [{ "numeric": ["<=", 0.7] }] } }
알림을 이메일 형식으로 지정
비즈니스 팀에 올바른 형식의 이메일 알림을 보내야 하는 경우가 있습니다. Amazon EventBridge와 AWS Lambda를 사용하면 됩니다.

다음 샘플 코드를 사용하여 데이터 품질 알림 형식을 지정하여 이메일을 생성할 수 있습니다.
import boto3 import json from datetime import datetime sns_client = boto3.client('sns') glue_client = boto3.client('glue') sns_topic_arn = 'arn:aws:sns:<region-code>:<account-id>:<sns-topic-name>' def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info sns_client.publish( TopicArn=sns_topic_arn, Message=message_text, Subject=subject_text ) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }
CloudWatch 통합에서 경보 및 알림 설정
Amazon EventBridge를 사용하여 데이터 품질 알림을 설정하는 것이 좋습니다. Amazon EventBridge는 고객에게 알림을 보낼 때 일회성 설정을 요구하기 때문입니다. 하지만 익숙하기 때문에 Amazon CloudWatch를 선호하는 고객도 있습니다. 이러한 고객을 위해 Amazon CloudWatch와의 통합을 제공합니다.
각 AWS Glue Data Quality 평가에서는 데이터 품질 실행당 glue.data.quality.rules.passed
(통과한 규칙의 수를 나타냄) 및 glue.data.quality.rules.failed
(실패한 규칙의 수를 나타냄)와 같은 지표 페어를 생성합니다. 이 생성된 지표를 사용하여 해당 데이터 품질 실행이 임계값 아래로 떨어질 경우 사용자에게 알리는 경보를 생성할 수 있습니다. Amazon SNS 알림을 통해 이메일을 보내는 경보 설정을 시작하려면 아래 단계를 수행합니다.
Amazon SNS 알림을 통해 이메일을 보내는 경보 설정을 시작하려면 아래 단계를 수행합니다.
Amazon CloudWatch 콘솔을 엽니다.
지표 아래에서 모든 지표를 선택합니다. Glue Data Quality라는 사용자 지정 네임스페이스 아래에 추가 네임스페이스가 표시됩니다.
참고
AWS Glue 데이터 품질 실행을 시작할 때 Amazon CloudWatch에 지표 게시 확인란이 활성화되어 있는지 확인합니다. 그렇지 않으면 특정 실행에 대한 지표가 Amazon CloudWatch에 게시되지 않습니다.
Glue Data Quality
네임스페이스 아래에서 테이블 및 규칙 세트별로 지표가 생성되었음을 알 수 있습니다. 이 주제의 목적에 따라 이 값이 1을 초과하는 경우glue.data.quality.rules.failed
규칙 및 경보를 사용합니다(즉, 실패한 규칙 평가 수가 1보다 크면 알림을 제공하려고 함).경보를 생성하려면 경보 아래에서 모든 경보를 선택합니다.
경보 생성을 선택하세요.
지표 선택을 선택하세요.
생성한 테이블에 해당하는
glue.data.quality.rules.failed
지표를 선택한 다음 지표 선택을 선택합니다.지표 및 조건 지정 탭의 지표 섹션 아래에서:
Statistic(통계)에서 Sum(합계)를 선택합니다.
기간에서 1분을 선택합니다.
조건 섹션에서:
임계값 유형(Threshold type)에서 정적(Static)을 선택합니다.
glue.data.quality.rules.failed 조건이 다음과 같을 때마다...에서 이상을 선택합니다.
기준...에서 임계값으로 1을 입력합니다.
이렇게 선택하면
glue.data.quality.rules.failed
지표가 1 이상의 값을 생성하는 경우 경보가 트리거됩니다. 하지만 데이터가 없으면 허용 가능한 값으로 간주합니다.다음을 선택합니다.
작업 구성에서:
경보 상태 트리거에서 경보 내를 선택합니다.
다음 SNS 주제로 알림 보내기 섹션에서 새 주제를 생성하여 새 SNS 주제를 통해 알림 보내기를 선택합니다.
알림을 수신할 이메일 엔드포인트에 이메일 주소를 입력합니다. 그리고 주제 생성을 클릭합니다.
Next(다음)를 선택합니다.
경보 이름에
myFirstDQAlarm
을 입력한 후 다음을 선택합니다.모든 선택 항목의 요약 페이지가 표시됩니다. 하단에서 경보 생성을 선택합니다.
이제 Amazon CloudWatch 경보 대시보드에서 경보가 생성되는 것을 볼 수 있습니다.
데이터 품질 결과를 쿼리하여 대시보드 구축
대시보드를 구축하여 데이터 품질 결과를 표시할 수 있습니다. 이렇게 하는 방법은 두 가지입니다.
다음 코드를 사용하여 Amazon S3에 데이터를 쓰도록 Amazon EventBridge 설정:
import boto3 import json from datetime import datetime s3_client = boto3.client('s3') glue_client = boto3.client('glue') s3_bucket = 's3-bucket-name' def write_logs(log_metadata): try: filename = datetime.now().strftime("%m%d%Y%H%M%S") + ".json" key_opts = { 'year': datetime.now().year, 'month': "{:02d}".format(datetime.now().month), 'day': "{:02d}".format(datetime.now().day), 'filename': filename } s3key = "gluedataqualitylogs/year={year}/month={month}/day={day}/{filename}".format(**key_opts) s3_client.put_object(Bucket=s3_bucket, Key=s3key, Body=json.dumps(log_metadata)) except Exception as e: print(f'Error writing logs to S3: {e}') def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info write_logs(log_metadata) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }
Amazon S3에 데이터를 쓴 후 AWS Glue 크롤러를 사용하여 Athena에 등록하고 테이블을 쿼리할 수 있습니다.
데이터 품질 평가 중에 Amazon S3 위치 구성:
AWS Glue 데이터 카탈로그 또는 AWS Glue ETL에서 데이터 품질 작업을 실행할 때 Amazon S3 위치를 제공하여 데이터 품질 결과를 Amazon S3에 쓸 수 있습니다. 아래 구문을 통해 데이터 품질 결과를 읽기 위해 대상을 참조하여 테이블을 생성할 수 있습니다.
CREATE EXTERNAL TABLE
및 MSCK REPAIR TABLE
쿼리는 별도로 실행해야 합니다.
CREATE EXTERNAL TABLE <my_table_name>( catalogid string, databasename string, tablename string, dqrunid string, evaluationstartedon timestamp, evaluationcompletedon timestamp, rule string, outcome string, failurereason string, evaluatedmetrics string) PARTITIONED BY ( `year` string, `month` string, `day` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://glue-s3-dq-bucket-us-east-2-results/' TBLPROPERTIES ( 'classification'='json', 'compressionType'='none', 'typeOfData'='file');
MSCK REPAIR TABLE <my_table_name>;
위 테이블을 생성한 후에 Amazon Athena를 사용하여 분석 쿼리를 실행할 수 있습니다.
AWS CloudFormation을 사용하여 데이터 품질 규칙 배포
AWS CloudFormation을 사용하여 데이터 품질 규칙을 생성할 수 있습니다. 자세한 내용은 AWS Glue에 대한 AWS CloudFormation을 참조하세요.
데이터 품질 규칙 예약
다음 방법을 사용하여 데이터 품질 규칙을 예약할 수 있습니다.
-
데이터 카탈로그에서 데이터 품질 규칙을 예약합니다. 코드를 사용하지 않는 사용자도 이 옵션을 사용하여 데이터 품질 검사를 쉽게 예약할 수 있습니다. AWS Glue Data Quality는 Amazon EventBridge에서 예약을 생성합니다. 데이터 품질 규칙을 예약하려면:
-
규칙 세트로 이동한 다음 실행을 클릭합니다.
-
실행 빈도에서 원하는 예약을 선택하고 작업 이름을 제공합니다. 이 작업 이름은 EventBridge에서의 예약 이름입니다.
-
Amazon EventBridge 및 AWS Step Functions를 사용하여 데이터 품질 규칙에 대한 평가 및 권장 사항을 조정합니다.