APIs GlueContext em Scala do AWS Glue - AWS Glue

APIs GlueContext em Scala do AWS Glue

Pacote:   com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext é o ponto de entrada para leitura e gravação de um DynamicFrame de e para o Amazon Simple Storage Service (Amazon S3), AWS Glue Data Catalog, JDBC e assim por diante. Essa classe fornece funções de utilitário para criar objetos Característica do DataSource e DataSink que, por sua vez, podem ser usados para ler e gravar DynamicFrames.

GlueContext também pode ser usado para definir um número de partições de destino (o padrão é 20) no DynamicFrame se o número de partições criado da origem é menor do que um limite mínimo para as partições (o padrão é 10).

def addIngestionTimeColumns

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

Acrescenta colunas de tempo de ingestão, como ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute, para o DataFrame de entrada. Essa função é gerada automaticamente no script gerado pelo AWS Glue, quando você especifica uma tabela do catálogo de dados com o Amazon S3 como destino. Essa função atualiza automaticamente a partição com colunas de tempo de ingestão na tabela de saída. Isso permite que os dados de saída sejam particionados automaticamente no tempo de ingestão sem exigir colunas de tempo de ingestão explícitas nos dados de entrada.

  • dataFrame: o dataFrame ao qual anexar as colunas de tempo de ingestão.

  • timeGranularity: o detalhamento das colunas de tempo. Os valores válidos são “day”, “hour” e “minute”. Por exemplo, se “hour” é transmitido para a função, o dataFrame original terá as colunas de tempo “ingest_year”, “ingest_month”, “ingest_day” e “ingest_hour” anexadas.

Retorna o quadro de dados após anexar as colunas de detealhamento de tempo.

Exemplo:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrameFromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Retorna um DataFrame criado com a conexão e o formato especificados. Use essa função apenas com fontes de transmissão do AWS Glue.

  • connectionType: o tipo de conexão de transmissão. Os valores válidos são kinesis e kafka.

  • connectionOptions: opções de conexão, a quais são diferentes para Kinesis e Kafka. Você pode encontrar a lista de todas as opções de conexão para cada origem dos dados de transmissão em Tipos e opções de conexão para ETL no AWS Glue para Spark. Observe as seguintes diferenças nas opções de conexão de transmissão:

    • As fontes de transmissão do Kinesis exigem streamARN, startingPosition, inferSchema e classification.

    • As fontes de transmissão do Kafka exigem connectionName, topicName, startingOffsets, inferSchema e classification.

  • transformationContext: o contexto de transformação a ser usado (opcional).

  • format: uma especificação de formato (opcional). É usado para uma conexão do Amazon S3 ou do AWS Glue com suporte para vários formatos. Para obter informações sobre os formatos compatíveis, consulte Opções de formato de dados para entradas e saídas no AWS Glue para Spark.

  • formatOptions: as opções de formato para o formato especificado. Para obter informações sobre as opções de formato compatíveis, consulte Opções de formato de dados.

Exemplo para fonte de transmissão do Amazon Kinesis:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Exemplo para fonte de transmissão do Kafka:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

Aplica a batch_function transmitida para cada microlote lido a partir da fonte de transmissão.

  • frame: o DataFrame que contém o microlote atual.

  • batch_function: uma função que será aplicada para cada microlote.

  • options: uma coleção de pares de chave-valor que contém informações sobre como processar microlotes. São necessárias as seguintes opções:

    • windowSize: a quantidade de tempo gasto no processamento de cada lote.

    • checkpointLocation: o local onde os pontos de verificação são armazenados para o trabalho de ETL de transmissão.

    • batchMaxRetries: o número máximo de novas tentativas deste lote em caso de falha. O valor padrão é 3. Essa opção só pode ser configurada para o Glue versão 2.0 e posterior.

Exemplo:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Cria um DataSink que grava em um local especificado de uma tabela definida no Data Catalog.

  • database: o nome do banco de dados no Data Catalog.

  • tableName: o nome da tabela no Data Catalog.

  • redshiftTmpDir: o diretório de preparação temporário a ser usado com determinados coletores de dados. Definido como vazio por padrão.

  • transformationContext: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.

  • additionalOptions: opções adicionais fornecidas ao AWS Glue.

  • catalogId: o ID do catálogo (ID da conta) do Data Catalog que está sendo acessado. Quando nulo, o ID da conta do chamador padrão é usado.

