Clase DynamicFrame Scala de AWS Glue - AWS Adherencia

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Clase DynamicFrame Scala de AWS Glue

Paquete: com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

DynamicFrame es una colección distribuida de objetos DynamicRecord autodescriptivos.

Los elementos DynamicFrames se han diseñado para proporcionar un modelo de datos flexible para operaciones de ETL (extracción, transformación y carga). No necesitan un esquema para crearse y se pueden usar para leer y transformar datos que contienen valores y tipos confusos o incoherentes. Un esquema se puede calcular bajo demanda para las operaciones que necesiten uno.

Los objetos DynamicFrame proporcionan una serie de transformaciones para la limpieza de datos y ETL. También admiten la conversión a clases DataFrame de SparkSQL, y desde dichas clases, para integrarse con el código existente y las numerosas operaciones de análisis que proporcionan las clases DataFrame.

Los siguientes parámetros se comparten entre las numerosas transformaciones de AWS Glue que construyen objetos DynamicFrame:

  • transformationContext: identificador de este DynamicFrame. El parámetro transformationContext se utiliza como clave para el estado de marcador de flujo de trabajo que se conserva de una ejecución a otra.

  • callSite: proporciona información de contexto para los informes de error. Estos valores se establecen automáticamente cuando se realiza la llamada desde Python.

  • stageThreshold: número máximo de registros de error permitidos en el cálculo de este objeto DynamicFrame antes de generar una excepción, sin incluir los registros presentes en el objeto DynamicFrame anterior.

  • totalThreshold: número máximo registros de error totales antes de que se genere una excepción, incluidos los de marcos anteriores.

Val errorsCount

val errorsCount

El número de registros de error en este objeto DynamicFrame. Se incluyen errores de las operaciones anteriores.

Def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings: secuencia de mapeos para construir un objeto DynamicFrame nuevo.

  • caseSensitive: indica si hay que considerar que las columnas de origen distinguen entre mayúsculas y minúsculas. Configurar este valor como falso puede ser de ayuda al realizar integraciones con almacenes que no distinguen entre mayúsculas y minúsculas, como AWS Glue Data Catalog.

Selecciona, proyecta y convierte las columnas en función de una secuencia de mapeos.

Cada mapeo se compone de una columna y un tipo de origen y una columna y un tipo de destino. Los mapeos se pueden especificar como una tupla cuádruple (source_path, source_type, target_path, target_type) o un objeto MappingSpec que contiene la misma información.

Además de usar los mapeos para proyecciones y conversiones simples, también se pueden usar para anidar o aplanar campos separando los componentes de la ruta con "." (punto).

Por ejemplo, suponga que tiene una instancia de DynamicFrame con el siguiente esquema.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

Puede hacer la siguiente llamada para quitar el anidamiento en los campos state y zip:

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

El esquema resultante es el siguiente.

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

También puede utilizar applyMapping para volver a anidar columnas. Por ejemplo, el siguiente código invierte la transformación anterior y crea una estructura denominada address en el destino.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

Los nombres de los campos que contienen caracteres "." (punto) se pueden citar mediante acentos graves (``).

nota

Actualmente no se puede usar el método applyMapping para mapear columnas anidadas en matrices.

Def assertErrorThreshold

def assertErrorThreshold : Unit

Acción que fuerza el cómputo y verifica que el número de registros de error esté por debajo de stageThreshold y totalThreshold. Genera una excepción si no se cumple ninguna de las condiciones.

Def count

lazy def count

Devuelve el número de elementos en este objeto DynamicFrame.

Def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Devuelve un objeto DynamicFrame nuevo sin la columna especificada.

Def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Devuelve un objeto DynamicFrame nuevo sin las columnas especificadas.

Este método se puede utilizar para eliminar columnas anidadas, incluidas las que se encuentran en matrices, pero no para descartar elementos de matriz específicos.

Def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

Devuelve un objeto DynamicFrame nuevo sin las columnas nulas.

