AWS GlueAPI Scala GlueContext - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

AWS GlueAPI Scala GlueContext

Package : com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext Est le point d'entrée pour lire et écrire un DynamicFrame depuis et vers Amazon Simple Storage Service (Amazon S3), le catalogue de données AWS Glue, JDBC, etc. Cette classe fournit des fonctions d'utilitaire pour créer des objets Caractéristique DataSource et DataSink qui peuvent ensuite être utilisés pour lire et écrire les DynamicFrame.

Vous pouvez également utiliser GlueContext pour définir un nombre cible de partitions (par défaut 20) dans le DynamicFrame si le nombre de partitions créées à partir de la source est inférieure à un seuil minimal pour les partitions (par défaut 10).

addIngestionTimeColonnes def

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

Ajoute des colonnes de temps d'ingestion, telles que ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute à l'entrée DataFrame. Cette fonction est automatiquement générée dans le script généré par AWS Glue lorsque vous spécifiez une table Data Catalog avec Amazon S3 comme cible. Cette fonction met automatiquement à jour la partition avec les colonnes de temps d'ingestion sur la table de sortie. Cela permet aux données de sortie d'être automatiquement partitionnées à l'heure d'ingestion sans nécessiter de colonnes d'heure d'ingestion explicites dans les données d'entrée.

  • dataFramedataFrame auquel ajouter les colonnes de temps d'ingestion.

  • timeGranularity — granularité des colonnes de temps. Les valeurs valides sont « day », « hour » et « minute ». Par exemple, si « hour » est transmis à la fonction, les colonnes de temps « ingest_year », « ingest_month », « ingest_day » et « ingest_hour » seront ajoutées à l'original « dataFrame ».

Renvoie le bloc de données après l'ajout des colonnes de granularité temporelle.

Exemple :

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrame FromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Renvoie un DataFrame créé avec la connexion et le format spécifiés. Utilisez cette fonction uniquement avec les sources AWS de streaming Glue.

  • connectionType : type de connexion en streaming. Les valeurs valides sont kinesis et kafka.

  • connectionOptions : options de connexion, qui sont différentes pour Kinesis et Kafka. Vous trouverez la liste de toutes les options de connexion pour chaque source de données de streaming sur la page Types et options de connexion pour ETL dans AWS Glue pour Spark. Notez les différences suivantes dans les options de connexion en streaming :

    • Les sources de streaming Kinesis nécessitent streamARN, startingPosition, inferSchema et classification.

    • Les sources de streaming Kafka nécessitent connectionName, topicName, startingOffsets, inferSchema et classification.

  • transformationContext : contexte de transformation à utiliser (facultatif).

  • format : spécification de format (facultatif). Utilisée pour une connexion Amazon S3 ou AWS Glue prenant en charge plusieurs formats. Pour plus d'informations sur les formats pris en charge, veuillez consulter Options de format pour les entrées et sorties dans AWS Glue pour Spark.

  • formatOptions : options de format pour le format spécifié. Pour de plus amples informations sur les options de formats pris en charge, veuillez consulter Options de format de données.

Exemple pour la source de streaming Amazon Kinesis :

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Exemple pour la source de streaming Kafka :

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

S'applique à batch_function transmis à chaque micro-lot lu à partir de la source de streaming.

  • frame— Le DataFrame contenant le microlot actuel.

  • batch_function — fonction qui sera appliquée à chaque micro-lot.

  • options — collection de paires clé-valeur qui contient des informations sur le traitement de micro-lots. Les options suivantes sont requises :

    • windowSize — durée de traitement de chaque lot.

    • checkpointLocation — emplacement dans lequel les points de contrôle sont stockés pour la tâche ETL en streaming.

    • batchMaxRetries – nombre maximum de nouvelles tentatives pour ce lot en cas d'échec. La valeur par défaut est 3. Cette option n'est configurable que pour Glue version 2.0 et ultérieure.

Exemple :

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Crée un DataSink qui écrit dans un emplacement spécifié dans une table définie dans Data Catalog.

  • database – Nom de base de données dans le catalogue de données.

  • tableName – Nom de la table dans le catalogue de données.

  • redshiftTmpDir – Répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.

  • transformationContext – Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.

  • additionalOptions — Options supplémentaires fournies à AWS Glue.

  • catalogId — ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.

