选择您的 Cookie 首选项

我们使用必要 Cookie 和类似工具提供我们的网站和服务。我们使用性能 Cookie 收集匿名统计数据,以便我们可以了解客户如何使用我们的网站并进行改进。必要 Cookie 无法停用,但您可以单击“自定义”或“拒绝”来拒绝性能 Cookie。

如果您同意,AWS 和经批准的第三方还将使用 Cookie 提供有用的网站功能、记住您的首选项并显示相关内容,包括相关广告。要接受或拒绝所有非必要 Cookie,请单击“接受”或“拒绝”。要做出更详细的选择,请单击“自定义”。

使用 API 来衡量和管理数据质量

聚焦模式
使用 API 来衡量和管理数据质量 - AWS Glue

本主题介绍如何使用 API 来衡量和管理数据质量。

先决条件

  • 确保您的 boto3 版本是最新版本,以便它包含最新的 AWS Glue Data Quality API。

  • 确保您的 AWS CLI 版本是最新版本,以便包含最新的 CLI。

如果您使用 AWS Glue 作业来运行这些 API,则可以使用以下选项将 boto3 库更新到最新版本:

—additional-python-modules boto3==<version>

使用 AWS Glue Data Quality 建议

要启动 AWS Glue Data Quality 建议,请运行:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn): """ Starts a recommendation run that is used to generate rules when you don't know what rules to write. AWS Glue Data Quality analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset and modify the generated ruleset to your liking. :param database_name: The name of the AWS Glue database which contains the dataset. :param table_name: The name of the AWS Glue table against which we want a recommendation :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. """ try: response = self.client.start_data_quality_rule_recommendation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } }, Role=role_arn ) except ClientError as err: logger.error( "Couldn't start data quality recommendation run %s. Here's why: %s: %s", name, err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response['RunId']

对于建议运行,您可以使用 pushDownPredicatescatalogPartitionPredicates 来提高性能,并且只能在目录源的特定分区上运行建议。

client.start_data_quality_rule_recommendation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name, 'AdditionalOptions': { 'pushDownPredicate': "year=2022" } } }, Role=role_arn, NumberOfWorkers=2, CreatedRulesetName='<rule_set_name>' )

