Koneksi Amazon DocumentDB - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Koneksi Amazon DocumentDB

Anda dapat menggunakan AWS Glue for Spark untuk membaca dan menulis ke tabel di Amazon DocumentDB. Anda dapat terhubung ke Amazon DocumentDB menggunakan kredensil yang disimpan melalui koneksi Glue. AWS Secrets Manager AWS

Untuk informasi selengkapnya tentang Amazon DocumentDB, lihat dokumentasi Amazon DocumentDB.

catatan

Cluster elastis Amazon DocumentDB saat ini tidak didukung saat menggunakan konektor Glue. AWS Untuk informasi selengkapnya tentang cluster elastis, lihat Menggunakan cluster elastis Amazon DocumentDB.

Membaca dan menulis ke koleksi Amazon DocumentDB

catatan

Bila Anda membuat tugas ETL yang menghubungkan ke Amazon DocumentDB, untuk properti tugas Connections, Anda harus mengkhususkan sebuah objek koneksi yang menentukan virtual private cloud (VPC) di mana Amazon DocumentDB berjalan. Untuk objek koneksi, jenis koneksinya harus JDBC, dan JDBC URL harus mongo://<DocumentDB_host>:27017.

catatan

Sampel kode ini dikembangkan untuk AWS Glue 3.0. Untuk bermigrasi ke AWS Glue 4.0, konsultasikanMongoDB. uriParameter telah berubah.

catatan

Saat menggunakan Amazon DocumentDBretryWrites, harus disetel ke false dalam situasi tertentu, seperti saat dokumen yang ditulis menentukan. _id Untuk informasi lebih lanjut, lihat Perbedaan Fungsional dengan MongoDB di dokumentasi Amazon DocumentDB.

Skrip Python berikut menunjukkan menggunakan jenis koneksi dan opsi koneksi untuk membaca dan menulis ke Amazon DocumentDB.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } write_documentdb_options = { "retryWrites": "false", "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from DocumentDB dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()

Skrip Scala berikut menunjukkan menggunakan jenis koneksi dan opsi koneksi untuk membaca dan menulis ke Amazon DocumentDB.

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from DocumentDB val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to DocumentDB glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }

Referensi opsi koneksi Amazon DocumentDB

Mengkhususkan koneksi ke Amazon DocumentDB (dengan kompatibilitas MongoDB).

Pilihan koneksi berbeda untuk koneksi sumber dan koneksi sink.

“ConnectionType”: “Documentdb” sebagai sumber

Gunakan opsi koneksi berikut dengan "connectionType": "documentdb" sebagai sumber:

  • "uri": (Wajib) Host Amazon DocumentDB tempat untuk membaca, diformat sebagai mongodb://<host>:<port>.

  • "database": (Wajib) Basis data Amazon DocumentDB tempat untuk membaca.

  • "collection": (Wajib) Koleksi Amazon DocumentDB tempat untuk membaca.

  • "username": (Wajib) Nama pengguna Amazon DocumentDB.

  • "password": (Wajib) Kata sandi Amazon DocumentDB.

  • "ssl": (Wajib jika menggunakan SSL) Jika koneksi Anda menggunakan SSL, maka Anda harus menyertakan opsi ini dengan nilai "true".

  • "ssl.domain_match": (Wajib jika menggunakan SSL) Jika koneksi Anda menggunakan SSL, maka Anda harus menyertakan opsi ini dengan nilai "false".

  • "batchSize": (Opsional): Jumlah dokumen yang akan dikembalikan per batch, digunakan dalam kursor batch internal.

  • "partitioner": (Opsional): Nama kelas pemartisi untuk membaca input data dari Amazon DocumentDB. Konektor menyediakan pemartisi berikut:

    • MongoDefaultPartitioner(default) (Tidak didukung di AWS Glue 4.0)

    • MongoSamplePartitioner(Tidak didukung di AWS Glue 4.0)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner(Tidak didukung di AWS Glue 4.0)

  • "partitionerOptions" (Opsional): Opsi untuk pemartisi yang ditunjuk. Opsi berikut didukung untuk setiap pemartisi:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    Untuk informasi lebih lanjut tentang opsi ini, lihat Konfigurasi Partisi dalam dokumentasi MongoDB.

“ConnectionType”: “Documentdb” sebagai wastafel

Gunakan opsi koneksi berikut dengan "connectionType": "documentdb" sebagai sink:

  • "uri": (Wajib) Host Amazon DocumentDB tempat untuk menulis, diformat sebagai mongodb://<host>:<port>.

  • "database": (Wajib) Basis data Amazon DocumentDB tempat untuk menulis.

  • "collection": (Wajib) Koleksi Amazon DocumentDB tempat untuk menulis.

  • "username": (Wajib) Nama pengguna Amazon DocumentDB.

  • "password": (Wajib) Kata sandi Amazon DocumentDB.

  • "extendedBsonTypes": (Opsional) Jika true, memungkinkan jenis BSON diperpanjang saat menulis data ke Amazon DocumentDB. Defaultnya adalah true.

  • "replaceDocument": (Opsional) Jika true, menggantikan seluruh dokumen ketika menyimpan set data yang berisi bidang _id. Jika false, hanya bidang dalam dokumen yang cocok dengan bidang dalam set data saja yang diperbarui. Defaultnya adalah true.

  • "maxBatchSize": (Opsional): Ukuran batch maksimum untuk operasi massal saat menyimpan data. Default-nya adalah 512.

  • "retryWrites": (Opsional): Secara otomatis mencoba kembali operasi penulisan tertentu satu kali jika AWS Glue menemukan kesalahan jaringan.