Este tópico descreve como usar as APIs para medir e gerenciar a qualidade dos dados.
Sumário
Pré-requisitos
Certifique-se de que sua versão do boto3 esteja atualizada para incluir a API do AWS Glue Data Quality mais recente.
Certifique-se de que sua versão da AWS CLI esteja atualizada, para incluir a CLI mais recente.
Se você estiver usando um trabalho do AWS Glue para executar essas APIs, poderá usar a opção a seguir para atualizar a biblioteca do boto3 para a versão mais recente:
—additional-python-modules boto3==<version>
Trabalhar com as recomendações do AWS Glue Data Quality
Para iniciar uma recomendação do AWS Glue Data Quality, execute:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn):
"""
Starts a recommendation run that is used to generate rules when you don't know what rules to write. AWS Glue Data Quality
analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset
and modify the generated ruleset to your liking.
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want a recommendation
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
"""
try:
response = self.client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_arn
)
except ClientError as err:
logger.error(
"Couldn't start data quality recommendation run %s. Here's why: %s: %s", name,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
Para uma execução de recomendação, você pode usar pushDownPredicates
ou catalogPartitionPredicates
para melhorar a performance e executar recomendações somente em partições específicas das fontes do catálogo.
client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name,
'AdditionalOptions': {
'pushDownPredicate': "year=2022"
}
}
},
Role=role_arn,
NumberOfWorkers=2,
CreatedRulesetName='<rule_set_name>'
)
Para obter os resultados de uma recomendação do AWS Glue Data Quality, execute:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_rule_recommendation_run(self, run_id):
"""
Gets the specified recommendation run that was used to generate rules.
:param run_id: The id of the data quality recommendation run
"""
try:
response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id)
except ClientError as err:
logger.error(
"Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Do objeto de resposta acima, você pode extrair o conjunto de regras recomendado pela execução, para usar em outras etapas:
print(response['RecommendedRuleset'])
Rules = [
RowCount between 2000 and 8000,
IsComplete "col1",
IsComplete "col2",
StandardDeviation "col3" between 58138330.8 and 64258155.09,
ColumnValues "col4" between 1000042965 and 1214474826,
IsComplete "col5"
]
Para obter uma lista de todas as suas execuções de recomendação que podem ser filtradas e listadas:
response = client.list_data_quality_rule_recommendation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>'
}
}
)
Para cancelar as tarefas existentes de recomendação do AWS Glue Data Quality:
response = client.cancel_data_quality_rule_recommendation_run(
RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
Trabalhar com conjuntos de regras do AWS Glue Data Quality
Para criar um conjunto de regras do AWS Glue Data Quality:
response = client.create_data_quality_ruleset(
Name='<ruleset_name>',
Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]',
TargetTable={
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
)
Para criar um conjunto de regras de qualidade de dados:
response = client.get_data_quality_ruleset(
Name='<ruleset_name>'
)
print(response)
Você pode usar essa API para extrair o conjunto de regras:
print(response['Ruleset'])
Para listar todos os conjuntos de regras de qualidade de dados para uma tabela:
response = client.list_data_quality_rulesets()
Você pode usar a condição de filtro na API para filtrar todos os conjuntos de regras anexados a um banco de dados ou a uma tabela específica:
response = client.list_data_quality_rulesets(
Filter={
'TargetTable': {
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
},
)
Para atualizar um conjunto de regras de qualidade de dados:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def update_data_quality_ruleset(self, ruleset_name, ruleset_string):
"""
Update an AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to update
:param ruleset_string: The DQDL ruleset string to update the ruleset with
"""
try:
response = self.client.update_data_quality_ruleset(
Name=ruleset_name,
Ruleset=ruleset_string
)
except ClientError as err:
logger.error(
"Couldn't update the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Para excluir um conjunto de regras de qualidade de dados:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def delete_data_quality_ruleset(self, ruleset_name):
"""
Delete a AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to delete
"""
try:
response = self.client.delete_data_quality_ruleset(
Name=ruleset_name
)
except ClientError as err:
logger.error(
"Couldn't delete the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Trabalhar com execuções do AWS Glue Data Quality
Para iniciar uma execução do AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list):
"""
Start an AWS Glue Data Quality evaluation run
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want to evaluate.
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
:param ruleset_list: The list of AWS Glue Data Quality ruleset names to evaluate.
"""
try:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_name,
RulesetNames=ruleset_list
)
except ClientError as err:
logger.error(
"Couldn't start the AWS Glue Data Quality Run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
Lembre-se de que você pode passar um parâmetro pushDownPredicate
ou catalogPartitionPredicate
para garantir que sua execução de qualidade de dados só tenha como objetivo um conjunto específico de partições na tabela do catálogo. Por exemplo:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CloudWatchMetricsEnabled': False
},
RulesetNames=[
'<ruleset_name>',
]
)
Você também pode configurar como as regras compostas em seu conjunto de regras são avaliadas, no nível ROW ou COLUMN. Para obter mais informações sobre o funcionamento das regras compostas, consulte How composite rules work na documentação.
Exemplo de como definir o método de avaliação de regra composta em sua solicitação:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CompositeRuleEvaluationMethod':ROW
},
RulesetNames=[
'<ruleset_name>',
]
)
Para obter informações sobre uma execução do AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_ruleset_evaluation_run(self, run_id):
"""
Get details about an AWS Glue Data Quality Run
:param run_id: The AWS Glue Data Quality run ID to look up
"""
try:
response = self.client.get_data_quality_ruleset_evaluation_run(
RunId=run_id
)
except ClientError as err:
logger.error(
"Couldn't look up the AWS Glue Data Quality run ID. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Para obter os resultados de uma execução do AWS Glue Data Quality:
Para uma determinada execução do AWS Glue Data Quality, você pode extrair os resultados da avaliação da execução usando o seguinte método:
response = client.get_data_quality_ruleset_evaluation_run(
RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(response['RuleResults'])
Para listar todas as suas execuções do AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name):
"""
Lists all the AWS Glue Data Quality runs against a given table
:param database_name: The name of the database where the data quality runs
:param table_name: The name of the table against which the data quality runs were created
"""
try:
response = self.client.list_data_quality_ruleset_evaluation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
}
}
)
except ClientError as err:
logger.error(
"Couldn't list the AWS Glue Quality runs. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Você pode modificar a cláusula de filtro para só mostrar os resultados entre determinadas horas ou os resultados de execuções em tabelas específicas.
Para interromper uma execução contínua do AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def cancel_data_quality_ruleset_evaluation_run(self, result_id):
"""
Cancels a given AWS Glue Data Quality run
:param result_id: The result id of a AWS Glue Data Quality run to cancel
"""
try:
response = self.client.cancel_data_quality_ruleset_evaluation_run(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't cancel the AWS Glue Data Quality run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Trabalhar com resultados do AWS Glue Data Quality
Para obter os resultados de uma execução do AWS Glue Data Quality:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_result(self, result_id):
"""
Outputs the result of an AWS Glue Data Quality Result
:param result_id: The result id of an AWS Glue Data Quality run
"""
try:
response = self.client.get_data_quality_result(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't get the AWS Glue Data Quality result. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
Para ver as estatísticas coletadas para um determinado resultado de qualidade de dados:
import boto3
from botocore.exceptions import ClientError
import logging
logger = logging.getLogger(__name__)
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_profile_for_data_quality_result(self, result_id):
"""
Outputs the statistic profile for a AWS Glue Data Quality Result
:param result_id: The result id of a AWS Glue Data Quality run
"""
try:
response = self.glue_client.get_data_quality_result(
ResultId=result_id
)
# the profile contains all statistics gathered for the result
profile_id = response['ProfileId']
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
return profile
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality profile. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para visualizar a série temporal de uma estatística coletada em várias execuções de qualidade de dados:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_statistics_for_data_quality_result(self, profile_id):
"""
Outputs an array of datapoints for each statistic in the input result.
:param result_id: The profile id of a AWS Glue Data Quality run
"""
try:
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
statistics = [self.glue_client.list_data_quality_statistics(
StatisticId = s['StatisticId']
) for s in profile['Statistics']]
return statistics
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para visualizar o modelo de detecção de anomalias para uma estatística específica:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_result_for_statistic(self, statistic_id, profile_id):
"""
Outputs the details (bounds) of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model_result(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality model results. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para excluir um ponto de dados da linha de base de detecção de anomalias de seu modelo estatístico:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_statistic(self, statistic_id, profile_ids):
"""
Annotate some points along a given statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param statistic_id the statistic timeseries to annotate
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.batch_put_data_quality_statistic_annotation(
InclusionAnnotations = [
{'ProfileId': prof_id,
'StatisticId': statistic_id,
'InclusionAnnotation': 'EXCLUDE'} for prof_id in profile_ids
]
)
return response['FailedInclusionAnnotations']
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para visualizar o status do treinamento do modelo de detecção de anomalias para uma estatística específica:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_status_for_statistic(self, statistic_id, profile_id):
"""
Outputs the status of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para excluir todos os resultados de uma rodada específica de qualidade de dados das linhas de base de detecção de anomalias:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_profile(self, profile_id):
"""
Exclude datapoints produced by a run across statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.put_data_quality_profile_annotation(
ProfileId = profile_id,
InclusionAnnotation = "EXCLUDE"
)
return response
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
Para obter os resultados de uma determinada rodada qualidade de dados e exibir os resultados:
Com um runID
do AWS Glue Data Quality, você pode extrair o resultID
para obter os resultados reais, conforme apresentado abaixo:
response = client.get_data_quality_ruleset_evaluation_run(
RunId='dqrun-abca77ee126abe1378c1da1ae0750d7dxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(resp['RuleResults'])