Renvoie le DataSink.

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Crée un Caractéristique DataSource qui lit les données à partir d'une définition de table dans Data Catalog.

  • database — nom de la base de données dans Data Catalog.

  • tableName – Nom de la table dans le catalogue de données.

  • redshiftTmpDir – Répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.

  • transformationContext – Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.

  • pushDownPredicate – Filtre les partitions sans avoir à répertorier ni lire tous les fichiers de votre jeu de données. Pour plus d'informations, consultez Préfiltrage à l'aide des prédicats pushdown.

  • additionalOptions — Ensemble de paires nom-valeur facultatives. Les options possibles comprennent celles répertoriées dans Types et options de connexion pour ETL dans AWS Glue pour Spark, sauf pour endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification et delimiter. Est une autre option prise en charge  catalogPartitionPredicate:

    catalogPartitionPredicate – Vous pouvez passer une expression de catalogue à filtrer en fonction des colonnes d'index. Cela envoie le filtrage du côté serveur. Pour en savoir plus, consultez AWS Glue Indexes de partition. Notez que push_down_predicate et catalogPartitionPredicate utilisent des syntaxes différentes. Le premier utilise la syntaxe standard SQL Spark et le dernier utilise l'analyseur JSQL.

  • catalogId — ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.

Renvoie le DataSource.

Exemple pour la source de streaming

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Crée un DataSink qui écrit dans une base de données JDBC spécifiée d'un objet Connection dans Data Catalog. L'objet Connection comporte des informations de connexion pour se connecter à un récepteur JDBC incluant l'URL, le nom d'utilisateur, le mot de passe, le VPC, le sous-réseau et les groupes de sécurité.

  • catalogConnection – Nom de la connexion dans le catalogue de données qui contient l'URL JDBC sur laquelle écrire.

  • options – Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires nécessaires pour écrire dans un magasin de données JDBC. Cela consiste notamment à :

    • dbtable (obligatoire) — Nom de la table JDBC. Pour les magasins de données JDBC qui prennent en charge les schémas dans une base de données, spécifiez schema.table-name. Si aucun schéma n'est fourni, c'est le schéma « public » par défaut qui est utilisé. L'exemple suivant illustre un paramètre d'options qui pointe sur un schéma nommé test et une table nommée test_table dans la base de données test_db.

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database (obligatoire) — Nom de la base de données JDBC.

    • Toutes les options supplémentaires transmises directement à l'enregistreur JDBC SparkSQL. Pour plus d'informations, consultez Redshift data source for Spark.

  • redshiftTmpDir — répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.

  • transformationContext – Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.

  • catalogId — ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.

Exemple de code :

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

Renvoie le DataSink.

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Crée un DataSink fichier qui écrit des données vers une destination telle qu'Amazon Simple Storage Service (Amazon S3), JDBC ou le AWS Glue Data Catalog, ou un flux de données Apache Kafka ou Amazon Kinesis.

Renvoie le DataSink.

getSinkWithFormat def

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Crée un DataSink fichier qui écrit des données vers une destination telle qu'Amazon S3, JDBC ou le catalogue de données, ou un flux de données Apache Kafka ou Amazon Kinesis. Définit également le format des données à écrire vers la destination.

  • connectionType — Type de connexion. veuillez consulter Types et options de connexion pour ETL dans AWS Glue pour Spark.

  • options — Chaîne JSON de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir une connexion avec le récepteur de données. veuillez consulter Types et options de connexion pour ETL dans AWS Glue pour Spark.

  • transformationContext – Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.

  • format — Format des données à écrire dans la destination.

  • formatOptions – Chaîne JSON de paires nom-valeur qui fournissent des options supplémentaires pour le formatage des données à la destination. veuillez consulter Options de format de données.

Renvoie le DataSink.

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Crée un Caractéristique DataSource fichier qui lit les données d'une source telle qu'Amazon S3, JDBC ou le AWS Glue Data Catalog. Prend également en charge les sources de données de streaming Kafka et Kinesis.

  • connectionType – Type de données de la source de données. veuillez consulter Types et options de connexion pour ETL dans AWS Glue pour Spark.

  • connectionOptions — Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir la connexion avec la source de données. Pour plus d’informations, consultez Types et options de connexion pour ETL dans AWS Glue pour Spark.

    Une source de streaming Kinesis nécessite les options de connexion suivantes : streamARN, startingPosition, inferSchema et classification.

    Une source de streaming Kafka nécessite les options de connexion suivantes : connectionName, topicName, startingOffsets, inferSchema et classification.

  • transformationContext – Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.

  • pushDownPredicate – Prédicat sur les colonnes de partition.

