AWS Glue Data Quality 目前为 AWS Glue 的预览版,可能会发生变化。 |
程序包:com.amazonaws.services.glue.dq
object EvaluateDataQuality
Def apply
def apply(frame: DynamicFrame,
ruleset: String,
publishingOptions: JsonOptions = JsonOptions.empty): DynamicFrame
根据 DynamicFrame
评估数据质量规则集,并返回一个包含评估结果的新 DynamicFrame
。要了解有关 AWS Glue 数据质量的更多信息,请参阅AWS Glue Data Quality。
-
frame
表示您想要评估的数据质量的DynamicFrame
。 -
ruleset
– 字符串格式的数据质量定义语言(DQDL)规则集。要了解有关 DQDL 的更多信息,请参阅数据质量定义语言(DQDL)引用指南。 -
publishingOptions
– 一个字典,用于为发布评估结果和指标指定以下选项:-
dataQualityEvaluationContext
– 一个字符串,用于指定 AWS Glue 应在哪个命名空间下发布 Amazon CloudWatch 指标和数据质量结果。汇总指标显示在 CloudWatch 中,而完整结果显示在 AWS Glue Studio 界面中。-
必需:否
-
默认值:
default_context
-
-
enableDataQualityCloudWatchMetrics
– 指定是否应将数据质量评估结果发布到 CloudWatch。您可以使用dataQualityEvaluationContext
选项为指标指定命名空间。-
必需:否
-
默认值:False
-
-
enableDataQualityResultsPublishing
– 指定是否应在 AWS Glue Studio 界面的 Data Quality(数据质量)选项卡上显示数据质量结果。-
必需:否
-
默认值:true
-
-
resultsS3Prefix
– 指定 AWS Glue 可以写入数据质量评估结果的 Amazon S3 位置。-
必需:否
-
默认值:""(空字符串)
-
-
示例
以下示例代码演示了在执行 SelectFields
转换之前如何评估 DynamicFrame
数据质量。该脚本在尝试转换之前会验证所有数据质量规则是否均已通过。
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import com.amazonaws.services.glue.dq.EvaluateDataQuality
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
// @params: [JOB_NAME]
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
// Create DynamicFrame with data
val Legislators_Area = glueContext.getCatalogSource(database="legislators", tableName="areas_json", transformationContext="S3bucket_node1").getDynamicFrame()
// Define data quality ruleset
val DQ_Ruleset = """
Rules = [ColumnExists "id"]
"""
// Evaluate data quality
val DQ_Results = EvaluateDataQuality.apply(frame=Legislators_Area, ruleset=DQ_Ruleset, publishingOptions=JsonOptions("""{"dataQualityEvaluationContext": "Legislators_Area", "enableDataQualityMetrics": "true", "enableDataQualityResultsPublishing": "true"}"""))
assert(DQ_Results.filter(_.getField("Outcome").contains("Failed")).count == 0, "Failing DQ rules for Legislators_Area caused the job to fail.")
// Script generated for node Select Fields
val SelectFields_Results = Legislators_Area.selectFields(paths=Seq("id", "name"), transformationContext="Legislators_Area")
Job.commit()
}
}