Memperbarui skema, dan menambahkan partisi baru di Katalog Data menggunakan AWS Glue ETLLowongan - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memperbarui skema, dan menambahkan partisi baru di Katalog Data menggunakan AWS Glue ETLLowongan

Pekerjaan ekstrak, transformasi, dan load (ETL) Anda mungkin membuat partisi tabel baru di penyimpanan data target. Skema dataset Anda dapat berkembang dan menyimpang dari AWS Glue Skema Katalog Data dari waktu ke waktu. AWS Glue ETLpekerjaan sekarang menyediakan beberapa fitur yang dapat Anda gunakan dalam ETL skrip Anda untuk memperbarui skema dan partisi Anda di Katalog Data. Fitur-fitur ini memungkinkan Anda untuk melihat hasil ETL pekerjaan Anda di Katalog Data, tanpa harus menjalankan ulang crawler.

Partisi baru

Jika Anda ingin melihat partisi baru di AWS Glue Data Catalog, Anda dapat melakukan salah satu hal berikut:

  • Setelah tugas selesai, jalankan kembali crawler, dan lihat partisi baru di konsol tersebut saat crawler selesai.

  • Setelah tugas selesai, segera lihat partisi baru di konsol tersebut, tanpa harus menjalankan ulang crawler. Anda dapat mengaktifkan fitur ini dengan menambahkan beberapa baris kode ke ETL skrip Anda, seperti yang ditunjukkan pada contoh berikut. Kode menggunakan argumen enableUpdateCatalog untuk menunjukkan bahwa Katalog Data akan diperbarui selama eksekusi tugas saat partisi baru dibuat.

Metode 1

Berikan enableUpdateCatalog dan partitionKeys dalam sebuah argumen pilihan.

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
Metode 2

Berikan enableUpdateCatalog dan partitionKeys dalam getSink(), dan panggil setCatalogInfo() di objek DataSink.

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

Sekarang, Anda dapat membuat tabel katalog baru, memperbarui tabel yang ada dengan skema yang dimodifikasi, dan menambahkan partisi tabel baru di Katalog Data menggunakan AWS Glue ETLpekerjaan itu sendiri, tanpa perlu menjalankan kembali crawler.

Memperbarui skema tabel

Jika Anda ingin menimpa skema tabel Katalog Data Anda, Anda dapat melakukan salah satu hal berikut:

  • Setelah tugas selesai, jalankan kembali crawler dan pastikan crawler dikonfigurasi untuk memperbarui definisi tabel juga. Lihat partisi baru di konsol tersebut beserta pembaruan skema apa pun, saat crawler selesai. Untuk informasi selengkapnya, lihat Mengonfigurasi Crawler Menggunakan. API

  • Setelah tugas selesai, segera lihat skema yang sudah dimodifikasi di konsol tersebut, tanpa harus menjalankan ulang crawler. Anda dapat mengaktifkan fitur ini dengan menambahkan beberapa baris kode ke ETL skrip Anda, seperti yang ditunjukkan pada contoh berikut. Kode menggunakan enableUpdateCatalog yang diatur ke BETUL, dan juga updateBehavior yang diatur ke UPDATE_IN_DATABASE, yang menunjukkan untuk menimpa skema dan menambahkan partisi baru dalam Katalog Data selama eksekusi tugas.

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

Anda juga dapat mengatur nilai updateBehavior ke LOG jika Anda ingin mencegah skema tabel Anda agar tidak ditimpa, tapi masih ingin menambahkan partisi baru. Nilai default dari updateBehavior adalah UPDATE_IN_DATABASE, jadi jika Anda tidak secara eksplisit mendefinisikannya, maka skema tabel akan ditimpa.

Jika tidak enableUpdateCatalog disetel ke true, terlepas dari opsi mana pun yang dipilihupdateBehavior, ETL pekerjaan tidak akan memperbarui tabel di Katalog Data.

Membuat tabel baru

Anda juga dapat menggunakan opsi yang sama untuk membuat sebuah tabel baru di Katalog Data. Anda dapat menentukan basis data dan nama tabel baru dengan menggunakan setCatalogInfo.

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Pembatasan

Perhatikan pembatasan-pembatasan berikut ini:

  • Hanya target Amazon Simple Storage Service (Amazon S3) saja yang didukung.

  • enableUpdateCatalogFitur ini tidak didukung untuk tabel yang diatur.

  • Hanya format berikut ini didukung: json, csv, avro, dan parquet.

  • Untuk membuat atau memperbarui tabel dengan parquet klasifikasi, Anda harus menggunakan AWS Glue penulis parket yang dioptimalkan untuk DynamicFrames. Ini dapat dicapai dengan salah satu dari yang berikut:

    • Jika Anda memperbarui tabel yang ada dalam katalog dengan parquet klasifikasi, tabel harus memiliki properti "useGlueParquetWriter" tabel yang disetel ke true sebelum Anda memperbaruinya. Anda dapat mengatur properti ini melalui AWS Glue APIs/SDK, melalui konsol atau melalui pernyataan AthenaDDL.

      Bidang edit properti tabel katalog di AWS Glue konsol.

      Setelah properti tabel katalog diatur, Anda dapat menggunakan cuplikan kode berikut untuk memperbarui tabel katalog dengan data baru:

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • Jika tabel belum ada dalam katalog, Anda dapat menggunakan getSink() metode dalam skrip Anda connection_type="s3" untuk menambahkan tabel dan partisi ke katalog, bersama dengan menulis data ke Amazon S3. Berikan yang sesuai partitionKeys dan compression untuk alur kerja Anda.

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=True) s3sink.writeFrame(frameToWrite)
    • Nilai glueparquet format adalah metode warisan yang memungkinkan penulis AWS Glue parket.

  • Saat updateBehavior diatur ke LOG, partisi baru akan ditambahkan hanya jika skema DynamicFrame setara dengan atau berisi sebuah subset dari kolom yang didefinisikan dalam skema tabel Katalog Data.

  • Pembaruan skema tidak didukung untuk tabel yang tidak dipartisi (tidak menggunakan opsi "”partitionKeys).

  • Anda partitionKeys harus setara, dan dalam urutan yang sama, antara parameter yang diteruskan dalam ETL skrip Anda dan skema tabel Katalog Data Anda. partitionKeys

  • Fitur ini saat ini belum men-support pembaruan/pembuatan tabel di mana skema yang memperbarui bersarang (misalnya, array dalam struct).

Untuk informasi selengkapnya, lihat Pemrograman skrip Spark.