Conexões do Amazon DocumentDB
É possível usar o AWS Glue para Spark para ler e gravar em tabelas no Amazon DocumentDB. Você pode se conectar ao Amazon DocumentDB usando credenciais armazenadas no AWS Secrets Manager por meio de uma conexão do AWS Glue.
Para obter mais informações sobre o Amazon DocumentDB, consulte a Documentação do Amazon DocumentDB.
nota
No momento, os clusters elásticos do Amazon DocumentDB não são compatíveis com o uso do conector do AWS Glue. Para obter mais informações sobre clusters elásticos, consulte Using Amazon DocumentDB elastic clusters.
Ler e gravar em coleções do Amazon DocumentDB
nota
Ao criar um trabalho de ETL que se conecta ao Amazon DocumentDB, para a propriedade de trabalho Connections
, é necessário designar um objeto de conexão que especifica a nuvem privada virtual (VPC) em que o Amazon DocumentDB é executado. Para o objeto de conexão, o tipo de conexão deve ser JDBC
, e o JDBC URL
deve ser mongo://
.<DocumentDB_host>
:27017
nota
Estes exemplos de código foram desenvolvidos para o AWS Glue 3.0. Para migrar para o AWS Glue 4.0, consulte MongoDB. O parâmetro uri
foi alterado.
nota
Ao usar o Amazon DocumentDB, retryWrites
deve ser definido como falso em determinadas situações, como quando o documento escrito especifica _id
. Para obter mais informações, consulte Diferenças funcionais com o MongoDB na documentação do Amazon DocumentDB.
O script Python a seguir demonstra como usar tipos e opções de conexão para ler e gravar no Amazon DocumentDB.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } write_documentdb_options = { "retryWrites": "false", "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from DocumentDB dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()
O script Scala a seguir demonstra como usar tipos e opções de conexão para ler e gravar no Amazon DocumentDB.
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from DocumentDB val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to DocumentDB glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }
Referência de opções de conexão do Amazon DocumentDB
Designa uma conexão com o Amazon DocumentDB (compatível com MongoDB).
As opções de conexão diferem para uma conexão da fonte e uma conexão do coletor.
“connectionType”: “documentdb” como fonte
Use as seguintes opções de conexão com "connectionType": "documentdb"
como fonte:
-
"uri"
: (obrigatório) o host do Amazon DocumentDB do qual fazer a leitura, formatado comomongodb://<host>:<port>
. -
"database"
: (obrigatório) o banco de dados do Amazon DocumentDB do qual fazer a leitura. -
"collection"
: (obrigatório) a coleção do Amazon DocumentDB da qual fazer a leitura. -
"username"
: (obrigatório) o nome de usuário do Amazon DocumentDB. -
"password"
: (obrigatório) a senha do Amazon DocumentDB. -
"ssl"
: (obrigatório se estiver usando SSL) se sua conexão usar SSL, você deve incluir essa opção com o valor"true"
. -
"ssl.domain_match"
: (obrigatório se estiver usando SSL) se sua conexão usar SSL, você deve incluir essa opção com o valor"false"
. -
"batchSize"
: (opcional) o número de documentos que devem ser retornados por lote, usado no cursor de lotes internos. -
"partitioner"
: (opcional) o nome da classe do particionador do qual deve ser feita a leitura dos dados de entrada do Amazon DocumentDB. O conector fornece os seguintes particionadores:-
MongoDefaultPartitioner
(padrão) (sem suporte no AWS Glue 4.0) -
MongoSamplePartitioner
(sem suporte no AWS Glue 4.0) -
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
(sem suporte no AWS Glue 4.0)
-
-
"partitionerOptions"
: (opcional) opções para o particionador designado. As seguintes opções são compatíveis com cada particionador:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
, partitionSizeMB -
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
, partitionSizeMB
Para obter mais informações sobre essas opções, consulte Partitioner Configuration
na documentação do MongoDB. -
“connectionType”: “documentdb” como coletor
Use as seguintes opções de conexão com "connectionType": "documentdb"
como coletor:
-
"uri"
: (obrigatório) o host do Amazon DocumentDB no qual fazer a gravação, formatado comomongodb://<host>:<port>
. -
"database"
: (obrigatório) o banco de dados do Amazon DocumentDB no qual fazer a gravação. -
"collection"
: (obrigatório) a coleção do Amazon DocumentDB na qual fazer a gravação. -
"username"
: (obrigatório) o nome de usuário do Amazon DocumentDB. -
"password"
: (obrigatório) a senha do Amazon DocumentDB. -
"extendedBsonTypes"
: (opcional) setrue
, permite tipos BSON estendidos ao gravar dados no Amazon DocumentDB. O padrão étrue
. -
"replaceDocument"
: (opcional) se fortrue
, substituirá todo o documento ao salvar conjuntos de dados que contêm um campo_id
. Se forfalse
, serão atualizados somente os campos do documento que correspondam aos campos do conjunto de dados. O padrão étrue
. -
"maxBatchSize"
: (opcional) o tamanho máximo do lote para operações em massa ao salvar dados. O padrão é 512. -
"retryWrites"
: (Opcional): repita automaticamente determinadas operações de escrita uma única vez se o AWS Glue encontrar um erro de rede.