Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Questo argomento descrive come utilizzare per APIs misurare e gestire la qualità dei dati.
Indice
Prerequisiti
Assicurati che la tua versione di boto3 sia aggiornata in modo che includa la versione più recente di AWS Glue Data Quality. API
Assicurati che la tua AWS CLI versione sia aggiornata, in modo da includere la versione più recente. CLI
Se stai usando un lavoro AWS Glue per eseguirliAPIs, puoi utilizzare la seguente opzione per aggiornare la libreria boto3 alla versione più recente:
—additional-python-modules boto3==<version>
Utilizzo dei consigli di AWS Glue Data Quality
Per iniziare una raccomandazione di AWS Glue Data Quality, esegui:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn):
"""
Starts a recommendation run that is used to generate rules when you don't know what rules to write. AWS Glue Data Quality
analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset
and modify the generated ruleset to your liking.
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want a recommendation
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
"""
try:
response = self.client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_arn
)
except ClientError as err:
logger.error(
"Couldn't start data quality recommendation run %s. Here's why: %s: %s", name,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
Per l'esecuzione di un suggerimento, puoi utilizzare pushDownPredicates
o per catalogPartitionPredicates
migliorare le prestazioni ed eseguire i suggerimenti solo su partizioni specifiche delle origini del catalogo.
client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name,
'AdditionalOptions': {
'pushDownPredicate': "year=2022"
}
}
},
Role=role_arn,
NumberOfWorkers=2,
CreatedRulesetName='<rule_set_name>'
)
Per ottenere i risultati di una raccomandazione di AWS Glue Data Quality, esegui:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_rule_recommendation_run(self, run_id):
"""
Gets the specified recommendation run that was used to generate rules.
:param run_id: The id of the data quality recommendation run
"""
try:
response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id)
except ClientError as err:
logger.error(
"Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Dall'oggetto di risposta precedente, è possibile estrarre RuleSet quanto consigliato dall'esecuzione, da utilizzare nei passaggi successivi:
print(response['RecommendedRuleset'])
Rules = [
RowCount between 2000 and 8000,
IsComplete "col1",
IsComplete "col2",
StandardDeviation "col3" between 58138330.8 and 64258155.09,
ColumnValues "col4" between 1000042965 and 1214474826,
IsComplete "col5"
]
Per ottenere un elenco di tutte le esecuzioni suggerite che è possibile filtrare ed elencare:
response = client.list_data_quality_rule_recommendation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>'
}
}
)
Per annullare le attività di raccomandazione esistenti di AWS Glue Data Quality:
response = client.cancel_data_quality_rule_recommendation_run(
RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
Utilizzo dei set di regole AWS Glue Data Quality
Per creare un set di regole AWS Glue Data Quality:
response = client.create_data_quality_ruleset(
Name='<ruleset_name>',
Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]',
TargetTable={
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
)
Per ottenere un set di regole di qualità dei dati:
response = client.get_data_quality_ruleset(
Name='<ruleset_name>'
)
print(response)
Puoi usarlo API per poi estrarre il set di regole:
print(response['Ruleset'])
Per elencare tutti i set di regole sulla qualità dei dati per una tabella:
response = client.list_data_quality_rulesets()
Puoi utilizzare la condizione di filtro all'interno di API per filtrare tutti i set di regole collegati a un database o a una tabella specifici:
response = client.list_data_quality_rulesets(
Filter={
'TargetTable': {
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
},
)
Per aggiornare un set di regole di qualità dei dati:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def update_data_quality_ruleset(self, ruleset_name, ruleset_string):
"""
Update an AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to update
:param ruleset_string: The DQDL ruleset string to update the ruleset with
"""
try:
response = self.client.update_data_quality_ruleset(
Name=ruleset_name,
Ruleset=ruleset_string
)
except ClientError as err:
logger.error(
"Couldn't update the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Per eliminare un set di regole di qualità dei dati:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def delete_data_quality_ruleset(self, ruleset_name):
"""
Delete a AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to delete
"""
try:
response = self.client.delete_data_quality_ruleset(
Name=ruleset_name
)
except ClientError as err:
logger.error(
"Couldn't delete the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
L'utilizzo di AWS Glue Data Quality funziona
Per avviare un'esecuzione di AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list):
"""
Start an AWS Glue Data Quality evaluation run
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want to evaluate.
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
:param ruleset_list: The list of AWS Glue Data Quality ruleset names to evaluate.
"""
try:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_name,
RulesetNames=ruleset_list
)
except ClientError as err:
logger.error(
"Couldn't start the AWS Glue Data Quality Run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
Ricorda che puoi trasmettere un parametro pushDownPredicate
o catalogPartitionPredicate
per garantire che l'esecuzione della qualità dei dati riguardi solo un set specifico di partizioni all'interno della tabella del catalogo. Per esempio:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CloudWatchMetricsEnabled': False
},
RulesetNames=[
'<ruleset_name>',
]
)
Puoi anche configurare il modo in cui vengono valutate le regole composite del tuo set di regole, a livello OR. ROW COLUMN Per ulteriori informazioni sul funzionamento delle regole composite, consulta la sezione Come funzionano le regole composite nella documentazione.
Esempio su come impostare il metodo di valutazione delle regole composite nella richiesta:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CompositeRuleEvaluationMethod':ROW
},
RulesetNames=[
'<ruleset_name>',
]
)
Per ottenere informazioni su un programma AWS Glue Data Quality esegui:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_ruleset_evaluation_run(self, run_id):
"""
Get details about an AWS Glue Data Quality Run
:param run_id: The AWS Glue Data Quality run ID to look up
"""
try:
response = self.client.get_data_quality_ruleset_evaluation_run(
RunId=run_id
)
except ClientError as err:
logger.error(
"Couldn't look up the AWS Glue Data Quality run ID. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Per ottenere i risultati di un AWS Glue Data Quality esegui:
Per una determinata esecuzione di AWS Glue Data Quality, puoi estrarre i risultati della valutazione dell'esecuzione utilizzando il seguente metodo:
response = client.get_data_quality_ruleset_evaluation_run(
RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(response['RuleResults'])
Per elencare tutte le tue esecuzioni di AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name):
"""
Lists all the AWS Glue Data Quality runs against a given table
:param database_name: The name of the database where the data quality runs
:param table_name: The name of the table against which the data quality runs were created
"""
try:
response = self.client.list_data_quality_ruleset_evaluation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
}
}
)
except ClientError as err:
logger.error(
"Couldn't list the AWS Glue Quality runs. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
È possibile modificare la clausola di filtro in modo che mostri solo i risultati compresi entro orari specifici o in base a tabelle specifiche.
Per interrompere una corsa in corso di AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def cancel_data_quality_ruleset_evaluation_run(self, result_id):
"""
Cancels a given AWS Glue Data Quality run
:param result_id: The result id of a AWS Glue Data Quality run to cancel
"""
try:
response = self.client.cancel_data_quality_ruleset_evaluation_run(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't cancel the AWS Glue Data Quality run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Utilizzo dei risultati di AWS Glue Data Quality
Per ottenere i risultati della tua corsa AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_result(self, result_id):
"""
Outputs the result of an AWS Glue Data Quality Result
:param result_id: The result id of an AWS Glue Data Quality run
"""
try:
response = self.client.get_data_quality_result(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't get the AWS Glue Data Quality result. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Per visualizzare le statistiche raccolte per un determinato risultato di qualità dei dati:
import boto3
from botocore.exceptions import ClientError
import logging
logger = logging.getLogger(__name__)
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_profile_for_data_quality_result(self, result_id):
"""
Outputs the statistic profile for a AWS Glue Data Quality Result
:param result_id: The result id of a AWS Glue Data Quality run
"""
try:
response = self.glue_client.get_data_quality_result(
ResultId=result_id
)
# the profile contains all statistics gathered for the result
profile_id = response['ProfileId']
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
return profile
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality profile. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Per visualizzare le serie temporali di una statistica raccolta su più cicli di valutazione della qualità dei dati:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_statistics_for_data_quality_result(self, profile_id):
"""
Outputs an array of datapoints for each statistic in the input result.
:param result_id: The profile id of a AWS Glue Data Quality run
"""
try:
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
statistics = [self.glue_client.list_data_quality_statistics(
StatisticId = s['StatisticId']
) for s in profile['Statistics']]
return statistics
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Per visualizzare il modello di rilevamento delle anomalie per una statistica specifica:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_result_for_statistic(self, statistic_id, profile_id):
"""
Outputs the details (bounds) of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model_result(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality model results. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Per escludere un datapoint dalla linea di base di rilevamento delle anomalie del relativo modello statistico:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_statistic(self, statistic_id, profile_ids):
"""
Annotate some points along a given statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param statistic_id the statistic timeseries to annotate
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.batch_put_data_quality_statistic_annotation(
InclusionAnnotations = [
{'ProfileId': prof_id,
'StatisticId': statistic_id,
'InclusionAnnotation': 'EXCLUDE'} for prof_id in profile_ids
]
)
return response['FailedInclusionAnnotations']
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Per visualizzare lo stato dell'addestramento del modello di rilevamento delle anomalie per una statistica specifica:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_status_for_statistic(self, statistic_id, profile_id):
"""
Outputs the status of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Per escludere tutti i risultati da una specifica esecuzione sulla qualità dei dati dalle linee di base per il rilevamento delle anomalie:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_profile(self, profile_id):
"""
Exclude datapoints produced by a run across statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.put_data_quality_profile_annotation(
ProfileId = profile_id,
InclusionAnnotation = "EXCLUDE"
)
return response
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Per ottenere i risultati di un determinato ciclo di qualità dei dati e visualizzare i risultati:
Con un AWS Glue Data QualityrunID
, puoi estrarre i dati resultID
per poi ottenere i risultati effettivi, come mostrato di seguito:
response = client.get_data_quality_ruleset_evaluation_run(
RunId='dqrun-abca77ee126abe1378c1da1ae0750d7dxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(resp['RuleResults'])