AWS Glue GlueContext API Scala - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

AWS Glue GlueContext API Scala

Pacchetto: com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext è il punto di ingresso per la lettura e la scrittura di un DynamicFrame da e verso Amazon Simple Storage Service (Amazon S3), il catalogo dati AWS Glue, JDBC e così via. Questa classe fornisce funzioni di utilità per creare oggetti Proprietà DataSource e DataSink che possono in cambio essere utilizzati per leggere e scrivere DynamicFrame.

GlueContext può anche essere usato per impostare un numero target di partizioni (per impostazione predefinita 20) nel DynamicFrame se il numero di partizioni create dalla sorgente è inferiore alla soglia minima delle partizioni (per impostazione predefinita 10).

def Colonne addIngestionTime

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

Aggiunge colonne del tempo di importazione dati come ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute al DataFrame di input. Questa funzione viene generata automaticamente nello script generato da AWS Glue quando si specifica una tabella del catalogo dati con Amazon S3 come destinazione. Questa funzione aggiorna automaticamente la partizione con le colonne del tempo di importazione dati nella tabella di output. Ciò consente ai dati di output di venire partizionati automaticamente nel tempo di importazione dati senza necessitare di colonne di tempo di inserimento esplicite nei dati di input.

  • dataFrame: il dataFrame al quale aggiungere le colonne del tempo di importazione dati.

  • timeGranularity: la granularità delle colonne temporali. I valori validi sono "day", "hour" e "minute". Ad esempio, se "hour" viene passato alla funzione, il dataFrame originale avrà "ingest_year", "ingest_month", "ingest_day" e "ingest_hour" colonne temporali aggiunte.

Restituisce il frame di dati dopo l'aggiunta di colonne di granularità di tempo.

Esempio:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrame FromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Restituisce un DataFrame creato con la connessione e il formato specificati. Usa questa funzione solo con le sorgenti di streaming AWS Glue.

  • connectionType: il tipo di connessione streaming. I valori validi includono kinesis e kafka.

  • connectionOptions: opzioni di connessione, che sono diverse per Kinesis e Kafka. È possibile trovare l'elenco di tutte le opzioni di connessione per ogni origine dati di streaming all'indirizzo Tipi e opzioni di connessione per ETL in AWS Glue per Spark. Di seguito vengono illustrate le differenze delle opzioni di connessione di streaming:

    • Le origini di streaming di Kinesis richiedono streamARN, startingPosition, inferSchema e classification.

    • Le origini di streaming di Kafka richiedono connectionName, topicName, startingOffsets, inferSchema e classification.

  • transformationContext: il contesto di trasformazione da utilizzare (facoltativo).

  • format: una specifica del formato (facoltativo). Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Per ulteriori informazioni sui formati supportati, consulta Opzioni del formato dati per input e output in AWS Glue per Spark.

  • formatOptions: opzioni di formattazione per il formato specificato. Per ulteriori informazioni sulle opzioni di formato supportate, consulta Opzioni del formato dei dati.

Esempio per l'origine di streaming Amazon Kinesis:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Esempio per l'origine di streaming Kafka:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

Applica il batch_function passato a ogni micro batch che viene letto dall'origine di streaming.

  • frame— Il file DataFrame contenente il microbatch corrente.

  • batch_function: una funzione che verrà applicata per ogni micro batch.

  • options: una raccolta di coppie chiave-valore che contiene informazioni su come elaborare micro batch. Sono richieste le seguenti opzioni:

    • windowSize: la quantità di tempo da dedicare all'elaborazione di ciascun batch.

    • checkpointLocation: la posizione in cui sono archiviati i checkpoint per il processo ETL di streaming.

    • batchMaxRetries: numero massimo di tentativi per riprovare il processo se il batch ha esito negativo. Il valore predefinito è 3. Questa opzione è configurabile solo per Glue versione 2.0 e successive.

Esempio:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

- def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Crea un DataSink che scrive in una posizione specificata di una tabella definita nel catalogo dati.

  • database — Il nome del database nel catalogo di dati.

  • tableName — Il nome della tabella nel catalogo dati.

  • redshiftTmpDir — La directory di gestione temporanea da utilizzare con alcuni sink di dati. Impostato su per impostazione predefinita.

  • transformationContext — Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.

  • additionalOptions: opzioni aggiuntive fornite a AWS Glue.

  • catalogId — L'ID catalogo (ID account) relativo al catalogo dati a cui si accede. Se null, viene utilizzato l'ID account predefinito del chiamante.

