AWS GlueScala-APIs GlueContext - AWS Glue

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

AWS GlueScala-APIs GlueContext

Paket: com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext ist der Eintrittspunkt für das Lesen und Schreiben eines DynamicFrame von und zu Amazon Simple Storage Service (Amazon S3), den AWS Glue Data Catalog, JDBC und so weiter. Diese Klasse bietet Hilfsfunktionen zum Erstellen von DataSource-Trait- und DataSink-Objekten, die wiederum zum Lesen und Schreiben von DynamicFrames verwendet werden können.

Sie können GlueContext auch verwenden, um eine Zielanzahl an Partitionen (Standard 20) im DynamicFrame festzulegen, wenn die Anzahl der von der Quelle erstellten Partitionen kleiner ist als ein Mindestwert für Partitionen (Standard 10).

Spalten definieren addIngestionTime

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

Hängt die Erfassungszeitspalten wie ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute an die Eingabe DataFrame an. Diese Funktion wird automatisch in dem von AWS Glue erzeugten Skript generiert, wenn Sie eine Data-Catalog-Tabelle mit Amazon S3 als Ziel angeben. Diese Funktion aktualisiert automatisch die Partition mit Erfassungszeitspalten in der Ausgabetabelle. So können die Ausgabedaten bei der Erfassung automatisch partitioniert werden, ohne dass explizite Erfassungszeitspalten in den Eingabedaten erforderlich sind.

  • dataFrame – Der dataFrame, um die Erfassungszeitspalten anzuhängen.

  • timeGranularity – Die Granularität der Zeitspalten. Gültige Werte sind „day“, „hour“ und „minute“. Wenn zum Beispiel „hour“ an die Funktion übergeben wird, werden im Original dataFrame die Zeiten in den Spalten „ingest_year“, „ingest_month“, „ingest_day“, und „ingest_hour“ aktualisiert.

Gibt den Datenrahmen nach dem Anhängen der Zeitgranularitätsspalten zurück.

Beispiel:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrame FromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Gibt einen DataFrame zurück, der mit der angegebenen Verbindung und dem Format erstellt wurde. Verwenden Sie diese Funktion nur mit AWS Glue-Streaming-Quellen.

  • connectionType – Die Art der Streaming-Verbindung. Gültige Werte sind kinesis und kafka.

  • connectionOptions – Verbindungsoptionen, die für Kinesis und Kafka unterschiedlich sind. Die Liste aller Verbindungsoptionen für jede Streaming-Datenquelle finden Sie unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark. Beachten Sie die folgenden Unterschiede bei den Streaming-Verbindungsoptionen:

    • Kinesis-Streaming-Quellen erfordern streamARN, startingPosition, inferSchema und classification.

    • Kafka-Streaming-Quellen erfordern connectionName, topicName, startingOffsets, inferSchema und classification.

  • transformationContext – Der zu verwendende Transformationskontext (optional).

  • format – Eine Formatspezifikation (optional). Diese wird für eine Amazon-S3- oder eine AWS Glue-Verbindung verwendet, die mehrere Formate unterstützt. Informationen zu den unterstützten Formaten finden Sie unter Mögliche Formate für Eingaben und Ausgaben in AWS Glue für Spark

  • formatOptions – Formatierungsoptionen für das angegebene Format. Weitere Informationen zu unterstützten Formatoptionen finden Sie unter Pfad-Formatoptionen.

Beispiel für Amazon-Kinesis-Streaming-Quelle:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Beispiel für die Kafka-Streaming-Quelle:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

Wendet die batch_function auf jeden Mikrobatch an, der von der Streaming-Quelle gelesen wird.

  • frame— Der DataFrame , der den aktuellen Mikrostapel enthält.

  • batch_function – Eine Funktion, die für jeden Mikrobatch angewendet wird.

  • options – Eine Sammlung von Schlüssel-Wert-Paaren, die Informationen zur Verarbeitung von Mikrobatches enthält. Die folgenden Optionen sind erforderlich:

    • windowSize – Die Zeitspanne für die Verarbeitung der einzelnen Batches.

    • checkpointLocation – Der Ort, an dem Checkpoints für den Streaming-ETL-Auftrag gespeichert werden.

    • batchMaxRetries – Die maximale Anzahl der Wiederholungsversuche für diesen Batch, wenn er fehlschlägt. Der Standardwert ist 3. Diese Option ist nur für Glue 2.0 und höher konfigurierbar.

Beispiel:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Erstellt eine DataSink, die in einen Speicherort schreibt, der in einer Tabelle angegeben ist, die im Data Catalog definiert ist.

  • database – Der Datenbankname im Data Catalog.

  • tableName – Der Tabellenname im Data Catalog.

  • redshiftTmpDir – Das vorläufige Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.

  • transformationContext – Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.

  • additionalOptions – Zusätzliche Optionen für AWS Glue.

  • catalogId – Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.

