在 AWS Glue 中使用 JSON 格式 - AWS Glue

在 AWS Glue 中使用 JSON 格式

AWS Glue 从源中检索数据,并将数据写入以各种数据格式存储和传输的目标。如果您的数据以 JSON 数据格式存储或传输,本文档将向您介绍供您使用 AWS Glue 中的数据的可用功能。

AWS Glue 支持使用 JSON 格式。此格式表示形状一致但内容灵活且并非基于行或列的数据结构。JSON 由多个权威机构发布的平行标准定义,其中一项标准便是 ECMA-404。有关常引用的源对该格式的说明,请参阅 Introducing JSON(JSON 简介)。

您可以使用 AWS Glue 从 Amazon S3 读取 JSON 文件、bzipgzip 压缩 JSON 文件。请在 S3 连接参数 上而非本页中讨论的配置中配置压缩行为。

读取 写入 流式处理读取 对小文件进行分组 作业书签
支持 支持 支持 支持 支持

示例:从 S3 读取 JSON 文件或文件夹

先决条件:需要待读取的 JSON 文件或文件夹的 S3 路径 (s3path)。

配置:在函数选项中,请指定 format="json"。在您的 connection_options 中,请使用 paths 键指定 s3path。您可以在连接选项中进一步更改读取操作遍历 S3 的方式,请参阅 Amazon S3 连接选项参考,了解详细信息。您可以配置读取器解释 format_options 中 JSON 文件的方式。有关详细信息,请参阅 JSON Configuration Reference(JSON 配置参考)。

以下 AWS Glue ETL 脚本显示了从 S3 读取 JSON 文件或文件夹的过程:

Python

在本示例中,使用 create_dynamic_frame.from_options 方法。

# Example: Read JSON from S3 # For show, we handle a nested JSON file that we can limit with the JsonPath parameter # For show, we also handle a JSON where a single entry spans multiple lines # Consider whether optimizePerformance is right for your workflow. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://s3path"]}, format="json", format_options={ "jsonPath": "$.id", "multiline": True, # "optimizePerformance": True, -> not compatible with jsonPath, multiline } )

您还可以使用脚本(pyspark.sql.DataFrame)中的 DataFrames。

dataFrame = spark.read\ .option("multiline", "true")\ .json("s3://s3path")
Scala

在本示例中,使用 getSourceWithFormat 操作。

// Example: Read JSON from S3 // For show, we handle a nested JSON file that we can limit with the JsonPath parameter // For show, we also handle a JSON where a single entry spans multiple lines // Consider whether optimizePerformance is right for your workflow. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dynamicFrame = glueContext.getSourceWithFormat( formatOptions=JsonOptions("""{"jsonPath": "$.id", "multiline": true, "optimizePerformance":false}"""), connectionType="s3", format="json", options=JsonOptions("""{"paths": ["s3://s3path"], "recurse": true}""") ).getDynamicFrame() } }

您还可以使用脚本(pyspark.sql.DataFrame)中的 DataFrames。

val dataFrame = spark.read .option("multiline", "true") .json("s3://s3path")

示例:将 JSON 文件和文件夹写入 Amazon S3

先决条件:需要初始化的 DataFrame (dataFrame) 或 DynamicFrame (dynamicFrame)。您还需要预期 S3 输出路径 s3path

配置:在函数选项中,请指定 format="json"。在您的 connection_options 中,请使用 paths 键指定 s3path。您可以在 connection_options 中进一步修改编写器与 S3 的交互方式。有关详细信息,请参阅 AWS Glue 中的 ETL 输入和输出的数据格式选项:Amazon S3 连接选项参考。您可以配置读取器解释 format_options 中的 JSON 文件的方式。有关详细信息,请参阅 JSON Configuration Reference(JSON 配置参考)。

以下 AWS Glue ETL 脚本显示了从 S3 写入 JSON 文件或文件夹的过程:

Python

在本示例中,使用 write_dynamic_frame.from_options 方法。

# Example: Write JSON to S3 from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="s3", connection_options={"path": "s3://s3path"}, format="json" )

您还可以使用脚本(pyspark.sql.DataFrame)中的 DataFrames。

df.write.json("s3://s3path/")
Scala

在本示例中,请使用 getSinkWithFormat 方法。

// Example: Write JSON to S3 import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getSinkWithFormat( connectionType="s3", options=JsonOptions("""{"path": "s3://s3path"}"""), format="json" ).writeDynamicFrame(dynamicFrame) } }

您还可以使用脚本(pyspark.sql.DataFrame)中的 DataFrames。

df.write.json("s3://s3path")

JSON 配置参考

您可以将以下 format_options 值与 format="json" 结合使用:

  • jsonPathJsonPath 表达式,标识要读取到记录中的对象。当文件包含嵌套在外部数组内的记录时,此表达式尤其有用。例如,以下 JsonPath 表达式面向 JSON 对象的 id 字段。

    format="json", format_options={"jsonPath": "$.id"}
  • multiline – 指定单个记录能否跨越多行的布尔值。当字段包含带引号的换行符时,会出现此选项。如果有记录跨越多个行,您必须将此选项设置为 "true"。默认值为 "false",它允许在分析过程中更积极地拆分文件。

  • optimizePerformance – 一个布尔值,用于指定是否将高级 SIMD JSON 读取器与基于 Apache Arrow 的列式内存格式结合使用。仅适用于 AWS Glue 3.0。不兼容 multilinejsonPath。提供这两个选项中的任何一个都将指示 AWS Glue 回滚到标准读取器。

  • withSchema – 一个字符串值,以 手动指定 XML 架构 中描述的格式指定表 Schema。仅从非目录连接读取时与 optimizePerformance 结合使用。

将矢量化 SIMD JSON 读取器与 Apache Arrow 列式格式结合使用

AWS Glue 版本 3.0 增加了适用于 JSON 数据的矢量化读取器。与标准读取器相比,它在某些条件下的执行速度可提高 2 倍。此读取器存在一些需要用户在使用前注意的限制,详见本节的说明。

要使用优化的读取器,请将在 format_options 或表属性中将 "optimizePerformance" 设置为 True。除非从目录中读取,否则您还需要提供 withSchemawithSchema 需要有一个 手动指定 XML 架构 中描述的输入

// Read from S3 data source glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options = {"paths": ["s3://s3path"]}, format = "json", format_options={ "optimizePerformance": True, "withSchema": SchemaString }) // Read from catalog table glueContext.create_dynamic_frame.from_catalog( database = database, table_name = table, additional_options = { // The vectorized reader for JSON can read your schema from a catalog table property. "optimizePerformance": True, })

有关在 AWS Glue 库中构建 SchemaString 的更多信息,请参阅 PySpark 扩展类型

矢量化 CSV 读取器的限制

请注意以下限制:

  • 不支持具有嵌套对象或数组值的 JSON 元素。AWS Glue 将回滚到标准读取器(如有提供)。

  • 必须从目录或使用 withSchema 参数提供一个 Schema。

  • 不兼容 multilinejsonPath。提供这两个选项中的任何一个都将指示 AWS Glue 回滚到标准读取器。

  • 如果提供的输入记录与输入 Schema 不一致,将会导致读取器失败。

  • 将不会创建错误记录

  • 不支持具有多字节字符(如日语或中文字符)的 JSON 文件。