Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
AWS Glue GlueContext API Scala
Pacchetto: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
è il punto di ingresso per la lettura e la scrittura di un DynamicFrame da e verso Amazon Simple Storage Service (Amazon S3), il catalogo dati AWS Glue, JDBC e così via. Questa classe fornisce funzioni di utilità per creare oggetti Proprietà DataSource e DataSink che possono in cambio essere utilizzati per leggere e scrivere DynamicFrame
.
GlueContext
può anche essere usato per impostare un numero target di partizioni (per impostazione predefinita 20) nel DynamicFrame
se il numero di partizioni create dalla sorgente è inferiore alla soglia minima delle partizioni (per impostazione predefinita 10).
def Colonne addIngestionTime
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Aggiunge colonne del tempo di importazione dati come ingest_year
, ingest_month
, ingest_day
, ingest_hour
, ingest_minute
al DataFrame
di input. Questa funzione viene generata automaticamente nello script generato da AWS Glue quando si specifica una tabella del catalogo dati con Amazon S3 come destinazione. Questa funzione aggiorna automaticamente la partizione con le colonne del tempo di importazione dati nella tabella di output. Ciò consente ai dati di output di venire partizionati automaticamente nel tempo di importazione dati senza necessitare di colonne di tempo di inserimento esplicite nei dati di input.
-
dataFrame
: ildataFrame
al quale aggiungere le colonne del tempo di importazione dati. -
timeGranularity
: la granularità delle colonne temporali. I valori validi sono "day
", "hour
" e "minute
". Ad esempio, se "hour
" viene passato alla funzione, ildataFrame
originale avrà "ingest_year
", "ingest_month
", "ingest_day
" e "ingest_hour
" colonne temporali aggiunte.
Restituisce il frame di dati dopo l'aggiunta di colonne di granularità di tempo.
Esempio:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrame FromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Restituisce un DataFrame
creato con la connessione e il formato specificati. Usa questa funzione solo con le sorgenti di streaming AWS Glue.
connectionType
: il tipo di connessione streaming. I valori validi includonokinesis
ekafka
.-
connectionOptions
: opzioni di connessione, che sono diverse per Kinesis e Kafka. È possibile trovare l'elenco di tutte le opzioni di connessione per ogni origine dati di streaming all'indirizzo Tipi e opzioni di connessione per ETL in AWS Glue per Spark. Di seguito vengono illustrate le differenze delle opzioni di connessione di streaming:-
Le origini di streaming di Kinesis richiedono
streamARN
,startingPosition
,inferSchema
eclassification
. -
Le origini di streaming di Kafka richiedono
connectionName
,topicName
,startingOffsets
,inferSchema
eclassification
.
-
transformationContext
: il contesto di trasformazione da utilizzare (facoltativo).format
: una specifica del formato (facoltativo). Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Per ulteriori informazioni sui formati supportati, consulta Opzioni del formato dati per input e output in AWS Glue per Spark.formatOptions
: opzioni di formattazione per il formato specificato. Per ulteriori informazioni sulle opzioni di formato supportate, consulta Opzioni del formato dei dati.
Esempio per l'origine di streaming Amazon Kinesis:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Esempio per l'origine di streaming Kafka:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
Applica il batch_function
passato a ogni micro batch che viene letto dall'origine di streaming.
-
frame
— Il file DataFrame contenente il microbatch corrente. -
batch_function
: una funzione che verrà applicata per ogni micro batch. -
options
: una raccolta di coppie chiave-valore che contiene informazioni su come elaborare micro batch. Sono richieste le seguenti opzioni:-
windowSize
: la quantità di tempo da dedicare all'elaborazione di ciascun batch. -
checkpointLocation
: la posizione in cui sono archiviati i checkpoint per il processo ETL di streaming. -
batchMaxRetries
: numero massimo di tentativi per riprovare il processo se il batch ha esito negativo. Il valore predefinito è 3. Questa opzione è configurabile solo per Glue versione 2.0 e successive.
-
Esempio:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
- def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Crea un DataSink che scrive in una posizione specificata di una tabella definita nel catalogo dati.
database
— Il nome del database nel catalogo di dati.tableName
— Il nome della tabella nel catalogo dati.redshiftTmpDir
— La directory di gestione temporanea da utilizzare con alcuni sink di dati. Impostato su per impostazione predefinita.transformationContext
— Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.additionalOptions
: opzioni aggiuntive fornite a AWS Glue.catalogId
— L'ID catalogo (ID account) relativo al catalogo dati a cui si accede. Se null, viene utilizzato l'ID account predefinito del chiamante.
Restituisce il DataSink
.
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Crea un Proprietà DataSource che legge dati da una definizione di tabella nel catalogo di dati.
database
— Il nome del database nel catalogo di dati.tableName
— Il nome della tabella nel catalogo dati.redshiftTmpDir
— La directory di gestione temporanea da utilizzare con alcuni sink di dati. Impostato su per impostazione predefinita.transformationContext
— Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.pushDownPredicate
: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.additionalOptions
: una raccolta di coppie nome/valore opzionali. Le opzioni possibili includono quelle elencate in Tipi e opzioni di connessione per ETL in AWS Glue per Spark ad eccezione diendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
edelimiter
. Un'altra opzione supportata ècatalogPartitionPredicate
:catalogPartitionPredicate
— È possibile passare un'espressione di catalogo per filtrare in base alle colonne di indice. Questo esegue il push down del filtro sul lato server. Per ulteriori informazioni, consulta la pagina relativa agli indici di partizionamento di AWS Glue. Tieni presente chepush_down_predicate
ecatalogPartitionPredicate
usano sintassi diverse. Il primo utilizza la sintassi standard Spark SQL e il secondo utilizza il parser JSQL.catalogId
— L'ID catalogo (ID account) relativo al catalogo dati a cui si accede. Se null, viene utilizzato l'ID account predefinito del chiamante.
Restituisce il DataSource
.
Esempio di origine di streaming
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Crea un oggetto DataSink che scrive in un database JDBC specificato in un oggetto Connection
nel catalogo dati. L'oggetto Connection
dispone di informazioni per connettersi a un sink JDBC, compresi URL, nome utente, password, VPC, sottorete e gruppi di sicurezza.
catalogConnection
— Il nome della connessione nel catalogo dati contenente l'URL JDBC su cui scrivere.options
: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive necessarie per scrivere su un datastore JDBC. Questo include:dbtable (obbligatorio): il nome della tabella JDBC. Per i archivi dati JDBC che supportano schemi all'interno di un database, specifica
schema.table-name
. Se non viene fornito alcuno schema, viene usato lo schema "pubblico" predefinito. L'esempio seguente mostra un parametro options che punta a uno schema denominatotest
e a una tabella denominatatest_table
nel databasetest_db
.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (obbligatorio): il nome del database JDBC.
Le eventuali opzioni aggiuntive trasmesse direttamente allo scrittore SparkSQL JDBC. Per ulteriori informazioni, consulta la pagina relativa all'origine dati Redshift per Spark
.
redshiftTmpDir
— Una directory di gestione temporanea da utilizzare con alcuni sink di dati. Impostato su per impostazione predefinita.transformationContext
— Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.catalogId
— L'ID catalogo (ID account) relativo al catalogo dati a cui si accede. Se null, viene utilizzato l'ID account predefinito del chiamante.
Codice di esempio:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Restituisce il DataSink
.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Crea un file DataSink che scrive dati su una destinazione come Amazon Simple Storage Service (Amazon S3), JDBC o Glue Data Catalog o AWS un flusso di dati Apache Kafka o Amazon Kinesis.
-
connectionType
— Il tipo di connessione. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark. -
connectionOptions
: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive per stabilire la connessione con il sink dei dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark. -
transformationContext
— Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.
Restituisce il DataSink
.
Formato def getSinkWith
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Crea un file DataSink che scrive dati su una destinazione come Amazon S3, JDBC o Data Catalog o un flusso di dati Apache Kafka o Amazon Kinesis. Imposta anche il formato per i dati da scrivere nella destinazione.
connectionType
— Il tipo di connessione. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.-
options
: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive per stabilire connessioni con il sink dei dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark. transformationContext
— Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.format
— Il formato dei dati da scrivere sulla destinazione.formatOptions
: una stringa di coppie nome-valore JSON che forniscono opzioni aggiuntive per la formattazione dei dati nella destinazione. Per informazioni, consulta Opzioni del formato dei dati.
Restituisce il DataSink
.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Crea un file Proprietà DataSource che legge i dati da una fonte come Amazon S3, JDBC o Glue Data Catalog. AWS Supporta anche origini dati di streaming Kafka e Kinesis.
connectionType
— Il tipo di origine dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.-
connectionOptions
: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive per stabilire una connessione con l'origine dati. Per ulteriori informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.Un'origine di streaming Kinesis richiede le seguenti opzioni di connessione:
streamARN
,startingPosition
,inferSchema
eclassification
.Un'origine di streaming Kinesis richiede le seguenti opzioni di connessione:
connectionName
,topicName
,startingOffsets
,inferSchema
eclassification
. transformationContext
— Il contesto di trasformazione associato al sink che i segnalibri di processo utilizzano. Impostato su per impostazione predefinita.pushDownPredicate
— Predicato sulle colonne delle partizioni.
Restituisce il DataSource
.
Esempio per l'origine di streaming Amazon Kinesis:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Esempio per l'origine di streaming Kafka:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
formato def getSourceWith
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Crea un file Proprietà DataSource che legge i dati da una fonte come Amazon S3, JDBC o AWS Glue Data Catalog e imposta anche il formato dei dati archiviati nell'origine.
connectionType
– Il tipo dell'origine dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.-
options
: una stringa di coppie nome/valore JSON che forniscono informazioni aggiuntive per stabilire una connessione all'origine dati. Per informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark. transformationContext
– Il contesto di trasformazione associato al sink che deve essere usato dai segnalibri dei processi. Impostato su per impostazione predefinita.format
– Il formato dei dati archiviati nell'origine. Quando ilconnectionType
è "s3", è anche possibile specificareformat
. Le possibilità sono “avro”, “csv”, “grokLog”, “ion”, “json”, “xml”, “parquet” oppure “orc”.formatOptions
: una stringa di coppie nome/valore JSON che forniscono opzioni aggiuntive per l'analisi dei dati nell'origine. Per informazioni, consulta Opzioni del formato dei dati.
Restituisce il DataSource
.
Examples (Esempi)
Crea un file DynamicFrame da un'origine dati che sia un file con valori separati da virgole (CSV) su Amazon S3:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Crea un file DynamicFrame da un'origine dati che sia PostgreSQL utilizzando una connessione JDBC:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
Crea un file DynamicFrame da un'origine dati che è MySQL utilizzando una connessione JDBC:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
Ottiene l'oggetto SparkSession
associato a questo GlueContext. Utilizzate questo SparkSession oggetto per registrare tabelle e UDF da utilizzare con DataFrame
created from. DynamicFrames
Restituisce il. SparkSession
def startTransaction
def startTransaction(readOnly: Boolean):String
Avvia una nuova transazione. Chiama internamente l'API Lake Formation startTransaction.
readOnly
: (booleano) indica se questa transazione debba essere di sola lettura o lettura e scrittura. Le scritture effettuate utilizzando un ID transazione di sola lettura verranno rifiutate. Il commit delle transazioni di sola lettura non deve essere eseguito.
Restituisce l'ID transazione.
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Tenta di eseguire il commit della transazione specificata. commitTransaction
può restituire prima che la transazione abbia terminato il commit. Chiama internamente l'API Lake Formation commitTransaction.
transactionId
: (stringa) la transazione di cui eseguire il commit.waitForCommit
: (booleano) determina se ilcommitTransaction
restituisce immediatamente. Il valore di default è true. Se false,commitTransaction
effettua il polling e aspetta che sia stato eseguito il commit della transazione. Il tempo di attesa è limitato a 1 minuto utilizzando il backoff esponenziale con un massimo di 6 tentativi.
Restituisce un valore booleano per indicare se il commit sia stato eseguito o meno.
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
Tenta di annullare la transazione specificata. Richiama internamente l'CancelTransactionAPI Lake Formation.
transactionId
: (stringa) la transazione da annullare.
Restituisce un'eccezione TransactionCommittedException
se è stato precedentemente eseguito il commit della transazione.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Crea un oggetto GlueContext
utilizzando lo SparkContext
specificato, le partizioni minime e quelle target.
sc
— Il carattereSparkContext
.minPartitions
— Il numero minimo di partizioni.targetPartitions
— Il numero target di partizioni.
Restituisce il GlueContext
.
def this
def this( sc : SparkContext )
Crea un oggetto GlueContext
con il SparkContext
fornito. Imposta il numero minimo di partizioni a 10 e a le partizioni target a 20.
sc
— Il carattereSparkContext
.
Restituisce il GlueContext
.
def this
def this( sparkContext : JavaSparkContext )
Crea un oggetto GlueContext
con il JavaSparkContext
fornito. Imposta il numero minimo di partizioni a 10 e a le partizioni target a 20.
sparkContext
— Il carattereJavaSparkContext
.
Restituisce il GlueContext
.