Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
AWS GlueAPI Scala GlueContext
Package : com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
Est le point d'entrée pour lire et écrire un DynamicFrame depuis et vers Amazon Simple Storage Service (Amazon S3), le catalogue de données AWS Glue, JDBC, etc. Cette classe fournit des fonctions d'utilitaire pour créer des objets Caractéristique DataSource et DataSink qui peuvent ensuite être utilisés pour lire et écrire les DynamicFrame
.
Vous pouvez également utiliser GlueContext
pour définir un nombre cible de partitions (par défaut 20) dans le DynamicFrame
si le nombre de partitions créées à partir de la source est inférieure à un seuil minimal pour les partitions (par défaut 10).
addIngestionTimeColonnes def
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Ajoute des colonnes de temps d'ingestion, telles que ingest_year
, ingest_month
, ingest_day
, ingest_hour
, ingest_minute
à l'entrée DataFrame
. Cette fonction est automatiquement générée dans le script généré par AWS Glue lorsque vous spécifiez une table Data Catalog avec Amazon S3 comme cible. Cette fonction met automatiquement à jour la partition avec les colonnes de temps d'ingestion sur la table de sortie. Cela permet aux données de sortie d'être automatiquement partitionnées à l'heure d'ingestion sans nécessiter de colonnes d'heure d'ingestion explicites dans les données d'entrée.
-
dataFrame
–dataFrame
auquel ajouter les colonnes de temps d'ingestion. -
timeGranularity
— granularité des colonnes de temps. Les valeurs valides sont «day
», «hour
» et «minute
». Par exemple, si «hour
» est transmis à la fonction, les colonnes de temps «ingest_year
», «ingest_month
», «ingest_day
» et «ingest_hour
» seront ajoutées à l'original «dataFrame
».
Renvoie le bloc de données après l'ajout des colonnes de granularité temporelle.
Exemple :
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrame FromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Renvoie un DataFrame
créé avec la connexion et le format spécifiés. Utilisez cette fonction uniquement avec les sources AWS de streaming Glue.
connectionType
: type de connexion en streaming. Les valeurs valides sontkinesis
etkafka
.-
connectionOptions
: options de connexion, qui sont différentes pour Kinesis et Kafka. Vous trouverez la liste de toutes les options de connexion pour chaque source de données de streaming sur la page Types de connexion et options ETL pour AWS Glue pour Spark. Notez les différences suivantes dans les options de connexion en streaming :-
Les sources de streaming Kinesis nécessitent
streamARN
,startingPosition
,inferSchema
etclassification
. -
Les sources de streaming Kafka nécessitent
connectionName
,topicName
,startingOffsets
,inferSchema
etclassification
.
-
transformationContext
: contexte de transformation à utiliser (facultatif).format
: spécification de format (facultatif). Utilisée pour une connexion Amazon S3 ou AWS Glue prenant en charge plusieurs formats. Pour plus d'informations sur les formats pris en charge, veuillez consulter Options de format pour les entrées et sorties dans AWS Glue pour Spark.formatOptions
: options de format pour le format spécifié. Pour de plus amples informations sur les options de formats pris en charge, veuillez consulter Options de format de données.
Exemple pour la source de streaming Amazon Kinesis :
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Exemple pour la source de streaming Kafka :
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
S'applique à batch_function
transmis à chaque micro-lot lu à partir de la source de streaming.
-
frame
— Le DataFrame contenant le microlot actuel. -
batch_function
— fonction qui sera appliquée à chaque micro-lot. -
options
— collection de paires clé-valeur qui contient des informations sur le traitement de micro-lots. Les options suivantes sont requises :-
windowSize
— durée de traitement de chaque lot. -
checkpointLocation
— emplacement dans lequel les points de contrôle sont stockés pour la tâche ETL en streaming. -
batchMaxRetries
– nombre maximum de nouvelles tentatives pour ce lot en cas d'échec. La valeur par défaut est 3. Cette option n'est configurable que pour Glue version 2.0 et ultérieure.
-
Exemple :
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Crée un DataSink qui écrit dans un emplacement spécifié dans une table définie dans Data Catalog.
database
– Nom de base de données dans le catalogue de données.tableName
– Nom de la table dans le catalogue de données.redshiftTmpDir
– Répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.transformationContext
– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.additionalOptions
— Options supplémentaires fournies à AWS Glue.catalogId
— ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.
Renvoie le DataSink
.
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Crée un Caractéristique DataSource qui lit les données à partir d'une définition de table dans Data Catalog.
database
— nom de la base de données dans Data Catalog.tableName
– Nom de la table dans le catalogue de données.redshiftTmpDir
– Répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.transformationContext
– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.pushDownPredicate
– Filtre les partitions sans avoir à répertorier ni lire tous les fichiers de votre jeu de données. Pour plus d'informations, consultez Préfiltrage à l'aide des prédicats pushdown.additionalOptions
— Ensemble de paires nom-valeur facultatives. Les options possibles comprennent celles répertoriées dans Types de connexion et options ETL pour AWS Glue pour Spark, sauf pourendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
etdelimiter
. Est une autre option prise en chargecatalogPartitionPredicate
:catalogPartitionPredicate
– Vous pouvez passer une expression de catalogue à filtrer en fonction des colonnes d'index. Cela envoie le filtrage du côté serveur. Pour en savoir plus, consultez AWS Glue Indexes de partition. Notez quepush_down_predicate
etcatalogPartitionPredicate
utilisent des syntaxes différentes. Le premier utilise la syntaxe standard SQL Spark et le dernier utilise l'analyseur JSQL.catalogId
— ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.
Renvoie le DataSource
.
Exemple pour la source de streaming
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Crée un DataSink qui écrit dans une base de données JDBC spécifiée d'un objet Connection
dans Data Catalog. L'objet Connection
comporte des informations de connexion pour se connecter à un récepteur JDBC incluant l'URL, le nom d'utilisateur, le mot de passe, le VPC, le sous-réseau et les groupes de sécurité.
catalogConnection
– Nom de la connexion dans le catalogue de données qui contient l'URL JDBC sur laquelle écrire.options
– Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires nécessaires pour écrire dans un magasin de données JDBC. Cela consiste notamment à :dbtable (obligatoire) — Nom de la table JDBC. Pour les magasins de données JDBC qui prennent en charge les schémas dans une base de données, spécifiez
schema.table-name
. Si aucun schéma n'est fourni, c'est le schéma « public » par défaut qui est utilisé. L'exemple suivant illustre un paramètre d'options qui pointe sur un schéma nommétest
et une table nomméetest_table
dans la base de donnéestest_db
.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (obligatoire) — Nom de la base de données JDBC.
Toutes les options supplémentaires transmises directement à l'enregistreur JDBC SparkSQL. Pour plus d'informations, consultez Redshift data source for Spark
.
redshiftTmpDir
— répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.transformationContext
– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.catalogId
— ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.
Exemple de code :
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Renvoie le DataSink
.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Crée un DataSink fichier qui écrit des données vers une destination telle qu'Amazon Simple Storage Service (Amazon S3), JDBC ou le AWS Glue Data Catalog, ou un flux de données Apache Kafka ou Amazon Kinesis.
-
connectionType
— Type de connexion. veuillez consulter Types de connexion et options ETL pour AWS Glue pour Spark. -
connectionOptions
— Chaîne JSON de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir la connexion avec le récepteur de données. veuillez consulter Types de connexion et options ETL pour AWS Glue pour Spark. -
transformationContext
– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.
Renvoie le DataSink
.
getSinkWithFormat def
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Crée un DataSink fichier qui écrit des données vers une destination telle qu'Amazon S3, JDBC ou le catalogue de données, ou un flux de données Apache Kafka ou Amazon Kinesis. Définit également le format des données à écrire vers la destination.
connectionType
— Type de connexion. veuillez consulter Types de connexion et options ETL pour AWS Glue pour Spark.-
options
— Chaîne JSON de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir une connexion avec le récepteur de données. veuillez consulter Types de connexion et options ETL pour AWS Glue pour Spark. transformationContext
– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.format
— Format des données à écrire dans la destination.formatOptions
– Chaîne JSON de paires nom-valeur qui fournissent des options supplémentaires pour le formatage des données à la destination. veuillez consulter Options de format de données.
Renvoie le DataSink
.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Crée un Caractéristique DataSource fichier qui lit les données d'une source telle qu'Amazon S3, JDBC ou le AWS Glue Data Catalog. Prend également en charge les sources de données de streaming Kafka et Kinesis.
connectionType
– Type de données de la source de données. veuillez consulter Types de connexion et options ETL pour AWS Glue pour Spark.-
connectionOptions
— Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir la connexion avec la source de données. Pour plus d’informations, consultez Types de connexion et options ETL pour AWS Glue pour Spark.Une source de streaming Kinesis nécessite les options de connexion suivantes :
streamARN
,startingPosition
,inferSchema
etclassification
.Une source de streaming Kafka nécessite les options de connexion suivantes :
connectionName
,topicName
,startingOffsets
,inferSchema
etclassification
. transformationContext
– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.pushDownPredicate
– Prédicat sur les colonnes de partition.
Renvoie le DataSource
.
Exemple pour la source de streaming Amazon Kinesis :
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Exemple pour la source de streaming Kafka :
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
getSourceWithFormat def
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Crée un fichier Caractéristique DataSource qui lit les données d'une source telle qu'Amazon S3, JDBC ou le AWS Glue Data Catalog, et définit également le format des données stockées dans la source.
connectionType
– Type de données de la source de données. veuillez consulter Types de connexion et options ETL pour AWS Glue pour Spark.-
options
– Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir la connexion avec la source de données. veuillez consulter Types de connexion et options ETL pour AWS Glue pour Spark. transformationContext
– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.format
– Format des données stockées à la source. Lorsque leconnectionType
est « s3 », vous pouvez également spécifierformat
. Peut être « avro », « csv », « grokLog », « ion », « json », « xml », « parquet » ou « orc ».formatOptions
– Chaîne JSON de paires nom-valeur qui fournissent des options supplémentaires pour l'analyse des données à la source. veuillez consulter Options de format de données.
Renvoie le DataSource
.
Exemples
Créez un DynamicFrame à partir d'une source de données qui est un fichier de valeurs séparées par des virgules (CSV) sur Amazon S3 :
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Créez un DynamicFrame à partir d'une source de données PostgreSQL à l'aide d'une connexion JDBC :
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
Créez un DynamicFrame à partir d'une source de données MySQL à l'aide d'une connexion JDBC :
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
Permet d'obtenir l'objet SparkSession
associé à GlueContext. Utilisez cet SparkSession objet pour enregistrer des tables et des UDF à utiliser avec DataFrame
Created From DynamicFrames.
Renvoie le SparkSession.
def startTransaction
def startTransaction(readOnly: Boolean):String
Démarrer une nouvelle transaction. Appelle en interne l'API Démarrer la transaction Lake Formation.
readOnly
– Valeur booléenne indiquant si cette transaction doit être en lecture seule ou en lecture et en écriture. Les écritures effectuées à l'aide d'un ID de transaction en lecture seule seront rejetées. Les transactions en lecture seule n'ont pas besoin d'être validées.
Retourne l'ID de transaction.
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Tentative de validation de la transaction spécifiée. commitTransaction
peut être renvoyé avant la fin de la validation de la transaction. Appelle en interne la Lake Formation commitTransaction API.
transactionId
– (Chaîne) La transaction à valider.waitForCommit
– (Booléen) Détermine si lecommitTransaction
retourne immédiatement. La valeur par défaut est True. Si elle est false,commitTransaction
interroge et attend que la transaction soit validée. Le temps d'attente est limité à 1 minute en utilisant le backoff exponentiel avec un maximum de 6 tentatives de nouvelle tentative.
Renvoie une valeur de type Booléen pour indiquer si la validation a été effectuée ou non.
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
Tentative d'annulation de la transaction spécifiée. Appelle en interne l'CancelTransactionAPI Lake Formation.
transactionId
– (Chaîne) La transaction à annuler.
Retourne une exception TransactionCommittedException
si la transaction a déjà été validée.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Crée un objet GlueContext
utilisant le SparkContext
spécifié, les partitions minimales et les partitions cible.
sc
— LeSparkContext
.minPartitions
— Nombre minimal de partitions.targetPartitions
— Nombre cible de partitions.
Renvoie le GlueContext
.
def this
def this( sc : SparkContext )
Crée un objet GlueContext
avec le SparkContext
fourni. Définit le nombre minimal de partitions à 10 et de partitions cibles à 20.
sc
— LeSparkContext
.
Renvoie le GlueContext
.
def this
def this( sparkContext : JavaSparkContext )
Crée un objet GlueContext
avec le JavaSparkContext
fourni. Définit le nombre minimal de partitions à 10 et de partitions cibles à 20.
sparkContext
— LeJavaSparkContext
.
Renvoie le GlueContext
.