nota

Solo se eliminan las columnas del tipo NullType. Los valores nulos individuales de otras columnas no se eliminan ni modifican.

def errorsAsDynamicFrame

def errorsAsDynamicFrame

Devuelve un objeto DynamicFrame nuevo que contiene los registros de error de este DynamicFrame.

Def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Crea un objeto DynamicFrame nuevo que solo contiene los registros en los que la función "f" devuelve true. La función de filtro "f" no debe mutar el registro de entrada.

Def getName

def getName : String

Devuelve el nombre de este objeto DynamicFrame.

Def getNumPartitions

def getNumPartitions

Devuelve el número de particiones en este objeto DynamicFrame.

Def getSchemaIfComputed

def getSchemaIfComputed : Option[Schema]

Devuelve el esquema si ya se ha calculado. No analiza los datos si aún no se ha calculado el esquema.

Def isSchemaComputed

def isSchemaComputed : Boolean

Devuelve true si el esquema se ha calculado para este objeto DynamicFrame, de lo contrario devuelve false. Si este método devuelve false, la llamada al método schema necesitará otra pasada por los registros en este objeto DynamicFrame.

Def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

Def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1: las columnas de este objeto DynamicFrame que se utilizarán para la combinación.

  • keys2: las columnas de frame2 que se utilizarán para la combinación. Deben tener la misma longitud que keys1.

  • frame2: el elemento DynamicFrame con el que debe realizarse la combinación.

Devuelve el resultado de realizar una combinación equijoin con frame2 utilizando las claves especificadas.

Def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Devuelve un objeto DynamicFrame nuevo creado aplicando la función "f" especificada a cada registro de este objeto DynamicFrame.

Este método copia cada registro antes de aplicar la función especificada, por lo que es seguro para mutar los registros. Si la función de mapeo genera una excepción en un registro determinado, dicho registro se marca como un error y la pila de seguimiento se guarda como una columna en el registro de errores.

Def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame: DynamicFrame provisional que se fusionará.

  • primaryKeys: la lista de campos de clave principal para hacer coincidir los registros de los DynamicFrame de origen y provisionales.

  • transformationContext: una cadena única que se utiliza para recuperar metadatos sobre la transformación actual (opcional).

  • options: una cadena de pares de nombre-valor de JSON que proporcionan información adicional para esta transformación.

  • callSite: se utiliza para proporcionar información de contexto para los informes de error.

  • stageThreshold: un valor Long. Número de errores de la transformación especificada que provocarán que el proceso se termine.

  • totalThreshold: un valor Long. Número total de errores hasta esta transformación (incluida) que provocarán que el proceso se termine.

Combina este objeto DynamicFrame con una instancia provisional de DynamicFrame en función de las claves principales especificadas para identificar registros. Los registros duplicados (registros con las mismas claves principales) no se eliminan. Si no hay ningún registro que coincida en el marco provisional, se retienen todos los registros del origen (incluidos los duplicados). Si el marco provisional tiene registros coincidentes, estos sobrescriben a los registros del origen en AWS Glue.

La instancia de DynamicFrame devuelta contiene el registro A en los siguientes casos:

  1. Si A existe tanto en el marco de origen como en el marco provisional, se devuelve A en el marco provisional.

  2. Si A está en la tabla de origen y A.primaryKeys no está en stagingDynamicFrame (significa que A no se actualiza en la tabla provisional).

No es necesario que el marco de origen y el marco provisional tengan el mismo esquema.

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

Def printSchema

def printSchema : Unit

Imprime el esquema de este objeto DynamicFrame en stdout en un formato fácil de leer.

Def recomputeSchema

def recomputeSchema : Schema

Fuerza un nuevo cálculo del esquema. Es necesario realizar un análisis de los datos, lo que podría "endurecer" el esquema si hay algunos campos del esquema actual que no están en los datos.

Devuelve el esquema calculado de nuevo.

