AWS GlueAPI Scala GlueContext - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

AWS GlueAPI Scala GlueContext

Package: com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext adalah titik masuk untuk membaca dan menulis DynamicFrame dari dan ke Amazon Simple Storage Service (Amazon S3), Katalog Data Glue AWS , JDBC, dan sebagainya. Kelas ini menyediakan fungsi utilitas untuk membuat objek DataSource sifat dan DataSink yang pada gilirannya dapat digunakan untuk membaca dan menulis DynamicFrame.

Anda juga dapat menggunakan GlueContext untuk menetapkan target jumlah partisi (default 20) di DynamicFrame jika jumlah partisi yang dibuat dari sumber kurang dari ambang batas minimum untuk partisi (default 10).

Kolom def addIngestionTime

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

Menambahkan kolom waktu penyerapan seperti ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute ke input DataFrame. Fungsi ini secara otomatis dihasilkan dalam skrip yang dihasilkan oleh AWS Glue saat Anda menentukan tabel Katalog Data dengan Amazon S3 sebagai target. Fungsi ini secara otomatis memperbarui partisi dengan kolom waktu penyerapan pada tabel output. Hal ini memungkinkan data output dipartisi secara otomatis pada waktu penyerapan tanpa memerlukan kolom waktu penyerapan eksplisit dalam data input.

  • dataFramedataFrame yang akan ditambahi dengan kolom waktu penyerapan.

  • timeGranularity — Kedetailan dari kolom waktu. Nilai yang benar adalah "day", "hour" dan "minute". Misalnya, jika "hour" diberikan dalam fungsi, maka dataFrame asli akan memiliki kolom waktu "ingest_year", "ingest_month", "ingest_day", dan "ingest_hour" yang ditambahkan.

Mengembalikan bingkai data setelah menambahkan kolom kedetailan waktu.

Contoh:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrame FromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Mengembalikan sebuah DataFrame dibuat dengan koneksi dan format yang ditentukan. Gunakan fungsi ini hanya dengan sumber streaming AWS Glue.

  • connectionType— Jenis koneksi streaming. Nilai yang valid mencakup kinesis dan kafka.

  • connectionOptions— Opsi koneksi, yang berbeda untuk Kinesis dan Kafka. Anda dapat menemukan daftar semua opsi koneksi untuk setiap sumber data streaming di Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark. Perhatikan perbedaan berikut dalam pilihan koneksi streaming:

    • Sumber streaming Kinesis memerlukan streamARN, startingPosition, inferSchema, dan classification.

    • Sumber streaming Kafka membutuhkan connectionName, topicName, startingOffsets, inferSchema, dan classification.

  • transformationContext— Konteks transformasi yang akan digunakan (opsional).

  • format- Spesifikasi format (opsional). Ini digunakan untuk Amazon S3 atau AWS Glue koneksi yang mendukung berbagai format. Untuk informasi tentang format yang didukung, lihat Opsi format data untuk input dan output untuk Spark AWS Glue

  • formatOptions— Opsi format untuk format yang ditentukan. Untuk informasi tentang pilihan format yang didukung, lihat Opsi format data.

Contoh untuk sumber streaming Amazon Kinesis:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Contoh untuk sumber streaming Kafka:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

Menerapkan batch_function yang diberikan ke setiap batch mikro yang dibaca dari sumber Streaming.

  • frame— Yang DataFrame berisi batch mikro saat ini.

  • batch_function — Sebuah fungsi yang akan diterapkan untuk setiap batch mikro.

  • options — Kumpulan pasangan kunci-nilai yang menyimpan informasi tentang cara memproses batch mikro. Opsi-opsi berikut diperlukan:

    • windowSize — Jumlah waktu yang diperlukan untuk pemrosesan setiap batch.

    • checkpointLocation — Lokasi di mana pos pemeriksaan disimpan untuk tugas ETL streaming.

    • batchMaxRetries — Jumlah waktu maksimum untuk mengulang mencoba batch sekali lagi jika gagal. Nilai default-nya adalah 3. Opsi ini hanya dapat dikonfigurasi untuk Glue versi 2.0 dan di atasnya.