Gibt den DataSink zurück.

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Erstellt einen DataSource-Trait, der Daten aus einer Tabellendefinition im Data Catalog liest.

  • database – Der Datenbankname im Data Catalog.

  • tableName – Der Tabellenname im Data Catalog.

  • redshiftTmpDir – Das vorläufige Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.

  • transformationContext – Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.

  • pushDownPredicate – Filtert Partitionen, ohne alle Dateien in Ihrem Datensatz auflisten und lesen zu müssen. Weitere Informationen finden Sie unter Vorabfilterung mit Pushdown-Prädikaten.

  • additionalOptions – Eine Sammlung optionaler Name/Wert-Paare. Zu den möglichen Optionen gehören die unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark aufgeführten, außer endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification und delimiter. Eine weitere unterstützte Option ist catalogPartitionPredicate:

    catalogPartitionPredicate – Sie können einen Katalogausdruck basierend auf den Indexspalten an den Filter übergeben. Dies verlagert die Filterung auf die Serverseite. Weitere Informationen finden Sie unter AWS Glue-Partition-Indizes. Beachten Sie, dass push_down_predicate und catalogPartitionPredicate verschiedene Syntaxen verwenden. Erstere verwendet die Spark-SQL-Standardsyntax und letztere verwendet den JSQL-Parser.

  • catalogId – Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.

Gibt den DataSource zurück.

Beispiel für eine Streaming-Quelle

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Erstellt eine DataSink, die in eine JDBC-Datenbank schreibt, die im Data Catalog in einem Connection-Objekt angegeben ist. Das Connection-Objekt enthält Informationen zum Herstellen einer Verbindung zu einer JDBC-Senke, einschließlich URL, Benutzername, Passwort, VPC, Subnetz und Sicherheitsgruppen.

  • catalogConnection – Der Name der Verbindung im Data Catalog, die die JDBC-URL zum Schreiben enthält.

  • options – Eine Reihe von JSON-Namen-Wert-Paaren, die zusätzliche Informationen liefern, die zum Schreiben in einen JDBC-Datenspeicher erforderlich sind. Dies umfasst:

    • dbtable (erforderlich) – Der Name der JDBC-Tabelle. Bei JDBC-Datenspeichern, die von Schemata innerhalb einer Datenbank unterstützen, geben Sie schema.table-name an. Wenn kein Schema angegeben ist, wird der Standardwert "öffentliches" Schema verwendet. Das folgende Beispiel zeigt einen Optionsparameter, der auf ein Schema mit Namen test und einer Tabelle mit Namen test_table in der Datenbank test_db verweist.

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database (erforderlich) – Der Name der JDBC-Datenbank.

    • Alle zusätzlichen Optionen werden direkt an den SparkSQL JDBC-Writer übergeben. Weitere Informationen finden Sie unter Redshift-Datenquelle für Spark.

  • redshiftTmpDir – Ein vorläufiges Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.

  • transformationContext – Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.

  • catalogId – Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.

Beispiel-Code:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

Gibt den DataSink zurück.

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Erstellt eineDataSink, die Daten in ein Ziel wie Amazon Simple Storage Service (Amazon S3), JDBC oder den AWS Glue Data Catalog oder einen Apache Kafka- oder Amazon Kinesis Kinesis-Datenstream schreibt.

Gibt den DataSink zurück.

def-Format getSinkWith

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Erstellt eineDataSink, die Daten in ein Ziel wie Amazon S3, JDBC oder den Datenkatalog oder einen Apache Kafka- oder Amazon Kinesis Kinesis-Datenstream schreibt. Legt auch das Format für die Daten fest, die an das Ziel geschrieben werden sollen.

  • connectionType – Typ der Verbindung. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.

  • options – Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datensenke bereitstellt. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.

  • transformationContext – Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.

  • format – Das Format der Daten, die in das Ziel geschrieben werden.

  • formatOptions – Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Optionen zum Formatieren der Daten am Ziel bereitstellt. Siehe Pfad-Formatoptionen.

Gibt den DataSink zurück.

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Erstellt eineDataSource-Trait, die Daten aus einer Quelle wie Amazon S3, JDBC oder dem AWS Glue-Datenkatalog liest. Unterstützt auch Kafka- und Kinesis-Streaming-Datenquellen.

  • connectionType – Der Typ der Datenquelle. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.

  • connectionOptions – Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datenquelle bereitstellt. Weitere Informationen finden Sie unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark.

    Für eine Kinesis-Streaming-Quelle sind die folgenden Verbindungsoptionen erforderlich: streamARN, startingPosition, inferSchema und classification.

    Für eine Kafka-Streaming-Quelle sind die folgenden Verbindungsoptionen erforderlich: connectionName, topicName, startingOffsets, inferSchema und classification.

  • transformationContext – Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.

  • pushDownPredicate – Prädikat für Partitionsspalten.

