Menyiapkan peringatan, penerapan, dan penjadwalan - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menyiapkan peringatan, penerapan, dan penjadwalan

Topik ini menjelaskan cara mengatur peringatan, penerapan, dan penjadwalan untuk Kualitas Data AWS Glue.

Menyiapkan peringatan dan pemberitahuan dalam integrasi Amazon EventBridge

AWS Glue Data Quality mendukung penerbitan EventBridge peristiwa, yang dipancarkan setelah menyelesaikan evaluasi aturan Kualitas Data. Dengan ini, Anda dapat dengan mudah mengatur peringatan ketika aturan kualitas data gagal.

Berikut adalah contoh peristiwa saat Anda mengevaluasi kumpulan aturan kualitas data di Katalog Data. Dengan informasi ini, Anda dapat meninjau data yang tersedia dengan Amazon EventBridge. Anda dapat mengeluarkan panggilan API tambahan untuk mendapatkan detail selengkapnya. Misalnya, panggil get_data_quality_result API dengan ID hasil untuk mendapatkan detail eksekusi tertentu.

{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_DATA_CATALOG", "runId":"dqrun-12334567890", "databaseName": "db-123", "tableName": "table-123", "catalogId": "123456789012" }, "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00, "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }

Berikut adalah contoh peristiwa yang dipublikasikan saat Anda mengevaluasi set aturan kualitas data di notebook Glue ETL atau AWS AWS Glue Studio.

{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_JOB", "jobId": "jr-12334567890", "jobName": "dq-eval-job-1234", "evaluationContext": "", } "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00 "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }

Untuk evaluasi Kualitas Data berjalan baik di Katalog Data dan dalam pekerjaan ETL, Amazon CloudWatch opsi Publikasikan metrik ke, yang dipilih secara default, harus tetap dipilih agar EventBridge penerbitan berfungsi.

Menyiapkan EventBridge notifikasi

Properti kualitas data di AWS CloudFormation

Untuk menerima peristiwa yang dipancarkan dan menentukan target, Anda harus mengonfigurasi aturan Amazon EventBridge . Untuk membuat aturan:

  1. Buka EventBridge konsol Amazon.

  2. Pilih Aturan di bawah bagian Bus pada bilah navigasi.

  3. Pilih Buat aturan.

  4. Tentang Tentukan Rincian Aturan:

    1. Untuk Nama, masukkanmyDQRule.

    2. Masukkan deskripsi (opsional).

    3. Untuk bus acara, pilih bus acara Anda. Jika Anda tidak memilikinya, biarkan sebagai default.

    4. Untuk Jenis aturan pilih Aturan dengan pola acara lalu pilih Berikutnya.

  5. Pada Pola Acara Bangun:

    1. Untuk sumber acara pilih AWS acara atau acara EventBridge mitra.

    2. Lewati bagian contoh acara.

    3. Untuk metode pembuatan pilih Gunakan formulir pola.

    4. Untuk pola acara:

      1. Pilih AWS layanan untuk sumber Acara.

      2. Pilih Glue Data Quality untuk AWS layanan.

      3. Pilih Hasil Evaluasi Kualitas Data yang Tersedia untuk jenis Acara.

      4. Pilih GAGAL untuk status tertentu. Kemudian Anda melihat pola acara yang mirip dengan berikut ini:

        { "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "state": ["FAILED"] } }
      5. Untuk opsi konfigurasi lainnya, lihatOpsi konfigurasi tambahan untuk pola acara.

  6. Pada Target Pilih:

    1. Untuk Jenis Target pilih AWS layanan.

    2. Gunakan menu tarik-turun Pilih target untuk memilih AWS layanan yang Anda inginkan untuk terhubung (SNS, Lambda, SQS, dll.), Lalu pilih Berikutnya.

  7. Pada Konfigurasi tag klik Tambahkan tag baru untuk menambahkan tag opsional lalu pilih Berikutnya.

  8. Anda melihat halaman ringkasan dari semua pilihan. Pilih Buat aturan di bagian bawah.

Opsi konfigurasi tambahan untuk pola acara

Selain memfilter acara Anda tentang keberhasilan atau kegagalan, Anda mungkin ingin memfilter lebih lanjut peristiwa pada parameter yang berbeda.

Untuk melakukan ini, buka bagian Pola Acara, dan pilih Edit pola untuk menentukan parameter tambahan. Perhatikan bahwa bidang dalam pola peristiwa peka huruf besar/kecil. Berikut ini adalah contoh konfigurasi pola acara.

Untuk menangkap peristiwa dari tabel tertentu yang mengevaluasi kumpulan aturan tertentu, gunakan jenis pola ini:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_DATA_CATALOG"], "databaseName": "db-123", "tableName": "table-123", }, "rulesetNames": ["ruleset1", "ruleset2"] "state": ["FAILED"] } }

Untuk menangkap peristiwa dari pekerjaan tertentu dalam pengalaman ETL, gunakan jenis pola ini:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_JOB"], "jobName": ["dq_evaluation_job1", "dq_evaluation_job2"] }, "state": ["FAILED"] } }

Untuk menangkap peristiwa dengan skor di bawah ambang tertentu (misalnya 70%):

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "score": [{ "numeric": ["<=", 0.7] }] } }

Memformat pemberitahuan sebagai email

Terkadang Anda perlu mengirim pemberitahuan email yang diformat dengan baik ke tim bisnis Anda. Anda dapat menggunakan Amazon EventBridge dan AWS Lambda untuk mencapai ini.

Pemberitahuan kualitas data diformat sebagai email

Kode contoh berikut dapat digunakan untuk memformat pemberitahuan kualitas data Anda untuk menghasilkan email.

import boto3 import json from datetime import datetime sns_client = boto3.client('sns') glue_client = boto3.client('glue') sns_topic_arn = 'arn:aws:sns:<region-code>:<account-id>:<sns-topic-name>' def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info sns_client.publish( TopicArn=sns_topic_arn, Message=message_text, Subject=subject_text ) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }

