As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Classe DynamicFrame em Scala do AWS Glue
Pacote: com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
Um DynamicFrame
é uma coleção distribuída de objetos autodescritivos DynamicRecord.
DynamicFrame
s são projetados para fornecer um modelo de dados flexível para operações de ETL (extração, transformação e carregamento). Eles não exigem um esquema para criar e podem ser usados para ler e transformar dados com valores e tipos confusos ou inconsistentes. Um esquema pode ser calculado sob demanda para as operações que precisam de um.
DynamicFrame
fornece uma variedade de transformações para limpeza de dados e ETL. Eles também suportam conversão de e para os DataFrames SparkSQL para integração com o código existente e as muitas operações de análise que os DataFrames fornecem.
Os seguintes parâmetros são compartilhados em muitas das transformações do AWS Glue que constroem DynamicFrame
s:
transformationContext
: o identificador para esseDynamicFrame
. OtransformationContext
é usado como uma chave para o estado do marcador de trabalho que é mantido nas execuções.callSite
: fornece informações de contexto para o relatório de erros. Esses valores são definidos automaticamente quando chamados do Python.stageThreshold
— O número máximo de registros de erros permitido a partir da computação desseDynamicFrame
antes de lançar uma exceção, excluindo os registros presentes noDynamicFrame
anterior.totalThreshold
– O número máximo de registros de erros totais antes que uma exceção seja lançada, incluindo os de quadros anteriores.
Val errorsCount
val errorsCount
O número de registros de erros neste DynamicFrame
. Isso inclui erros de operações anteriores.
Def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
— Uma sequência de mapeamentos para construir um novoDynamicFrame
.caseSensitive
: informa se deve ou não tratar as colunas de origem como colunas que diferenciam letras maiúsculas de minúsculas. Definir essa opção como false pode ajudar na integração com armazenamentos que não diferenciam maiúsculas de minúsculas, como o AWS Glue Data Catalog.
Seleciona, projeta e converte colunas com base em uma sequência de mapeamentos.
Cada mapeamento é composto por uma coluna e um tipo de origem e uma coluna e um tipo de destino. Mapeamentos podem ser especificados como um objeto 4-tuple (source_path
, source_type
, target_path
, target_type
) or a MappingSpec que contém a mesma informação.
Além de usar mapeamentos para projeções simples e da conversão, você pode usar os mapeamentos para aninhar ou desaninhar campos, separando os componentes do caminho com '.
' (ponto).
Por exemplo, digamos que você tem um DynamicFrame
com o seguinte esquema.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
Você pode fazer a seguinte chamada para desaninhar os campos state
e zip
.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
O esquema resultante será o seguinte.
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
Você também pode usar applyMapping
para aninhar novamente as colunas. Por exemplo, o seguinte inverte a transformação anterior e cria uma estrutura chamada address
no destino.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
Os nomes de campo que contêm caracteres '.
' (período) pode ser cotado usando acentos graves (``
).
nota
Atualmente, você não pode usar o método applyMapping
para mapear colunas que estão aninhadas sob arrays.
Def assertErrorThreshold
def assertErrorThreshold : Unit
Uma ação que força a computação e verifica se o número de registros de erros está abaixo de stageThreshold
e de totalThreshold
. Lança uma exceção se a condição falhar.
Def count
lazy
def count
Retorna o número de elementos neste DynamicFrame
.
Def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Retorna um novo DynamicFrame
com a coluna especificada removida.
Def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Retorna um novo DynamicFrame
com as colunas especificadas removidas.
Esse método pode ser usado para excluir colunas aninhadas, incluindo aquelas dentro de matrizes, mas não pode ser usado para descartar elementos de matriz específicos.
Def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
Retorna um novo DynamicFrame
com todas as colunas nulas removidas.
nota
Isso remove apenas colunas do tipo NullType
. Valores nulos individuais em outras colunas não são removidos ou modificados.
Def errorsAsDynamicFrame
def errorsAsDynamicFrame
Retorna um novo DynamicFrame
contendo os registros de erro deste DynamicFrame
.
Def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Cria um novo DynamicFrame
que contém apenas os registros para os quais a função "f
" retorna true
. A função de filtro "f
" não deve alterar o registro de entrada.
Def getName
def getName : String
Retorna o nome deste DynamicFrame
.
Def getNumPartitions
def getNumPartitions
Retorna o número de partições neste DynamicFrame
.
Def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
Retorna o esquema, se ele já foi calculado. Não verifica os dados se o esquema ainda não tiver sido calculado.
Def isSchemaComputed
def isSchemaComputed : Boolean
Retorna true
se o esquema foi calculado para este DynamicFrame
ou false
, se não. Se esse método retorna falso, chamar o método schema
exige outra passagem pelos registros nesse DynamicFrame
.
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
– As colunas nesseDynamicFrame
para usar na junção.keys2
: as colunas noframe2
a serem usadas na junção. Deve ter o mesmo tamanho quekeys1
.frame2
— ODynamicFrame
para usar na junção.
Retorna o resultado da execução de uma junção equivalente com frame2
usando as chaves especificadas.
Def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Retorna um novo DynamicFrame
criado aplicando a função especificada "f
" para cada registro nesse DynamicFrame
.
Este método copia cada registro antes de aplicar a função especificada, portanto, é seguro alterar os registros. Se a função de mapeamento lançar uma exceção em um determinado registro, esse registro é marcado como um erro e o rastreamento de pilha é salvo como uma coluna no registro de erros.
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
: oDynamicFrame
de preparação para mesclar.primaryKeys
: a lista de campos de chave primária para corresponder aos registros da fonte eDynamicFrame
s de preparação.transformationContext
: uma string exclusiva usada para recuperar os metadados sobre a transformação atual (opcional).options
: uma string de pares nome-valor JSON que fornecem informações adicionais para essa transformação.callSite
: usado para fornecer informações de contexto para o relatório de erros.stageThreshold
: umaLong
. O número de erros na transformação para a qual o processamento precisa apresentar falhas.totalThreshold
: umaLong
. O número total de erros nessa transformação para os quais o processamento precisa apresentar falhas.
Mescla esse DynamicFrame
com uma preparação DynamicFrame
de acordo com as chaves primárias especificadas para identificar registros. Registros duplicados (com as mesmas chaves primárias) não são eliminados. Se não houver nenhum registro correspondente no quadro de preparação, todos os registros (incluindo os duplicados) serão retidos da origem. Se o quadro de preparação tiver registros correspondentes, os do quadro de preparação substituirão os da origem no AWS Glue.
O DynamicFrame
retornado contém registro A nos seguintes casos:
Se
A
existir no quadro de origem e no quadro de preparação, oA
do quadro de preparação será retornado.Se
A
estiver na tabela de origem eA.primaryKeys
não estiver nostagingDynamicFrame
(isso significa queA
não é atualizado na tabela de preparação).
O quadro de origem e o quadro de preparação não precisam ter o mesmo esquema.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
Def printSchema
def printSchema : Unit
Imprime o esquema deste DynamicFrame
para stdout
em um formato legível.
Def recomputeSchema
def recomputeSchema : Schema
Força um recálculo do esquema. Isso requer uma verificação nos dados, mas pode "restringir" o esquema se houver alguns campos no esquema atual que não estejam presentes nos dados.
Retorna o esquema recalculado.
Def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
: o nome a ser usado para oDynamicFrame
base na saída.DynamicFrame
s criados pelo deslocamento de matrizes começam com isso como um prefixo.stagingPath
: o caminho do Amazon Simple Storage Service (Amazon S3) para gravação de dados intermediários.options
: opções e a configuração da função Relationalize. Não utilizado no momento.
Nivela todas as estruturas aninhadas e matrizes dinâmicas em tabelas separadas.
Você pode usar essa operação para preparar dados extremamente aninhados para adição em um banco de dados relacional. As estruturas aninhadas são niveladas da mesma maneira que a transformação Unnest. Além disso, as matrizes são dinamizadas em tabelas separadas, com cada elemento da matriz se tornando uma linha. Por exemplo, digamos que você tenha um DynamicFrame
com os seguintes dados.
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
Execute o código a seguir.
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
Isso produz duas tabelas. A primeira tabela é chamada "pessoas" e contém o seguinte.
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
Aqui, a matriz de amigos foi substituída por uma chave de junção gerada automaticamente. Uma tabela separada chamada people.friends
é criada com o seguinte conteúdo.
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
Nessa tabela, 'id
' é uma chave de junção que identifica de qual registro o elemento da matriz é proveniente, 'index
' refere-se à posição na matriz original e 'val
' é a entrada real da matriz.
O método relationalize
retorna a sequência de DynamicFrame
criada aplicando esse processo recursivamente a todas as matrizes.
nota
A biblioteca do AWS Glue gera automaticamente chaves de junção para novas tabelas. Para garantir que as chaves de junção sejam exclusivas nas execuções do trabalho, você deve ativar as marcações de trabalho.
Def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
– O nome original da coluna.newName
– O novo nome da coluna.
Retorna um novo DynamicFrame
com campo especificado renomeado.
Você pode usar esse método para renomear campos aninhados. Por exemplo, o código a seguir renomearia state
como state_code
na estrutura de endereço.
{{{ df.renameField("address.state", "address.state_code") }}}
Def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Retorna um novo DynamicFrame
com partições numPartitions
.
Def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
— Uma ação a ser aplicada a todas as colunasChoiceType
não listadas na sequência de especificações.database
: o banco de dados do Data Catalog a ser usado com a açãomatch_catalog
.tableName
: a tabela do Data Catalog a ser usada com a açãomatch_catalog
.
Retorna um novo DynamicFrame
substituindo um ou mais ChoiceType
por um tipo mais específico.
Há duas maneiras de usar resolveChoice
. A primeira é especificar uma sequência de colunas específicas e como resolvê-las. Elas são especificadas como tuplas compostas de pares (coluna, ação).
A seguir estão as ações possíveis:
cast:type
– Tenta converter todos os valores para o tipo especificado.make_cols
– Converte cada tipo distinto em uma coluna com o nomecolumnName_type
.make_struct
– Converte uma coluna em uma estrutura com chaves para cada tipo distinto.project:type
— Mantém apenas valores do tipo especificado.
O outro modo para resolveChoice
é especificar uma única resolução para todos os ChoiceType
s. Você pode usar isso em casos em que a lista completa de ChoiceType
s for desconhecida antes da execução. Além das ações listadas anteriormente, esse modo também aceita a seguinte ação:
match_catalog
ChoiceType
– Tenta converter cada para o tipo correspondente na tabela de catálogo especificada.
Exemplos:
Corrija a coluna user.id
convertendo para um inteiro e faça o campo address
reter apenas as estruturas.
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
Resolva todos os ChoiceType
s convertendo cada opção em uma coluna separada.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
Resolva todos os ChoiceType
s convertendo para os tipos na tabela de catálogo especificada.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
Def schema
def schema : Schema
Retorna o esquema deste DynamicFrame
.
O esquema retornado é garantido para conter todos os domínios que estão presentes nesse registro DynamicFrame
. Mas em um pequeno número de casos, ele também pode conter campos adicionais. Você pode usar o método Unnest para "restringir" o esquema com base nos registros deste DynamicFrame
.
Def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Retorna um único campo como um DynamicFrame
.
Def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
— A sequência de nomes de coluna para selecionar.
Retorna um novo DynamicFrame
contendo as colunas especificadas.
nota
O método selectFields
só pode ser usado para selecionar as colunas de nível superior. O método applyMapping pode ser usado para selecionar as colunas aninhadas.
Def show
def show( numRows : Int = 20 ) : Unit
numRows
– O número de linhas a serem impressas.
Imprime linhas desse DynamicFrame
no formato JSON.
Def simplifyDDBJson
As exportações do DynamoDB feitas com o conector de exportação para DynamoDB do AWS Glue resultam em arquivos JSON de estruturas aninhadas específicas. Para obter mais informações, consulte Objetos de dados. simplifyDDBJson
Simplifica as colunas aninhadas em um DynamicFrame desse tipo de dados e retorna um novo DynamicFrame simplificado. Se houver vários tipos ou um tipo de mapa contidos em um tipo de lista, os elementos na lista não serão simplificados. Esse método oferece suporte somente a dados no for mato JSON de exportação do DynamoDB. Considere o unnest
para realizar alterações semelhantes em outros tipos de dados.
def simplifyDDBJson() : DynamicFrame
Esse método não aceita parâmetros.
Exemplo de entrada
Considere o seguinte esquema gerado por uma exportação do DynamoDB:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
Código de exemplo
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
A transformação simplifyDDBJson
simplificará isso para:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Transformação de passagem que retorna os mesmos registros, mas grava um subconjunto de registros como um efeito secundário.
path
: o caminho no Amazon S3 no qual gravar a saída, no formatos3://bucket//path
.options
— Um mapa opcionalJsonOptions
que descreve o comportamento de amostragem.
Retorna um DynamicFrame
que contém os mesmos registros deste.
Por padrão, grava 100 registros arbitrários no local especificado por path
. Você pode personalizar esse comportamento usando o mapa options
. Chaves válidas incluem o seguinte:
topk
: especifica o número total de registros gravados. O padrão é 100.prob
: especifica a probabilidade (como um número decimal) de incluir um registro individual. O padrão é 1.
Por exemplo, a chamada a seguir seria um exemplo do conjunto de dados, selecionando cada registro com uma probabilidade de 20% e interrompendo após a gravação de 200 registros.
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
Def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
— Os caminhos a serem incluídos no primeiroDynamicFrame
.
Retorna uma sequência de dois DynamicFrame
s. O primeiro DynamicFrame
contém os caminhos especificados e o segundo, todas as outras colunas.
Exemplo
Este exemplo usa um DynamicFrame criado a partir da tabela persons
no banco de dados legislators
no catálogo de dados do Glue da AWS e divide o DynamicFrame em dois, com os campos especificados entrando no primeiro DynamicFrame e os campos restantes entrando em um segundo DynamicFrame. O exemplo então escolhe o primeiro DynamicFrame do resultado.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
Def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
Divide linhas com base em predicados que comparam colunas com constantes.
paths
— As colunas a serem usadas para comparação.values
— Os valores constantes a serem usados para comparação.operators
— Os operadores a serem usados para comparação.
Retorna uma sequência de dois DynamicFrame
s. O primeiro contém linhas para as quais o predicado é verdadeiro, e o segundo contém linhas para as quais ele é falso.
Os predicados são especificados usando três sequências: "paths
" contém os nomes de coluna (possivelmente aninhados), "values
" contém os valores constantes para comparar e "operators
" contém os operadores a serem usados para comparação. Todas as três sequências devem ter o mesmo tamanho: o operador n
é usado para comparar a coluna n
com o valor n
.
Cada operador deve ser um de "!=
", "=
", "<=
", "<
", ">=
" ou ">
".
Como exemplo, a chamada a seguir dividiria um DynamicFrame
para que o primeiro quadro de saída contivesse registros de pessoas dos Estados Unidos com mais de 65 anos, e o segundo contivesse todos os outros registros.
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Def stageErrorsCount
def stageErrorsCount
Retorna o número de registros de erros criados durante o cálculo desse DynamicFrame
. Isso exclui erros de operações anteriores que foram passadas para este DynamicFrame
como entrada.
Def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
Converte isso DynamicFrame
para um Apache Spark SQL DataFrame
com o mesmo esquema e registros.
nota
Como DataFrame
s não oferecem suporte para ChoiceType
s, esse método converte automaticamente colunas ChoiceType
em StructType
s. Para obter mais informações e opções para opções de resolução, consulte resolveChoice.
Def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
: a coluna a ser analisada. Deve ser uma string ou binária.format
– O formato a ser usado para a análise.optionString
– Opções para passar para o formato, como o separador CSV.
Analisa uma coluna de string ou binária incorporada de acordo com o formato especificado. Colunas analisadas são aninhadas sob uma estrutura com o nome da coluna original.
Por exemplo, suponha que há um arquivo CSV com uma coluna JSON incorporada,
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
Após uma análise inicial, seria obtido um DynamicFrame
com o seguinte esquema.
{{{ root |-- name: string |-- age: int |-- address: string }}}
É possível chamar unbox
na coluna de endereço para analisar os componentes específicos.
{{{ df.unbox("address", "json") }}}
Isso nos dá um DynamicFrame
com o seguinte esquema.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Retorna um novo DynamicFrame
com todas as estruturas aninhadas niveladas. Os nomes são criados usando o caractere '.
' (ponto).
Por exemplo, digamos que você tem um DynamicFrame
com o seguinte esquema.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
A chamada a seguir desaninha a estrutura do endereço.
{{{ df.unnest() }}}
O esquema resultante será o seguinte.
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
Esse método também desaninha estruturas aninhados dentro de matrizes. No entanto, por razões históricas, os nomes dos campos são prefixados com o nome do array de fechamento e ".val
".
Def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
Desaninha colunas aninhadas em um DynamicFrame
que estão especificamente na estrutura JSON do DynamoDB e retorna um novo DynamicFrame
não aninhado. Colunas que pertençam a uma matriz de tipos de estrutura não serão desaninhadas. Observe que esse é um tipo específico de transformação de desaninhamento que se comporta diferentemente da transformação unnest
comum e requer que os dados já estejam na estrutura JSON do DynamoDB. Para mais informações, consulte JSON do DynamoDB.
Por exemplo, o esquema de uma leitura de uma exportação com a estrutura JSON do DynamoDB pode se parecer com o seguinte:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
A transformação unnestDDBJson()
converteria isso em:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
O exemplo de código a seguir mostra como usar o conector de exportação para DynamoDB do AWS Glue, invocar um desaninhamento de JSON do DynamoDB e imprimir o número de partições:
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
: uma função que retorna o esquema a ser usado. Especificado como uma função de parâmetro zero para adiar a computação potencialmente dispendiosa.
Define o esquema desse DynamicFrame
com o valor especificado. Isso é usado principalmente de maneira interna para evitar a recomputação dispendiosa do esquema. O esquema transmitido deve conter todas as colunas presentes nos dados.
Def withName
def withName( name : String ) : DynamicFrame
name
— O novo nome a ser usado.
Retorna uma cópia desse DynamicFrame
com um novo nome.
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
Retorna uma cópia desse DynamicFrame
com o contexto de transformação especificado.