Renvoie le DataSource.

Exemple pour la source de streaming Amazon Kinesis :

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Exemple pour la source de streaming Kafka :

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

getSourceWithFormat def

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Crée un fichier Caractéristique DataSource qui lit les données d'une source telle qu'Amazon S3, JDBC ou le AWS Glue Data Catalog, et définit également le format des données stockées dans la source.

  • connectionType – Type de données de la source de données. veuillez consulter Types et options de connexion pour ETL dans AWS Glue pour Spark.

  • options – Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir la connexion avec la source de données. veuillez consulter Types et options de connexion pour ETL dans AWS Glue pour Spark.

  • transformationContext – Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.

  • format – Format des données stockées à la source. Lorsque le connectionType est « s3 », vous pouvez également spécifier format. Peut être « avro », « csv », « grokLog », « ion », « json », « xml », « parquet » ou « orc ».

  • formatOptions – Chaîne JSON de paires nom-valeur qui fournissent des options supplémentaires pour l'analyse des données à la source. veuillez consulter Options de format de données.

Renvoie le DataSource.

Exemples

Créez un DynamicFrame à partir d'une source de données qui est un fichier de valeurs séparées par des virgules (CSV) sur Amazon S3 :

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

Créez un DynamicFrame à partir d'une source de données PostgreSQL à l'aide d'une connexion JDBC :

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

Créez un DynamicFrame à partir d'une source de données MySQL à l'aide d'une connexion JDBC :

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

Permet d'obtenir l'objet SparkSession associé à GlueContext. Utilisez cet SparkSession objet pour enregistrer des tables et des UDF à utiliser avec DataFrame Created From DynamicFrames.

Renvoie le SparkSession.

def startTransaction

def startTransaction(readOnly: Boolean):String

Démarrer une nouvelle transaction. Appelle en interne l'API Démarrer la transaction Lake Formation.

  • readOnly – Valeur booléenne indiquant si cette transaction doit être en lecture seule ou en lecture et en écriture. Les écritures effectuées à l'aide d'un ID de transaction en lecture seule seront rejetées. Les transactions en lecture seule n'ont pas besoin d'être validées.

Retourne l'ID de transaction.

def commitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

Tentative de validation de la transaction spécifiée. commitTransaction peut être renvoyé avant la fin de la validation de la transaction. Appelle en interne la Lake Formation commitTransaction API.

  • transactionId – (Chaîne) La transaction à valider.

  • waitForCommit – (Booléen) Détermine si le commitTransaction retourne immédiatement. La valeur par défaut est True. Si elle est false,commitTransaction interroge et attend que la transaction soit validée. Le temps d'attente est limité à 1 minute en utilisant le backoff exponentiel avec un maximum de 6 tentatives de nouvelle tentative.

Renvoie une valeur de type Booléen pour indiquer si la validation a été effectuée ou non.

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

Tentative d'annulation de la transaction spécifiée. Appelle en interne l'CancelTransactionAPI Lake Formation.

  • transactionId – (Chaîne) La transaction à annuler.

Retourne une exception TransactionCommittedException si la transaction a déjà été validée.

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

Crée un objet GlueContext utilisant le SparkContext spécifié, les partitions minimales et les partitions cible.

  • sc — Le SparkContext.

  • minPartitions — Nombre minimal de partitions.

  • targetPartitions — Nombre cible de partitions.

Renvoie le GlueContext.

def this

def this( sc : SparkContext )

Crée un objet GlueContext avec le SparkContext fourni. Définit le nombre minimal de partitions à 10 et de partitions cibles à 20.

  • sc — Le SparkContext.

Renvoie le GlueContext.

def this

def this( sparkContext : JavaSparkContext )

Crée un objet GlueContext avec le JavaSparkContext fourni. Définit le nombre minimal de partitions à 10 et de partitions cibles à 20.

  • sparkContext — Le JavaSparkContext.

Renvoie le GlueContext.