设置警报、部署和计划
本主题介绍如何为 AWS Glue Data Quality 设置警报、部署和计划。
目录
在 Amazon EventBridge 集成中设置警报和通知
AWS Glue Data Quality 支持发布 EventBridge 事件,这些事件是在 Data Quality 规则集评估运行完成后发出的。这样,当数据质量规则失效时,您可以轻松设置警报。
以下是您在 Data Catalog 中评估数据质量规则集时的示例事件。有了这些信息,您就可以查看 Amazon EventBridge 提供的数据。您可以发出其他 API 调用以获取更多详细信息。例如,使用结果 ID 调用 get_data_quality_result
API 以获取特定执行的详细信息。
{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_DATA_CATALOG", "runId":"dqrun-12334567890", "databaseName": "db-123", "tableName": "table-123", "catalogId": "123456789012" }, "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00, "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }
以下是您在评估 AWS Glue ETL 或 AWS Glue Studio 笔记本中的数据质量规则集时发布的示例事件。
{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_JOB", "jobId": "jr-12334567890", "jobName": "dq-eval-job-1234", "evaluationContext": "", } "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00 "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }
要在 Data Catalog 和 ETL 作业中同时运行数据质量评估,必须保持选中默认处于选中状态的将指标发布到 Amazon CloudWatch 选项,EventBridge 才可以发布。
设置 EventBridge 通知
要接收发出的事件并定义目标,必须配置 Amazon EventBridge 规则。要创建规则,请执行以下操作:
打开 Amazon EventBridge 控制台。
在导航栏的总线部分下选择规则。
选择 Create Rule。
在定义规则详细信息中:
对于名称,请输入
myDQRule
。输入描述(可选)。
对于事件总线,请选择您的事件总线。如果您没有,请将其保留为默认值。
对于“规则类型”,选择具有事件模式的规则,然后选择下一步。
在构建事件模式中:
对于事件源,选择 AWS 事件或 EventBridge 合作伙伴事件。
跳过示例事件部分。
对于创建方法,选择使用模式表单。
对于事件模式:
为事件源选择 AWS 服务。
为 AWS 服务选择 Glue Data Quality。
为“事件类型”选择可用的数据质量评估结果。
为“特定状态”选择失败。然后,您将看到类似于以下内容的事件模式:
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "state": ["FAILED"] } }
有关更多连接选项,请参阅 事件模式的其他配置选项。
在选择目标上:
对于目标类型,选择 AWS 服务。
使用选择目标下拉列表选择要连接的所需 AWS 服务(SNS、Lambda、SQS 等),然后选择下一步。
在配置标签上,单击添加新标签以添加可选标签,然后选择下一步。
您会看到所有选择的摘要页面。选择底部的创建规则。
事件模式的其他配置选项
除了根据成功或失败筛选事件外,您可能还需要根据不同的参数进一步筛选事件。
为此,请转到“事件模式”部分,然后选择编辑模式以指定其他参数。请注意,事件模式中的字段区分大小写。以下是配置事件模式的示例。
要从评估特定规则集的特定表中捕获事件,请使用以下类型的模式:
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_DATA_CATALOG"], "databaseName": "db-123", "tableName": "table-123", }, "rulesetNames": ["ruleset1", "ruleset2"] "state": ["FAILED"] } }
要在 ETL 体验中捕获来自特定作业的事件,请使用以下类型的模式:
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_JOB"], "jobName": ["dq_evaluation_job1", "dq_evaluation_job2"] }, "state": ["FAILED"] } }
要捕获分数低于特定阈值(例如 70%)的事件,请执行以下操作:
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "score": [{ "numeric": ["<=", 0.7] }] } }
将通知格式化为电子邮件
有时,您需要向业务团队发送格式完善的电子邮件通知。您可以使用 Amazon EventBridge 和 AWS Lambda 实现这一目标。
以下示例代码可用于格式化数据质量通知以生成电子邮件。
import boto3 import json from datetime import datetime sns_client = boto3.client('sns') glue_client = boto3.client('glue') sns_topic_arn = 'arn:aws:sns:<region-code>:<account-id>:<sns-topic-name>' def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info sns_client.publish( TopicArn=sns_topic_arn, Message=message_text, Subject=subject_text ) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }
在 CloudWatch 集成中设置警报和通知
我们推荐的方法是使用 Amazon EventBridge 设置数据质量警报,因为 Amazon EventBridge 需要一次性设置才能提醒客户。但是,由于熟悉的原因,有些客户更喜欢使用 Amazon CloudWatch。对于此类客户,我们提供与 Amazon CloudWatch 的集成。
每次 AWS Glue Data Quality 评估都会在每次数据质量运行时发出一对名为 glue.data.quality.rules.passed
(表示通过的规则数量)和 glue.data.quality.rules.failed
(表示失败的规则数)的指标。您可以使用此发出的指标创建警报,以便在给定的数据质量运行低于阈值时提醒用户。要开始设置可通过 Amazon SNS 通知发送电子邮件的警报,请按照以下步骤操作:
要开始设置可通过 Amazon SNS 通知发送电子邮件的警报,请按照以下步骤操作:
打开 Amazon CloudWatch 控制台。
在指标下选择所有指标。您将在自定义命名空间下看到一个名为 Glue Data Quality 的额外命名空间。
注意
启动 AWS Glue Data Quality 运行时,请确保已启用将指标发布到 Amazon CloudWatch复选框。否则,该特定运行的指标将不会发布到 Amazon CloudWatch。
在
Glue Data Quality
命名空间下,您可以看到每个表、每个规则集发出的指标。对于本主题,如果该值超过 1,我们将使用glue.data.quality.rules.failed
规则和警报(表示,如果我们看到许多失败的规则评估大于 1,我们希望收到通知)。要创建警报,请在警报下选择所有警报。
选择创建警报。
选择选择指标。
选择与您创建的表格相对应的
glue.data.quality.rules.failed
指标,然后选择选择指标。在指定指标和条件选项卡下的指标部分下:
对于 Statistic(统计数据),选择 Sum(总计)。
对于周期,选择 1 分钟。
在条件部分下:
对于阈值类型,选择静态。
对于每当 glue.data.quality.rules.failed 为...,选择大于/等于。
对于不止于...,输入 1 作为阈值。
这些选择意味着,如果
glue.data.quality.rules.failed
指标发出的值大于或等于 1,我们将触发警报。但是,如果没有数据,我们会将其视为可接受。选择下一步。
在配置操作中:
对于警报状态触发器,选择报警中。
对于向以下 SNS 主题发送通知部分,选择创建新主题以通过新的 SNS 主题发送通知。
对于将接收通知的电子邮件端点,请输入您的电子邮件地址。然后单击创建主题。
选择下一步。
对于警报名称,输入
myFirstDQAlarm
,然后选择下一步。您会看到所有选择的摘要页面。选择底部的创建警报。
现在,您可以从 Amazon CloudWatch 警报控制面板中看到正在创建的警报。
查询数据质量结果以构建控制面板
您可能需要构建一个控制面板来显示您的数据质量结果。有两种方式可执行此操作:
使用以下代码设置 Amazon EventBridge,将数据写入 Amazon S3:
import boto3 import json from datetime import datetime s3_client = boto3.client('s3') glue_client = boto3.client('glue') s3_bucket = 's3-bucket-name' def write_logs(log_metadata): try: filename = datetime.now().strftime("%m%d%Y%H%M%S") + ".json" key_opts = { 'year': datetime.now().year, 'month': "{:02d}".format(datetime.now().month), 'day': "{:02d}".format(datetime.now().day), 'filename': filename } s3key = "gluedataqualitylogs/year={year}/month={month}/day={day}/{filename}".format(**key_opts) s3_client.put_object(Bucket=s3_bucket, Key=s3key, Body=json.dumps(log_metadata)) except Exception as e: print(f'Error writing logs to S3: {e}') def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info write_logs(log_metadata) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }
写入 Amazon S3 后,您可以使用 AWS Glue 爬网程序注册到 Athena 并查询表。
在数据质量评估期间配置 Amazon S3 位置:
在 AWS Glue Data Catalog 或 AWS Glue ETL 中运行数据质量任务时,您可以提供 Amazon S3 位置来将数据质量结果写入 Amazon S3。您可以使用以下语法通过引用目标来创建表,以读取数据质量结果。
请注意:必须分别运行 CREATE EXTERNAL TABLE
和 MSCK REPAIR TABLE
查询。
CREATE EXTERNAL TABLE <my_table_name>( catalogid string, databasename string, tablename string, dqrunid string, evaluationstartedon timestamp, evaluationcompletedon timestamp, rule string, outcome string, failurereason string, evaluatedmetrics string) PARTITIONED BY ( `year` string, `month` string, `day` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://glue-s3-dq-bucket-us-east-2-results/' TBLPROPERTIES ( 'classification'='json', 'compressionType'='none', 'typeOfData'='file');
MSCK REPAIR TABLE <my_table_name>;
创建上表后,即可使用 Amazon Athena 运行分析查询。
使用 AWS CloudFormation 部署数据质量规则
您可以使用 AWS CloudFormation 创建数据质量规则。有关更多信息,请参阅 AWS CloudFormation for AWS Glue。
计划数据质量规则
您可以使用以下方法计划数据质量规则:
-
从 Data Catalog 中计划数据质量规则:任何代码用户都无法使用此选项轻松计划数据质量扫描。AWSGlue Data Quality 将在 Amazon EventBridge 中创建计划。要计划数据质量规则,请执行以下操作:
-
导航到规则集并单击运行。
-
在运行频率中,选择所需的计划并提供任务名称。此任务名称是您在 EventBridge 中计划的名称。
-
使用 Amazon EventBridge 和 AWS Step Functions 来编排数据质量规则的评估和建议。