设置警报、部署和计划 - AWS Glue

设置警报、部署和计划

本主题介绍如何为 AWS Glue Data Quality 设置警报、部署和计划。

在 Amazon EventBridge 集成中设置警报和通知

AWS Glue Data Quality 支持发布 EventBridge 事件,这些事件是在 Data Quality 规则集评估运行完成后发出的。这样,当数据质量规则失效时,您可以轻松设置警报。

以下是您在 Data Catalog 中评估数据质量规则集时的示例事件。有了这些信息,您就可以查看 Amazon EventBridge 提供的数据。您可以发出其他 API 调用以获取更多详细信息。例如,使用结果 ID 调用 get_data_quality_result API 以获取特定执行的详细信息。

{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_DATA_CATALOG", "runId":"dqrun-12334567890", "databaseName": "db-123", "tableName": "table-123", "catalogId": "123456789012" }, "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00, "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }

以下是您在评估 AWS Glue ETL 或 AWS Glue Studio 笔记本中的数据质量规则集时发布的示例事件。

{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_JOB", "jobId": "jr-12334567890", "jobName": "dq-eval-job-1234", "evaluationContext": "", } "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00 "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }

要在 Data Catalog 和 ETL 作业中同时运行数据质量评估,必须保持选中默认处于选中状态的将指标发布到 Amazon CloudWatch 选项,EventBridge 才可以发布。

设置 EventBridge 通知

AWS CloudFormation 中的数据质量属性

要接收发出的事件并定义目标,必须配置 Amazon EventBridge 规则。要创建规则,请执行以下操作:

  1. 打开 Amazon EventBridge 控制台。

  2. 在导航栏的总线部分下选择规则

  3. 选择 Create Rule

  4. 定义规则详细信息中:

    1. 对于名称,请输入 myDQRule

    2. 输入描述(可选)。

    3. 对于事件总线,请选择您的事件总线。如果您没有,请将其保留为默认值。

    4. 对于“规则类型”,选择具有事件模式的规则,然后选择下一步

  5. 构建事件模式中:

    1. 对于事件源,选择 AWS 事件或 EventBridge 合作伙伴事件

    2. 跳过示例事件部分。

    3. 对于创建方法,选择使用模式表单

    4. 对于事件模式:

      1. 为事件源选择 AWS 服务

      2. 为 AWS 服务选择 Glue Data Quality

      3. 为“事件类型”选择可用的数据质量评估结果

      4. 为“特定状态”选择失败。然后,您将看到类似于以下内容的事件模式:

        { "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "state": ["FAILED"] } }
      5. 有关更多连接选项,请参阅 事件模式的其他配置选项

  6. 选择目标上:

    1. 对于目标类型,选择 AWS 服务

    2. 使用选择目标下拉列表选择要连接的所需 AWS 服务(SNS、Lambda、SQS 等),然后选择下一步

  7. 配置标签上,单击添加新标签以添加可选标签,然后选择下一步

  8. 您会看到所有选择的摘要页面。选择底部的创建规则

事件模式的其他配置选项

除了根据成功或失败筛选事件外,您可能还需要根据不同的参数进一步筛选事件。

为此,请转到“事件模式”部分,然后选择编辑模式以指定其他参数。请注意,事件模式中的字段区分大小写。以下是配置事件模式的示例。

要从评估特定规则集的特定表中捕获事件,请使用以下类型的模式:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_DATA_CATALOG"], "databaseName": "db-123", "tableName": "table-123", }, "rulesetNames": ["ruleset1", "ruleset2"] "state": ["FAILED"] } }

要在 ETL 体验中捕获来自特定作业的事件,请使用以下类型的模式:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_JOB"], "jobName": ["dq_evaluation_job1", "dq_evaluation_job2"] }, "state": ["FAILED"] } }

要捕获分数低于特定阈值(例如 70%)的事件,请执行以下操作:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "score": [{ "numeric": ["<=", 0.7] }] } }

将通知格式化为电子邮件

有时,您需要向业务团队发送格式完善的电子邮件通知。您可以使用 Amazon EventBridge 和 AWS Lambda 实现这一目标。

格式化为电子邮件的数据质量通知

以下示例代码可用于格式化数据质量通知以生成电子邮件。

import boto3 import json from datetime import datetime sns_client = boto3.client('sns') glue_client = boto3.client('glue') sns_topic_arn = 'arn:aws:sns:<region-code>:<account-id>:<sns-topic-name>' def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info sns_client.publish( TopicArn=sns_topic_arn, Message=message_text, Subject=subject_text ) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }

在 CloudWatch 集成中设置警报和通知

我们推荐的方法是使用 Amazon EventBridge 设置数据质量警报,因为 Amazon EventBridge 需要一次性设置才能提醒客户。但是,由于熟悉的原因,有些客户更喜欢使用 Amazon CloudWatch。对于此类客户,我们提供与 Amazon CloudWatch 的集成。

每次 AWS Glue Data Quality 评估都会在每次数据质量运行时发出一对名为 glue.data.quality.rules.passed(表示通过的规则数量)和 glue.data.quality.rules.failed(表示失败的规则数)的指标。您可以使用此发出的指标创建警报,以便在给定的数据质量运行低于阈值时提醒用户。要开始设置可通过 Amazon SNS 通知发送电子邮件的警报,请按照以下步骤操作:

