Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Transacciones en Amazon DocumentDB
Amazon DocumentDB (con compatibilidad con MongoDB) admite ahora la compatibilidad con MongoDB 4.0, incluidas las transacciones. Puede realizar transacciones en varios documentos, estados de cuenta, colecciones y bases de datos. Las transacciones simplifican el desarrollo de aplicaciones al permitirle realizar operaciones atómicas, consistentes, aisladas y duraderas (ACID) en uno o más documentos dentro de un clúster de Amazon DocumentDB. Los casos de uso más comunes de las transacciones incluyen el procesamiento financiero, el cumplimiento y la gestión de pedidos y la creación de juegos para varios jugadores.
No hay ningún costo adicional para las transacciones. Solo paga por los dispositivos iOS de lectura y escritura que consuma como parte de las transacciones.
Requisitos
Para utilizar la función de transacciones, debe cumplir los siguientes requisitos:
Prácticas recomendadas
Estas son algunas de las prácticas recomendadas para que pueda aprovechar al máximo las transacciones con Amazon DocumentDB.
-
Confirme o cancele siempre la transacción una vez que se haya completado. Dejar una transacción incompleta agota los recursos de la base de datos y puede provocar conflictos de escritura.
-
Se recomienda limitar las transacciones al menor número de comandos necesario. Si tiene transacciones con varios estados de cuenta que se pueden dividir en varias transacciones más pequeñas, es recomendable hacerlo para reducir la probabilidad de que se agote el tiempo de espera. Procure siempre de crear transacciones cortas, no lecturas prolongadas.
Limitaciones
-
Amazon DocumentDB no admite cursores en una transacción.
-
Amazon DocumentDB no puede crear nuevas colecciones en una transacción ni realizar consultas ni actualizar colecciones no existentes.
-
Los bloqueos de escritura a nivel de documento están sujetos a un tiempo de espera de 1 minuto, que el usuario no puede configurar.
-
Los comandos de escritura reintentable, confirmación reintentable y cancelación reintentables no son compatibles con Amazon DocumentDB. Si utiliza el intérprete de comandos de mongo heredado (no mongosh), no incluya el comando retryWrites=false
en ninguna cadena de código. El reintento de las escrituras está desactivado de forma predeterminada. Incluir retryWrites=false
podría provocar un error en comandos de lectura normales.
-
Cada instancia de Amazon DocumentDB tiene un límite superior en el número de transacciones simultáneas abiertas en la instancia a la vez. Para conocer los límites, consulte Límites de instancias.
-
Para una transacción determinada, el tamaño del registro de transacciones debe ser inferior a 32 MB.
-
Amazon DocumentDB admite count()
dentro de transacciones, pero no todos los controladores admiten esta capacidad. Una alternativa es utilizar la API countDocuments()
, que convierte la consulta de recuento en una consulta de agregación en el lado del cliente.
-
Las transacciones tienen un límite de ejecución de un minuto y las sesiones tienen un tiempo de espera de 30 minutos. Si se agota el tiempo de espera de una transacción, se cancelará y cualquier comando posterior que se ejecute dentro de la sesión para la transacción existente generará el siguiente error:
WriteCommandError({
"ok" : 0,
"operationTime" : Timestamp(1603491424, 627726),
"code" : 251,
"errmsg" : "Given transaction number 0 does not match any in-progress transactions."
}
Supervisión y diagnóstico
Con la compatibilidad con las transacciones en Amazon DocumentDB 4.0, se agregaron métricas de CloudWatch adicionales para ayudarle a monitorear sus transacciones.
Nuevas métricas de CloudWatch
-
DatabaseTransactions
: el número de transacciones abiertas realizadas en un período de un minuto.
-
DatabaseTransactionsAborted
: el número de transacciones canceladas realizadas en un período de un minuto.
-
DatabaseTransactionsMax
: el número máximo de transacciones abiertas realizadas en un período de un minuto.
-
TransactionsAborted
: el número de transacciones canceladas en una instancia en un período de un minuto.
-
TransactionsCommitted
: el número de transacciones confirmadas en una instancia en un período de un minuto.
-
TransactionsOpen
: el número de transacciones abiertas en una instancia en un período de un minuto.
-
TransactionsOpenMax
: el número máximo de transacciones abiertas en una instancia en un período de un minuto.
-
TransactionsStarted
: el número de transacciones canceladas en una instancia en un período de un minuto.
Además, se agregaron nuevos campos a currentOp
lsid
, transactionThreadId
, y un nuevo estado para “idle transaction
” y las transacciones serverStatus
: currentActive
, currentInactive
, currentOpen
, totalAborted
, totalCommitted
y totalStarted
.
Nivel de aislamiento de transacciones
Al iniciar una transacción, puede especificar tanto readConcern
como writeConcern
, de la forma como se muestra en el siguiente ejemplo:
mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});
Para readConcern
, Amazon DocumentDB admite el aislamiento de instantáneas de forma predeterminada. Si se especifica una readConcern
local, disponible o mayoritario, Amazon DocumentDB actualizará el nivel de readConcern
a instantánea. Amazon DocumentDB no admite el readConcern
linealizable y, si se especifica una preocupación de lectura de este tipo, se producirá un error.
Para writeConcern
, Amazon DocumentDB admite la mayoría de forma predeterminada y se logra un quórum de escritura cuando se conservan cuatro copias de los datos en tres zonas de disponibilidad. Si se especifica una writeConcern
inferior, Amazon DocumentDB la actualizará la writeConcern
a la mayoría. Además, todas las escrituras de Amazon DocumentDB se registran en un diario y no se puede deshabilitar.
Casos de uso
En esta sección, analizaremos dos casos de uso de las transacciones: varias declaraciones y varias recopilaciones.
Transacciones con varios estados de cuenta
Las transacciones de Amazon DocumentDB son de varios estados, lo que significa que puede escribir una transacción que abarque varios estados con una confirmación o reversión explícita. Puede agrupar acciones de insert
, update
, delete
y findAndModify
como una sola operación atómica.
Un caso de uso común para las transacciones con varios estados de cuenta es una transacción de débito-crédito. Por ejemplo: le debe dinero a un amigo de unas prendas de ropa. Por lo tanto, debe cargar (retirar) $500 de su cuenta y acreditar $500 (depósito) en la cuenta de su amigo. Para realizar esa operación, debe realizar las operaciones de deuda y crédito en una sola transacción para garantizar la atomicidad. Si lo hace, evitará que se debiten $500 de su cuenta, pero no se acrediten en la cuenta de su amigo. Este es el aspecto que tendría este caso de uso:
// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
session.commitTransaction();
accountColl.find();
// *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
session.abortTransaction();
Transacciones de varias recopilaciones
Nuestras transacciones también son de cobro múltiple, lo que significa que se pueden utilizar para realizar múltiples operaciones dentro de una sola transacción y en varios cobros. Esto proporciona una visión coherente de los datos y mantiene la integridad de los mismos. Cuando se ejecutan los comandos como un solo <>
, las transacciones son ejecuciones de todo o nada, es decir, todas se ejecutarán correctamente o todas fallarán.
A continuación, se muestra un ejemplo de transacciones de varios cobros, en las que se utiliza el mismo escenario y los mismos datos del ejemplo para las transacciones con varios estados de cuenta.
// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var amountToTransfer = 500;
var collectionName = "account";
var session = db.getMongo().startSession({causalConsistency: false});
var accountCollInBankA = session.getDatabase("bankA")[collectionName];
var accountCollInBankB = session.getDatabase("bankB")[collectionName];
accountCollInBankA.drop();
accountCollInBankB.drop();
accountCollInBankA.insert({name: "Alice", balance: 1000});
accountCollInBankB.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
session.commitTransaction();
accountCollInBankA.find(); // Alice holds $500 in bankA
accountCollInBankB.find(); // Bob holds $1500 in bankB
// *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var accountCollInBankA = session.getDatabase("bankA")[collectionName];
var accountCollInBankB = session.getDatabase("bankB")[collectionName];
accountCollInBankA.drop();
accountCollInBankB.drop();
accountCollInBankA.insert({name: "Alice", balance: 1000});
accountCollInBankB.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
var newAliceBalance = aliceBalance - amountToTransfer;
accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance;
// add $500 to Bob's account
var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance;
session.abortTransaction();
accountCollInBankA.find(); // Alice holds $1000 in bankA
accountCollInBankB.find(); // Bob holds $1000 in bankB
Ejemplos de API de transacciones para la API de devolución de llamada
La API de devolución de llamada solo está disponible para controladores 4,2 o más.
- Javascript
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Javascript.
// *** Transfer $500 from Alice to Bob inside a transaction: Success ***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert.eq(newAliceBalance, findAliceBalance);
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
assert.eq(newBobBalance, findBobBalance);
session.commitTransaction();
accountColl.find();
- Node.js
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Node.js.
// Node.js callback API:
const bankDB = await mongoclient.db("bank");
var accountColl = await bankDB.createCollection("account");
var amountToTransfer = 500;
const session = mongoclient.startSession({causalConsistency: false});
await accountColl.drop();
await accountColl.insertOne({name: "Alice", balance: 1000}, { session });
await accountColl.insertOne({name: "Bob", balance: 1000}, { session });
const transactionOptions = {
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' }
};
// deduct $500 from Alice's account
var aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
assert(aliceBalance.balance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
session.startTransaction(transactionOptions);
await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session });
await session.commitTransaction();
aliceBalance = await accountColl.findOne({name: "Alice"}, {session});
assert(newAliceBalance == aliceBalance.balance);
// add $500 to Bob's account
var bobBalance = await accountColl.findOne({name: "Bob"}, {session});
var newBobBalance = bobBalance.balance + amountToTransfer;
session.startTransaction(transactionOptions);
await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session });
await session.commitTransaction();
bobBalance = await accountColl.findOne({name: "Bob"}, {session});
assert(newBobBalance == bobBalance.balance);
- C#
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con C#.
// C# Callback API
var dbName = "bank";
var collName = "account";
var amountToTransfer = 500;
using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
{
var bankDB = client.GetDatabase(dbName);
var accountColl = bankDB.GetCollection<BsonDocument>(collName);
bankDB.DropCollection(collName);
accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
// start transaction
var transactionOptions = new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority);
var result = session.WithTransaction(
(sess, cancellationtoken) =>
{
// deduct $500 from Alice's account
var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"),
Builders<BsonDocument>.Update.Set("balance", newAliceBalance));
aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance == newAliceBalance);
// add $500 from Bob's account
var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"),
Builders<BsonDocument>.Update.Set("balance", newBobBalance));
bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(bobBalance == newBobBalance);
return "Transaction committed";
}, transactionOptions);
// check values outside of transaction
var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceNewBalance == 500);
Debug.Assert(bobNewBalance == 1500);
}
- Ruby
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Ruby.
// Ruby Callback API
dbName = "bank"
collName = "account"
amountToTransfer = 500
session = client.start_session(:causal_consistency=> false)
bankDB = Mongo::Database.new(client, dbName)
accountColl = bankDB[collName]
accountColl.drop()
accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
# start transaction
session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do
# deduct $500 from Alice's account
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert aliceBalance >= amountToTransfer
newAliceBalance = aliceBalance - amountToTransfer
accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance']
assert_equal(newAliceBalance, aliceBalance)
# add $500 from Bob's account
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
newBobBalance = bobBalance + amountToTransfer
accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
assert_equal(newBobBalance, bobBalance)
end
# check results outside of transaction
aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
assert_equal(aliceBalance, 500)
assert_equal(bobBalance, 1500)
session.end_session
- Go
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Go.
// Go - Callback API
type Account struct {
Name string
Balance int
}
ctx := context.TODO()
dbName := "bank"
collName := "account"
amountToTransfer := 500
session, err := client.StartSession(options.Session().SetCausalConsistency(false))
assert.NilError(t, err)
defer session.EndSession(ctx)
bankDB := client.Database(dbName)
accountColl := bankDB.Collection(collName)
accountColl.Drop(ctx)
_, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000})
_, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000})
transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
_, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) {
var result Account
// deduct $500 from Alice's account
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance := result.Balance
newAliceBalance := aliceBalance - amountToTransfer
_, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance = result.Balance
assert.Equal(t, aliceBalance, newAliceBalance)
// add $500 to Bob's account
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
newBobBalance := bobBalance + amountToTransfer
_, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance = result.Balance
assert.Equal(t, bobBalance, newBobBalance)
if err != nil {
return nil, err
}
return "transaction committed", err
}, transactionOptions)
// check results outside of transaction
var result Account
err = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result)
aliceNewBalance := result.Balance
err = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
bobNewBalance := result.Balance
assert.Equal(t, aliceNewBalance, 500)
assert.Equal(t, bobNewBalance, 1500)
// Go - Core API
type Account struct {
Name string
Balance int
}
func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error {
amountToTransfer := 500
transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
if err := sessionContext.StartTransaction(transactionOptions); err != nil {
panic(err)
}
var result Account
// deduct $500 from Alice's account
err := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance := result.Balance
newAliceBalance := aliceBalance - amountToTransfer
_, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}})
if err != nil {
sessionContext.AbortTransaction(sessionContext)
}
err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result)
aliceBalance = result.Balance
assert.Equal(t, aliceBalance, newAliceBalance)
// add $500 to Bob's account
err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
newBobBalance := bobBalance + amountToTransfer
_, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}})
if err != nil {
sessionContext.AbortTransaction(sessionContext)
}
err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result)
bobBalance = result.Balance
assert.Equal(t, bobBalance, newBobBalance)
err = sessionContext.CommitTransaction(sessionContext)
return err
}
func doTransactionWithRetry(t *testing.T) {
ctx := context.TODO()
dbName := "bank"
collName := "account"
bankDB := client.Database(dbName)
accountColl := bankDB.Collection(collName)
client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error {
accountColl.Drop(ctx)
accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000})
accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000})
for {
err := transferMoneyWithRetry(sessionContext, accountColl, t)
if err == nil {
println("transaction committed")
return nil
}
if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") {
continue
}
println("transaction failed")
return err
}
})
// check results outside of transaction
var result Account
accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&esult)
aliceBalance := result.Balance
assert.Equal(t, aliceBalance, 500)
accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result)
bobBalance := result.Balance
assert.Equal(t, bobBalance, 1500)
}
- Java
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Java.
// Java (sync) - Callback API
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
accountColl.drop();
int amountToTransfer = 500;
// add sample data
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
clientSession.withTransaction(new TransactionBody<Void>() {
@Override
public Void execute() {
// deduct $500 from Alice's account
List<Document> documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int aliceBalance = (int) documentList.get(0).get("balance");
int newAliceBalance = aliceBalance - amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
// check Alice's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
int bobBalance = (int) documentList.get(0).get("balance");
int newBobBalance = bobBalance + amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
// check Bob's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
return null;
}
}, txnOptions);
}
- C
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con C.
// Sample Code for C with Callback
#include <bson.h>
#include <mongoc.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
typedef struct {
int64_t balance;
bson_t *account;
bson_t *opts;
mongoc_collection_t *collection;
} ctx_t;
bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
bool r = true;
ctx_t *data = (ctx_t *) ctx;
bson_t local_reply;
bson_t *selector = data->account;
bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}");
mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error);
*reply = bson_copy (&local_reply);
bson_destroy (&local_reply);
bson_destroy (update);
return r;
}
void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
bson_t reply;
bool r = true;
const bson_t *doc;
bson_iter_t iter;
ctx_t alice_ctx;
ctx_t bob_ctx;
bson_error_t error;
// find query
bson_t *alice_query = bson_new ();
BSON_APPEND_UTF8(alice_query, "name", "Alice");
bson_t *bob_query = bson_new ();
BSON_APPEND_UTF8(bob_query, "name", "Bob");
// create session
// set causal consistency to false
mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_causal_consistency (session_opts, false);
// start the session
mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
// add session to options
bson_t *opts = bson_new();
mongoc_client_session_append (client_session, opts, &error);
// deduct 500 from Alice
// find account balance of Alice
mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance >= amount_to_transfer);
int64_t new_alice_balance = alice_balance - amount_to_transfer;
// set variables which will be used by callback function
alice_ctx.collection = collection;
alice_ctx.opts = opts;
alice_ctx.balance = new_alice_balance;
alice_ctx.account = alice_query;
// callback
r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error);
assert(r);
// find account balance of Alice after transaction
cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance == new_alice_balance);
assert(alice_balance == 500);
// add 500 to bob's balance
// find account balance of Bob
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
int64_t new_bob_balance = bob_balance + amount_to_transfer;
bob_ctx.collection = collection;
bob_ctx.opts = opts;
bob_ctx.balance = new_bob_balance;
bob_ctx.account = bob_query;
// set read & write concern
mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
// callback
r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error);
assert(r);
// find account balance of Bob after transaction
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
bob_balance = (bson_iter_value (&iter))->value.v_int64;
assert(bob_balance == new_bob_balance);
assert(bob_balance == 1500);
// cleanup
bson_destroy(alice_query);
bson_destroy(bob_query);
mongoc_client_session_destroy(client_session);
bson_destroy(opts);
mongoc_transaction_opts_destroy(txn_opts);
mongoc_read_concern_destroy(read_concern);
mongoc_write_concern_destroy(write_concern);
mongoc_cursor_destroy(cursor);
bson_destroy(doc);
}
int main(int argc, char* argv[]) {
mongoc_init ();
mongoc_client_t* client = mongoc_client_new (<connection uri>);
bson_error_t error;
// connect to bank db
mongoc_database_t *database = mongoc_client_get_database (client, "bank");
// access account collection
mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
// set amount to transfer
int64_t amount_to_transfer = 500;
// delete the collection if already existing
mongoc_collection_drop(collection, &error);
// open Alice account
bson_t *alice_account = bson_new ();
BSON_APPEND_UTF8(alice_account, "name", "Alice");
BSON_APPEND_INT64(alice_account, "balance", 1000);
// open Bob account
bson_t *bob_account = bson_new ();
BSON_APPEND_UTF8(bob_account, "name", "Bob");
BSON_APPEND_INT64(bob_account, "balance", 1000);
bool r = true;
r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
test_callback_money_transfer(client, collection, amount_to_transfer);
}
- Python
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Python.
// Sample Python code with callback api
import pymongo
def callback(session, balance, query):
collection.update_one(query, {'$set': {"balance": balance}}, session=session)
client = pymongo.MongoClient(<connection uri>)
rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
wc_majority = pymongo.write_concern.WriteConcern('majority')
# To start, drop and create an account collection and insert balances for both Alice and Bob
collection = client.get_database("bank").get_collection("account")
collection.drop()
collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
amount_to_transfer = 500
# deduct 500 from Alice's account
alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert alice_balance >= amount_to_transfer
new_alice_balance = alice_balance - amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority)
updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert updated_alice_balance == new_alice_balance
# add 500 to Bob's account
bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert bob_balance >= amount_to_transfer
new_bob_balance = bob_balance + amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority)
updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert updated_bob_balance == new_bob_balance
Sample Python code with Core api
import pymongo
client = pymongo.MongoClient(<connection_string>)
rc_snapshot = pymongo.read_concern.ReadConcern('snapshot')
wc_majority = pymongo.write_concern.WriteConcern('majority')
# To start, drop and create an account collection and insert balances for both Alice and Bob
collection = client.get_database("bank").get_collection("account")
collection.drop()
collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000})
collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000})
amount_to_transfer = 500
# deduct 500 from Alice's account
alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert alice_balance >= amount_to_transfer
new_alice_balance = alice_balance - amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session)
session.commit_transaction()
updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance")
assert updated_alice_balance == new_alice_balance
# add 500 to Bob's account
bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert bob_balance >= amount_to_transfer
new_bob_balance = bob_balance + amount_to_transfer
with client.start_session({'causalConsistency':False}) as session:
session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority)
collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session)
session.commit_transaction()
updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance")
assert updated_bob_balance == new_bob_balance
Ejemplos de API de transacciones para la API de Core
- Javascript
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Javascript.
// *** Transfer $500 from Alice to Bob inside a transaction: Success ***
// Setup bank account for Alice and Bob. Each have $1000 in their account
var databaseName = "bank";
var collectionName = "account";
var amountToTransfer = 500;
var session = db.getMongo().startSession({causalConsistency: false});
var bankDB = session.getDatabase(databaseName);
var accountColl = bankDB[collectionName];
accountColl.drop();
accountColl.insert({name: "Alice", balance: 1000});
accountColl.insert({name: "Bob", balance: 1000});
session.startTransaction();
// deduct $500 from Alice's account
var aliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance - amountToTransfer;
accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}});
var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance;
assert.eq(newAliceBalance, findAliceBalance);
// add $500 to Bob's account
var bobBalance = accountColl.find({"name": "Bob"}).next().balance;
var newBobBalance = bobBalance + amountToTransfer;
accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}});
var findBobBalance = accountColl.find({"name": "Bob"}).next().balance;
assert.eq(newBobBalance, findBobBalance);
session.commitTransaction();
accountColl.find();
- C#
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con C#.
// C# Core API
public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session)
{
var amountToTransfer = 500;
// start transaction
var transactionOptions = new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority);
session.StartTransaction(transactionOptions);
try
{
// deduct $500 from Alice's account
var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance >= amountToTransfer);
var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer;
accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"),
Builders<bSondocument>.Update.Set("balance", newAliceBalance));
aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceBalance == newAliceBalance);
// add $500 from Bob's account
var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
var newBobBalance = bobBalance.AsInt32 + amountToTransfer;
accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"),
Builders<bSondocument>.Update.Set("balance", newBobBalance));
bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(bobBalance == newBobBalance);
}
catch (Exception e)
{
session.AbortTransaction();
throw;
}
session.CommitTransaction();
}
}
public void DoTransactionWithRetry(MongoClient client)
{
var dbName = "bank";
var collName = "account";
using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false}))
{
try
{
var bankDB = client.GetDatabase(dbName);
var accountColl = bankDB.GetCollection<bSondocument>(collName);
bankDB.DropCollection(collName);
accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } });
accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } });
while(true) {
try
{
TransferMoneyWithRetry(accountColl, session);
break;
}
catch (MongoException e)
{
if(e.HasErrorLabel("TransientTransactionError"))
{
continue;
}
else
{
throw;
}
}
}
// check values outside of transaction
var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance");
var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance");
Debug.Assert(aliceNewBalance == 500);
Debug.Assert(bobNewBalance == 1500);
}
catch (Exception e)
{
Console.WriteLine("Error running transaction: " + e.Message);
}
}
}
- Ruby
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Ruby.
# Ruby Core API
def transfer_money_w_retry(session, accountColl)
amountToTransfer = 500
session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority})
# deduct $500 from Alice's account
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert aliceBalance >= amountToTransfer
newAliceBalance = aliceBalance - amountToTransfer
accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session)
aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance']
assert_equal(newAliceBalance, aliceBalance)
# add $500 to Bob's account
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
newBobBalance = bobBalance + amountToTransfer
accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session)
bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance']
assert_equal(newBobBalance, bobBalance)
session.commit_transaction
end
def do_txn_w_retry(client)
dbName = "bank"
collName = "account"
session = client.start_session(:causal_consistency=> false)
bankDB = Mongo::Database.new(client, dbName)
accountColl = bankDB[collName]
accountColl.drop()
accountColl.insert_one({"name"=>"Alice", "balance"=>1000})
accountColl.insert_one({"name"=>"Bob", "balance"=>1000})
begin
transferMoneyWithRetry(session, accountColl)
puts "transaction committed"
rescue Mongo::Error => e
if e.label?('TransientTransactionError')
retry
else
puts "transaction failed"
raise
end
end
# check results outside of transaction
aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance']
bobBalance = accountColl.find({"name"=>"Bob"}).first['balance']
assert_equal(aliceBalance, 500)
assert_equal(bobBalance, 1500)
end
- Java
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Java.
// Java (sync) - Core API
public void transferMoneyWithRetry() {
// connect to server
MongoClientURI mongoURI = new MongoClientURI(uri);
MongoClient mongoClient = new MongoClient(mongoURI);
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
accountColl.drop();
// insert some sample data
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000));
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000));
while (true) {
try {
doTransferMoneyWithRetry(accountColl, mongoClient);
break;
} catch (MongoException e) {
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
continue;
} else {
throw e;
}
}
}
}
public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
int amountToTransfer = 500;
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) {
clientSession.startTransaction(txnOptions);
// deduct $500 from Alice's account
List<Document> documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int aliceBalance = (int) documentList.get(0).get("balance");
Assert.assertTrue(aliceBalance >= amountToTransfer);
int newAliceBalance = aliceBalance - amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance)));
// check Alice's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Alice")).into(documentList);
int updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
int bobBalance = (int) documentList.get(0).get("balance");
int newBobBalance = bobBalance + amountToTransfer;
accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance)));
// check Bob's new balance
documentList = new ArrayList<>();
accountColl.find(clientSession, new Document("name", "Bob")).into(documentList);
updatedBalance = (int) documentList.get(0).get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
// commit transaction
clientSession.commitTransaction();
}
}
// Java (async) -- Core API
public void transferMoneyWithRetry() {
// connect to the server
MongoClient mongoClient = MongoClients.create(uri);
MongoDatabase bankDB = mongoClient.getDatabase("bank");
MongoCollection accountColl = bankDB.getCollection("account");
SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>();
mongoClient.getDatabase("bank").drop().subscribe(dropCallback);
dropCallback.await();
// insert some sample data
SubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>();
accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback);
insertionCallback.await();
insertionCallback = new SubscriberLatchWrapper<>();
accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);;
insertionCallback.await();
while (true) {
try {
doTransferMoneyWithRetry(accountColl, mongoClient);
break;
} catch (MongoException e) {
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
continue;
} else {
throw e;
}
}
}
}
public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) {
int amountToTransfer = 500;
// start the transaction
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build();
ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build();
SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>();
mongoClient.startSession(sessionOptions).subscribe(sessionCallback);
ClientSession session = sessionCallback.get().get(0);
session.startTransaction(txnOptions);
// deduct $500 from Alice's account
SubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
Document documentFound = findCallback.get().get(0);
int aliceBalance = (int) documentFound.get("balance");
int newAliceBalance = aliceBalance - amountToTransfer;
SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>();
accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback);
updateCallback.await();
// check Alice's new balance
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
int updatedBalance = (int) documentFound.get("balance");
Assert.assertEquals(updatedBalance, newAliceBalance);
// add $500 to Bob's account
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
int bobBalance = (int) documentFound.get("balance");
int newBobBalance = bobBalance + amountToTransfer;
updateCallback = new SubscriberLatchWrapper<>();
accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback);
updateCallback.await();
// check Bob's new balance
findCallback = new SubscriberLatchWrapper<>();
accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback);
documentFound = findCallback.get().get(0);
updatedBalance = (int) documentFound.get("balance");
Assert.assertEquals(updatedBalance, newBobBalance);
// commit the transaction
SubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>();
session.commitTransaction().subscribe(transactionCallback);
transactionCallback.await();
}
public class SubscriberLatchWrapper<T> implements Subscriber<T> {
/**
* A Subscriber that stores the publishers results and provides a latch so can block on completion.
*
* @param <T> The publishers result type
*/
private final List<T> received;
private final List<RuntimeException> errors;
private final CountDownLatch latch;
private volatile Subscription subscription;
private volatile boolean completed;
/**
* Construct an instance
*/
public SubscriberLatchWrapper() {
this.received = new ArrayList<>();
this.errors = new ArrayList<>();
this.latch = new CountDownLatch(1);
}
@Override
public void onSubscribe(final Subscription s) {
subscription = s;
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(final T t) {
received.add(t);
}
@Override
public void onError(final Throwable t) {
if (t instanceof RuntimeException) {
errors.add((RuntimeException) t);
} else {
errors.add(new RuntimeException("Unexpected exception", t));
}
onComplete();
}
@Override
public void onComplete() {
completed = true;
subscription.cancel();
latch.countDown();
}
/**
* Get received elements
*
* @return the list of received elements
*/
public List<T> getReceived() {
return received;
}
/**
* Get received elements.
*
* @return the list of receive elements
*/
public List<T> get() {
return await().getReceived();
}
/**
* Await completion or error
*
* @return this
*/
public SubscriberLatchWrapper<T> await() {
subscription.request(Integer.MAX_VALUE);
try {
if (!latch.await(300, TimeUnit.SECONDS)) {
throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds");
}
} catch (InterruptedException e) {
throw new MongoInterruptedException("Interrupted waiting for observeration", e);
}
if (!errors.isEmpty()) {
throw errors.get(0);
}
return this;
}
public boolean getCompleted() {
return this.completed;
}
public void close() {
subscription.cancel();
received.clear();
}
}
- C
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con C.
// Sample C code with core session
bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){
bool r = true;
bson_error_t error;
bson_t *opts = bson_new();
bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}");
// set read & write concern
mongoc_read_concern_t *read_concern = mongoc_read_concern_new ();
mongoc_write_concern_t *write_concern = mongoc_write_concern_new ();
mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new ();
mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY);
mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
mongoc_transaction_opts_set_write_concern (txn_opts, write_concern);
mongoc_transaction_opts_set_read_concern (txn_opts, read_concern);
mongoc_client_session_start_transaction (client_session, txn_opts, &error);
mongoc_client_session_append (client_session, opts, &error);
r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error);
mongoc_client_session_commit_transaction (client_session, NULL, &error);
bson_destroy (opts);
mongoc_transaction_opts_destroy(txn_opts);
mongoc_read_concern_destroy(read_concern);
mongoc_write_concern_destroy(write_concern);
bson_destroy (update);
return r;
}
void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){
bson_t reply;
bool r = true;
const bson_t *doc;
bson_iter_t iter;
bson_error_t error;
// find query
bson_t *alice_query = bson_new ();
BSON_APPEND_UTF8(alice_query, "name", "Alice");
bson_t *bob_query = bson_new ();
BSON_APPEND_UTF8(bob_query, "name", "Bob");
// create session
// set causal consistency to false
mongoc_session_opt_t *session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_causal_consistency (session_opts, false);
// start the session
mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error);
// add session to options
bson_t *opts = bson_new();
mongoc_client_session_append (client_session, opts, &error);
// deduct 500 from Alice
// find account balance of Alice
mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance >= amount_to_transfer);
int64_t new_alice_balance = alice_balance - amount_to_transfer;
// core
r = core_session (client_session, collection, alice_query, new_alice_balance);
assert(r);
// find account balance of Alice after transaction
cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
alice_balance = (bson_iter_value (&iter))->value.v_int64;
assert(alice_balance == new_alice_balance);
assert(alice_balance == 500);
// add 500 to Bob's balance
// find account balance of Bob
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64;
int64_t new_bob_balance = bob_balance + amount_to_transfer;
//core
r = core_session (client_session, collection, bob_query, new_bob_balance);
assert(r);
// find account balance of Bob after transaction
cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL);
mongoc_cursor_next (cursor, &doc);
bson_iter_init (&iter, doc);
bson_iter_find (&iter, "balance");
bob_balance = (bson_iter_value (&iter))->value.v_int64;
assert(bob_balance == new_bob_balance);
assert(bob_balance == 1500);
// cleanup
bson_destroy(alice_query);
bson_destroy(bob_query);
mongoc_client_session_destroy(client_session);
bson_destroy(opts);
mongoc_cursor_destroy(cursor);
bson_destroy(doc);
}
int main(int argc, char* argv[]) {
mongoc_init ();
mongoc_client_t* client = mongoc_client_new (<connection uri>);
bson_error_t error;
// connect to bank db
mongoc_database_t *database = mongoc_client_get_database (client, "bank");
// access account collection
mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account");
// set amount to transfer
int64_t amount_to_transfer = 500;
// delete the collection if already existing
mongoc_collection_drop(collection, &error);
// open Alice account
bson_t *alice_account = bson_new ();
BSON_APPEND_UTF8(alice_account, "name", "Alice");
BSON_APPEND_INT64(alice_account, "balance", 1000);
// open Bob account
bson_t *bob_account = bson_new ();
BSON_APPEND_UTF8(bob_account, "name", "Bob");
BSON_APPEND_INT64(bob_account, "balance", 1000);
bool r = true;
r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error);
if (!r) {printf("Error encountered:%s", error.message);}
test_core_money_transfer(client, collection, amount_to_transfer);
}
- Scala
-
El siguiente código muestra cómo utilizar la API de transacciones de Amazon DocumentDB con Scala.
// Scala Core API
def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = {
val accountColl = database.getCollection("account")
var amountToTransfer = 500
var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => {
clientSession.startTransaction()
// deduct $500 from Alice's account
var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
assert(aliceBalance >= amountToTransfer)
var newAliceBalance = aliceBalance - amountToTransfer
accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await()
aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance")
assert(aliceBalance == newAliceBalance)
// add $500 to Bob's account
var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
var newBobBalance = bobBalance + amountToTransfer
accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await()
bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance")
assert(bobBalance == newBobBalance)
clientSession
})
transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await()
}
def doTransactionWithRetry(): Unit = {
val client: MongoClient = MongoClientWrapper.getMongoClient()
val database: MongoDatabase = client.getDatabase("bank")
val accountColl = database.getCollection("account")
accountColl.drop().await()
val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build()
var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions)
accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await()
accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await()
var retry = true
while (retry) {
try {
transferMoneyWithRetry(sessionObservable, database)
println("transaction committed")
retry = false
}
catch {
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("retrying transaction")
}
case other: Throwable => {
println("transaction failed")
retry = false
throw other
}
}
}
// check results outside of transaction
assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500)
assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500)
accountColl.drop().await()
}
Comandos de la admitidos
Comando |
Compatible |
abortTransaction
|
Sí
|
commitTransaction
|
Sí
|
endSessions
|
Sí
|
killSession
|
Sí
|
killAllSession
|
Sí
|
killAllSessionsByPattern
|
No
|
refreshSessions
|
No
|
startSession
|
Sí
|
Capacidades no compatibles
Métodos |
Etapas o comandos |
db.collection.aggregate()
|
$collStats
$currentOp
$indexStats
$listSessions
$out
|
db.collection.count()
db.collection.countDocuments()
|
$where
$near
$nearSphere
|
db.collection.insert()
|
insert no se admite si no se ejecuta en una colección existente. Este método es compatible si se dirige a una colección preexistente.
|
Sesiones
Las sesiones de MongoDB son un marco que se utiliza para admitir escrituras reintentables, coherencia causal, transacciones y administrar operaciones en bases de datos. Cuando se crea una sesión, el cliente genera un identificador de sesión lógico (lsid) que se utiliza para etiquetar todas las operaciones de esa sesión al enviar comandos al servidor.
Amazon DocumentDB admite el uso de sesiones para habilitar las transacciones, pero no admite la coherencia causal ni las escrituras reintentables.
Al utilizar transacciones en Amazon DocumentDB, una transacción se iniciará desde una sesión mediante la API session.startTransaction()
y una sesión admite una sola transacción a la vez. Del mismo modo, las transacciones se completan mediante las API commit (session.commitTransaction()
) o abort (session.abortTransaction()
).
Coherencia causal
La coherencia causal garantiza que, en una sola sesión de cliente, el cliente observe la coherencia de lectura tras escritura, las lecturas/escrituras monoatómicas y las escrituras sigan a las lecturas, y estas garantías se aplican a todas las instancias de un clúster, no solo a las principales. Amazon DocumentDB no admite la coherencia causal y la siguiente afirmación generará un error.
var mySession = db.getMongo().startSession();
var mySessionObject = mySession.getDatabase('test').getCollection('account');
mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
//Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
mySessionObject.find()
//Error: error: {
// "ok" : 0,
// "code" : 303,
// "errmsg" : "Feature not supported: 'causal consistency'",
// "operationTime" : Timestamp(1603461817, 493214)
//}
mySession.endSession()
Puede deshabilitar la coherencia causal dentro de una sesión. Tenga en cuenta que si lo hace, podrá utilizar el marco de sesiones, pero no ofrecerá garantías de coherencia causal en las lecturas. Al usar Amazon DocumentDB, las lecturas de la instancia principal serán consistentes de lectura tras escritura y las lecturas de las instancias de réplica serán consistentes eventualmente. Las transacciones son el principal caso de uso para utilizar las sesiones.
var mySession = db.getMongo().startSession({causalConsistency: false});
var mySessionObject = mySession.getDatabase('test').getCollection('account');
mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}});
//Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
mySessionObject.find()
//{ "_id" : 1, "name" : "Bob", "balance" : 100 }
//{ "_id" : 2, "name" : "Alice", "balance" : 1700 }
Reintento de las escrituras
Las escrituras reintentables son una capacidad en la que el cliente intentará volver a intentar las operaciones de escritura una vez cuando se produzcan errores de red o si el cliente no puede encontrar la principal. En Amazon DocumentDB, las escrituras que se pueden volver a intentar no se admiten y deben deshabilitarse. Puede deshabilitarla con el comando (retryWrites=false
) en la cadena de conexión.
Si utiliza el intérprete de comandos de mongo heredado (no mongosh), no incluya el comando retryWrites=false
en ninguna cadena de código. El reintento de las escrituras está desactivado de forma predeterminada. Incluir retryWrites=false
podría provocar un error en comandos de lectura normales.
Errores de la transacción
Cuando se utilizan transacciones, hay situaciones en las que se puede producir un error que indique que el número de transacción no coincide con ninguna transacción en curso.
El error se puede generar en al menos dos escenarios diferentes:
Una vez transcurrido el tiempo de espera de la transacción de un minuto.
Tras el reinicio de la instancia (debido a una aplicación de un parche, una recuperación tras un error, etc.), es posible que se produzca este error incluso en los casos en que la transacción se haya hecho correctamente. Durante el reinicio de una instancia, la base de datos no puede diferenciar entre una transacción que se completó correctamente y una transacción que se canceló. En otras palabras, el estado de finalización de la transacción es ambiguo.
La mejor forma de solucionar este error es hacer que las actualizaciones transaccionales sean idempotentes, por ejemplo, utilizando el mutador $set
en lugar de una operación de incremento/decremento. Consulte a continuación:
{ "ok" : 0,
"operationTime" : Timestamp(1603938167, 1),
"code" : 251,
"errmsg" : "Given transaction number 1 does not match any in-progress transactions."
}