Retorna o DataSink.

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Cria um Característica do DataSource que lê dados em uma definição de tabela do Data Catalog.

  • database: o nome do banco de dados no Data Catalog.

  • tableName: o nome da tabela no Data Catalog.

  • redshiftTmpDir: o diretório de preparação temporário a ser usado com determinados coletores de dados. Definido como vazio por padrão.

  • transformationContext: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.

  • pushDownPredicate: filtra partições sem a necessidade de listar e ler todos os arquivos no seu conjunto de dados. Para ter mais informações, consulte Pré-filtragem usando a aplicação de predicados.

  • additionalOptions – Uma coleção de pares nome-valor opcionais. As opções possíveis incluem as listadas em Tipos e opções de conexão para ETL no AWS Glue para Spark, exceto para endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification e delimiter. Outra opção suportada é catalogPartitionPredicate:

    catalogPartitionPredicate: você pode transmitir uma expressão de catálogo para filtrar com base nas colunas de índice. Isso leva a filtragem para o lado do servidor. Para obter mais informações, consulte Índices de partição do AWS Glue. Observe que push_down_predicate e catalogPartitionPredicate usam sintaxes diferentes. O primeiro usa a sintaxe padrão do Spark SQL e o outro usa o analisador JSQL.

  • catalogId: o ID do catálogo (ID da conta) do Data Catalog que está sendo acessado. Quando nulo, o ID da conta do chamador padrão é usado.

Retorna o DataSource.

Exemplo de fonte de transmissão

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Cria um DataSink que grava em um banco de dados JDBC especificado em um objeto Connection no Data Catalog. O objeto Connection tem informações para se conectar a um depósito JDBC, incluindo URL, nome de usuário, senha, VPC, sub-rede e grupos de segurança.

  • catalogConnection: o nome da conexão no Data Catalog que contém o URL do JDBC onde gravar.

  • options: uma string de pares de nome-valor JSON que fornecem as informações adicionais necessárias para gravar em um datastore do JDBC. Isso inclui:

    • dbtable (obrigatório) — O nome da tabela JDBC. Para armazenamentos de dados JDBC que oferecem suporte a esquemas dentro de um banco de dados, especifique schema.table-name. Se um esquema não for fornecido, o esquema "público" padrão será usado. O exemplo a seguir mostra um parâmetro de opções que aponta para um esquema chamado test e uma tabela chamada test_table no banco de dados test_db.

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database (obrigatório) — O nome do banco de dados do JDBC.

    • Todas as opções adicionais transmitidas diretamente para gravador do JDBC de SparkSQL. Para obter mais informações, consulte Fonte de dados Redshift para Spark.

  • redshiftTmpDir: um diretório de preparação temporário a ser usado com determinados coletores de dados. Definido como vazio por padrão.

  • transformationContext: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.

  • catalogId: o ID do catálogo (ID da conta) do Data Catalog que está sendo acessado. Quando nulo, o ID da conta do chamador padrão é usado.

Código de exemplo:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

Retorna o DataSink.

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Cria um DataSink que grava dados em um destino como Amazon Simple Storage Service (Amazon S3), JDBC, Catálogo de Dados do AWS Glue ou um fluxo de dados do Amazon Kinesis ou do Apache Kafka.

Retorna o DataSink.

def getSinkWithFormat

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Cria um DataSink que grava dados em um destino como Amazon S3, JDBC, Catálogo de Dados ou um fluxo de dados do Amazon Kinesis ou do Apache Kafka. Também define o formato dos dados que serão gravados no destino.

Retorna o DataSink.

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Cria um Característica do DataSource que lê dados em uma fonte, como o Amazon S3, JDBC ou AWS Glue Data Catalog. Também oferece suporte a origens de dados de transmissão do Kafka e Kinesis.

  • connectionType: o tipo da origem dos dados. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.

  • connectionOptions – uma string de pares de nome/valor JSON que fornecem as informações adicionais para estabelecer uma conexão com a origem de dados. Para ter mais informações, consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.

    Uma fonte de transmissão do Kinesis requer as seguintes opções de conexão: streamARN, startingPosition, inferSchema e classification.

    Uma fonte de transmissão do Kafka requer as seguintes opções de conexão: connectionName, topicName, startingOffsets, inferSchema e classification.

  • transformationContext: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.

  • pushDownPredicate: predicado em colunas de partição.