Siapkan peringatan dan notifikasi dalam integrasi CloudWatch

Pendekatan yang kami rekomendasikan adalah menyiapkan peringatan kualitas data menggunakan Amazon EventBridge, karena Amazon EventBridge memerlukan penyiapan satu kali untuk mengingatkan pelanggan. Namun, beberapa pelanggan lebih suka Amazon CloudWatch karena keakraban. Untuk pelanggan seperti itu, kami menawarkan integrasi dengan Amazon CloudWatch.

Setiap evaluasi Kualitas Data AWS Glue memancarkan sepasang metrik bernama glue.data.quality.rules.passed (menunjukkan sejumlah aturan yang lolos) dan glue.data.quality.rules.failed (menunjukkan jumlah aturan yang gagal) per kualitas data yang dijalankan. Anda dapat menggunakan metrik yang dipancarkan ini untuk membuat alarm untuk mengingatkan pengguna jika kualitas data tertentu berjalan di bawah ambang batas. Untuk memulai dengan mengatur alarm yang akan mengirim email melalui pemberitahuan Amazon SNS, ikuti langkah-langkah di bawah ini:

Untuk memulai dengan mengatur alarm yang akan mengirim email melalui pemberitahuan Amazon SNS, ikuti langkah-langkah di bawah ini:

  1. Buka CloudWatch konsol Amazon.

  2. Pilih Semua metrik di bawah Metrik. Anda akan melihat namespace tambahan di bawah ruang nama Kustom berjudul Glue Data Quality.

    catatan

    Saat memulai proses AWS Glue Data Quality, pastikan CloudWatch kotak centang Publikasikan metrik ke Amazon diaktifkan. Jika tidak, metrik untuk proses tertentu tidak akan dipublikasikan ke Amazon CloudWatch.

    Di bawah Glue Data Quality namespace, Anda dapat melihat metrik yang dipancarkan per tabel, per kumpulan aturan. Untuk tujuan topik ini, kami akan menggunakan glue.data.quality.rules.failed aturan dan alarm jika nilai ini melebihi 1 (menunjukkan bahwa, jika kami melihat sejumlah evaluasi aturan yang gagal lebih besar dari 1, kami ingin diberi tahu).

  3. Untuk membuat alarm, pilih Semua alarm di bawah Alarm.

  4. Pilih Buat alarm.

  5. Pilih Pilih Metrik.

  6. Pilih glue.data.quality.rules.failed metrik yang sesuai dengan tabel yang telah Anda buat, lalu pilih Pilih metrik.

  7. Di bawah tab Tentukan metrik dan kondisi, di bawah bagian Metrik:

    1. Untuk Statistik pilih Jumlah.

    2. Untuk Periode, pilih 1 menit.

  8. Di bawah bagian Kondisi:

    1. Untuk Jenis ambang batas, pilih Statis.

    2. Untuk Setiap kali glue.data.quality.rules.failed adalah... , pilih Greater/Equal.

    3. Untuk dari... , masukkan 1 sebagai nilai ambang batas.

    Pilihan ini menyiratkan bahwa jika glue.data.quality.rules.failed metrik memancarkan nilai lebih besar dari atau sama dengan 1, kami akan memicu alarm. Namun, jika tidak ada data, kami akan memperlakukannya sebagai dapat diterima.

  9. Pilih Selanjutnya.

  10. Pada tindakan Konfigurasi:

    1. Untuk bagian Pemicu status alarm, pilih Dalam alarm.

    2. Untuk Kirim pemberitahuan ke bagian topik SNS berikut, pilih Buat topik baru untuk mengirim pemberitahuan melalui topik SNS baru.

    3. Untuk titik akhir Email yang akan menerima notifikasi, masukkan alamat email Anda. Kemudian klik Buat Topik.

    4. Pilih Selanjutnya.

  11. Untuk nama Alarm, masukkanmyFirstDQAlarm, lalu pilih Berikutnya.

  12. Anda melihat halaman ringkasan dari semua pilihan. Pilih Buat alarm di bagian bawah.