Contoh:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Membuat sebuah DataSink yang menulis ke lokasi yang ditentukan dalam tabel yang didefinisikan dalam Katalog Data.

  • database — Nama basis data dalam Katalog Data.

  • tableName — Nama tabel dalam Katalog Data.

  • redshiftTmpDir — Direktori pentahapan sementara yang akan digunakan dengan data sink tertentu. Diatur ke kosong secara default.

  • transformationContext — Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.

  • additionalOptions— Opsi tambahan yang disediakan untukAWS Glue.

  • catalogId — ID katalog (ID akun) dari Katalog Data yang sedang diakses. Bila nol, maka ID akun default pemanggil yang akan digunakan.

Mengembalikan DataSink.

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Membuat sebuah DataSource sifat yang membaca data dari tabel definisi dalam Katalog Data.

  • database — Nama basis data dalam Katalog Data.

  • tableName — Nama tabel dalam Katalog Data.

  • redshiftTmpDir — Direktori pentahapan sementara yang akan digunakan dengan data sink tertentu. Diatur ke kosong secara default.

  • transformationContext — Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.

  • pushDownPredicate — Memfilter partisi tanpa harus mencantumkan dan membaca semua file dalam set data Anda. Untuk informasi selengkapnya, lihat Pra-penyaringan menggunakan predikat pushdown.

  • additionalOptions — Kumpulan pasangan nama-nilai opsional. Opsi yang mungkin adalah opsi-opsi yang tercantum dalam Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark kecuali endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification, dan delimiter. Opsi lain yang didukung adalahcatalogPartitionPredicate:

    catalogPartitionPredicate— Anda dapat meneruskan ekspresi katalog untuk memfilter berdasarkan kolom indeks. Ini mendorong penyaringan ke sisi server. Untuk informasi selengkapnya, lihat Indeks AWS Glue Partisi. Perhatikan itu push_down_predicate dan catalogPartitionPredicate gunakan sintaks yang berbeda. Yang pertama menggunakan sintaks standar Spark SQL dan yang kemudian menggunakan parser JSQL.

  • catalogId — ID katalog (ID akun) dari Katalog Data yang sedang diakses. Bila nol, maka ID akun default pemanggil yang akan digunakan.

Mengembalikan DataSource.

Contoh untuk sumber streaming

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Membuat sebuah DataSink yang menulis ke basis data JDBC yang ditentukan dalam objek Connection dalam Katalog Data. Objek Connection memiliki informasi untuk terhubung ke sbuah sink JDBC, termasuk URL, nama pengguna, kata sandi, VPC, subnet, dan grup keamanan.

  • catalogConnection — Nama koneksi dalam Katalog Data yang berisi URL JDBC yang akan ditulisi.

  • options — Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan yang diperlukan untuk menulis ke penyimpanan data JDBC. Hal ini mencakup:

    • dbtable (wajib) — Nama tabel JDBC. Untuk penyimpanan data JDBC yang mendukung skema dalam basis data, tentukan schema.table-name. Jika skema tidak disediakan, maka skema "publik" default digunakan. Contoh berikut menunjukkan parameter pilihan yang mengarahkan ke skema bernama test dan sebuah tabel bernama test_table dalam basis data test_db.

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database (wajib) — Nama basis data JDBC.

    • Setiap pilihan tambahan diberikan langsung ke penulis JDBC SparkSQL. Untuk informasi selengkapnya, lihat Sumber data Redshift untuk Spark.

  • redshiftTmpDir — Sebuah direktori pentahapan sementara yang akan digunakan dengan data sink tertentu. Diatur ke kosong secara default.

  • transformationContext — Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.

  • catalogId — ID katalog (ID akun) dari Katalog Data yang sedang diakses. Bila nol, maka ID akun default pemanggil yang akan digunakan.

Kode contoh:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

Mengembalikan DataSink.

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Membuat file DataSink yang menulis data ke tujuan seperti Amazon Simple Storage Service (Amazon S3), JDBC, atau Glue Data Catalog, atau AWS aliran data Apache Kafka atau Amazon Kinesis.

Mengembalikan DataSink.

Format def getSinkWith

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Membuat DataSink yang menulis data ke tujuan seperti Amazon S3, JDBC, atau Katalog Data, atau aliran data Apache Kafka atau Amazon Kinesis. Juga menetapkan format untuk data yang akan ditulis ke tujuan.

