Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
AWS GlueScala-APIs GlueContext
Paket: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
ist der Eintrittspunkt für das Lesen und Schreiben eines DynamicFrame von und zu Amazon Simple Storage Service (Amazon S3), den AWS Glue Data Catalog, JDBC und so weiter. Diese Klasse bietet Hilfsfunktionen zum Erstellen von DataSource-Trait- und DataSink-Objekten, die wiederum zum Lesen und Schreiben von DynamicFrame
s verwendet werden können.
Sie können GlueContext
auch verwenden, um eine Zielanzahl an Partitionen (Standard 20) im DynamicFrame
festzulegen, wenn die Anzahl der von der Quelle erstellten Partitionen kleiner ist als ein Mindestwert für Partitionen (Standard 10).
Spalten definieren addIngestionTime
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Hängt die Erfassungszeitspalten wie ingest_year
, ingest_month
, ingest_day
, ingest_hour
, ingest_minute
an die Eingabe DataFrame
an. Diese Funktion wird automatisch in dem von AWS Glue erzeugten Skript generiert, wenn Sie eine Data-Catalog-Tabelle mit Amazon S3 als Ziel angeben. Diese Funktion aktualisiert automatisch die Partition mit Erfassungszeitspalten in der Ausgabetabelle. So können die Ausgabedaten bei der Erfassung automatisch partitioniert werden, ohne dass explizite Erfassungszeitspalten in den Eingabedaten erforderlich sind.
-
dataFrame
– DerdataFrame
, um die Erfassungszeitspalten anzuhängen. -
timeGranularity
– Die Granularität der Zeitspalten. Gültige Werte sind „day
“, „hour
“ und „minute
“. Wenn zum Beispiel „hour
“ an die Funktion übergeben wird, werden im OriginaldataFrame
die Zeiten in den Spalten „ingest_year
“, „ingest_month
“, „ingest_day
“, und „ingest_hour
“ aktualisiert.
Gibt den Datenrahmen nach dem Anhängen der Zeitgranularitätsspalten zurück.
Beispiel:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrame FromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Gibt einen DataFrame
zurück, der mit der angegebenen Verbindung und dem Format erstellt wurde. Verwenden Sie diese Funktion nur mit AWS Glue-Streaming-Quellen.
connectionType
– Die Art der Streaming-Verbindung. Gültige Werte sindkinesis
undkafka
.-
connectionOptions
– Verbindungsoptionen, die für Kinesis und Kafka unterschiedlich sind. Die Liste aller Verbindungsoptionen für jede Streaming-Datenquelle finden Sie unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark. Beachten Sie die folgenden Unterschiede bei den Streaming-Verbindungsoptionen:-
Kinesis-Streaming-Quellen erfordern
streamARN
,startingPosition
,inferSchema
undclassification
. -
Kafka-Streaming-Quellen erfordern
connectionName
,topicName
,startingOffsets
,inferSchema
undclassification
.
-
transformationContext
– Der zu verwendende Transformationskontext (optional).format
– Eine Formatspezifikation (optional). Diese wird für eine Amazon-S3- oder eine AWS Glue-Verbindung verwendet, die mehrere Formate unterstützt. Informationen zu den unterstützten Formaten finden Sie unter Mögliche Formate für Eingaben und Ausgaben in AWS Glue für SparkformatOptions
– Formatierungsoptionen für das angegebene Format. Weitere Informationen zu unterstützten Formatoptionen finden Sie unter Pfad-Formatoptionen.
Beispiel für Amazon-Kinesis-Streaming-Quelle:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Beispiel für die Kafka-Streaming-Quelle:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
Wendet die batch_function
auf jeden Mikrobatch an, der von der Streaming-Quelle gelesen wird.
-
frame
— Der DataFrame , der den aktuellen Mikrostapel enthält. -
batch_function
– Eine Funktion, die für jeden Mikrobatch angewendet wird. -
options
– Eine Sammlung von Schlüssel-Wert-Paaren, die Informationen zur Verarbeitung von Mikrobatches enthält. Die folgenden Optionen sind erforderlich:-
windowSize
– Die Zeitspanne für die Verarbeitung der einzelnen Batches. -
checkpointLocation
– Der Ort, an dem Checkpoints für den Streaming-ETL-Auftrag gespeichert werden. -
batchMaxRetries
– Die maximale Anzahl der Wiederholungsversuche für diesen Batch, wenn er fehlschlägt. Der Standardwert ist 3. Diese Option ist nur für Glue 2.0 und höher konfigurierbar.
-
Beispiel:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Erstellt eine DataSink, die in einen Speicherort schreibt, der in einer Tabelle angegeben ist, die im Data Catalog definiert ist.
database
– Der Datenbankname im Data Catalog.tableName
– Der Tabellenname im Data Catalog.redshiftTmpDir
– Das vorläufige Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.transformationContext
– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.additionalOptions
– Zusätzliche Optionen für AWS Glue.catalogId
– Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.
Gibt den DataSink
zurück.
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Erstellt einen DataSource-Trait, der Daten aus einer Tabellendefinition im Data Catalog liest.
database
– Der Datenbankname im Data Catalog.tableName
– Der Tabellenname im Data Catalog.redshiftTmpDir
– Das vorläufige Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.transformationContext
– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.pushDownPredicate
– Filtert Partitionen, ohne alle Dateien in Ihrem Datensatz auflisten und lesen zu müssen. Weitere Informationen finden Sie unter Vorabfilterung mit Pushdown-Prädikaten.additionalOptions
– Eine Sammlung optionaler Name/Wert-Paare. Zu den möglichen Optionen gehören die unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark aufgeführten, außerendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
unddelimiter
. Eine weitere unterstützte Option istcatalogPartitionPredicate
:catalogPartitionPredicate
– Sie können einen Katalogausdruck basierend auf den Indexspalten an den Filter übergeben. Dies verlagert die Filterung auf die Serverseite. Weitere Informationen finden Sie unter AWS Glue-Partition-Indizes. Beachten Sie, dasspush_down_predicate
undcatalogPartitionPredicate
verschiedene Syntaxen verwenden. Erstere verwendet die Spark-SQL-Standardsyntax und letztere verwendet den JSQL-Parser.catalogId
– Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.
Gibt den DataSource
zurück.
Beispiel für eine Streaming-Quelle
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Erstellt eine DataSink, die in eine JDBC-Datenbank schreibt, die im Data Catalog in einem Connection
-Objekt angegeben ist. Das Connection
-Objekt enthält Informationen zum Herstellen einer Verbindung zu einer JDBC-Senke, einschließlich URL, Benutzername, Passwort, VPC, Subnetz und Sicherheitsgruppen.
catalogConnection
– Der Name der Verbindung im Data Catalog, die die JDBC-URL zum Schreiben enthält.options
– Eine Reihe von JSON-Namen-Wert-Paaren, die zusätzliche Informationen liefern, die zum Schreiben in einen JDBC-Datenspeicher erforderlich sind. Dies umfasst:dbtable (erforderlich) – Der Name der JDBC-Tabelle. Bei JDBC-Datenspeichern, die von Schemata innerhalb einer Datenbank unterstützen, geben Sie
schema.table-name
an. Wenn kein Schema angegeben ist, wird der Standardwert "öffentliches" Schema verwendet. Das folgende Beispiel zeigt einen Optionsparameter, der auf ein Schema mit Namentest
und einer Tabelle mit Namentest_table
in der Datenbanktest_db
verweist.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (erforderlich) – Der Name der JDBC-Datenbank.
Alle zusätzlichen Optionen werden direkt an den SparkSQL JDBC-Writer übergeben. Weitere Informationen finden Sie unter Redshift-Datenquelle für Spark
.
redshiftTmpDir
– Ein vorläufiges Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.transformationContext
– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.catalogId
– Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.
Beispiel-Code:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Gibt den DataSink
zurück.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Erstellt eineDataSink, die Daten in ein Ziel wie Amazon Simple Storage Service (Amazon S3), JDBC oder den AWS Glue Data Catalog oder einen Apache Kafka- oder Amazon Kinesis Kinesis-Datenstream schreibt.
-
connectionType
– Typ der Verbindung. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark. -
connectionOptions
– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datensenke bereitstellt. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark. -
transformationContext
– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.
Gibt den DataSink
zurück.
def-Format getSinkWith
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Erstellt eineDataSink, die Daten in ein Ziel wie Amazon S3, JDBC oder den Datenkatalog oder einen Apache Kafka- oder Amazon Kinesis Kinesis-Datenstream schreibt. Legt auch das Format für die Daten fest, die an das Ziel geschrieben werden sollen.
connectionType
– Typ der Verbindung. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.-
options
– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datensenke bereitstellt. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark. transformationContext
– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.format
– Das Format der Daten, die in das Ziel geschrieben werden.formatOptions
– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Optionen zum Formatieren der Daten am Ziel bereitstellt. Siehe Pfad-Formatoptionen.
Gibt den DataSink
zurück.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Erstellt eineDataSource-Trait, die Daten aus einer Quelle wie Amazon S3, JDBC oder dem AWS Glue-Datenkatalog liest. Unterstützt auch Kafka- und Kinesis-Streaming-Datenquellen.
connectionType
– Der Typ der Datenquelle. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.-
connectionOptions
– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datenquelle bereitstellt. Weitere Informationen finden Sie unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark.Für eine Kinesis-Streaming-Quelle sind die folgenden Verbindungsoptionen erforderlich:
streamARN
,startingPosition
,inferSchema
undclassification
.Für eine Kafka-Streaming-Quelle sind die folgenden Verbindungsoptionen erforderlich:
connectionName
,topicName
,startingOffsets
,inferSchema
undclassification
. transformationContext
– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.pushDownPredicate
– Prädikat für Partitionsspalten.
Gibt den DataSource
zurück.
Beispiel für Amazon-Kinesis-Streaming-Quelle:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Beispiel für die Kafka-Streaming-Quelle:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
def-Format getSourceWith
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Erstellt eineDataSource-Trait, die Daten aus einer Quelle wie Amazon S3, JDBC oder dem AWS Glue-Datenkatalog liest und auch das Format der in der Quelle gespeicherten Daten festlegt.
connectionType
– Der Typ der Datenquelle. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.-
options
– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datenquelle bereitstellt. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark. transformationContext
– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.format
– Das Format der in der Quelle gespeicherten Daten. Wenn derconnectionType
„s3“ ist, können Sie auchformat
angeben. Mögliche Werte sind: „avro“, „csv“, „grokLog“, „ion“, „json“, „xml“, „parquet“ oder „orc“.formatOptions
– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Optionen zum Analysieren von Daten an der Quelle bereitstellt. Siehe Pfad-Formatoptionen.
Gibt den DataSource
zurück.
Beispiele
Erstellen Sie eine Datei DynamicFrame aus einer Datenquelle, bei der es sich um eine Datei mit kommagetrennten Werten (CSV) auf Amazon S3 handelt:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Erstellen Sie mithilfe einer JDBC-Verbindung DynamicFrame aus einer Datenquelle, bei der es sich um eine PostgreSQL-Datenquelle handelt:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
Erstellen Sie mithilfe einer JDBC-Verbindung DynamicFrame aus einer Datenquelle, die ein MySQL ist:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
Ruft das SparkSession
-Objekt ab, das mit diesem GlueContext verknüpft ist. Verwenden Sie dieses SparkSession Objekt, um Tabellen und UDFs für die Verwendung mit DataFrame
created from zu registrieren. DynamicFrames
Gibt den zurück. SparkSession
def startTransaction
def startTransaction(readOnly: Boolean):String
Starten Sie eine neue Transaktion. Ruft intern die startTransaction-API von Lake Formation auf.
readOnly
– (Boolean) Gibt an, ob diese Transaktion schreibgeschützt oder gelesen und geschrieben werden soll. Schreibvorgänge mit einer schreibgeschützten Transaktions-ID werden abgelehnt. Schreibgeschützte Transaktionen müssen nicht festgeschrieben werden.
Gibt die Transaktions-ID zurück.
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Versucht, die angegebene Transaktion zu übernehmen. commitTransaction
kann zurückkehren, bevor die Transaktion abgeschlossen ist. Ruft intern die CommitTransaction-API von Lake Formation auf.
transactionId
– (String) Die zu verbindende Transaktion.waitForCommit
– (Boolean) Bestimmt, ob die Rückgabe voncommitTransaction
sofort erfolgt. Der Standardwert ist "True". Wenn false (falsch), befragtcommitTransaction
und wartet, bis die Transaktion übergeben wurde. Die Dauer der Wartezeit ist mit einem exponentiellen Backoff mit maximal 6 Wiederholungsversuchen auf 1 Minute beschränkt.
Gibt einen booleschen Wert zurück, um anzugeben, ob das Commit abgeschlossen ist oder nicht.
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
Versucht, die angegebene Transaktion abzubrechen. Ruft intern die Lake Formation CancelTransactionAPI auf.
transactionId
– (String) Die abzubrechende Transaktion.
Gibt eine TransactionCommittedException
-Ausnahme zurück, wenn die Transaktion zuvor festgeschrieben wurde.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Erzeugt ein GlueContext
-Objekt mit dem angegebenen SparkContext
, minimalen Partitionen und Zielpartitionen.
sc
— Das ToolSparkContext
.minPartitions
– Die Mindestanzahl an Partitionen.targetPartitions
– Die Zielanzahl an Partitionen.
Gibt den GlueContext
zurück.
def this
def this( sc : SparkContext )
Erstellt ein GlueContext
-Objekt mit dem mitgelieferten SparkContext
. Legt die Mindestpartitionen auf 10 und die Zielpartitionen auf 20 fest.
sc
— Das ToolSparkContext
.
Gibt den GlueContext
zurück.
def this
def this( sparkContext : JavaSparkContext )
Erstellt ein GlueContext
-Objekt mit dem mitgelieferten JavaSparkContext
. Legt die Mindestpartitionen auf 10 und die Zielpartitionen auf 20 fest.
sparkContext
— Das ToolJavaSparkContext
.
Gibt den GlueContext
zurück.