이 주제에서는 API를 사용하여 데이터 품질을 측정하고 관리하는 방법에 대해 설명합니다.
목차
사전 조건
최신 AWS Glue Data Quality API가 포함되도록 boto3 버전이 최신 버전인지 확인합니다.
최신 CLI를 포함하도록 AWS CLI 버전이 최신인지 확인합니다.
AWS Glue 작업을 사용하여 이러한 API를 실행하는 경우 다음 옵션을 사용하여 boto3 라이브러리를 최신 버전으로 업데이트할 수 있습니다.
—additional-python-modules boto3==<version>
AWS Glue Data Quality 권장 사용
AWS Glue Data Quality 권장 실행을 시작하려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn):
"""
Starts a recommendation run that is used to generate rules when you don't know what rules to write. AWS Glue Data Quality
analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset
and modify the generated ruleset to your liking.
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want a recommendation
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
"""
try:
response = self.client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_arn
)
except ClientError as err:
logger.error(
"Couldn't start data quality recommendation run %s. Here's why: %s: %s", name,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
권장 실행의 경우 pushDownPredicates
또는 catalogPartitionPredicates
를 사용하여 성능을 개선하고 카탈로그 소스의 특정 파티션에서만 권장을 실행할 수 있습니다.
client.start_data_quality_rule_recommendation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name,
'AdditionalOptions': {
'pushDownPredicate': "year=2022"
}
}
},
Role=role_arn,
NumberOfWorkers=2,
CreatedRulesetName='<rule_set_name>'
)
AWS Glue Data Quality 권장 실행 결과를 얻으려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_rule_recommendation_run(self, run_id):
"""
Gets the specified recommendation run that was used to generate rules.
:param run_id: The id of the data quality recommendation run
"""
try:
response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id)
except ClientError as err:
logger.error(
"Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id,
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
위의 응답 객체에서 실행에서 권장한 규칙 세트를 추출하여 이후 단계에서 사용할 수 있습니다.
print(response['RecommendedRuleset'])
Rules = [
RowCount between 2000 and 8000,
IsComplete "col1",
IsComplete "col2",
StandardDeviation "col3" between 58138330.8 and 64258155.09,
ColumnValues "col4" between 1000042965 and 1214474826,
IsComplete "col5"
]
필터링 및 나열할 수 있는 모든 권장 실행 목록을 가져오려면:
response = client.list_data_quality_rule_recommendation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>'
}
}
)
기존 AWS Glue Data Quality 권장 작업을 취소하려면:
response = client.cancel_data_quality_rule_recommendation_run(
RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
AWS Glue Data Quality 규칙 세트 사용
AWS Glue Data Quality 규칙 세트를 생성하려면:
response = client.create_data_quality_ruleset(
Name='<ruleset_name>',
Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]',
TargetTable={
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
)
데이터 품질 규칙 세트를 가져오려면:
response = client.get_data_quality_ruleset(
Name='<ruleset_name>'
)
print(response)
이 API를 사용하여 규칙 세트를 추출할 수 있습니다.
print(response['Ruleset'])
테이블의 모든 데이터 품질 규칙 세트를 나열하려면:
response = client.list_data_quality_rulesets()
API 내의 필터 조건을 사용하여 특정 데이터베이스 또는 테이블에 연결된 모든 규칙 세트를 필터링할 수 있습니다.
response = client.list_data_quality_rulesets(
Filter={
'TargetTable': {
'TableName': '<table_name>',
'DatabaseName': '<database_name>'
}
},
)
데이터 품질 규칙 세트를 업데이트하려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def update_data_quality_ruleset(self, ruleset_name, ruleset_string):
"""
Update an AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to update
:param ruleset_string: The DQDL ruleset string to update the ruleset with
"""
try:
response = self.client.update_data_quality_ruleset(
Name=ruleset_name,
Ruleset=ruleset_string
)
except ClientError as err:
logger.error(
"Couldn't update the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
데이터 품질 규칙 세트를 삭제하려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def delete_data_quality_ruleset(self, ruleset_name):
"""
Delete a AWS Glue Data Quality Ruleset
:param ruleset_name: The name of the AWS Glue Data Quality ruleset to delete
"""
try:
response = self.client.delete_data_quality_ruleset(
Name=ruleset_name
)
except ClientError as err:
logger.error(
"Couldn't delete the AWS Glue Data Quality ruleset. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
AWS Glue Data Quality 실행 사용
AWS Glue Data Quality 실행을 시작하려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list):
"""
Start an AWS Glue Data Quality evaluation run
:param database_name: The name of the AWS Glue database which contains the dataset.
:param table_name: The name of the AWS Glue table against which we want to evaluate.
:param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.
:param ruleset_list: The list of AWS Glue Data Quality ruleset names to evaluate.
"""
try:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
},
Role=role_name,
RulesetNames=ruleset_list
)
except ClientError as err:
logger.error(
"Couldn't start the AWS Glue Data Quality Run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response['RunId']
pushDownPredicate
또는 catalogPartitionPredicate
파라미터를 전달하여 카탈로그 테이블 내의 특정 파티션 세트만 대상으로 데이터 품질 실행을 수행할 수 있습니다. 예:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CloudWatchMetricsEnabled': False
},
RulesetNames=[
'<ruleset_name>',
]
)
또한 ROW 또는 COLUMN 수준에서 규칙 세트의 복합 규칙을 평가하는 방법도 구성할 수 있습니다. 복합 규칙의 작동 방식에 대한 자세한 내용은 설명서에서 복합 규칙 작동 방식을 참조하십시오.
요청에서 복합 규칙 평가 방법을 설정하는 방법을 보여주는 예:
response = client.start_data_quality_ruleset_evaluation_run(
DataSource={
'GlueTable': {
'DatabaseName': '<database_name>',
'TableName': '<table_name>',
'AdditionalOptions': {
'pushDownPredicate': 'year=2023'
}
}
},
Role='<role_name>',
NumberOfWorkers=5,
Timeout=123,
AdditionalRunOptions={
'CompositeRuleEvaluationMethod':ROW
},
RulesetNames=[
'<ruleset_name>',
]
)
AWS Glue Data Quality 실행에 대한 정보를 가져오려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_ruleset_evaluation_run(self, run_id):
"""
Get details about an AWS Glue Data Quality Run
:param run_id: The AWS Glue Data Quality run ID to look up
"""
try:
response = self.client.get_data_quality_ruleset_evaluation_run(
RunId=run_id
)
except ClientError as err:
logger.error(
"Couldn't look up the AWS Glue Data Quality run ID. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
AWS Glue Data Quality 실행 결과를 가져오려면:
해당 AWS Glue Data Quality 실행에 대해 다음 방법을 사용하여 실행 평가 결과를 추출할 수 있습니다.
response = client.get_data_quality_ruleset_evaluation_run(
RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(response['RuleResults'])
AWS Glue Data Quality 실행을 모두 나열하려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name):
"""
Lists all the AWS Glue Data Quality runs against a given table
:param database_name: The name of the database where the data quality runs
:param table_name: The name of the table against which the data quality runs were created
"""
try:
response = self.client.list_data_quality_ruleset_evaluation_runs(
Filter={
'DataSource': {
'GlueTable': {
'DatabaseName': database_name,
'TableName': table_name
}
}
}
)
except ClientError as err:
logger.error(
"Couldn't list the AWS Glue Quality runs. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
특정 시간 사이의 결과 또는 특정 테이블에 대해 실행된 결과만 표시하도록 filter 절을 수정할 수 있습니다.
진행 중인 AWS Glue Data Quality 실행을 중지하려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def cancel_data_quality_ruleset_evaluation_run(self, result_id):
"""
Cancels a given AWS Glue Data Quality run
:param result_id: The result id of a AWS Glue Data Quality run to cancel
"""
try:
response = self.client.cancel_data_quality_ruleset_evaluation_run(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't cancel the AWS Glue Data Quality run. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
AWS Glue Data Quality 결과 사용
AWS Glue Data Quality 실행 결과를 가져오려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_data_quality_result(self, result_id):
"""
Outputs the result of an AWS Glue Data Quality Result
:param result_id: The result id of an AWS Glue Data Quality run
"""
try:
response = self.client.get_data_quality_result(
ResultId=result_id
)
except ClientError as err:
logger.error(
"Couldn't get the AWS Glue Data Quality result. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
else:
return response
특정 데이터 품질 결과에 대해 수집된 통계를 보려면:
import boto3
from botocore.exceptions import ClientError
import logging
logger = logging.getLogger(__name__)
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_profile_for_data_quality_result(self, result_id):
"""
Outputs the statistic profile for a AWS Glue Data Quality Result
:param result_id: The result id of a AWS Glue Data Quality run
"""
try:
response = self.glue_client.get_data_quality_result(
ResultId=result_id
)
# the profile contains all statistics gathered for the result
profile_id = response['ProfileId']
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
return profile
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality profile. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
여러 데이터 품질 실행에서 수집된 통계의 시계열을 보려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_statistics_for_data_quality_result(self, profile_id):
"""
Outputs an array of datapoints for each statistic in the input result.
:param result_id: The profile id of a AWS Glue Data Quality run
"""
try:
profile = self.glue_client.list_data_quality_statistics(
ProfileId = profile_id
)
statistics = [self.glue_client.list_data_quality_statistics(
StatisticId = s['StatisticId']
) for s in profile['Statistics']]
return statistics
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
특정 통계에 대한 이상 탐지 모델을 보려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_result_for_statistic(self, statistic_id, profile_id):
"""
Outputs the details (bounds) of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model_result(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality model results. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
통계 모델의 이상 탐지 기준에서 데이터 포인트를 제외하려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_statistic(self, statistic_id, profile_ids):
"""
Annotate some points along a given statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param statistic_id the statistic timeseries to annotate
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.batch_put_data_quality_statistic_annotation(
InclusionAnnotations = [
{'ProfileId': prof_id,
'StatisticId': statistic_id,
'InclusionAnnotation': 'EXCLUDE'} for prof_id in profile_ids
]
)
return response['FailedInclusionAnnotations']
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
특정 통계에 대한 이상 탐지 모델을 보려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def get_model_training_status_for_statistic(self, statistic_id, profile_id):
"""
Outputs the status of anomaly detection training for the given statistic at the given profile.
:param statistic_id the model's statistic (the timeseries it is tracking)
:param profile_id the profile associated with the model (a point in the timeseries)
"""
try:
model = self.glue_client.get_data_quality_model(
ProfileId = profile_id, StatisticId = statistic_id
)
return model
except ClientError as err:
logger.error(
"Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
이상 탐지 기준에서 특정 데이터 품질 실행의 모든 결과를 제외하려면:
class GlueWrapper:
"""Encapsulates AWS Glue actions."""
def __init__(self, glue_client):
"""
:param glue_client: A Boto3 AWS Glue client.
"""
self.glue_client = glue_client
def apply_exclusions_to_profile(self, profile_id):
"""
Exclude datapoints produced by a run across statistic timeseries.
This example excludes the provided values; INCLUDE can also be used to undo this action.
:param profile_id the profiles we want to exclude (points in the timeseries)
"""
try:
response = self.glue_client.put_data_quality_profile_annotation(
ProfileId = profile_id,
InclusionAnnotation = "EXCLUDE"
)
return response
except ClientError as err:
logger.error(
"Couldn't store Data Quality annotations. Here's why: %s: %s",
err.response['Error']['Code'], err.response['Error']['Message'])
raise
특정 데이터 품질 실행의 결과를 가져와 결과를 표시하려면:
AWS Glue Data Quality runID
를 사용되면 아래와 같이 resultID
을(를) 추출하여 실제 결과를 얻을 수 있습니다.
response = client.get_data_quality_ruleset_evaluation_run(
RunId='dqrun-abca77ee126abe1378c1da1ae0750d7dxxxx'
)
resultID = response['ResultIds'][0]
response = client.get_data_quality_result(
ResultId=resultID
)
print(resp['RuleResults'])