Def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName: el nombre que se va a utilizar para el DynamicFrame base en la salida. Los DynamicFrame que se crean mediante matrices dinámicas comienzan con este prefijo.

  • stagingPath: la ruta de Amazon Simple Storage Service (Amazon S3) para escribir datos intermedios.

  • options: opciones y configuración de Rationalize. En la actualidad no se utiliza.

Aplana todas las estructuras anidadas y convierte las matrices en tablas independientes.

Puede utilizar esta operación para preparar datos con anidamiento profundo para su incorporación a una base de datos relacional. Las estructuras anidadas se aplanan de la misma manera en que lo haría la transformación Unnest. Además, las matrices se dividen en tablas independientes en las que cada elemento de matriz se convierte en una fila. Por ejemplo, supongamos que tiene una instancia de DynamicFrame con los siguientes datos.

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

Ejecute el siguiente código.

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

Se generan dos tablas. La primera tabla se denomina "people" y contiene lo siguiente.

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

Aquí, la matriz friends se ha reemplazado por una clave de combinación generada automáticamente. Se crea una tabla diferente llamada people.friends con el siguiente contenido.

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

En esta tabla, "id" es una clave de combinación que identifica de qué registro procede el elemento de matriz, "index" hace referencia a la posición en la matriz original y "val" es la entrada de matriz real.

El método relationalize devuelve la secuencia de objetos DynamicFrame creados aplicando este proceso de forma recursiva a todas las matrices.

nota

La biblioteca de AWS Glue genera automáticamente las claves de combinación de las tablas nuevas. Para garantizar que las claves de combinación sean únicas en las ejecuciones de flujo de trabajo, debe habilitar los marcadores de flujo de trabajo.

Def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName: el nombre original de la columna.

  • newName: el nuevo nombre de la columna.

Devuelve un DynamicFrame nuevo que incluye el campo especificado con un nuevo nombre.

Puede usar este método para cambiar el nombre de los campos anidados. Por ejemplo, el siguiente código cambiaría el nombre de state a state_code en la estructura de dirección.

{{{ df.renameField("address.state", "address.state_code") }}}

Def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Devuelve un objeto DynamicFrame nuevo con numPartitions particiones.

Def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption: acción que se aplica a todas las columnas ChoiceType que no están indicadas en la secuencia specs.

  • database: la base de datos de Data Catalog que se usará con la acción match_catalog.

  • tableName: la tabla de Data Catalog que se usará con la acción match_catalog.

Devuelve un objeto DynamicFrame nuevo reemplazando uno o varios ChoiceType por un tipo más específico.

Hay dos formas de usar resolveChoice. La primera consiste en especificar una secuencia de columnas específicas y cómo resolverlas. Se especifican como tuplas compuestas de pares (columna, acción).

Los posibles valores son los siguientes:

  • cast:type: intenta convertir todos los valores al tipo especificado.

  • make_cols: convierte cada tipo diferenciado en una columna con el nombre columnName_type.

  • make_struct: convierte una columna en una estructura con claves para cada tipo diferenciado.

  • project:type: retiene solo valores del tipo especificado.

El otro modo de resolveChoice es especificar una resolución única para todas las columnas ChoiceType. Se puede utilizar en los casos en los que se desconozca la lista completa de ChoiceType antes de la ejecución. Además de las acciones enumeradas anteriormente, este modo también admite la siguiente acción:

  • match_catalog: intenta convertir cada ChoiceType al tipo correspondiente en la tabla de catálogos especificada.

Ejemplos:

Resolver la columna user.id mediante la conversión a int y hacer que el campo address solo conserve estructuras.

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

Resolver todas las columnas ChoiceType convirtiendo cada opción en una columna independiente.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

Resolver todas las columnas ChoiceType mediante la conversión a los tipos de la tabla de catálogos especificada.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

Def schema

def schema : Schema

Devuelve el esquema de este objeto DynamicFrame.

