Configurar alertas, implantações e agendamentos
Este tópico descreve como configurar alertas, implantações e agendamentos para o AWS Glue Data Quality.
Sumário
Configurar alertas e notificações na integração com o Amazon EventBridge
O AWS Glue Data Quality é compatível com a publicação de eventos do EventBridge, que são emitidos após a conclusão de uma execução de avaliação do conjunto de regras de qualidade de dados. Com isso, você pode facilmente configurar alertas quando as regras de qualidade de dados não forem atendidas.
Aqui está um exemplo de evento quando você avalia os conjuntos de regras de qualidade de dados no catálogo de dados. Com essas informações, você pode revisar os dados disponibilizados com o Amazon EventBridge. Você pode emitir chamadas de API adicionais para obter mais detalhes. Por exemplo, chame a API get_data_quality_result
com o ID do resultado para obter os detalhes de uma execução específica.
{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_DATA_CATALOG", "runId":"dqrun-12334567890", "databaseName": "db-123", "tableName": "table-123", "catalogId": "123456789012" }, "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00, "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }
Este é um exemplo de evento que é publicado quando você avalia conjuntos de regras de qualidade de dados nos cadernos do AWS Glue ETL ou do AWS Glue Studio.
{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_JOB", "jobId": "jr-12334567890", "jobName": "dq-eval-job-1234", "evaluationContext": "", } "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00 "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }
Para execuções de avaliação de qualidade de dados no catálogo de dados e em trabalhos de ETL, a opção Publicar métricas no Amazon CloudWatch, selecionada por padrão, deve permanecer selecionada para que a publicação do EventBridge funcione.
Configurar eventos e notificações
Para receber os eventos emitidos e definir destinos, você deve configurar as regras do Amazon EventBridge. Para criar regras:
Abra o console do Amazon EventBridge.
Escolha Regras na seção Barramentos da barra de navegação.
Selecione Create Rule.
Em Definir detalhes da regra:
Em Nome, insira
myDQRule
.Insira uma descrição (opcional).
Para barramento de eventos, selecione seu barramento de eventos. Se você não tiver um, deixe o padrão.
Em Tipo de regra, escolha Regra com um padrão de eventos.
Em Criar padrão de eventos:
Como a origem dos eventos, selecione Eventos da AWS ou eventos de parceiros do EventBridge.
Pule a seção de exemplo de evento.
Como método de criação, selecione Usar formulário de padrão.
Para padrão de eventos:
Selecione Serviços da AWS para Fonte do evento.
Selecione Glue Data Quality para serviço da AWS.
Selecione Resultados da avaliação de qualidade de dados disponíveis para Tipo de evento.
Selecione FALHA para Estados específicos. Em seguida, você verá um padrão de evento como este:
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "state": ["FAILED"] } }
Para obter mais opções de conexão, consulte Opções de configuração adicionais para o padrão de eventos.
Em Selecionar alvos:
Em Tipos de destino, selecione Serviço da AWS.
Use o menu suspenso Selecionar um destino para escolher o AWS serviço ao qual você deseja se conectar (SNS, Lambda, SQS etc.) e escolha Avançar.
Em Configurar tag(s), clique em Adicionar novas tags para adicionar tags opcionais e escolha Avançar.
Você verá uma página de resumo de todas as seleções. Escolha Criar regra na parte inferior.
Opções de configuração adicionais para o padrão de eventos
Além de filtrar o evento baseado em sucesso ou fracasso, talvez você queira filtrar ainda mais os eventos com base em outros parâmetros.
Para fazer isso, vá para a seção Padrão de eventos e selecione Editar padrão para especificar parâmetros adicionais. Observe que os campos no padrão de eventos diferenciam maiúsculas de minúsculas. Estes são exemplos de configuração do padrão de eventos.
Para capturar eventos de uma tabela específica avaliando conjuntos de regras específicos, use esse tipo de padrão:
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_DATA_CATALOG"], "databaseName": "db-123", "tableName": "table-123", }, "rulesetNames": ["ruleset1", "ruleset2"] "state": ["FAILED"] } }
Para capturar eventos de trabalhos específicos na experiência de ETL, use esse tipo de padrão:
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_JOB"], "jobName": ["dq_evaluation_job1", "dq_evaluation_job2"] }, "state": ["FAILED"] } }
Para capturar eventos com uma pontuação abaixo de um limite específico (por exemplo, 70%):
{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "score": [{ "numeric": ["<=", 0.7] }] } }
Formatar notificações como e-mails
Às vezes, você precisa enviar uma notificação por e-mail bem formatada para as equipes da sua empresa. Você pode usar o Amazon EventBridge e o AWS Lambda para fazer isso.
O exemplo de código a seguir pode ser usado para formatar as notificações de qualidade de dados para gerar e-mails.
import boto3 import json from datetime import datetime sns_client = boto3.client('sns') glue_client = boto3.client('glue') sns_topic_arn = 'arn:aws:sns:<region-code>:<account-id>:<sns-topic-name>' def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info sns_client.publish( TopicArn=sns_topic_arn, Message=message_text, Subject=subject_text ) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }
Configurar alertas e notificações na integração com o CloudWatch
Nossa abordagem recomendada é configurar alertas de qualidade de dados usando o Amazon EventBridge, porque o Amazon EventBridge só precisa ser configurado uma única vez para alertar os clientes. Porém, alguns clientes preferem o Amazon CloudWatch por sua familiaridade. Para esses clientes, oferecemos a integração com o Amazon CloudWatch.
Cada avaliação do AWS Glue Data Quality emite um par de métricas denominadas glue.data.quality.rules.passed
(indicando o número de regras aprovadas) e glue.data.quality.rules.failed
(indicando o número de regras que falharam) por execução de qualidade de dados. Você pode usar essa métrica emitida para criar alarmes para avisar os usuários se uma determinada execução de qualidade de dados ficar abaixo de um limite. Para começar a configurar um alarme que enviaria um e-mail por meio de uma notificação do Amazon SNS, siga as etapas abaixo:
Para começar a configurar um alarme que enviaria um e-mail por meio de uma notificação do Amazon SNS, siga as etapas abaixo:
Abra o console do Amazon CloudWatch.
Escolha Todas as métricas em Métricas. Você verá um namespace adicional em Namespaces personalizados intitulado Glue Data Quality.
nota
Ao iniciar uma execução do AWS Glue Data Quality, certifique-se de que a caixa de seleção Publicar métricas no Amazon CloudWatch esteja marcada. Caso contrário, as métricas dessa execução específica não serão publicadas no Amazon CloudWatch.
No namespace
Glue Data Quality
, você pode ver as métricas que estão sendo emitidas por tabela, por conjunto de regras. Para a finalidade deste tópico, usaremos a regraglue.data.quality.rules.failed
e o alarme se esse valor ultrapassar 1 (indicando que, se observarmos um número de avaliações de regras com falha maior do que 1, queremos ser notificados).Para criar o alarme, escolha Todos os alarmes em Alarmes.
Selecione Criar alarme.
Escolha Selecionar métrica.
Selecione a métrica
glue.data.quality.rules.failed
correspondente à tabela que você criou e depois escolha Selecionar métrica.Na guia Especificar métricas e condições, na seção Métricas:
Em Estatística, selecione Soma.
Em Período, escolha 1 minuto.
Na seção Condições:
Em Tipo de limite, escolha Estático.
para Sempre que glue.data.quality.rules.failed for..., selecione Maior/Igual.
Por do que... , insira 1 como o valor limite.
Essas seleções implicam que, se a métrica
glue.data.quality.rules.failed
emitir um valor maior ou igual a 1, acionaremos um alarme. Porém, se não houver dados, nós a trataremos como aceitável.Escolha Próximo.
Em Configurar ações:
Em Gatilho de estado do alarme, escolha Em alarme.
Em Enviar uma notificação para o seguinte tópico do SNS, escolha Criar um novo tópico para enviar uma notificação por um novo tópico do SNS.
Em Endpoints de e-mail que receberão a notificação, insira o endereço de e-mail. Depois clique em Criar tópico.
Escolha Avançar.
Em Nome secreto, insira
myFirstDQAlarm
e escolha Avançar.Você verá uma página de resumo de todas as seleções. Escolha Criar alarme na parte inferior.
Agora você pode ver o alarme sendo criado no painel de alarmes do Amazon CloudWatch.
Consultar resultados de qualidade de dados para criar painéis
Talvez você queira criar um painel para exibir seus resultados de qualidade de dados. Há duas maneiras de fazer isso:
Configure o Amazon EventBridge com o seguinte códigopara gravar os dados no Amazon S3:
import boto3 import json from datetime import datetime s3_client = boto3.client('s3') glue_client = boto3.client('glue') s3_bucket = 's3-bucket-name' def write_logs(log_metadata): try: filename = datetime.now().strftime("%m%d%Y%H%M%S") + ".json" key_opts = { 'year': datetime.now().year, 'month': "{:02d}".format(datetime.now().month), 'day': "{:02d}".format(datetime.now().day), 'filename': filename } s3key = "gluedataqualitylogs/year={year}/month={month}/day={day}/{filename}".format(**key_opts) s3_client.put_object(Bucket=s3_bucket, Key=s3key, Body=json.dumps(log_metadata)) except Exception as e: print(f'Error writing logs to S3: {e}') def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info write_logs(log_metadata) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }
Depois de gravar no Amazon S3, você pode usar os crawlers do AWS Glue para se registrar no Athena e consultar as tabelas.
Configure um local no Amazon S3 durante uma avaliação de qualidade de dados:
Ao executar tarefas de qualidade de dados no catálogo de dados do AWS Glue ou no AWS Glue ETL, você pode fornecer um local no Amazon S3 para gravar os resultados de qualidade de dados. Você pode usar a sintaxe abaixo para criar uma tabela referenciando o destino para o qual ler os resultados de qualidade dos dados.
Observe que você deve executar as consultas CREATE EXTERNAL TABLE
e MSCK REPAIR TABLE
separadamente.
CREATE EXTERNAL TABLE <my_table_name>( catalogid string, databasename string, tablename string, dqrunid string, evaluationstartedon timestamp, evaluationcompletedon timestamp, rule string, outcome string, failurereason string, evaluatedmetrics string) PARTITIONED BY ( `year` string, `month` string, `day` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://glue-s3-dq-bucket-us-east-2-results/' TBLPROPERTIES ( 'classification'='json', 'compressionType'='none', 'typeOfData'='file');
MSCK REPAIR TABLE <my_table_name>;
Depois de criar a tabela acima, você poderá executar consultas analíticas usando o Amazon Athena.
Implantar regras de qualidade de dados usando o AWS CloudFormation
Você pode usar o AWS CloudFormation para criar regras de qualidade de dados. Para obter mais informações, consulte AWS CloudFormation para o AWS Glue.
Agendar regras de qualidade de dados
Você pode agendar as regras de qualidade de dados usando os seguintes métodos:
-
Agende as regras de qualidade no dados do catálogo de dados: nenhum usuário de código pode usar essa opção para agendar facilmente suas verificações de qualidade de dados. O AWS O Glue Data Quality criará a agenda no Amazon EventBridge. Para agendar regras de qualidade de dados:
-
Navegue até o conjunto de regras e clique em Executar.
-
Na Frequência de execução, selecione a agenda desejada e forneça um Nome de tarefa. Esse nome de tarefa será o nome da sua agenda no EventBridge.
-
Use o Amazon EventBridge e o AWS Step Functions para orquestrar avaliações e recomendações para regras de qualidade de dados.