Amazon DocumentDB の接続
Spark 用の AWS Glue を使用して Amazon DocumentDB 内のテーブルに対する読み込みと書き込みを行うことができます。AWS Glue 接続を介して AWS Secrets Manager で保存されている認証情報を使用して Amazon DocumentDB に接続できます。
Amazon DocumentDB の詳細については、「Amazon DocumentDB のドキュメント」を参照してください。
注記
Amazon DocumentDB エラスティッククラスターは、現在 AWS Glue コネクタを使用する場合にはサポートされていません。エラスティッククラスターの詳細については、「Amazon DocumentDB エラスティッククラスターの使用」を参照してください。
Amazon DocumentDB コレクションへの読み取りと書き込み
注記
Amazon DocumentDB に接続する ETL ジョブを作成する際、Connections
ジョブのプロパティにおいて、Amazon DocumentDB が実行されている Virtual Private Cloud (VPC) を特定するための、接続オブジェクトを指定する必要があります。接続オブジェクトの場合、接続タイプは JDBC
で、JDBC URL
は mongo://
である必要があります。<DocumentDB_host>
:27017
注記
これらのコードサンプルは AWS Glue 3.0 用に開発されました。AWS Glue 4.0 への移行については、MongoDB を参照してください。uri
パラメータが変更されました。
注記
Amazon DocumentDB を使用する場合、書かれた文書に _id
が指定されている場合など、特定の状況では、retryWrites
は false に設定する必要があります。詳細については、Amazon DocumentDB ドキュメントの「MongoDB との機能の違い」を参照してください。
次の Python スクリプトでは、Amazon DocumentDB に対する読み取りと書き込みのための、接続タイプと接続オプションの使用方法を示しています。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } write_documentdb_options = { "retryWrites": "false", "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from DocumentDB dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()
次の Scala スクリプトでは、Amazon DocumentDB に対する読み取りと書き込みのための、接続タイプと接続オプションの使用方法を示しています。
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from DocumentDB val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to DocumentDB glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }
Amazon DocumentDB 接続のオプションのリファレンス
Amazon DocumentDB (MongoDB 互換) への接続を指定します。
接続オプションは、ソース接続とシンク接続とで異なります。
"connectionType": "Documentdb" ソースとする
"connectionType": "documentdb"
をソースとして、次の接続オプションを使用します。
-
"uri"
: (必須) 読み取り元の Amazon DocumentDB ホスト (mongodb://<host>:<port>
形式)。 -
"database"
: (必須) 読み取り元の Amazon DocumentDB データベース。 -
"collection"
: (必須) 読み取り元の Amazon DocumentDB コレクション。 -
"username"
: (必須) Amazon DocumentDB のユーザー名。 -
"password"
: (必須) Amazon DocumentDB のパスワード。 -
"ssl"
: (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を"true"
に設定する必要があります。 -
"ssl.domain_match"
: (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を"false"
に設定する必要があります。 -
"batchSize"
: (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。 -
"partitioner"
: (オプション) Amazon DocumentDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。-
MongoDefaultPartitioner
(デフォルト) (AWS Glue 4.0 ではサポートされていません) -
MongoSamplePartitioner
(AWS Glue 4.0 ではサポートされていません) -
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
(AWS Glue 4.0 ではサポートされていません)
-
-
"partitionerOptions"
(オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
、partitionSizeMB -
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
、partitionSizeMB
これらのオプションの詳細については、MongoDB のドキュメントの「Partitioner Configuration
」を参照してください。 -
"connectionType": "documentdb" as Sink
"connectionType": "documentdb"
をシンクとして、次の接続オプションを使用します。
-
"uri"
: (必須) 書き込み先の Amazon DocumentDB ホスト (mongodb://<host>:<port>
形式)。 -
"database"
: (必須) 書き込み先の Amazon DocumentDB データベース。 -
"collection"
: (必須) 書き込み先の Amazon DocumentDB コレクション。 -
"username"
: (必須) Amazon DocumentDB のユーザー名。 -
"password"
: (必須) Amazon DocumentDB のパスワード。 -
"extendedBsonTypes"
: (オプション)true
が指定されている場合、Amazon DocumentDB へのデータ書き込み時に拡張 BSON 型を使用できます。デフォルト:true
。 -
"replaceDocument"
: (オプション)true
の場合、_id
フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。false
の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルト:true
。 -
"maxBatchSize"
: (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。 -
"retryWrites"
: (オプション): AWS Glue でネットワークエラーが発生した場合、特定の書き込みオペレーションを 1 回自動的に再試行します。