Restituisce il DataSink.

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Crea un Proprietà DataSource che legge dati da una definizione di tabella nel catalogo di dati.

  • database — Il nome del database nel catalogo di dati.

  • tableName — Il nome della tabella nel catalogo dati.

  • redshiftTmpDir — La directory di gestione temporanea da utilizzare con alcuni sink di dati. Impostato su per impostazione predefinita.

  • transformationContext — Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.

  • pushDownPredicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.

  • additionalOptions: una raccolta di coppie nome/valore opzionali. Le opzioni possibili includono quelle elencate in Tipi e opzioni di connessione per ETL in AWS Glue per Spark ad eccezione di endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification e delimiter. Un'altra opzione supportata è catalogPartitionPredicate:

    catalogPartitionPredicate — È possibile passare un'espressione di catalogo per filtrare in base alle colonne di indice. Questo esegue il push down del filtro sul lato server. Per ulteriori informazioni, consulta la pagina relativa agli indici di partizionamento di AWS Glue. Tieni presente che push_down_predicate e catalogPartitionPredicate usano sintassi diverse. Il primo utilizza la sintassi standard Spark SQL e il secondo utilizza il parser JSQL.

  • catalogId — L'ID catalogo (ID account) relativo al catalogo dati a cui si accede. Se null, viene utilizzato l'ID account predefinito del chiamante.

Restituisce il DataSource.

Esempio di origine di streaming

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Crea un oggetto DataSink che scrive in un database JDBC specificato in un oggetto Connection nel catalogo dati. L'oggetto Connection dispone di informazioni per connettersi a un sink JDBC, compresi URL, nome utente, password, VPC, sottorete e gruppi di sicurezza.

  • catalogConnection — Il nome della connessione nel catalogo dati contenente l'URL JDBC su cui scrivere.

  • options: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive necessarie per scrivere su un datastore JDBC. Questo include:

    • dbtable (obbligatorio): il nome della tabella JDBC. Per i archivi dati JDBC che supportano schemi all'interno di un database, specifica schema.table-name. Se non viene fornito alcuno schema, viene usato lo schema "pubblico" predefinito. L'esempio seguente mostra un parametro options che punta a uno schema denominato test e a una tabella denominata test_table nel database test_db.

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database (obbligatorio): il nome del database JDBC.

    • Le eventuali opzioni aggiuntive trasmesse direttamente allo scrittore SparkSQL JDBC. Per ulteriori informazioni, consulta la pagina relativa all'origine dati Redshift per Spark.

  • redshiftTmpDir — Una directory di gestione temporanea da utilizzare con alcuni sink di dati. Impostato su per impostazione predefinita.

  • transformationContext — Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.

  • catalogId — L'ID catalogo (ID account) relativo al catalogo dati a cui si accede. Se null, viene utilizzato l'ID account predefinito del chiamante.

Codice di esempio:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

Restituisce il DataSink.

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Crea un file DataSink che scrive dati su una destinazione come Amazon Simple Storage Service (Amazon S3), JDBC o Glue Data Catalog o AWS un flusso di dati Apache Kafka o Amazon Kinesis.

Restituisce il DataSink.

Formato def getSinkWith

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Crea un file DataSink che scrive dati su una destinazione come Amazon S3, JDBC o Data Catalog o un flusso di dati Apache Kafka o Amazon Kinesis. Imposta anche il formato per i dati da scrivere nella destinazione.

  • connectionType — Il tipo di connessione. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.

  • options: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive per stabilire connessioni con il sink dei dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.

  • transformationContext — Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.

  • format — Il formato dei dati da scrivere sulla destinazione.

  • formatOptions: una stringa di coppie nome-valore JSON che forniscono opzioni aggiuntive per la formattazione dei dati nella destinazione. Per informazioni, consulta Opzioni del formato dei dati.

Restituisce il DataSink.

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Crea un file Proprietà DataSource che legge i dati da una fonte come Amazon S3, JDBC o Glue Data Catalog. AWS Supporta anche origini dati di streaming Kafka e Kinesis.

  • connectionType — Il tipo di origine dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.

  • connectionOptions: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive per stabilire una connessione con l'origine dati. Per ulteriori informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.

    Un'origine di streaming Kinesis richiede le seguenti opzioni di connessione: streamARN, startingPosition, inferSchema e classification.

    Un'origine di streaming Kinesis richiede le seguenti opzioni di connessione: connectionName, topicName, startingOffsets, inferSchema e classification.

  • transformationContext — Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.

  • pushDownPredicate — Predicato sulle colonne delle partizioni.