Anda sekarang dapat melihat alarm dibuat dari dasbor CloudWatch alarm Amazon.

Menanyakan hasil kualitas data untuk membangun dasbor

Anda mungkin ingin membangun dasbor untuk menampilkan hasil kualitas data Anda. Ada dua cara untuk melakukan hal ini:

Siapkan Amazon EventBridge dengan kode berikut untuk menulis data ke Amazon S3:

import boto3 import json from datetime import datetime s3_client = boto3.client('s3') glue_client = boto3.client('glue') s3_bucket = 's3-bucket-name' def write_logs(log_metadata): try: filename = datetime.now().strftime("%m%d%Y%H%M%S") + ".json" key_opts = { 'year': datetime.now().year, 'month': "{:02d}".format(datetime.now().month), 'day': "{:02d}".format(datetime.now().day), 'filename': filename } s3key = "gluedataqualitylogs/year={year}/month={month}/day={day}/{filename}".format(**key_opts) s3_client.put_object(Bucket=s3_bucket, Key=s3key, Body=json.dumps(log_metadata)) except Exception as e: print(f'Error writing logs to S3: {e}') def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info write_logs(log_metadata) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }

Setelah menulis ke Amazon S3, Anda dapat menggunakan perayap AWS Glue untuk mendaftar ke Athena dan menanyakan tabel.

Konfigurasikan lokasi Amazon S3 selama evaluasi kualitas data::

Saat menjalankan tugas kualitas data di AWS Glue Data Catalog atau AWS Glue ETL, Anda dapat memberikan lokasi Amazon S3 untuk menulis hasil kualitas data ke Amazon S3. Anda dapat menggunakan sintaks di bawah ini untuk membuat tabel dengan mereferensikan target untuk membaca hasil kualitas data.

Perhatikan bahwa Anda harus menjalankan MSCK REPAIR TABLE kueri CREATE EXTERNAL TABLE dan secara terpisah.

CREATE EXTERNAL TABLE <my_table_name>( catalogid string, databasename string, tablename string, dqrunid string, evaluationstartedon timestamp, evaluationcompletedon timestamp, rule string, outcome string, failurereason string, evaluatedmetrics string) PARTITIONED BY ( `year` string, `month` string, `day` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://glue-s3-dq-bucket-us-east-2-results/' TBLPROPERTIES ( 'classification'='json', 'compressionType'='none', 'typeOfData'='file');
MSCK REPAIR TABLE <my_table_name>;

Setelah Anda membuat tabel di atas, Anda dapat menjalankan kueri analitik menggunakan Amazon Athena.

Menerapkan aturan kualitas data menggunakan AWS CloudFormation

Anda dapat menggunakan AWS CloudFormation untuk membuat aturan kualitas data. Untuk informasi lebih lanjut, lihat AWS CloudFormation untuk AWS Glue.

Menjadwalkan aturan kualitas data

Anda dapat menjadwalkan aturan kualitas data menggunakan metode berikut:

  • Jadwalkan aturan kualitas data dari Katalog Data: tidak ada pengguna kode yang dapat menggunakan opsi ini untuk dengan mudah menjadwalkan pemindaian kualitas data mereka. AWS Glue Data Quality akan membuat jadwal di Amazon EventBridge. Untuk menjadwalkan aturan kualitas data:

    • Arahkan ke ruleset dan klik Run.

    • Dalam frekuensi Jalankan, pilih jadwal yang diinginkan dan berikan Nama Tugas. Nama Tugas ini adalah nama jadwal Anda di EventBridge.

  • Gunakan Amazon EventBridge dan AWS Step Functions untuk mengatur evaluasi dan rekomendasi untuk aturan kualitas data.