要开始设置可通过 Amazon SNS 通知发送电子邮件的警报,请按照以下步骤操作:

  1. 打开 Amazon CloudWatch 控制台。

  2. 指标下选择所有指标。您将在自定义命名空间下看到一个名为 Glue Data Quality 的额外命名空间。

    注意

    启动 AWS Glue Data Quality 运行时,请确保已启用将指标发布到 Amazon CloudWatch复选框。否则,该特定运行的指标将不会发布到 Amazon CloudWatch。

    Glue Data Quality 命名空间下,您可以看到每个表、每个规则集发出的指标。对于本主题,如果该值超过 1,我们将使用 glue.data.quality.rules.failed 规则和警报(表示,如果我们看到许多失败的规则评估大于 1,我们希望收到通知)。

  3. 要创建警报,请在警报下选择所有警报

  4. 选择创建警报

  5. 选择选择指标

  6. 选择与您创建的表格相对应的 glue.data.quality.rules.failed 指标,然后选择选择指标

  7. 指定指标和条件选项卡下的指标部分下:

    1. 对于 Statistic(统计数据),选择 Sum(总计)

    2. 对于周期,选择 1 分钟

  8. 条件部分下:

    1. 对于阈值类型,选择静态

    2. 对于每当 glue.data.quality.rules.failed 为...,选择大于/等于

    3. 对于不止于...,输入 1 作为阈值。

    这些选择意味着,如果 glue.data.quality.rules.failed 指标发出的值大于或等于 1,我们将触发警报。但是,如果没有数据,我们会将其视为可接受。

  9. 选择下一步

  10. 配置操作中:

    1. 对于警报状态触发器,选择报警中

    2. 对于向以下 SNS 主题发送通知部分,选择创建新主题以通过新的 SNS 主题发送通知

    3. 对于将接收通知的电子邮件端点,请输入您的电子邮件地址。然后单击创建主题

    4. 选择下一步

  11. 对于警报名称,输入 myFirstDQAlarm,然后选择下一步

  12. 您会看到所有选择的摘要页面。选择底部的创建警报

现在,您可以从 Amazon CloudWatch 警报控制面板中看到正在创建的警报。

查询数据质量结果以构建控制面板

您可能需要构建一个控制面板来显示您的数据质量结果。有两种方式可执行此操作:

使用以下代码设置 Amazon EventBridge,将数据写入 Amazon S3:

import boto3 import json from datetime import datetime s3_client = boto3.client('s3') glue_client = boto3.client('glue') s3_bucket = 's3-bucket-name' def write_logs(log_metadata): try: filename = datetime.now().strftime("%m%d%Y%H%M%S") + ".json" key_opts = { 'year': datetime.now().year, 'month': "{:02d}".format(datetime.now().month), 'day': "{:02d}".format(datetime.now().day), 'filename': filename } s3key = "gluedataqualitylogs/year={year}/month={month}/day={day}/{filename}".format(**key_opts) s3_client.put_object(Bucket=s3_bucket, Key=s3key, Body=json.dumps(log_metadata)) except Exception as e: print(f'Error writing logs to S3: {e}') def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info write_logs(log_metadata) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }

写入 Amazon S3 后,您可以使用 AWS Glue 爬网程序注册到 Athena 并查询表。

在数据质量评估期间配置 Amazon S3 位置:

在 AWS Glue Data Catalog 或 AWS Glue ETL 中运行数据质量任务时,您可以提供 Amazon S3 位置来将数据质量结果写入 Amazon S3。您可以使用以下语法通过引用目标来创建表,以读取数据质量结果。

请注意:必须分别运行 CREATE EXTERNAL TABLEMSCK REPAIR TABLE 查询。

CREATE EXTERNAL TABLE <my_table_name>( catalogid string, databasename string, tablename string, dqrunid string, evaluationstartedon timestamp, evaluationcompletedon timestamp, rule string, outcome string, failurereason string, evaluatedmetrics string) PARTITIONED BY ( `year` string, `month` string, `day` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://glue-s3-dq-bucket-us-east-2-results/' TBLPROPERTIES ( 'classification'='json', 'compressionType'='none', 'typeOfData'='file');
MSCK REPAIR TABLE <my_table_name>;

创建上表后,即可使用 Amazon Athena 运行分析查询。

使用 AWS CloudFormation 部署数据质量规则

您可以使用 AWS CloudFormation 创建数据质量规则。有关更多信息,请参阅 AWS CloudFormation for AWS Glue

计划数据质量规则

您可以使用以下方法计划数据质量规则:

  • 从 Data Catalog 中计划数据质量规则:任何代码用户都无法使用此选项轻松计划数据质量扫描。AWSGlue Data Quality 将在 Amazon EventBridge 中创建计划。要计划数据质量规则,请执行以下操作:

    • 导航到规则集并单击运行

    • 运行频率中,选择所需的计划并提供任务名称。此任务名称是您在 EventBridge 中计划的名称。

  • 使用 Amazon EventBridge 和 AWS Step Functions 来编排数据质量规则的评估和建议。