El esquema devuelto contiene todos los campos presentes en un registro en este objeto DynamicFrame. Sin embargo, en un pequeño número de casos, es posible que también contenga campos adicionales. Puede usar el método Unnest para "ajustar" el esquema en función de los registros de este objeto DynamicFrame.

Def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Devuelve un campo individual como un objeto DynamicFrame.

Def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths: secuencia de los nombres de columna que se seleccionarán.

Devuelve un nuevo objeto DynamicFrame que contiene las columnas especificadas.

nota

Solo puede usar el método selectFields para seleccionar columnas de nivel superior. Puede usar el método applyMapping para seleccionar columnas anidadas.

Def show

def show( numRows : Int = 20 ) : Unit
  • numRows: número de filas que se imprimirán.

Imprime filas de este objeto DynamicFrame en formato JSON.

Def simplifyDDBJson

Las exportaciones de DynamoDB con el conector de exportaciones de DynamoDB de AWS Glue generan archivos JSON con estructuras anidadas específicas. Para obtener más información, consulte Objetos de datos. simplifyDDBJson Simplifica las columnas anidadas en un DynamicFrame de este tipo de datos y genera un DynamicFrame nuevo y simplificado. Si hay varios tipos y un tipo de mapa contenidos en tipo de lista, los elementos en la lista no se simplificarán. Este método solo es compatible con los datos exportados de DynamoDB con el formato JSON. Considere la acción de unnest para realizar cambios similares en otros tipos de datos.

def simplifyDDBJson() : DynamicFrame

Este método no toma ningún parámetro.

Ejemplo de entrada

Considere el siguiente esquema generado por una exportación a DynamoDB:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

Código de ejemplo

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

La transformación de simplifyDDBJson simplificará esto de la siguiente forma:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Transformación de paso a través que devuelve los mismos registros, pero escribe un subconjunto de registros como efecto secundario.

  • path: ruta de Amazon S3 en la que se escribirá la salida, con el formato s3://bucket//path.

  • options: mapa JsonOptions opcional que describe el comportamiento de muestreo.

Devuelve un objeto DynamicFrame que contiene los mismos registros que este.

De forma predeterminada, escribe 100 registros arbitrarios en la ubicación especificada por path. Puede personalizar este comportamiento mediante el mapa options. Entre las claves válidas se incluyen:

  • topk: especifica el número total de registros escritos. El valor predeterminado es 100.

  • prob: especifica la probabilidad de que se incluya un registro individual (como decimal). El valor predeterminado es 1.

Por ejemplo, la siguiente llamada realizaría un muestreo del conjunto de datos seleccionando cada registro con una probabilidad del 20 % y se detendría después de que se hubieran escrito 200 registros.

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

Def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths: las rutas que se incluirán en el primer objeto DynamicFrame.

Devuelve una secuencia de dos objetos DynamicFrame. El primer DynamicFrame contiene las rutas especificadas y el segundo contiene las demás columnas.

Ejemplo

En este ejemplo, se toma un objeto DynamicFrame creado a partir de la tabla persons en la base de datos legislators de Data Catalog de AWS Glue y divide el DynamicFrame en dos, con los campos especificados incluidos en el primer DynamicFrame y los campos restantes incluidos en un segundo DynamicFrame. Luego, el ejemplo elige el primer DynamicFrame del resultado.

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

Def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

Divide las filas en función de predicados que comparan columnas con constantes.

  • paths: columnas que se van a utilizar para la comparación.

  • values: valores constantes que se van a utilizar para la comparación.

  • operators: operadores que se van a utilizar para la comparación.

Devuelve una secuencia de dos objetos DynamicFrame. El primero contiene filas en las que se aplica el predicado y el segundo contiene las que no se les aplica.

Los predicados se especifican usando tres secuencias: "paths" contiene los nombres de columna (posiblemente anidadas), "values" contiene los valores constantes con los que se realizará la comparación y "operators" contiene los operadores que se usarán para la comparación. Las tres secuencias deben tener la misma longitud: el operador que ocupa la posición n se usa para comparar la columna n con el valor n.

