本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
AWS Glue Scala DynamicFrame 类
程序包:com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
DynamicFrame
是自描述的 DynamicRecord 对象的分布式集合。
DynamicFrame
旨在为 ETL(提取、转换和加载)操作提供灵活的数据模型。它们不需要创建架构,可用于读取和转换具有杂乱或不一致的值和类型的数据。可以按需为需要架构的那些操作计算架构。
DynamicFrame
为数据清理和 ETL 提供了一系列转换。它们还支持转换为 SparkSQL DataFrame 和从其转换以与现有代码和 DataFrame 提供的许多分析操作集成。
跨构造 DynamicFrame
的许多 AWS Glue 转换共享以下参数:
transformationContext
— 此DynamicFrame
的标识符。transformationContext
用作跨运行保存的作业书签状态的密钥。callSite
– 为错误报告提供上下文信息。在从 Python 调用时,会自动设置这些值。stageThreshold
– 在引发异常之前允许的来自此DynamicFrame
计算的最大错误记录数,不包括以前的DynamicFrame
中存在的记录。totalThreshold
– 引发异常之前的最大错误记录总数,包括以前的帧中的记录。
Val errorsCount
val errorsCount
此 DynamicFrame
中的错误记录数。这包括来自以前的操作的错误。
Def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
– 用于构造新DynamicFrame
的映射序列。caseSensitive
– 是否将源列视为区分大小写。在与不区分大小写的存储(如 AWS Glue 数据目录)集成时,将此项设置为 false 可能很有帮助。
基于一系列的映射选择、投影和转换列。
每个映射由源列和类型以及目标列和类型构成。映射可指定为四元组 (source_path
、source_type
、 target_path
、target_type
) 或包含相同信息的 MappingSpec 对象。
除了将映射用于简单的投影和转换外,还可以通过使用“.
”(句点)分隔路径的组件来用于嵌套或取消嵌套字段。
例如,假设您有一个包含以下架构的 DynamicFrame
。
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
您可以进行以下调用来取消嵌套 state
和 zip
字段。
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
生成的架构如下所示。
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
您还可以使用 applyMapping
来重新嵌套列。例如,以下代码反转以前的转换,并在目标中创建一个名为 address
的结构。
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
包含“.
”(句点)字符的字段名称可以使用反引号 (``
) 括起来。
注意
目前,您不能使用 applyMapping
方法映射嵌套在数组下的列。
Def assertErrorThreshold
def assertErrorThreshold : Unit
强制计算并验证错误记录数是否低于 stageThreshold
和 totalThreshold
的操作。如果任一条件失败,则引发异常。
Def count
lazy
def count
返回 DynamicFrame
中的元素数量。
Def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回已删除指定列的新 DynamicFrame
。
Def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回已删除指定列的新 DynamicFrame
。
您可以使用此方法删除嵌套列(包括数组中的列),但不能用于删除特定数组元素。
Def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
返回已删除所有空列的新 DynamicFrame
。
注意
这只删除类型为 NullType
的列。不删除或修改其他列中的单个空值。
Def errorsAsDynamicFrame
def errorsAsDynamicFrame
返回包含此 DynamicFrame
中的错误记录的新 DynamicFrame
。
Def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
构造只包含函数“f
”为其返回 true
的那些记录的新 DynamicFrame
。筛选器函数“f
”不应转变输入记录。
Def getName
def getName : String
返回此 DynamicFrame
的名称。
Def getNumPartitions
def getNumPartitions
返回 DynamicFrame
中的分区数量。
Def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
如果架构已经过计算,则返回该架构。如果架构尚未经过计算,则不扫描数据。
Def isSchemaComputed
def isSchemaComputed : Boolean
如果已为此 DynamicFrame
计算架构,则返回 true
;否则返回 false
。如果此方法返回 false,则调用 schema
方法将需要再次扫描此 DynamicFrame
中的记录。
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
– 此DynamicFrame
中用于联接的列。keys2
–frame2
中用于联接的列。必须与keys1
的长度相同。frame2
– 要联接的DynamicFrame
。
返回使用指定的键对 frame2
执行 equijoin 的结果。
Def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回通过对此 DynamicFrame
中的每个记录应用指定函数“f
”构造的新 DynamicFrame
。
此方法先复制每个记录,然后再应用指定函数,因此可以安全地转变记录。如果映射函数在给定记录上引发异常,则该记录将标记为错误,并且堆栈跟踪将另存为错误记录中的一个列。
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
– 要合并的暂存DynamicFrame
。primaryKeys
– 要匹配源和暂存DynamicFrame
中的记录的主键字段列表。transformationContext
– 用于检索有关当前转换的元数据的唯一字符串(可选)。options
– 为此转换提供其他信息的 JSON 名称-值对的字符串。callSite
– 用于为错误报告提供上下文信息。stageThreshold
— 一个Long
。给定转换中处理需要排除的错误的数目。totalThreshold
— 一个Long
。此转换中处理需要排除的错误的总数。
基于指定主键的将此 DynamicFrame
与暂存 DynamicFrame
合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 AWS Glue 中的源中的记录。
在以下情况下,返回的 DynamicFrame
将包含记录 A:
如果
A
在源帧和暂存帧中都存在,则返回暂存帧中的A
。如果
A
在源表中,且A.primaryKeys
不在stagingDynamicFrame
中(这意味着,未在暂存表中更新A
)。
源帧和暂存帧不需要具有相同的架构。
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
Def printSchema
def printSchema : Unit
以人类可读的格式将此 DynamicFrame
的架构输出到 stdout
。
Def recomputeSchema
def recomputeSchema : Schema
强制重新计算架构。这需要扫描数据,但如果数据中不存在当前架构中的一些字段,则可能会“压缩”架构。
返回经过重新计算的架构。
Def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
– 在输出中用于基本DynamicFrame
的名称。通过透视数组创建的DynamicFrame
以此作为前缀开头。stagingPath
– 用于写入中间数据的 Amazon Simple Storage Service(Amazon S3)路径。options
– 关系化选项和配置。目前未使用。
展平所有嵌套的结构并将数组透视为单独的表。
您可以使用此操作准备深度嵌套的数据以提取到关系数据库中。使用与 Unnest 转换相同的方式展平嵌套结构。此外,将数组透视为单独的表,其中每个数组元素成为一行。例如,假设您有一个包含以下数据的 DynamicFrame
。
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
运行以下代码。
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
这会生成两个表。第一个表名为“people”,其中包含以下内容。
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
在这里,friends 数组已被替换为自动生成的联接键。创建了名为 people.friends
的单独表,其中包含以下内容。
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
在此表中,“id
”是标识数组元素来自哪个记录的联接键,“index
”是指在原始数组中的位置,“val
”是实际的数组条目。
relationalize
方法返回通过以递归方式对所有数组应用此过程创建的 DynamicFrame
序列。
注意
AWS Glue 库自动为新表生成联接键。为确保联接键在作业运行中是唯一的,您必须启用作业书签。
Def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
– 列的原始名称。newName
– 列的新名称。
返回新 DynamicFrame
,其中的指定字段进行了重命名。
您可以使用此方法重命名嵌套字段。例如,以下代码在 address 结构中将 state
重命名为 state_code
。
{{{ df.renameField("address.state", "address.state_code") }}}
Def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回具有 numPartitions
分区的新 DynamicFrame
。
Def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
– 应用于 specs 序列中未列出的所有ChoiceType
列的操作。database
– 与match_catalog
操作一起使用的数据目录数据库。tableName
– 与match_catalog
操作一起使用的数据目录表。
通过将一个或多个 ChoiceType
替换为更具体的类型来返回新 DynamicFrame
。
可通过两种方式使用 resolveChoice
。第一种是指定一系列特定列以及如何解析它们。这些指定为由 (列, 操作) 对组成的元组。
可能的操作如下:
cast:type
– 尝试将所有值转换为指定的类型。make_cols
– 将每个不同的类型转换为名为columnName_type
的列。make_struct
– 将列转换为具有每个不同类型的键的结构。project:type
– 仅保留指定类型的值。
resolveChoice
的另一种模式是为所有 ChoiceType
指定单个解析方法。这可以在执行前不知道 ChoiceType
的完整列表的情况下使用。除了上面列出的操作外,此模式还支持以下操作:
match_catalog
ChoiceType
– 尝试将每个 转换为指定目录表中的对应类型。
示例:
通过转换为 int 来解析 user.id
列,并使 address
字段仅保留结构。
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
通过将每个选择转换为单独的列来解析所有 ChoiceType
。
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
通过转换为指定目录表中的类型来解析所有 ChoiceType
。
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
Def schema
def schema : Schema
返回此 DynamicFrame
的架构。
保证返回的架构包含此 DynamicFrame
中的记录中存在的每个字段。但在少数情况下,它可能还包含其他字段。您可以使用 Unnest 方法基于此 DynamicFrame
中的记录来“压缩”架构。
Def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回作为 DynamicFrame
的单个字段。
Def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
– 要选择的列名称序列。
返回包含指定列的新 DynamicFrame
。
注意
您只能使用 selectFields
方法选择顶级列。您可以使用 applyMapping 方法选择嵌套列。
Def show
def show( numRows : Int = 20 ) : Unit
numRows
– 要输出的行数。
以 JSON 格式输出此 DynamicFrame
中的行。
Def simplifyDDBJson
使用 AWS Glue DynamoDB 导出连接器进行 DynamoDB 导出时会生成具有特定嵌套结构的 JSON 文件。有关更多信息,请参阅数据对象。simplifyDDBJson
简化此数据类型的 DynamicFrame 中的嵌套列,并返回新的简化 DynamicFrame。如果 List 类型中包含多种类型或 Map 类型,则不会简化 List 中的元素。此方法仅支持 DynamoDB 导出 JSON 格式的数据。考虑使用 unnest
来对其他类型的数据进行类似更改。
def simplifyDDBJson() : DynamicFrame
此方法不使用任何参数。
示例输入
考虑 DynamoDB 导出生成的以下架构:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
示例代码
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
simplifyDDBJson
转换会将其简化为:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回相同记录但写出一部分记录作为副作用的传递转换。
path
– 将输出写入的 Amazon S3 中的路径,格式为s3://bucket//path
。options
– 描述取样行为的可选的JsonOptions
映射。
返回包含与这一个相同的记录的 DynamicFrame
。
默认情况下,将 100 个任意记录写入通过 path
指定的位置。您可以使用 options
映射自定义此行为。有效键包括:
topk
– 指定写出的记录的总数。默认值为 100。prob
– 指定包含单个记录的概率(小数形式)。默认值为 1。
例如,以下调用将对数据集进行取样,方法是以 20% 的概率选择每个记录并在写入 200 个记录后停止。
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
Def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
— 包括在第一个DynamicFrame
中的路径。
返回两个 DynamicFrame
的序列。第一个 DynamicFrame
包含指定路径,第二个包含所有其他列。
示例
此示例采用从 AWS Glue 数据目录中的 legislators
数据库中的 persons
表创建的 DynamicFrame,并将 DynamicFrame 拆分为两个,其中指定的字段进入第一个 DynamicFrame,其余字段进入第二个 DynamicFrame。然后,示例将从结果中选择第一个 DynamicFrame。
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
Def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
基于将列与常量比较的谓词来拆分行。
paths
– 用于比较的列。values
– 用于比较的常量值。operators
– 用于比较的运算符。
返回两个 DynamicFrame
的序列。第一个包含其谓词为 true 的行,第二个包含其谓词为 false 的行。
使用三个序列指定谓词:“paths
”包含(可能嵌套的)列名称,“values
”包含要与其进行比较的常量值,“operators
”包含用于比较的运算符。所有这三个序列必须长度相同:第 n
个运算符将用于将第 n
个列与第 n
个值进行比较。
每个运算符必须是以下运算符之一:“!=
”、“=
”、“<=
”、“<
”、“>=
”或“>
”。
例如,以下调用将拆分 DynamicFrame
,以便第一个输出帧将包含美国 65 岁以上的人员记录,第二个将包含所有其他记录。
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Def stageErrorsCount
def stageErrorsCount
返回在计算此 DynamicFrame
时创建的错误记录数。这不包括作为输入传递给此 DynamicFrame
的以前操作中的错误。
Def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
将此 DynamicFrame
转换为具有相同架构和记录的 Apache Spark SQL DataFrame
。
注意
由于 DataFrame
不支持 ChoiceType
,因此此方法自动将 ChoiceType
列转换为 StructType
。有关更多信息和用于解析选择的选项,请参阅 resolveChoice。
Def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
– 要分析的列。必须是字符串或二进制。format
– 用于分析的格式。optionString
– 传递到格式的选项,如 CSV 分隔符。
根据指定的格式分析嵌入式字符串或二进制列。已分析的列嵌套在具有原始列名称的结构下。
例如,假设您具有包含一个嵌入式 JSON 列的 CSV 文件。
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
在初始分析后,您将获取具有以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: string }}}
您可以对地址列调用 unbox
来分析特定组件。
{{{ df.unbox("address", "json") }}}
这为我们提供了具有以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回平展了所有嵌套结构的新 DynamicFrame
。构造名称时使用“.
”(句点)字符。
例如,假设您有一个包含以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
以下调用取消嵌套 address 结构。
{{{ df.unnest() }}}
生成的架构如下所示。
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
此方法还取消嵌套数组中的嵌套结构。但由于历史原因,此类字段的名称前附加了括起来的数组的名称和“.val
”。
Def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
解除 DynamicFrame
中的嵌套列,具体位于 DynamoDB JSON 结构中,并返回一个新的非嵌套 DynamicFrame
。属于结构类型数组的列将不会被解除嵌套。请注意,这是一种特定类型的非嵌套转换,其行为与常规 unnest
转换不同,并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON。
例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
unnestDDBJson()
转换会将此转换为:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
以下代码示例演示了如何使用 AWS Glue DynamoDB 导出连接器、调用 DynamoDB JSON 解除嵌套命令,以及打印分区数量:
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
– 返回要使用的架构的函数。指定为零参数函数来推迟可能昂贵的计算。
将此 DynamicFrame
的架构设置为指定值。这主要在内部使用以避免成本高昂的架构重新计算。传入的架构必须包含数据中存在的所有列。
Def withName
def withName( name : String ) : DynamicFrame
name
– 要使用的新名称。
返回使用新名称的此 DynamicFrame
的副本。
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
返回具有指定转换上下文的此 DynamicFrame
的副本。