Mengembalikan DataSink.

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Membuat DataSource sifat yang membaca data dari sumber seperti Amazon S3, JDBC, atau Glue AWS Data Catalog. Juga mendukung sumber data streaming Kafka dan Kinesis.

  • connectionType — Jenis sumber data. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark.

  • connectionOptions — Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan untuk membangun sebuah koneksi dengan sumber data. Untuk informasi selengkapnya, lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark.

    Sumber streaming Kinesis memerlukan opsi koneksi berikut: streamARN, startingPosition, inferSchema, dan classification.

    Sumber streaming Kafka membutuhkan pilihan koneksi berikut: connectionName, topicName, startingOffsets, inferSchema, dan classification.

  • transformationContext — Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.

  • pushDownPredicate — Predikat pada kolom partisi.

Mengembalikan DataSource.

Contoh untuk sumber streaming Amazon Kinesis:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Contoh untuk sumber streaming Kafka:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Format def getSourceWith

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Membuat DataSource sifat yang membaca data dari sumber seperti Amazon S3, JDBC, atau AWS Glue Data Catalog, dan juga menetapkan format data yang disimpan dalam sumber.

  • connectionType — Jenis sumber data. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark.

  • options — Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan untuk membangun sebuah koneksi dengan sumber data. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark.

  • transformationContext — Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.

  • format — Format data yang disimpan pada sumber. Saat connectionType adalah "s3", Anda juga dapat menentukan format. Bisa berupa “avro”, “csv”, “groklog”, “ion”, “json”, “xml”, “parquet”, atau “orc”, salah satunya.

  • formatOptions — Sebuah string pasangan nama-nilai JSON yang menyediakan opsi tambahan untuk mengurai data di sumber. Lihat Opsi format data.

Mengembalikan DataSource.

Contoh

Buat DynamicFrame dari sumber data yang merupakan file nilai yang dipisahkan koma (CSV) di Amazon S3:

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

Buat DynamicFrame dari sumber data yang merupakan PostgreSQL menggunakan koneksi JDBC:

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

Buat DynamicFrame dari sumber data yang merupakan MySQL menggunakan koneksi JDBC:

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

Mendapat SparkSession objek yang terkait dengan ini GlueContext. Gunakan SparkSession objek ini untuk mendaftarkan tabel dan UDF untuk digunakan dengan DataFrame dibuat dari DynamicFrames.

Mengembalikan SparkSession.

def StartTransaksi

def startTransaction(readOnly: Boolean):String

Mulai transaksi baru. Secara internal memanggil Lake Formation StartTransaction API.

  • readOnly— (Boolean) Menunjukkan apakah transaksi ini harus dibaca saja atau dibaca dan ditulis. Penulisan yang dibuat menggunakan ID transaksi hanya-baca akan ditolak. Transaksi read-only tidak perlu dilakukan.

Mengembalikan ID transaksi.

def CommitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

Upaya untuk melakukan transaksi yang ditentukan. commitTransactiondapat kembali sebelum transaksi selesai dilakukan. Secara internal memanggil Lake Formation CommitTransaction API.

  • transactionId— (String) Transaksi untuk melakukan.

  • waitForCommit— (Boolean) Menentukan apakah commitTransaction pengembalian segera. Nilai default-nya adalah betul. Jika salah, commitTransaction polling dan menunggu sampai transaksi dilakukan. Jumlah waktu tunggu dibatasi hingga 1 menit menggunakan backoff eksponensial dengan maksimal 6 upaya coba lagi.

Mengembalikan Boolean untuk menunjukkan apakah komit dilakukan atau tidak.

def batalkan Transaksi

def cancelTransaction(transactionId: String): Unit

Upaya untuk membatalkan transaksi yang ditentukan. Secara internal memanggil Lake Formation CancelTransactionAPI.

  • transactionId— (String) Transaksi untuk membatalkan.

Mengembalikan TransactionCommittedException pengecualian jika transaksi sebelumnya dilakukan.

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

Membuat sebuah objek GlueContext menggunakan SparkContext yang ditentukan, partisi minimum, dan partisi target.

  • scSparkContext.

  • minPartitions — Jumlah partisi minimum.

  • targetPartitions — Jumlah partisi target.

Mengembalikan GlueContext.

def this

def this( sc : SparkContext )

Membuat sebuah objek GlueContext dengan SparkContext yang disediakan. Menetapkan partisi minimum ke 10 dan partisi target ke 20.

  • scSparkContext.

Mengembalikan GlueContext.

def this

def this( sparkContext : JavaSparkContext )

Membuat sebuah objek GlueContext dengan JavaSparkContext yang disediakan. Menetapkan partisi minimum ke 10 dan partisi target ke 20.

  • sparkContextJavaSparkContext.

Mengembalikan GlueContext.