Package: com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
A DynamicFrame
is a distributed collection of self-describing DynamicRecord objects.
DynamicFrame
s are designed to provide a flexible data model for ETL (extract,
transform, and load) operations. They don't require a schema to create, and you can use them to
read and transform data that contains messy or inconsistent values and types. A schema can be
computed on demand for those operations that need one.
DynamicFrame
s provide a range of transformations for data cleaning and ETL.
They also support conversion to and from SparkSQL DataFrames to integrate with existing code and
the many analytics operations that DataFrames provide.
The following parameters are shared across many of the AWS Glue transformations that construct
DynamicFrame
s:
transformationContext
— The identifier for thisDynamicFrame
. ThetransformationContext
is used as a key for job bookmark state that is persisted across runs.callSite
— Provides context information for error reporting. These values are automatically set when calling from Python.stageThreshold
— The maximum number of error records that are allowed from the computation of thisDynamicFrame
before throwing an exception, excluding records that are present in the previousDynamicFrame
.totalThreshold
— The maximum number of total error records before an exception is thrown, including those from previous frames.
Val errorsCount
val errorsCount
The number of error records in this DynamicFrame
. This includes errors from
previous operations.
Def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
— A sequence of mappings to construct a newDynamicFrame
.caseSensitive
— Whether to treat source columns as case sensitive. Setting this to false might help when integrating with case-insensitive stores like the AWS Glue Data Catalog.
Selects, projects, and casts columns based on a sequence of mappings.
Each mapping is made up of a source column and type and a target column and type. Mappings
can be specified as either a four-tuple (source_path
,
source_type
, target_path
, target_type
) or a MappingSpec object containing the same
information.
In addition to using mappings for simple projections and casting, you can use them to nest
or unnest fields by separating components of the path with '.
' (period).
For example, suppose that you have a DynamicFrame
with the following
schema.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
You can make the following call to unnest the state
and zip
fields.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
The resulting schema is as follows.
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
You can also use applyMapping
to re-nest columns. For example, the following
inverts the previous transformation and creates a struct named address
in the
target.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
Field names that contain '.
' (period) characters can be quoted by using
backticks (``
).
Note
Currently, you can't use the applyMapping
method to map columns that are nested
under arrays.
Def assertErrorThreshold
def assertErrorThreshold : Unit
An action that forces computation and verifies that the number of error records falls
below stageThreshold
and totalThreshold
. Throws an exception if
either condition fails.
Def count
lazy
def count
Returns the number of elements in this DynamicFrame
.
Def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Returns a new DynamicFrame
with the specified column removed.
Def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Returns a new DynamicFrame
with the specified columns removed.
You can use this method to delete nested columns, including those inside of arrays, but not to drop specific array elements.
Def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
Returns a new DynamicFrame
with all null columns removed.
Note
This only removes columns of type NullType
. Individual null
values in other columns are not removed or modified.
Def errorsAsDynamicFrame
def errorsAsDynamicFrame
Returns a new DynamicFrame
containing the error records from this
DynamicFrame
.
Def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Constructs a new DynamicFrame
containing only those records for which the
function 'f
' returns true
. The filter function 'f
'
should not mutate the input record.
Def getName
def getName : String
Returns the name of this DynamicFrame
.
Def getNumPartitions
def getNumPartitions
Returns the number of partitions in this DynamicFrame
.
Def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
Returns the schema if it has already been computed. Does not scan the data if the schema has not already been computed.
Def isSchemaComputed
def isSchemaComputed : Boolean
Returns true
if the schema has been computed for this
DynamicFrame
, or false
if not. If this method returns false, then
calling the schema
method requires another pass over the records in this
DynamicFrame
.
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
— The columns in thisDynamicFrame
to use for the join.keys2
— The columns inframe2
to use for the join. Must be the same length askeys1
.frame2
— TheDynamicFrame
to join against.
Returns the result of performing an equijoin with frame2
using the specified keys.
Def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Returns a new DynamicFrame
constructed by applying the specified function
'f
' to each record in this DynamicFrame
.
This method copies each record before applying the specified function, so it is safe to mutate the records. If the mapping function throws an exception on a given record, that record is marked as an error, and the stack trace is saved as a column in the error record.
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
— The stagingDynamicFrame
to merge.primaryKeys
— The list of primary key fields to match records from the source and stagingDynamicFrame
s.transformationContext
— A unique string that is used to retrieve metadata about the current transformation (optional).options
— A string of JSON name-value pairs that provide additional information for this transformation.callSite
— Used to provide context information for error reporting.stageThreshold
— ALong
. The number of errors in the given transformation for which the processing needs to error out.totalThreshold
— ALong
. The total number of errors up to and including in this transformation for which the processing needs to error out.
Merges this DynamicFrame
with a staging DynamicFrame
based on
the specified primary keys to identify records. Duplicate records (records with the same
primary keys) are not de-duplicated. If there is no matching record in the staging frame, all
records (including duplicates) are retained from the source. If the staging frame has matching
records, the records from the staging frame overwrite the records in the source in
AWS Glue.
The returned DynamicFrame
contains record A in the following cases:
If
A
exists in both the source frame and the staging frame, thenA
in the staging frame is returned.If
A
is in the source table andA.primaryKeys
is not in thestagingDynamicFrame
(that meansA
is not updated in the staging table).
The source frame and staging frame do not need to have the same schema.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
Def printSchema
def printSchema : Unit
Prints the schema of this DynamicFrame
to stdout
in a
human-readable format.
Def recomputeSchema
def recomputeSchema : Schema
Forces a schema recomputation. This requires a scan over the data, but it might "tighten" the schema if there are some fields in the current schema that are not present in the data.
Returns the recomputed schema.
Def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
— The name to use for the baseDynamicFrame
in the output.DynamicFrame
s that are created by pivoting arrays start with this as a prefix.stagingPath
— The Amazon Simple Storage Service (Amazon S3) path for writing intermediate data.options
— Relationalize options and configuration. Currently unused.
Flattens all nested structures and pivots arrays into separate tables.
You can use this operation to prepare deeply nested data for ingestion into a relational
database. Nested structs are flattened in the same manner as the Unnest transform.
Additionally, arrays are pivoted into separate tables with each array element becoming a row.
For example, suppose that you have a DynamicFrame
with the following data.
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
Run the following code.
{{{
df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty)
}}}
This produces two tables. The first table is named "people" and contains the following.
{{{
{"name": "Nancy", "age": 47, "friends": 1}
{"name": "Stephanie", "age": 28, "friends": 2}
{"name": "Nathan", "age": 54, "friends": 3)
}}}
Here, the friends array has been replaced with an auto-generated join key. A separate
table named people.friends
is created with the following content.
{{{
{"id": 1, "index": 0, "val": "Fred"}
{"id": 1, "index": 1, "val": "Lakshmi"}
{"id": 2, "index": 0, "val": "Yao"}
{"id": 2, "index": 1, "val": "Phil"}
{"id": 2, "index": 2, "val": "Alvin"}
{"id": 3, "index": 0, "val": "Nicolai"}
{"id": 3, "index": 1, "val": "Karen"}
}}}
In this table, 'id
' is a join key that identifies which record the array
element came from, 'index
' refers to the position in the original array, and
'val
' is the actual array entry.
The relationalize
method returns the sequence of DynamicFrame
s
created by applying this process recursively to all arrays.
Note
The AWS Glue library automatically generates join keys for new tables. To ensure that join keys are unique across job runs, you must enable job bookmarks.
Def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
— The original name of the column.newName
— The new name of the column.
Returns a new DynamicFrame
with the specified field renamed.
You can use this method to rename nested fields. For example, the following code would
rename state
to state_code
inside the address struct.
{{{
df.renameField("address.state", "address.state_code")
}}}
Def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Returns a new DynamicFrame
with numPartitions
partitions.
Def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
— An action to apply to allChoiceType
columns not listed in the specs sequence.database
— The Data Catalog database to use with thematch_catalog
action.tableName
— The Data Catalog table to use with thematch_catalog
action.
Returns a new DynamicFrame
by replacing one or more ChoiceType
s
with a more specific type.
There are two ways to use resolveChoice
. The first is to specify a sequence
of specific columns and how to resolve them. These are specified as tuples made up of (column,
action) pairs.
The following are the possible actions:
cast:type
— Attempts to cast all values to the specified type.make_cols
— Converts each distinct type to a column with the namecolumnName_type
.make_struct
— Converts a column to a struct with keys for each distinct type.project:type
— Retains only values of the specified type.
The other mode for resolveChoice
is to specify a single resolution for all
ChoiceType
s. You can use this in cases where the complete list of
ChoiceType
s is unknown before execution. In addition to the actions listed
preceding, this mode also supports the following action:
match_catalog
— Attempts to cast eachChoiceType
to the corresponding type in the specified catalog table.
Examples:
Resolve the user.id
column by casting to an int, and make the
address
field retain only structs.
{{{
df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct")))
}}}
Resolve all ChoiceType
s by converting each choice to a separate
column.
{{{
df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols")))
}}}
Resolve all ChoiceType
s by casting to the types in the specified catalog
table.
{{{
df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")),
database = Some("my_database"),
tableName = Some("my_table"))
}}}
Def schema
def schema : Schema
Returns the schema of this DynamicFrame
.
The returned schema is guaranteed to contain every field that is present in a record in
this DynamicFrame
. But in a small number of cases, it might also contain
additional fields. You can use the Unnest method to
"tighten" the schema based on the records in this DynamicFrame
.
Def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Returns a single field as a DynamicFrame
.
Def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
— The sequence of column names to select.
Returns a new DynamicFrame
containing the specified columns.
Note
You can only use the selectFields
method to select top-level columns. You can use
the applyMapping
method to select nested columns.
Def show
def show( numRows : Int = 20 ) : Unit
numRows
— The number of rows to print.
Prints rows from this DynamicFrame
in JSON format.
Def simplifyDDBJson
DynamoDB exports with the AWS Glue DynamoDB export connector results in JSON files of specific nested
structures. For more information, see Data objects.
simplifyDDBJson
Simplifies nested columns in a DynamicFrame of this type of data, and returns a new
simplified DynamicFrame. If there are multiple types or a Map type contained in a List type, the elements in the List will not
be simplified. This method only supports data in the DynamoDB export
JSON format. Consider unnest
to perform similar changes on other kinds of data.
def simplifyDDBJson() : DynamicFrame
This method does not take any parameters.
Example input
Consider the following schema generated by a DynamoDB export:
root
|-- Item: struct
| |-- parentMap: struct
| | |-- M: struct
| | | |-- childMap: struct
| | | | |-- M: struct
| | | | | |-- appName: struct
| | | | | | |-- S: string
| | | | | |-- packageName: struct
| | | | | | |-- S: string
| | | | | |-- updatedAt: struct
| | | | | | |-- N: string
| |-- strings: struct
| | |-- SS: array
| | | |-- element: string
| |-- numbers: struct
| | |-- NS: array
| | | |-- element: string
| |-- binaries: struct
| | |-- BS: array
| | | |-- element: string
| |-- isDDBJson: struct
| | |-- BOOL: boolean
| |-- nullValue: struct
| | |-- NULL: boolean
Example code
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
The simplifyDDBJson
transform will simplify this to:
root
|-- parentMap: struct
| |-- childMap: struct
| | |-- appName: string
| | |-- packageName: string
| | |-- updatedAt: string
|-- strings: array
| |-- element: string
|-- numbers: array
| |-- element: string
|-- binaries: array
| |-- element: string
|-- isDDBJson: boolean
|-- nullValue: null
Def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Passthrough transformation that returns the same records but writes out a subset of records as a side effect.
path
— The path in Amazon S3 to write output to, in the forms3://bucket//path
.options
— An optionalJsonOptions
map describing the sampling behavior.
Returns a DynamicFrame
that contains the same records as this one.
By default, writes 100 arbitrary records to the location specified by path
.
You can customize this behavior by using the options
map. Valid keys include the
following:
topk
— Specifies the total number of records written out. The default is 100.prob
— Specifies the probability (as a decimal) that an individual record is included. Default is 1.
For example, the following call would sample the dataset by selecting each record with a 20 percent probability and stopping after 200 records have been written.
{{{
df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2)))
}}}
Def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
— The paths to include in the firstDynamicFrame
.
Returns a sequence of two DynamicFrame
s. The first DynamicFrame
contains the specified paths, and the second contains all other columns.
Example
This example takes a DynamicFrame created from the persons
table in the
legislators
database in the AWS Glue Data Catalog and splits the DynamicFrame into two,
with the specified fields going into the first DynamicFrame and the remaining fields going
into a second DynamicFrame. The example then chooses the first DynamicFrame from the
result.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons",
transformationContext="InputFrame").getDynamicFrame()
val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note",
"links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang",
"other_names.note", "other_names.name"), transformationContext="SplitField_collection")
val ResultFrame = SplitField_collection(0)
Def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
Splits rows based on predicates that compare columns to constants.
paths
— The columns to use for comparison.values
— The constant values to use for comparison.operators
— The operators to use for comparison.
Returns a sequence of two DynamicFrame
s. The first contains rows for which
the predicate is true and the second contains those for which it is false.
Predicates are specified using three sequences: 'paths
' contains the
(possibly nested) column names, 'values
' contains the constant values to compare
to, and 'operators
' contains the operators to use for comparison. All three
sequences must be the same length: The n
th operator is used to compare the
n
th column with the n
th value.
Each operator must be one of "!=
", "=
", "<=
",
"<
", ">=
", or ">
".
As an example, the following call would split a DynamicFrame
so that the
first output frame would contain records of people over 65 from the United States, and the
second would contain all other records.
{{{
df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "="))
}}}
Def stageErrorsCount
def stageErrorsCount
Returns the number of error records created while computing this
DynamicFrame
. This excludes errors from previous operations that were passed into
this DynamicFrame
as input.
Def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
Converts this DynamicFrame
to an Apache Spark SQL DataFrame
with
the same schema and records.
Note
Because DataFrame
s don't support ChoiceType
s, this method
automatically converts ChoiceType
columns into StructType
s. For
more information and options for resolving choice, see resolveChoice.
Def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
— The column to parse. Must be a string or binary.format
— The format to use for parsing.optionString
— Options to pass to the format, such as the CSV separator.
Parses an embedded string or binary column according to the specified format. Parsed columns are nested under a struct with the original column name.
For example, suppose that you have a CSV file with an embedded JSON column.
name, age, address
Sally, 36, {"state": "NE", "city": "Omaha"}
...
After an initial parse, you would get a DynamicFrame
with the following
schema.
{{{
root
|-- name: string
|-- age: int
|-- address: string
}}}
You can call unbox
on the address column to parse the specific
components.
{{{
df.unbox("address", "json")
}}}
This gives us a DynamicFrame
with the following schema.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- city: string
}}}
Def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Returns a new DynamicFrame
with all nested structures flattened. Names are
constructed using the '.
' (period) character.
For example, suppose that you have a DynamicFrame
with the following
schema.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- city: string
}}}
The following call unnests the address struct.
{{{ df.unnest() }}}
The resulting schema is as follows.
{{{
root
|-- name: string
|-- age: int
|-- address.state: string
|-- address.city: string
}}}
This method also unnests nested structs inside of arrays. But for historical reasons, the
names of such fields are prepended with the name of the enclosing array and
".val
".
Def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
Unnests nested columns in a DynamicFrame
that are specifically in the DynamoDB JSON structure, and returns a new unnested DynamicFrame
. Columns that are of an array of struct types will not be unnested. Note that this is a specific type of unnesting transform that behaves differently from the regular unnest
transform and requires the data to already be in the DynamoDB JSON structure. For more information, see DynamoDB JSON.
For example, the schema of a reading an export with the DynamoDB JSON structure might look like the following:
root
|-- Item: struct
| |-- ColA: struct
| | |-- S: string
| |-- ColB: struct
| | |-- S: string
| |-- ColC: struct
| | |-- N: string
| |-- ColD: struct
| | |-- L: array
| | | |-- element: null
The unnestDDBJson()
transform would convert this to:
root
|-- ColA: string
|-- ColB: string
|-- ColC: string
|-- ColD: array
| |-- element: null
The following code example shows how to use the AWS Glue DynamoDB export connector, invoke a DynamoDB JSON unnest, and print the number of partitions:
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamoDbDataSink
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val glueContext = new GlueContext(SparkContext.getOrCreate())
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val dynamicFrame = glueContext.getSourceWithFormat(
connectionType = "dynamodb",
options = JsonOptions(Map(
"dynamodb.export" -> "ddb",
"dynamodb.tableArn" -> "<test_source>",
"dynamodb.s3.bucket" -> "<bucket name>",
"dynamodb.s3.prefix" -> "<bucket prefix>",
"dynamodb.s3.bucketOwner" -> "<account_id of bucket>",
))
).getDynamicFrame()
val unnested = dynamicFrame.unnestDDBJson()
print(unnested.getNumPartitions())
Job.commit()
}
}
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
— A function that returns the schema to use. Specified as a zero-parameter function to defer potentially expensive computation.
Sets the schema of this DynamicFrame
to the specified value. This is
primarily used internally to avoid costly schema recomputation. The passed-in schema must
contain all columns present in the data.
Def withName
def withName( name : String ) : DynamicFrame
name
— The new name to use.
Returns a copy of this DynamicFrame
with a new name.
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
Returns a copy of this DynamicFrame
with the specified transformation
context.