APIs GlueContext em Scala do AWS Glue
Pacote: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
é o ponto de entrada para leitura e gravação de um DynamicFrame de e para o Amazon Simple Storage Service (Amazon S3), AWS Glue Data Catalog, JDBC e assim por diante. Essa classe fornece funções de utilitário para criar objetos Característica do DataSource e DataSink que, por sua vez, podem ser usados para ler e gravar DynamicFrame
s.
GlueContext
também pode ser usado para definir um número de partições de destino (o padrão é 20) no DynamicFrame
se o número de partições criado da origem é menor do que um limite mínimo para as partições (o padrão é 10).
def addIngestionTimeColumns
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Acrescenta colunas de tempo de ingestão, como ingest_year
, ingest_month
, ingest_day
, ingest_hour
, ingest_minute
, para o DataFrame
de entrada. Essa função é gerada automaticamente no script gerado pelo AWS Glue, quando você especifica uma tabela do catálogo de dados com o Amazon S3 como destino. Essa função atualiza automaticamente a partição com colunas de tempo de ingestão na tabela de saída. Isso permite que os dados de saída sejam particionados automaticamente no tempo de ingestão sem exigir colunas de tempo de ingestão explícitas nos dados de entrada.
-
dataFrame
: odataFrame
ao qual anexar as colunas de tempo de ingestão. -
timeGranularity
: o detalhamento das colunas de tempo. Os valores válidos são “day
”, “hour
” e “minute
”. Por exemplo, se “hour
” é transmitido para a função, odataFrame
original terá as colunas de tempo “ingest_year
”, “ingest_month
”, “ingest_day
” e “ingest_hour
” anexadas.
Retorna o quadro de dados após anexar as colunas de detealhamento de tempo.
Exemplo:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrameFromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Retorna um DataFrame
criado com a conexão e o formato especificados. Use essa função apenas com fontes de transmissão do AWS Glue.
connectionType
: o tipo de conexão de transmissão. Os valores válidos sãokinesis
ekafka
.-
connectionOptions
: opções de conexão, a quais são diferentes para Kinesis e Kafka. Você pode encontrar a lista de todas as opções de conexão para cada origem dos dados de transmissão em Tipos e opções de conexão para ETL no AWS Glue para Spark. Observe as seguintes diferenças nas opções de conexão de transmissão:-
As fontes de transmissão do Kinesis exigem
streamARN
,startingPosition
,inferSchema
eclassification
. -
As fontes de transmissão do Kafka exigem
connectionName
,topicName
,startingOffsets
,inferSchema
eclassification
.
-
transformationContext
: o contexto de transformação a ser usado (opcional).format
: uma especificação de formato (opcional). É usado para uma conexão do Amazon S3 ou do AWS Glue com suporte para vários formatos. Para obter informações sobre os formatos compatíveis, consulte Opções de formato de dados para entradas e saídas no AWS Glue para Spark.formatOptions
: as opções de formato para o formato especificado. Para obter informações sobre as opções de formato compatíveis, consulte Opções de formato de dados.
Exemplo para fonte de transmissão do Amazon Kinesis:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Exemplo para fonte de transmissão do Kafka:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
Aplica a batch_function
transmitida para cada microlote lido a partir da fonte de transmissão.
-
frame
: o DataFrame que contém o microlote atual. -
batch_function
: uma função que será aplicada para cada microlote. -
options
: uma coleção de pares de chave-valor que contém informações sobre como processar microlotes. São necessárias as seguintes opções:-
windowSize
: a quantidade de tempo gasto no processamento de cada lote. -
checkpointLocation
: o local onde os pontos de verificação são armazenados para o trabalho de ETL de transmissão. -
batchMaxRetries
: o número máximo de novas tentativas deste lote em caso de falha. O valor padrão é 3. Essa opção só pode ser configurada para o Glue versão 2.0 e posterior.
-
Exemplo:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Cria um DataSink que grava em um local especificado de uma tabela definida no Data Catalog.
database
: o nome do banco de dados no Data Catalog.tableName
: o nome da tabela no Data Catalog.redshiftTmpDir
: o diretório de preparação temporário a ser usado com determinados coletores de dados. Definido como vazio por padrão.transformationContext
: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.additionalOptions
: opções adicionais fornecidas ao AWS Glue.catalogId
: o ID do catálogo (ID da conta) do Data Catalog que está sendo acessado. Quando nulo, o ID da conta do chamador padrão é usado.
Retorna o DataSink
.
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Cria um Característica do DataSource que lê dados em uma definição de tabela do Data Catalog.
database
: o nome do banco de dados no Data Catalog.tableName
: o nome da tabela no Data Catalog.redshiftTmpDir
: o diretório de preparação temporário a ser usado com determinados coletores de dados. Definido como vazio por padrão.transformationContext
: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.pushDownPredicate
: filtra partições sem a necessidade de listar e ler todos os arquivos no seu conjunto de dados. Para ter mais informações, consulte Pré-filtragem usando a aplicação de predicados.additionalOptions
– Uma coleção de pares nome-valor opcionais. As opções possíveis incluem as listadas em Tipos e opções de conexão para ETL no AWS Glue para Spark, exceto paraendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
edelimiter
. Outra opção suportada écatalogPartitionPredicate
:catalogPartitionPredicate
: você pode transmitir uma expressão de catálogo para filtrar com base nas colunas de índice. Isso leva a filtragem para o lado do servidor. Para obter mais informações, consulte Índices de partição do AWS Glue. Observe quepush_down_predicate
ecatalogPartitionPredicate
usam sintaxes diferentes. O primeiro usa a sintaxe padrão do Spark SQL e o outro usa o analisador JSQL.catalogId
: o ID do catálogo (ID da conta) do Data Catalog que está sendo acessado. Quando nulo, o ID da conta do chamador padrão é usado.
Retorna o DataSource
.
Exemplo de fonte de transmissão
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Cria um DataSink que grava em um banco de dados JDBC especificado em um objeto Connection
no Data Catalog. O objeto Connection
tem informações para se conectar a um depósito JDBC, incluindo URL, nome de usuário, senha, VPC, sub-rede e grupos de segurança.
catalogConnection
: o nome da conexão no Data Catalog que contém o URL do JDBC onde gravar.options
: uma string de pares de nome-valor JSON que fornecem as informações adicionais necessárias para gravar em um datastore do JDBC. Isso inclui:dbtable (obrigatório) — O nome da tabela JDBC. Para armazenamentos de dados JDBC que oferecem suporte a esquemas dentro de um banco de dados, especifique
schema.table-name
. Se um esquema não for fornecido, o esquema "público" padrão será usado. O exemplo a seguir mostra um parâmetro de opções que aponta para um esquema chamadotest
e uma tabela chamadatest_table
no banco de dadostest_db
.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (obrigatório) — O nome do banco de dados do JDBC.
Todas as opções adicionais transmitidas diretamente para gravador do JDBC de SparkSQL. Para obter mais informações, consulte Fonte de dados Redshift para Spark
.
redshiftTmpDir
: um diretório de preparação temporário a ser usado com determinados coletores de dados. Definido como vazio por padrão.transformationContext
: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.catalogId
: o ID do catálogo (ID da conta) do Data Catalog que está sendo acessado. Quando nulo, o ID da conta do chamador padrão é usado.
Código de exemplo:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Retorna o DataSink
.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Cria um DataSink que grava dados em um destino como Amazon Simple Storage Service (Amazon S3), JDBC, Catálogo de Dados do AWS Glue ou um fluxo de dados do Amazon Kinesis ou do Apache Kafka.
-
connectionType
— O tipo da conexão. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark. -
connectionOptions
– uma string de pares de nome/valor JSON que fornecem as informações adicionais para estabelecer a conexão com o depósito de dados. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark. -
transformationContext
: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.
Retorna o DataSink
.
def getSinkWithFormat
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Cria um DataSink que grava dados em um destino como Amazon S3, JDBC, Catálogo de Dados ou um fluxo de dados do Amazon Kinesis ou do Apache Kafka. Também define o formato dos dados que serão gravados no destino.
connectionType
— O tipo da conexão. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.-
options
— Uma string de pares de nome/valor JSON que fornecem as informações adicionais para estabelecer a conexão com o depósito de dados. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark. transformationContext
: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.format
— O formato dos dados a serem gravados no destino.formatOptions
: uma string de pares de nome-valor JSON que fornecem opções adicionais para a formatação de dados no destino. Consulte Opções de formato de dados.
Retorna o DataSink
.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Cria um Característica do DataSource que lê dados em uma fonte, como o Amazon S3, JDBC ou AWS Glue Data Catalog. Também oferece suporte a origens de dados de transmissão do Kafka e Kinesis.
connectionType
: o tipo da origem dos dados. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.-
connectionOptions
– uma string de pares de nome/valor JSON que fornecem as informações adicionais para estabelecer uma conexão com a origem de dados. Para ter mais informações, consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.Uma fonte de transmissão do Kinesis requer as seguintes opções de conexão:
streamARN
,startingPosition
,inferSchema
eclassification
.Uma fonte de transmissão do Kafka requer as seguintes opções de conexão:
connectionName
,topicName
,startingOffsets
,inferSchema
eclassification
. transformationContext
: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.pushDownPredicate
: predicado em colunas de partição.
Retorna o DataSource
.
Exemplo para fonte de transmissão do Amazon Kinesis:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Exemplo para fonte de transmissão do Kafka:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
def getSourceWithFormat
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Cria um Característica do DataSource que lê dados em uma fonte, como o Amazon S3, JDBC ou AWS Glue Data Catalog, e também define o formato dos dados armazenados na fonte.
connectionType
: o tipo da origem dos dados. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.-
options
: uma string de pares de nome-valor JSON que fornecem as informações adicionais para estabelecer uma conexão com a origem dos dados. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark. transformationContext
: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.format
: o formato dos dados armazenados na fonte. Quando oconnectionType
é "s3", você também pode especificarformat
. Pode ser um de “avro”, “csv”, “grokLog”, “ion”, “json”, “xml”, “parquet” ou “orc”.formatOptions
: uma string de pares de nome-valor JSON que fornecem opções adicionais para a análise dos dados na fonte. Consulte Opções de formato de dados.
Retorna o DataSource
.
Exemplos
Crie um DynamicFrame a partir de uma origem de dados que seja um arquivo com valores separados por vírgula (CSV) no Amazon S3:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Crie um DynamicFrame a partir de uma origem de dados que seja um PostgreSQL usando uma conexão JDBC:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
Crie um DynamicFrame a partir de uma origem de dados que seja um MySQL usando uma conexão JDBC:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
Obtém o objeto SparkSession
associado a este GlueContext. Use este objeto SparkSession para registrar tabelas e UDFs a serem usados com DataFrame
criado dos DynamicFrames.
Retorna o SparkSession.
def startTransaction
def startTransaction(readOnly: Boolean):String
Iniciar uma nova transação. Chama internamente a API startTransaction do Lake Formation.
readOnly
: (booleano) indica se esta transação deve ser somente de leitura ou de leitura e gravação. As gravações feitas usando um ID de transação somente de leitura serão rejeitadas. As transações somente de leitura não precisam ser confirmadas.
Retorna o ID da transação.
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Tenta confirmar a transação especificada. commitTransaction
pode retornar antes que a transação tenha terminado de confirmar. Chama internamente a API commitTransaction do Lake Formation.
transactionId
: (string) a transação a ser confirmada.waitForCommit
: (booleano) Determina secommitTransaction
retorna imediatamente. O valor padrão é true. Se for falso,commitTransaction
sonda e aguarda até que a transação seja confirmada. A quantidade de tempo de espera é restrita a 1 minuto usando recuo exponencial com um máximo de 6 tentativas.
Retorna um booleano para indicar se a confirmação foi feita ou não.
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
Tenta cancelar a transação especificada. Chama internamente a API cancelTransaction do Lake Formation.
transactionId
: (string) a transação a ser cancelada.
Retorna uma exceção TransactionCommittedException
se a transação tiver sido confirmada anteriormente.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Cria um objeto GlueContext
usando o SparkContext
especificado, um mínimo de partições e partições de destino.
sc
— A ferramentaSparkContext
.minPartitions
– O número mínimo de partições.targetPartitions
— O número destino de partições.
Retorna o GlueContext
.
def this
def this( sc : SparkContext )
Cria um objeto GlueContext
com o SparkContext
fornecido. Define o mínimo de partições como 10 e as partições de destino como 20.
sc
— A ferramentaSparkContext
.
Retorna o GlueContext
.
def this
def this( sparkContext : JavaSparkContext )
Cria um objeto GlueContext
com o JavaSparkContext
fornecido. Define o mínimo de partições como 10 e as partições de destino como 20.
sparkContext
— A ferramentaJavaSparkContext
.
Retorna o GlueContext
.