Gibt den DataSource zurück.

Beispiel für Amazon-Kinesis-Streaming-Quelle:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Beispiel für die Kafka-Streaming-Quelle:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

def-Format getSourceWith

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Erstellt eineDataSource-Trait, die Daten aus einer Quelle wie Amazon S3, JDBC oder dem AWS Glue-Datenkatalog liest und auch das Format der in der Quelle gespeicherten Daten festlegt.

  • connectionType – Der Typ der Datenquelle. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.

  • options – Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datenquelle bereitstellt. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.

  • transformationContext – Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.

  • format – Das Format der in der Quelle gespeicherten Daten. Wenn der connectionType „s3“ ist, können Sie auch format angeben. Mögliche Werte sind: „avro“, „csv“, „grokLog“, „ion“, „json“, „xml“, „parquet“ oder „orc“.

  • formatOptions – Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Optionen zum Analysieren von Daten an der Quelle bereitstellt. Siehe Pfad-Formatoptionen.

Gibt den DataSource zurück.

Beispiele

Erstellen Sie eine Datei DynamicFrame aus einer Datenquelle, bei der es sich um eine Datei mit kommagetrennten Werten (CSV) auf Amazon S3 handelt:

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

Erstellen Sie mithilfe einer JDBC-Verbindung DynamicFrame aus einer Datenquelle, bei der es sich um eine PostgreSQL-Datenquelle handelt:

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

Erstellen Sie mithilfe einer JDBC-Verbindung DynamicFrame aus einer Datenquelle, die ein MySQL ist:

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

Ruft das SparkSession-Objekt ab, das mit diesem GlueContext verknüpft ist. Verwenden Sie dieses SparkSession Objekt, um Tabellen und UDFs für die Verwendung mit DataFrame created from zu registrieren. DynamicFrames

Gibt den zurück. SparkSession

def startTransaction

def startTransaction(readOnly: Boolean):String

Starten Sie eine neue Transaktion. Ruft intern die startTransaction-API von Lake Formation auf.

  • readOnly – (Boolean) Gibt an, ob diese Transaktion schreibgeschützt oder gelesen und geschrieben werden soll. Schreibvorgänge mit einer schreibgeschützten Transaktions-ID werden abgelehnt. Schreibgeschützte Transaktionen müssen nicht festgeschrieben werden.

Gibt die Transaktions-ID zurück.

def commitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

Versucht, die angegebene Transaktion zu übernehmen. commitTransaction kann zurückkehren, bevor die Transaktion abgeschlossen ist. Ruft intern die CommitTransaction-API von Lake Formation auf.

  • transactionId – (String) Die zu verbindende Transaktion.

  • waitForCommit – (Boolean) Bestimmt, ob die Rückgabe von commitTransaction sofort erfolgt. Der Standardwert ist "True". Wenn false (falsch), befragt commitTransaction und wartet, bis die Transaktion übergeben wurde. Die Dauer der Wartezeit ist mit einem exponentiellen Backoff mit maximal 6 Wiederholungsversuchen auf 1 Minute beschränkt.

Gibt einen booleschen Wert zurück, um anzugeben, ob das Commit abgeschlossen ist oder nicht.

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

Versucht, die angegebene Transaktion abzubrechen. Ruft intern die Lake Formation CancelTransactionAPI auf.

  • transactionId – (String) Die abzubrechende Transaktion.

Gibt eine TransactionCommittedException-Ausnahme zurück, wenn die Transaktion zuvor festgeschrieben wurde.

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

Erzeugt ein GlueContext-Objekt mit dem angegebenen SparkContext, minimalen Partitionen und Zielpartitionen.

  • sc — Das Tool SparkContext.

  • minPartitions – Die Mindestanzahl an Partitionen.

  • targetPartitions – Die Zielanzahl an Partitionen.

Gibt den GlueContext zurück.

def this

def this( sc : SparkContext )

Erstellt ein GlueContext-Objekt mit dem mitgelieferten SparkContext. Legt die Mindestpartitionen auf 10 und die Zielpartitionen auf 20 fest.

  • sc — Das Tool SparkContext.

Gibt den GlueContext zurück.

def this

def this( sparkContext : JavaSparkContext )

Erstellt ein GlueContext-Objekt mit dem mitgelieferten JavaSparkContext. Legt die Mindestpartitionen auf 10 und die Zielpartitionen auf 20 fest.

  • sparkContext — Das Tool JavaSparkContext.

Gibt den GlueContext zurück.