En este tema, se describe cómo utilizar las API para medir y gestionar la calidad de los datos.
Contenido
Requisitos previos
Asegúrese de que su versión de boto3 esté actualizada para que incluya la última API de Calidad de datos de AWS Glue.
Asegúrese de que la versión de AWS CLI esté actualizada para incluir la CLI más reciente.
Si utiliza un trabajo de AWS Glue para ejecutar estas API, puede usar la siguiente opción para actualizar la biblioteca boto3 a la última versión:
—additional-python-modules boto3==<version>
Cómo trabajar con las recomendaciones de Calidad de datos de AWS Glue
Para iniciar una recomendación de Calidad de datos de AWS Glue, ejecute lo siguiente:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn):
"""
Starts a recommendation run that is used to generate rules when you don't know what rules to write. AWS Glue Data Quality
analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset
and modify the generated ruleset to your liking.
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want a recommendation
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
"""
try:
response = self.client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_arn
)
except ClientError as err:
logger.error(
"Couldn't start data quality recommendation run %s. Here's why: %s: %s", name,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
Para una ejecución de recomendaciones, puede utilizar sus recomendaciones pushDownPredicates
o catalogPartitionPredicates
para mejorar el rendimiento y ejecutar las recomendaciones solo en particiones específicas de los orígenes de su catálogo.
client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name,
'AdditionalOptions': {
'pushDownPredicate': "year=2022"
}
}
},
Role=role_arn,
NumberOfWorkers=2,
CreatedRulesetName='<rule_set_name>'
)
Para obtener los resultados de una recomendación de Calidad de datos de AWS Glue, ejecute:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_rule_recommendation_run(self, run_id):
"""
Gets the specified recommendation run that was used to generate rules.
:param run_id: The id of the data quality recommendation run
"""
try:
response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id)
except ClientError as err:
logger.error(
"Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Del objeto de respuesta anterior, puede extraer el conjunto de reglas recomendado por la ejecución para usarlo en los siguientes pasos:
print(response['RecommendedRuleset'])
Rules = [
RowCount between 2000 and 8000,
IsComplete "col1",
IsComplete "col2",
StandardDeviation "col3" between 58138330.8 and 64258155.09,
ColumnValues "col4" between 1000042965 and 1214474826,
IsComplete "col5"
]
Para obtener una lista de todas las ejecuciones de recomendación que se puedan filtrar y enumerar:
response = client.list_data_quality_rule_recommendation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>'
}
}
)
Para cancelar las tareas de recomendación de Calidad de datos de AWS Glue existentes:
response = client.cancel_data_quality_rule_recommendation_run(
RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
Trabajar con los conjuntos de reglas de Calidad de datos de AWS Glue
Para crear un conjunto de reglas de Calidad de datos de AWS Glue:
response = client.create_data_quality_ruleset(
Name='<ruleset_name>',
Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]',
TargetTable={
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
)
Para obtener un conjunto de reglas de calidad de datos:
response = client.get_data_quality_ruleset(
Name='<ruleset_name>'
)
print(response)
Puede utilizar esta API para, a continuación, extraer el conjunto de reglas:
print(response['Ruleset'])
Para enumerar todos los conjuntos de reglas de calidad de datos de una tabla:
response = client.list_data_quality_rulesets()
Puedes usar la condición de filtro de la API para filtrar todos los conjuntos de reglas adjuntos a una base de datos o tabla específica:
response = client.list_data_quality_rulesets(
Filter={
'TargetTable': {
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
},
)
Para actualizar un conjunto de reglas de calidad de datos:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def update_data_quality_ruleset(self, ruleset_name, ruleset_string):
"""
Update an AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to update
:param ruleset_string: The DQDL ruleset string to update the ruleset with
"""
try:
response = self.client.update_data_quality_ruleset(
Name=ruleset_name,
Ruleset=ruleset_string
)
except ClientError as err:
logger.error(
"Couldn't update the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Para eliminar un conjunto de reglas de la calidad de los datos:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def delete_data_quality_ruleset(self, ruleset_name):
"""
Delete a AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to delete
"""
try:
response = self.client.delete_data_quality_ruleset(
Name=ruleset_name
)
except ClientError as err:
logger.error(
"Couldn't delete the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Cómo trabajar con ejecuciones de Calidad de datos de AWS Glue
Para iniciar una ejecución de Calidad de datos de AWS Glue:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list):
"""
Start an AWS Glue Data Quality evaluation run
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want to evaluate.
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
:param ruleset_list: The list of AWS Glue Data Quality ruleset names to evaluate.
"""
try:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_name,
RulesetNames=ruleset_list
)
except ClientError as err:
logger.error(
"Couldn't start the AWS Glue Data Quality Run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
Recuerde que puede pasar un parámetro pushDownPredicate
o catalogPartitionPredicate
para garantizar que la ejecución de calidad de los datos solo se dirija a un conjunto específico de particiones de la tabla de catálogo. Por ejemplo:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CloudWatchMetricsEnabled': False
},
RulesetNames=[
'<ruleset_name>',
]
)
También puede configurar cómo se evalúan las reglas compuestas de su conjunto de reglas, ya sea en el nivel de la FILA o de la COLUMNA. Para obtener más información sobre cómo funcionan las reglas compuestas, consulte Cómo funcionan las reglas compuestas en la documentación.
Ejemplo de cómo configurar el método de evaluación de reglas compuestas en su solicitud:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CompositeRuleEvaluationMethod':ROW
},
RulesetNames=[
'<ruleset_name>',
]
)
Para obtener información sobre una ejecución de Calidad de datos de AWS Glue:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_ruleset_evaluation_run(self, run_id):
"""
Get details about an AWS Glue Data Quality Run
:param run_id: The AWS Glue Data Quality run ID to look up
"""
try:
response = self.client.get_data_quality_ruleset_evaluation_run(
RunId=run_id
)
except ClientError as err:
logger.error(
"Couldn't look up the AWS Glue Data Quality run ID. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Para obtener los resultados de una ejecución de Calidad de datos de AWS Glue:
Para una ejecución de Calidad de datos de AWS Glue determinada, puede extraer los resultados de la evaluación de la ejecución mediante el siguiente método:
response = client.get_data_quality_ruleset_evaluation_run(
RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(response['RuleResults'])
Para ver una lista de todas sus ejecuciones de Calidad de datos de AWS Glue:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name):
"""
Lists all the AWS Glue Data Quality runs against a given table
:param database_name: The name of the database where the data quality runs
:param table_name: The name of the table against which the data quality runs were created
"""
try:
response = self.client.list_data_quality_ruleset_evaluation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
}
}
)
except ClientError as err:
logger.error(
"Couldn't list the AWS Glue Quality runs. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Puede modificar la cláusula de filtro para que solo muestre los resultados entre tiempos específicos o si se ejecutan en tablas específicas.
Para detener una ejecución continua de Calidad de datos de AWS Glue:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def cancel_data_quality_ruleset_evaluation_run(self, result_id):
"""
Cancels a given AWS Glue Data Quality run
:param result_id: The result id of a AWS Glue Data Quality run to cancel
"""
try:
response = self.client.cancel_data_quality_ruleset_evaluation_run(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't cancel the AWS Glue Data Quality run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Cómo trabajar con los resultados de Calidad de datos de AWS Glue
Para obtener los resultados de ejecución de Calidad de datos de AWS Glue:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_result(self, result_id):
"""
Outputs the result of an AWS Glue Data Quality Result
:param result_id: The result id of an AWS Glue Data Quality run
"""
try:
response = self.client.get_data_quality_result(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't get the AWS Glue Data Quality result. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Para ver las estadísticas recopiladas para un resultado de calidad de datos determinado:
import boto3
from botocore.exceptions import ClientError
import logging
logger = logging.getLogger(__name__)
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_profile_for_data_quality_result(self, result_id):
"""
Outputs the statistic profile for a AWS Glue Data Quality Result
:param result_id: The result id of a AWS Glue Data Quality run
"""
try:
response = self.glue_client.get_data_quality_result(
ResultId=result_id
)
# the profile contains all statistics gathered for the result
profile_id = response['ProfileId']
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
return profile
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality profile. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para ver las series temporales de una estadística recopilada en varias ejecuciones de calidad de datos:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_statistics_for_data_quality_result(self, profile_id):
"""
Outputs an array of datapoints for each statistic in the input result.
:param result_id: The profile id of a AWS Glue Data Quality run
"""
try:
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
statistics = [self.glue_client.list_data_quality_statistics(
StatisticId = s['StatisticId']
) for s in profile['Statistics']]
return statistics
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para ver el modelo de detección de anomalías de una estadística específica:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_result_for_statistic(self, statistic_id, profile_id):
"""
Outputs the details (bounds) of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model_result(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality model results. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para excluir un punto de datos de la referencia de la detección de anomalías de su modelo estadístico:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_statistic(self, statistic_id, profile_ids):
"""
Annotate some points along a given statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param statistic_id the statistic timeseries to annotate
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.batch_put_data_quality_statistic_annotation(
InclusionAnnotations = [
{'ProfileId': prof_id,
'StatisticId': statistic_id,
'InclusionAnnotation': 'EXCLUDE'} for prof_id in profile_ids
]
)
return response['FailedInclusionAnnotations']
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para ver el estado del entrenamiento del modelo de detección de anomalías de una estadística específica:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_status_for_statistic(self, statistic_id, profile_id):
"""
Outputs the status of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para excluir todos los resultados de una ejecución de calidad de datos específica a partir de las referencias de la detección de anomalías:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_profile(self, profile_id):
"""
Exclude datapoints produced by a run across statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.put_data_quality_profile_annotation(
ProfileId = profile_id,
InclusionAnnotation = "EXCLUDE"
)
return response
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para obtener los resultados de una ejecución de calidad de datos determinada y mostrarlos:
Con un runID
de Calidad de datos de AWS Glue, puede extraer el resultID
para obtener los resultados reales, tal y como se muestra a continuación:
response = client.get_data_quality_ruleset_evaluation_run(
RunId='dqrun-abca77ee126abe1378c1da1ae0750d7dxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(resp['RuleResults'])