要获取 AWS Glue Data Quality 建议的结果,请运行:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_data_quality_rule_recommendation_run(self, run_id): """ Gets the specified recommendation run that was used to generate rules. :param run_id: The id of the data quality recommendation run """ try: response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id) except ClientError as err: logger.error( "Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id, err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

从上面的响应对象中,您可以提取运行推荐的规则集,以便在后续步骤中使用:

print(response['RecommendedRuleset']) Rules = [ RowCount between 2000 and 8000, IsComplete "col1", IsComplete "col2", StandardDeviation "col3" between 58138330.8 and 64258155.09, ColumnValues "col4" between 1000042965 and 1214474826, IsComplete "col5" ]

要获取可以筛选和列出的所有建议的列表,请执行以下操作:

response = client.list_data_quality_rule_recommendation_runs( Filter={ 'DataSource': { 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>' } } )

要取消现有 AWS Glue Data Quality 建议任务,请执行以下操作:

response = client.cancel_data_quality_rule_recommendation_run( RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx' )

使用 AWS Glue Data Quality 规则集

要创建 AWS Glue Data Quality 规则集,请执行以下操作:

response = client.create_data_quality_ruleset( Name='<ruleset_name>', Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]', TargetTable={ 'TableName': '<table_name>', 'DatabaseName': '<database_name>' } )

要创建数据质量规则集,请执行以下操作:

response = client.get_data_quality_ruleset( Name='<ruleset_name>' ) print(response)

然后,您可以使用此 API 提取规则集:

print(response['Ruleset'])

要列出表的所有数据质量规则集,请执行以下操作:

response = client.list_data_quality_rulesets()

您可以使用 API 中的筛选条件来筛选附加到特定数据库或表的所有规则集:

response = client.list_data_quality_rulesets( Filter={ 'TargetTable': { 'TableName': '<table_name>', 'DatabaseName': '<database_name>' } }, )

要更新数据质量规则集,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def update_data_quality_ruleset(self, ruleset_name, ruleset_string): """ Update an AWS Glue Data Quality Ruleset :param ruleset_name: The name of the AWS Glue Data Quality ruleset to update :param ruleset_string: The DQDL ruleset string to update the ruleset with """ try: response = self.client.update_data_quality_ruleset( Name=ruleset_name, Ruleset=ruleset_string ) except ClientError as err: logger.error( "Couldn't update the AWS Glue Data Quality ruleset. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

要删除数据质量规则集,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def delete_data_quality_ruleset(self, ruleset_name): """ Delete a AWS Glue Data Quality Ruleset :param ruleset_name: The name of the AWS Glue Data Quality ruleset to delete """ try: response = self.client.delete_data_quality_ruleset( Name=ruleset_name ) except ClientError as err: logger.error( "Couldn't delete the AWS Glue Data Quality ruleset. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

使用 AWS Glue Data Quality 运行

要启动 AWS Glue Data Quality 运行,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list): """ Start an AWS Glue Data Quality evaluation run :param database_name: The name of the AWS Glue database which contains the dataset. :param table_name: The name of the AWS Glue table against which we want to evaluate. :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. :param ruleset_list: The list of AWS Glue Data Quality ruleset names to evaluate. """ try: response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } }, Role=role_name, RulesetNames=ruleset_list ) except ClientError as err: logger.error( "Couldn't start the AWS Glue Data Quality Run. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response['RunId']

请记住,您可以传递 pushDownPredicatecatalogPartitionPredicate 参数,以确保您的数据质量运行仅针对目录表中的一组特定分区。例如:

response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>', 'AdditionalOptions': { 'pushDownPredicate': 'year=2023' } } }, Role='<role_name>', NumberOfWorkers=5, Timeout=123, AdditionalRunOptions={ 'CloudWatchMetricsEnabled': False }, RulesetNames=[ '<ruleset_name>', ] )

您还可以在“行”级或“列”级配置如何评估规则集中的复合规则。有关复合规则工作原理的更多信息,请参阅文档中的复合规则的工作原理

关于如何在请求中设置复合规则评估方法的示例:

response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>', 'AdditionalOptions': { 'pushDownPredicate': 'year=2023' } } }, Role='<role_name>', NumberOfWorkers=5, Timeout=123, AdditionalRunOptions={ 'CompositeRuleEvaluationMethod':ROW }, RulesetNames=[ '<ruleset_name>', ] )

要获取有关 AWS Glue Data Quality 运行的信息,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_data_quality_ruleset_evaluation_run(self, run_id): """ Get details about an AWS Glue Data Quality Run :param run_id: The AWS Glue Data Quality run ID to look up """ try: response = self.client.get_data_quality_ruleset_evaluation_run( RunId=run_id ) except ClientError as err: logger.error( "Couldn't look up the AWS Glue Data Quality run ID. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

要获取 AWS Glue Data Quality 运行的结果,请执行以下操作:

对于给定的 AWS Glue Data Quality 运行,您可以使用以下方法提取运行的评估结果:

response = client.get_data_quality_ruleset_evaluation_run( RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx' ) resultID = response['ResultIds'][0] response = client.get_data_quality_result( ResultId=resultID ) print(response['RuleResults'])

要列出所有的 AWS Glue Data Quality 运行,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name): """ Lists all the AWS Glue Data Quality runs against a given table :param database_name: The name of the database where the data quality runs :param table_name: The name of the table against which the data quality runs were created """ try: response = self.client.list_data_quality_ruleset_evaluation_runs( Filter={ 'DataSource': { 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } } } ) except ClientError as err: logger.error( "Couldn't list the AWS Glue Quality runs. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

您可以修改筛选器子句,使其仅显示特定时间之间的结果或针对特定表运行的结果。

要停止正在进行的 AWS Glue Data Quality 运行,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def cancel_data_quality_ruleset_evaluation_run(self, result_id): """ Cancels a given AWS Glue Data Quality run :param result_id: The result id of a AWS Glue Data Quality run to cancel """ try: response = self.client.cancel_data_quality_ruleset_evaluation_run( ResultId=result_id ) except ClientError as err: logger.error( "Couldn't cancel the AWS Glue Data Quality run. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

处理 AWS Glue Data Quality 结果

要获取 AWS Glue Data Quality 运行结果,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_data_quality_result(self, result_id): """ Outputs the result of an AWS Glue Data Quality Result :param result_id: The result id of an AWS Glue Data Quality run """ try: response = self.client.get_data_quality_result( ResultId=result_id ) except ClientError as err: logger.error( "Couldn't get the AWS Glue Data Quality result. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

要查看为给定数据质量结果收集的统计信息,请执行以下操作:

import boto3 from botocore.exceptions import ClientError import logging logger = logging.getLogger(__name__) class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_profile_for_data_quality_result(self, result_id): """ Outputs the statistic profile for a AWS Glue Data Quality Result :param result_id: The result id of a AWS Glue Data Quality run """ try: response = self.glue_client.get_data_quality_result( ResultId=result_id ) # the profile contains all statistics gathered for the result profile_id = response['ProfileId'] profile = self.glue_client.list_data_quality_statistics( ProfileId = profile_id ) return profile except ClientError as err: logger.error( "Couldn't retrieve Data Quality profile. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

要查看在多次数据质量运行中收集的统计信息的时间序列,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_statistics_for_data_quality_result(self, profile_id): """ Outputs an array of datapoints for each statistic in the input result. :param result_id: The profile id of a AWS Glue Data Quality run """ try: profile = self.glue_client.list_data_quality_statistics( ProfileId = profile_id ) statistics = [self.glue_client.list_data_quality_statistics( StatisticId = s['StatisticId'] ) for s in profile['Statistics']] return statistics except ClientError as err: logger.error( "Couldn't retrieve Data Quality statistics. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

要查看特定统计信息的异常检测模型,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_model_training_result_for_statistic(self, statistic_id, profile_id): """ Outputs the details (bounds) of anomaly detection training for the given statistic at the given profile. :param statistic_id the model's statistic (the timeseries it is tracking) :param profile_id the profile associated with the model (a point in the timeseries) """ try: model = self.glue_client.get_data_quality_model_result( ProfileId = profile_id, StatisticId = statistic_id ) return model except ClientError as err: logger.error( "Couldn't retrieve Data Quality model results. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

要将某个数据点从其统计模型的异常检测基线中排除,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def apply_exclusions_to_statistic(self, statistic_id, profile_ids): """ Annotate some points along a given statistic timeseries. This example excludes the provided values; INCLUDE can also be used to undo this action. :param statistic_id the statistic timeseries to annotate :param profile_id the profiles we want to exclude (points in the timeseries) """ try: response = self.glue_client.batch_put_data_quality_statistic_annotation( InclusionAnnotations = [ {'ProfileId': prof_id, 'StatisticId': statistic_id, 'InclusionAnnotation': 'EXCLUDE'} for prof_id in profile_ids ] ) return response['FailedInclusionAnnotations'] except ClientError as err: logger.error( "Couldn't store Data Quality annotations. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

要查看特定统计信息的异常检测模型训练状态,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_model_training_status_for_statistic(self, statistic_id, profile_id): """ Outputs the status of anomaly detection training for the given statistic at the given profile. :param statistic_id the model's statistic (the timeseries it is tracking) :param profile_id the profile associated with the model (a point in the timeseries) """ try: model = self.glue_client.get_data_quality_model( ProfileId = profile_id, StatisticId = statistic_id ) return model except ClientError as err: logger.error( "Couldn't retrieve Data Quality statistics. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

要将特定数据质量运行的所有结果从异常检测基线中排除,请执行以下操作:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def apply_exclusions_to_profile(self, profile_id): """ Exclude datapoints produced by a run across statistic timeseries. This example excludes the provided values; INCLUDE can also be used to undo this action. :param profile_id the profiles we want to exclude (points in the timeseries) """ try: response = self.glue_client.put_data_quality_profile_annotation( ProfileId = profile_id, InclusionAnnotation = "EXCLUDE" ) return response except ClientError as err: logger.error( "Couldn't store Data Quality annotations. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

要获取给定数据质量运行的结果并显示这些结果,请执行以下操作:

有了 AWS Glue 数据质量自动监测功能 runID,您就可以提取 resultID,然后得到实际结果,如下所示:

response = client.get_data_quality_ruleset_evaluation_run( RunId='dqrun-abca77ee126abe1378c1da1ae0750d7dxxxx' ) resultID = response['ResultIds'][0] response = client.get_data_quality_result( ResultId=resultID ) print(resp['RuleResults'])
隐私网站条款Cookie 首选项
© 2025, Amazon Web Services, Inc. 或其附属公司。保留所有权利。