本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Amazon DocumentDB 連線
可以使用 AWS Glue for Spark 在 Amazon DocumentDB 中讀取和寫入資料表。可以使用透過 AWS Glue 連線儲存在 AWS Secrets Manager 中的憑證來連線至 Amazon DocumentDB。
如需 Amazon DocumentDB 的詳細資訊,請參閱 Amazon DocumentDB 文件。
注意
使用 AWS Glue 連接器時,目前不支援 Amazon DocumentDB 彈性叢集。如需有關彈性叢集的詳細資訊,請參閱 Using Amazon DocumentDB elastic clusters。
讀取和寫入 Amazon DocumentDB 集合
注意
建立連接至 Amazon DocumentDB 的 ETL 任務時,如為 Connections
任務屬性,您必須指定連線物件,以指定執行 Amazon DocumentDB 的 Virtual Private Cloud (VPC)。如為連線物件,連線類型必須為 JDBC
,而 JDBC URL
必須為 mongo://
。<DocumentDB_host>
:27017
注意
這些程式碼範例專為 AWS Glue 3.0 而開發。若要遷移至 AWS Glue 4.0,請參閱 MongoDB。uri
參數已變更。
注意
使用 Amazon DocumentDB 時,在某些情況下 retryWrites
必須設定為 false,例如當編寫的文件指定 _id
時。如需詳細資訊,請參閱 Amazon DocumentDB 文件中的 MongoDB 的功能差異。
下列 Python 指令碼示範使用連線類型和連線選項,以讀取和寫入至 Amazon DocumentDB。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } write_documentdb_options = { "retryWrites": "false", "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from DocumentDB dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()
下列 Scala 指令碼示範使用連線類型和連線選項,以讀取和寫入至 Amazon DocumentDB。
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from DocumentDB val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to DocumentDB glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }
Amazon DocumentDB 連線選項參考
指定 Amazon DocumentDB (with MongoDB compatibility) 的連線。
來源連線和接收器連線的連線選項不同。
"connectionType": "Documentdb" as source
使用下列有 "connectionType": "documentdb"
的連線選項作為來源:
-
"uri"
:(必要) 讀取的 Amazon DocumentDB 主機,格式為mongodb://<host>:<port>
。 -
"database"
:(必要) 要從中讀取的 Amazon DocumentDB 資料庫。 -
"collection"
:(必要) 要從中讀取的 Amazon DocumentDB 集合。 -
"username"
:(必要) Amazon DocumentDB 使用者名稱。 -
"password"
:(必要) Amazon DocumentDB 密碼。 -
"ssl"
:(如果使用 SSL,則為必要) 如果您的連線使用 SSL,則必須加入此選項並具備值"true"
。 -
"ssl.domain_match"
:(如果使用 SSL,則為必要) 如果您的連線使用 SSL,則必須加入此選項並具備值"false"
。 -
"batchSize"
:(選用):每個批次傳回的文件數目,於內部批次的游標內使用。 -
"partitioner"
:(選用):從 Amazon DocumentDB 讀取輸入資料的分割區類別名稱。連接器提供下列分割區:-
MongoDefaultPartitioner
(預設) (不支援 AWS Glue 4.0) -
MongoSamplePartitioner
(不支援 AWS Glue 4.0) -
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
(不支援 AWS Glue 4.0)
-
-
"partitionerOptions"
(選用):指定分割區的選項。每個分割區都支援下列選項:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
,partitionSizeMB -
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
,partitionSizeMB
如需有關這些選項的詳細資訊,請參閱 MongoDB 文件中的分割區組態
。 -
"connectionType": "Documentdb" as sink
使用下列有 "connectionType": "documentdb"
的連線選項作為接收器:
-
"uri"
:(必要) 寫入的 Amazon DocumentDB 主機,格式為mongodb://<host>:<port>
。 -
"database"
:(必要) 要寫入的 Amazon DocumentDB 資料庫。 -
"collection"
:(必要) 要寫入的 Amazon DocumentDB 集合。 -
"username"
:(必要) Amazon DocumentDB 使用者名稱。 -
"password"
:(必要) Amazon DocumentDB 密碼。 -
"extendedBsonTypes"
:(選用) 如果true
,在寫入資料到 Amazon DocumentDB 時允許延伸的 BSON 類型。預設值為true
。 -
"replaceDocument"
:(選用) 如果true
,則在儲存包含_id
欄位的資料集時取代整個文件。若為false
,則文件中僅有與資料集中欄位相符的欄位會更新。預設值為true
。 -
"maxBatchSize"
:(選用):儲存資料時大量操作的批次大小上限。預設為 512。 -
"retryWrites"
:(選用) 如果 AWS Glue 發生網路錯誤,系統會自動重試特定寫入操作一次。