Cada operador debe ser uno de los siguientes: "!=", "=", "<=", "<", ">=" o ">".

A modo de ejemplo, la siguiente llamada dividiría un objeto DynamicFrame de modo que el primer marco de salida contuviera los registros de personas de más de 65 años de Estados Unidos y el segundo contuviera todos los demás registros.

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Def stageErrorsCount

def stageErrorsCount

Devuelve el número de registros de error creado al calcular este objeto DynamicFrame. Se excluyen los errores de operaciones anteriores que se pasaron a este objeto DynamicFrame como entrada.

Def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

Convierte este DynamicFrame en un DataFrame Apache Spark SQL con el mismo esquema y registros.

nota

Dado que los objetos DataFrame no admiten ChoiceType, este método convierte automáticamente las columnas ChoiceType en StructType. Para obtener más información y opciones para la resolución de elección, consulte resolveChoice.

Def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path: la columna que se va a analizar. Debe tener formato de cadena o binario.

  • format: el formato que se utilizará para el análisis.

  • optionString: opciones que se pasarán al formato, como el separador CSV.

Analiza una cadena insertada o una columna binaria de acuerdo con el formato especificado. Las columnas analizadas se anidan en una estructura con el nombre de columna original.

Por ejemplo, suponga que tiene un archivo CSV con una columna JSON insertada.

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

Después de un análisis inicial, obtendrá un objeto DynamicFrame con el siguiente esquema.

{{{ root |-- name: string |-- age: int |-- address: string }}}

Puede llamar a unbox en la columna de dirección para analizar los componentes específicos.

{{{ df.unbox("address", "json") }}}

Esto nos proporciona un objeto DynamicFrame con el siguiente esquema.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Devuelve un objeto DynamicFrame nuevo con todas las estructuras anidadas aplanadas. Los nombres se crean con el carácter "." (punto).

Por ejemplo, suponga que tiene una instancia de DynamicFrame con el siguiente esquema.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

La siguiente llamada elimina el anidamiento de la estructura de dirección:

{{{ df.unnest() }}}

El esquema resultante es el siguiente.

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

Este método también aplana las estructuras anidadas de dentro de las matrices. Sin embargo, por razones históricas, los nombres de dichos campos tienen anexado previamente el nombre de la matriz contenedora y ".val".

Def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

Desanida las columnas anidadas de un elemento DynamicFrame que se encuentren específicamente en la estructura JSON de DynamoDB y devuelve un nuevo elemento DynamicFrame no anidado. Las columnas que sean de una matriz de tipos de estructuras no se desanidarán. Tenga en cuenta que esta transformación de desanidamiento es un tipo específico que se comporta de modo diferente a la transformación unnest normal y requiere que los datos ya estén en la estructura JSON de DynamoDB. Para obtener más información, consulte JSON de DynamoDB.

Por ejemplo, el esquema de una lectura de una exportación con la estructura JSON de DynamoDB puede parecerse al siguiente:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

La transformación unnestDDBJson() convertiría esto en:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

En el siguiente ejemplo de código, se muestra cómo utilizar el conector de exportación de DynamoDB de AWS Glue, invocar un desanidamiento JSON de DynamoDB e imprimir el número de particiones:

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema: función que devuelve el esquema que se va a utilizar. Se especifica como una función sin parámetros para diferir el cómputo potencialmente costoso.

Establece el esquema de este objeto DynamicFrame en el valor especificado. Se utiliza principalmente de forma interna para evitar un nuevo y costoso cálculo de esquema. El esquema pasado debe contener todas las columnas presentes en los datos.

Def withName

def withName( name : String ) : DynamicFrame
  • name: nuevo nombre que se usará.

Devuelve una copia de este objeto DynamicFrame con un nuevo nombre.

Def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

Devuelve una copia de este objeto DynamicFrame con el contexto de transformación especificado.