Retorna o DataSource.

Exemplo para fonte de transmissão do Amazon Kinesis:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Exemplo para fonte de transmissão do Kafka:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

def getSourceWithFormat

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Cria um Característica do DataSource que lê dados em uma fonte, como o Amazon S3, JDBC ou AWS Glue Data Catalog, e também define o formato dos dados armazenados na fonte.

  • connectionType: o tipo da origem dos dados. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.

  • options: uma string de pares de nome-valor JSON que fornecem as informações adicionais para estabelecer uma conexão com a origem dos dados. Consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.

  • transformationContext: o contexto de transformação associado ao coletor a ser usado pelos marcadores de trabalho. Definido como vazio por padrão.

  • format: o formato dos dados armazenados na fonte. Quando o connectionType é "s3", você também pode especificar format. Pode ser um de “avro”, “csv”, “grokLog”, “ion”, “json”, “xml”, “parquet” ou “orc”.

  • formatOptions: uma string de pares de nome-valor JSON que fornecem opções adicionais para a análise dos dados na fonte. Consulte Opções de formato de dados.

Retorna o DataSource.

Exemplos

Crie um DynamicFrame a partir de uma origem de dados que seja um arquivo com valores separados por vírgula (CSV) no Amazon S3:

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

Crie um DynamicFrame a partir de uma origem de dados que seja um PostgreSQL usando uma conexão JDBC:

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

Crie um DynamicFrame a partir de uma origem de dados que seja um MySQL usando uma conexão JDBC:

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

Obtém o objeto SparkSession associado a este GlueContext. Use este objeto SparkSession para registrar tabelas e UDFs a serem usados com DataFrame criado dos DynamicFrames.

Retorna o SparkSession.

def startTransaction

def startTransaction(readOnly: Boolean):String

Iniciar uma nova transação. Chama internamente a API startTransaction do Lake Formation.

  • readOnly: (booleano) indica se esta transação deve ser somente de leitura ou de leitura e gravação. As gravações feitas usando um ID de transação somente de leitura serão rejeitadas. As transações somente de leitura não precisam ser confirmadas.

Retorna o ID da transação.

def commitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

Tenta confirmar a transação especificada. commitTransaction pode retornar antes que a transação tenha terminado de confirmar. Chama internamente a API commitTransaction do Lake Formation.

  • transactionId: (string) a transação a ser confirmada.

  • waitForCommit: (booleano) Determina se commitTransaction retorna imediatamente. O valor padrão é true. Se for falso, commitTransaction sonda e aguarda até que a transação seja confirmada. A quantidade de tempo de espera é restrita a 1 minuto usando recuo exponencial com um máximo de 6 tentativas.

Retorna um booleano para indicar se a confirmação foi feita ou não.

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

Tenta cancelar a transação especificada. Chama internamente a API cancelTransaction do Lake Formation.

  • transactionId: (string) a transação a ser cancelada.

Retorna uma exceção TransactionCommittedException se a transação tiver sido confirmada anteriormente.

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

Cria um objeto GlueContext usando o SparkContext especificado, um mínimo de partições e partições de destino.

  • sc — A ferramenta SparkContext.

  • minPartitions – O número mínimo de partições.

  • targetPartitions — O número destino de partições.

Retorna o GlueContext.

def this

def this( sc : SparkContext )

Cria um objeto GlueContext com o SparkContext fornecido. Define o mínimo de partições como 10 e as partições de destino como 20.

  • sc — A ferramenta SparkContext.

Retorna o GlueContext.

def this

def this( sparkContext : JavaSparkContext )

Cria um objeto GlueContext com o JavaSparkContext fornecido. Define o mínimo de partições como 10 e as partições de destino como 20.

  • sparkContext — A ferramenta JavaSparkContext.

Retorna o GlueContext.