本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
本主題說明如何用APIs來測量和管理資料品質。
必要條件
確保您的 boto3 版本為最新版本,以便包含最新的 AWS Glue 數據質量。API
確保您的 AWS CLI版本是最新的,以便包含最新版本CLI。
如果您使用 AWS Glue 工作來執行這些工作APIs,您可以使用下列選項將 boto3 程式庫更新為最新版本:
—additional-python-modules boto3==<version>
使用 AWS Glue 資料品質建議
若要啟動「 AWS Glue 資料品質」建議執行:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn):
"""
Starts a recommendation run that is used to generate rules when you don't know what rules to write. AWS Glue Data Quality
analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset
and modify the generated ruleset to your liking.
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want a recommendation
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
"""
try:
response = self.client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_arn
)
except ClientError as err:
logger.error(
"Couldn't start data quality recommendation run %s. Here's why: %s: %s", name,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
對於建議執行,您可以使用 pushDownPredicates
或 catalogPartitionPredicates
來改善效能,並僅在型錄來源的特定分割區上執行建議。
client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name,
'AdditionalOptions': {
'pushDownPredicate': "year=2022"
}
}
},
Role=role_arn,
NumberOfWorkers=2,
CreatedRulesetName='<rule_set_name>'
)
若要取得 AWS Glue 資料品質建議執行的結果:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_rule_recommendation_run(self, run_id):
"""
Gets the specified recommendation run that was used to generate rules.
:param run_id: The id of the data quality recommendation run
"""
try:
response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id)
except ClientError as err:
logger.error(
"Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
從上面的響應對象中,您可以提取 RuleSet 運行建議的,以便在後續步驟中使用:
print(response['RecommendedRuleset'])
Rules = [
RowCount between 2000 and 8000,
IsComplete "col1",
IsComplete "col2",
StandardDeviation "col3" between 58138330.8 and 64258155.09,
ColumnValues "col4" between 1000042965 and 1214474826,
IsComplete "col5"
]
若要取得可篩選並列出之所有建議執行的清單:
response = client.list_data_quality_rule_recommendation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>'
}
}
)
若要取消現有的 AWS Glue 資料品質建議工作,
response = client.cancel_data_quality_rule_recommendation_run(
RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
使用 AWS Glue 資料品質規則集
若要建立「 AWS Glue 資料品質」規則集,請執行
response = client.create_data_quality_ruleset(
Name='<ruleset_name>',
Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]',
TargetTable={
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
)
若要取得資料品質規則集:
response = client.get_data_quality_ruleset(
Name='<ruleset_name>'
)
print(response)
然後您可以使API用它來擷取規則集:
print(response['Ruleset'])
若要列出資料表的所有資料品質規則集:
response = client.list_data_quality_rulesets()
您可以使用中的篩選條件API來篩選附加至特定資料庫或表格的所有規則集:
response = client.list_data_quality_rulesets(
Filter={
'TargetTable': {
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
},
)
若要更新資料品質規則集:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def update_data_quality_ruleset(self, ruleset_name, ruleset_string):
"""
Update an AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to update
:param ruleset_string: The DQDL ruleset string to update the ruleset with
"""
try:
response = self.client.update_data_quality_ruleset(
Name=ruleset_name,
Ruleset=ruleset_string
)
except ClientError as err:
logger.error(
"Couldn't update the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
若要刪除資料品質規則集:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def delete_data_quality_ruleset(self, ruleset_name):
"""
Delete a AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to delete
"""
try:
response = self.client.delete_data_quality_ruleset(
Name=ruleset_name
)
except ClientError as err:
logger.error(
"Couldn't delete the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
使用 AWS Glue 資料品質執行
若要啟動「 AWS Glue 資料品質」執行:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list):
"""
Start an AWS Glue Data Quality evaluation run
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want to evaluate.
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
:param ruleset_list: The list of AWS Glue Data Quality ruleset names to evaluate.
"""
try:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_name,
RulesetNames=ruleset_list
)
except ClientError as err:
logger.error(
"Couldn't start the AWS Glue Data Quality Run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
請記住,您可以傳遞 pushDownPredicate
或 catalogPartitionPredicate
參數,以確保資料品質執行只鎖定型錄資料表中的一組特定分割區。例如:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CloudWatchMetricsEnabled': False
},
RulesetNames=[
'<ruleset_name>',
]
)
您也可以設定如何在ROW或COLUMN層級評估規則集中的複合規則。如需複合規則如何運作的詳細資訊,請參閱文件中複合規則的運作方式。
如何在請求中設定複合規則評估方法的範例:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CompositeRuleEvaluationMethod':ROW
},
RulesetNames=[
'<ruleset_name>',
]
)
若要取得有關 AWS Glue 資料品質執行的資訊,請執
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_ruleset_evaluation_run(self, run_id):
"""
Get details about an AWS Glue Data Quality Run
:param run_id: The AWS Glue Data Quality run ID to look up
"""
try:
response = self.client.get_data_quality_ruleset_evaluation_run(
RunId=run_id
)
except ClientError as err:
logger.error(
"Couldn't look up the AWS Glue Data Quality run ID. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
若要從「 AWS Glue 資料品質」執行中取得結果:
對於指定的「 AWS Glue 合資料品質」執行,您可以使用下列方法擷取執行評估的結果:
response = client.get_data_quality_ruleset_evaluation_run(
RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(response['RuleResults'])
若要列出所有 G AWS lue 資料品質執行:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name):
"""
Lists all the AWS Glue Data Quality runs against a given table
:param database_name: The name of the database where the data quality runs
:param table_name: The name of the table against which the data quality runs were created
"""
try:
response = self.client.list_data_quality_ruleset_evaluation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
}
}
)
except ClientError as err:
logger.error(
"Couldn't list the AWS Glue Quality runs. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
您可以修改篩選子句,僅顯示特定時間之間或針對特定資料表執行的結果。
若要停止進行中的「 AWS Glue 資料品質」執行:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def cancel_data_quality_ruleset_evaluation_run(self, result_id):
"""
Cancels a given AWS Glue Data Quality run
:param result_id: The result id of a AWS Glue Data Quality run to cancel
"""
try:
response = self.client.cancel_data_quality_ruleset_evaluation_run(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't cancel the AWS Glue Data Quality run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
使用 AWS Glue 資料品質結果
若要取得「G AWS lue 資料品質」執行結果:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_result(self, result_id):
"""
Outputs the result of an AWS Glue Data Quality Result
:param result_id: The result id of an AWS Glue Data Quality run
"""
try:
response = self.client.get_data_quality_result(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't get the AWS Glue Data Quality result. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
若要檢視針對指定資料品質結果收集的統計資料:
import boto3
from botocore.exceptions import ClientError
import logging
logger = logging.getLogger(__name__)
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_profile_for_data_quality_result(self, result_id):
"""
Outputs the statistic profile for a AWS Glue Data Quality Result
:param result_id: The result id of a AWS Glue Data Quality run
"""
try:
response = self.glue_client.get_data_quality_result(
ResultId=result_id
)
# the profile contains all statistics gathered for the result
profile_id = response['ProfileId']
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
return profile
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality profile. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
若要檢視跨多個資料品質執行收集之統計資料的時間序列,請執行下列步驟
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_statistics_for_data_quality_result(self, profile_id):
"""
Outputs an array of datapoints for each statistic in the input result.
:param result_id: The profile id of a AWS Glue Data Quality run
"""
try:
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
statistics = [self.glue_client.list_data_quality_statistics(
StatisticId = s['StatisticId']
) for s in profile['Statistics']]
return statistics
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
若要檢視特定統計資料的異常偵測模型:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_result_for_statistic(self, statistic_id, profile_id):
"""
Outputs the details (bounds) of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model_result(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality model results. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
若要從其統計模型的異常偵測基準中排除資料點:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_statistic(self, statistic_id, profile_ids):
"""
Annotate some points along a given statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param statistic_id the statistic timeseries to annotate
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.batch_put_data_quality_statistic_annotation(
InclusionAnnotations = [
{'ProfileId': prof_id,
'StatisticId': statistic_id,
'InclusionAnnotation': 'EXCLUDE'} for prof_id in profile_ids
]
)
return response['FailedInclusionAnnotations']
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
若要檢視特定統計資料的異常偵測模型訓練狀態:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_status_for_statistic(self, statistic_id, profile_id):
"""
Outputs the status of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
若要從異常偵測基準執行的特定資料品質中排除所有結果:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_profile(self, profile_id):
"""
Exclude datapoints produced by a run across statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.put_data_quality_profile_annotation(
ProfileId = profile_id,
InclusionAnnotation = "EXCLUDE"
)
return response
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
若要取得指定資料品質執行的結果並顯示結果,請執行下列步驟:
使用 AWS Glue 數據質量runID
,您可以提取resultID
以獲取實際結果,如下所示:
response = client.get_data_quality_ruleset_evaluation_run(
RunId='dqrun-abca77ee126abe1378c1da1ae0750d7dxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(resp['RuleResults'])