Restituisce il DataSource.

Esempio per l'origine di streaming Amazon Kinesis:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Esempio per l'origine di streaming Kafka:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

formato def getSourceWith

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Crea un file Proprietà DataSource che legge i dati da una fonte come Amazon S3, JDBC o AWS Glue Data Catalog e imposta anche il formato dei dati archiviati nell'origine.

  • connectionType – Il tipo dell'origine dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.

  • options: una stringa di coppie nome/valore JSON che forniscono informazioni aggiuntive per stabilire una connessione all'origine dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.

  • transformationContext – Il contesto di trasformazione associato al sink che deve essere usato dai segnalibri dei processi. Impostato su per impostazione predefinita.

  • format – Il formato dei dati archiviati nell'origine. Quando il connectionType è "s3", è anche possibile specificare format. Le possibilità sono “avro”, “csv”, “grokLog”, “ion”, “json”, “xml”, “parquet” oppure “orc”.

  • formatOptions: una stringa di coppie nome/valore JSON che forniscono opzioni aggiuntive per l'analisi dei dati nell'origine. Per informazioni, consulta Opzioni del formato dei dati.

Restituisce il DataSource.

Examples (Esempi)

Crea un file DynamicFrame da un'origine dati che sia un file con valori separati da virgole (CSV) su Amazon S3:

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

Crea un file DynamicFrame da un'origine dati che sia PostgreSQL utilizzando una connessione JDBC:

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

Crea un file DynamicFrame da un'origine dati che è MySQL utilizzando una connessione JDBC:

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

Ottiene l'oggetto SparkSession associato a questo GlueContext. Utilizzate questo SparkSession oggetto per registrare tabelle e UDF da utilizzare con DataFrame created from. DynamicFrames

Restituisce il. SparkSession

def startTransaction

def startTransaction(readOnly: Boolean):String

Avvia una nuova transazione. Chiama internamente l'API Lake Formation startTransaction.

  • readOnly: (booleano) indica se questa transazione debba essere di sola lettura o lettura e scrittura. Le scritture effettuate utilizzando un ID transazione di sola lettura verranno rifiutate. Il commit delle transazioni di sola lettura non deve essere eseguito.

Restituisce l'ID transazione.

def commitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

Tenta di eseguire il commit della transazione specificata. commitTransaction può restituire prima che la transazione abbia terminato il commit. Chiama internamente l'API Lake Formation commitTransaction.

  • transactionId: (stringa) la transazione di cui eseguire il commit.

  • waitForCommit: (booleano) determina se il commitTransaction restituisce immediatamente. Il valore di default è true. Se false, commitTransaction effettua il polling e aspetta che sia stato eseguito il commit della transazione. Il tempo di attesa è limitato a 1 minuto utilizzando il backoff esponenziale con un massimo di 6 tentativi.

Restituisce un valore booleano per indicare se il commit sia stato eseguito o meno.

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

Tenta di annullare la transazione specificata. Richiama internamente l'CancelTransactionAPI Lake Formation.

  • transactionId: (stringa) la transazione da annullare.

Restituisce un'eccezione TransactionCommittedException se è stato precedentemente eseguito il commit della transazione.

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

Crea un oggetto GlueContext utilizzando lo SparkContext specificato, le partizioni minime e quelle target.

  • sc — Il carattere SparkContext.

  • minPartitions — Il numero minimo di partizioni.

  • targetPartitions — Il numero target di partizioni.

Restituisce il GlueContext.

def this

def this( sc : SparkContext )

Crea un oggetto GlueContext con il SparkContext fornito. Imposta il numero minimo di partizioni a 10 e a le partizioni target a 20.

  • sc — Il carattere SparkContext.

Restituisce il GlueContext.

def this

def this( sparkContext : JavaSparkContext )

Crea un oggetto GlueContext con il JavaSparkContext fornito. Imposta il numero minimo di partizioni a 10 e a le partizioni target a 20.

  • sparkContext — Il carattere JavaSparkContext.

Restituisce il GlueContext.