Conexiones a Amazon DocumentDB
Puede utilizar AWS Glue para Spark para leer y escribir en tablas en bases de datos de Amazon DocumentDB. Puede conectar Amazon DocumentDB con las credenciales almacenadas en AWS Secrets Manager a través de una conexión de AWS Glue.
Para obtener más información sobre Amazon DocumentDB, consulte la documentación de Amazon DocumentDB.
nota
Los clústeres elásticos de Amazon DocumentDB no se admiten actualmente cuando se utiliza el conector Glue AWS. Para obtener más información sobre los clústeres elásticos, consulte Uso de clústeres elásticos de Amazon DocumentDB.
Lectura y escritura en las colecciones de Amazon DocumentDB
nota
Cuando crea un trabajo de ETL que se conecta a Amazon DocumentDB, para la propiedad del trabajo Connections
, debe designar un objeto de conexión que especifique la nube privada virtual (VPC) en la que se ejecuta Amazon DocumentDB. Para el objeto de conexión, el tipo de conexión debe ser JDBC
y el JDBC URL
debe ser mongo://
.<DocumentDB_host>
:27017
nota
Estos ejemplos de código fueron desarrollados para AWS Glue 3.0. Para migrar a AWS Glue 4.0, consulte MongoDB. El parámetro uri
ha cambiado.
nota
Cuando se utiliza Amazon DocumentDB, retryWrites
debe configurarse en falso en determinadas situaciones, como cuando el documento escrito lo especifica _id
. Para obtener más información, consulte Diferencias funcionales con MongoDB en la documentación de Amazon DocumentDB.
El siguiente script de Python muestra el uso de tipos de conexión y opciones de conexión para leer y escribir en Amazon DocumentDB.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } write_documentdb_options = { "retryWrites": "false", "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from DocumentDB dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()
El siguiente script de Scala muestra el uso de tipos de conexión y opciones de conexión para leer y escribir en Amazon DocumentDB.
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from DocumentDB val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to DocumentDB glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }
Referencia de opción de conexión de Amazon DocumentDB
Designa una conexión a Amazon DocumentDB (con compatibilidad con MongoDB).
Las opciones de conexión son distintas entre una conexión de origen y una conexión de receptor.
"connectionType": "Documentdb" como origen
Utilice las siguientes opciones de conexión con "connectionType": "documentdb"
como origen:
-
"uri"
: (obligatorio) el host de Amazon DocumentDB del que se va a leer, con formatomongodb://<host>:<port>
. -
"database"
: (obligatorio) la base de datos de Amazon DocumentDB de la que se va a leer. -
"collection"
: (obligatorio) la recopilación de Amazon DocumentDB de la que se va a leer. -
"username"
: (obligatorio) nombre de usuario de Amazon DocumentDB. -
"password"
: (obligatorio) la contraseña de Amazon DocumentDB. -
"ssl"
: (obligatorio si usa SSL) si su conexión usa SSL, debe incluir esta opción con el valor"true"
. -
"ssl.domain_match"
: (obligatorio si usa SSL) si su conexión usa SSL, debe incluir esta opción con el valor"false"
. -
"batchSize"
: (opcional): el número de documentos que se deben devolver por lote, que se utilizan dentro del cursor de lotes internos. -
"partitioner"
: (opcional): el nombre de la clase del particionador para leer los datos de entrada de Amazon DocumentDB. El conector proporciona los siguientes particionadores:-
MongoDefaultPartitioner
(predeterminado) (No compatible con AWS Glue 4.0) -
MongoSamplePartitioner
(No es compatible con AWS Glue 4.0) -
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
(No es compatible con AWS Glue 4.0)
-
-
"partitionerOptions"
( opcional): opciones para el particionador designado. Se admiten las siguientes opciones para cada particionador:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
, partitionSizeMB -
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
, partitionSizeMB
Para obtener más información acerca de estas opciones, consulte Partitioner Configuration
en la documentación de MongoDB. -
"connectionType": "Documentdb" como receptor
Utilice las siguientes opciones de conexión con "connectionType": "documentdb"
como receptor:
-
"uri"
: (obligatorio) el host de Amazon DocumentDB al que se va a escribir, con formatomongodb://<host>:<port>
. -
"database"
: (obligatorio) la base de datos de Amazon DocumentDB a la que se va a escribir. -
"collection"
: (obligatorio) la recopilación de Amazon DocumentDB a la que se va a escribir. -
"username"
: (obligatorio) nombre de usuario de Amazon DocumentDB. -
"password"
: (obligatorio) la contraseña de Amazon DocumentDB. -
"extendedBsonTypes"
: (opcional) si se establece entrue
, permite los tipos de BSON extendidos al escribir datos en Amazon DocumentDB. El valor predeterminado estrue
. -
"replaceDocument"
: (opcional) si estrue
, reemplaza todo el documento al guardar conjuntos de datos que contienen un campo_id
. Si esfalse
, solo se actualizan los campos del documento que coinciden con los campos del conjunto de datos. El valor predeterminado estrue
. -
"maxBatchSize"
: (opcional): el tamaño máximo del lote para operaciones en bloque al guardar datos. El valor predeterminado es 512. -
"retryWrites"
: (Opcional): reintenta automáticamente determinadas operaciones de escritura una sola vez si AWS Glue detecta un error de red.