

# 在 DynamoDB 中处理并发更新的最佳实践
<a name="BestPractices_ImplementingVersionControl"></a>

在分布式系统中，多个进程或用户可能会尝试同时修改相同的数据。如果没有并发控制，这些并发写入可能会导致更新丢失、数据不一致或争用情况。DynamoDB 提供了多种机制来帮助您管理并发访问和维护数据完整性。

**注意**  
`UpdateItem` 等单独的写入操作是原子操作，无论并发度如何，都始终对项目的最新版本进行操作。当您的应用程序必须读取某个项目，然后根据读取的值将其写回时（读取-修改-写入周期），就需要使用锁定策略，因为其他进程可能会在读取和写入之间修改该项目。

处理并发更新的主要策略有两种：
+ **乐观锁**：假设冲突情况很少。此策略允许并发访问，并在写入时使用条件写入检测冲突。如果检测到冲突，则写入失败，应用程序可以重试。
+ **悲观锁**：假设很可能出现冲突。此策略在修改资源之前获取对资源的独占访问权限，以这种方式来防止并发访问。其他进程必须等待直至解锁。

下表汇总了 DynamoDB 中提供的方法：


| 方法 | 机制 | 适用于 | 
| --- | --- | --- | 
| 乐观锁 | 版本属性 \$1 条件写入 | 争用率低，重试成本低 | 
| 悲观锁（事务） | TransactWriteItems | 多项目原子性，争用率中等 | 
| 悲观锁（锁客户端） | 带租约和检测信号的专用锁定表 | 长时间运行的工作流，分布式协调 | 

# 乐观锁（使用版本号）
<a name="BestPractices_OptimisticLocking"></a>

乐观锁是一种在写入时检测冲突而不是防止冲突的策略。每个项目都包含一个版本属性，其值会随着每次更新而增加。更新项目时，您需要包含一个[条件表达式](Expressions.ConditionExpressions.md)，用于检查版本号是否与应用程序上次读取的值匹配。如果在这个期间其他进程修改了该项目，则条件失败并且 DynamoDB 返回 `ConditionalCheckFailedException`。

## 使用乐观锁的情况
<a name="BestPractices_OptimisticLocking_WhenToUse"></a>

乐观锁非常适合用于以下情况：
+ 多个用户或进程可能会更新同一个项目，但冲突并不频繁。
+ 对于应用程序来说，重试失败写入的成本较低。
+ 您需要避免管理分布式锁的开销和复杂性。

常见的示例包括电子商务库存更新、协作编辑平台和金融交易记录。

## 权衡
<a name="BestPractices_OptimisticLocking_Tradeoffs"></a>

**高争用率下的重试开销**  
在高并发环境中，发生冲突的可能性会增加，进而可能导致更高的重试次数和写入成本。

**实施复杂性**  
向项目添加版本控制和处理有条件检查，会增加应用程序逻辑的复杂性。适用于 Java 的 AWS SDK v2 增强版客户端通过 [https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/ddb-en-client-extensions.html#ddb-en-client-extensions-VRE](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/ddb-en-client-extensions.html#ddb-en-client-extensions-VRE) 注解提供内置支持，这可以自动为您管理版本号。

## 模式设计
<a name="BestPractices_OptimisticLocking_PatternDesign"></a>

每个项目中包含一个版本属性。以下提供了一个简单的架构设计：
+ 分区键：每个项目的唯一标识符（例如 `ItemId`）。
+ 属性：
  + `ItemId` – 项目的唯一标识符。
  + `Version` – 表示项目版本号的整数。
  + `QuantityLeft` – 项目的剩余库存。

首次创建项目时，`Version` 属性设置为 1。每次进行更新时，版本号增加 1。


| ItemID（分区键） | 版本 | QuantityLeft | 
| --- | --- | --- | 
| Bananas | 1 | 10 | 
| Apples | 1 | 5 | 
| Oranges | 1 | 7 | 

## 实现
<a name="BestPractices_OptimisticLocking_Implementation"></a>

要实施乐观锁，请按照以下步骤操作：

1. 读取项目的当前版本。

   ```
   def get_item(item_id):
       response = table.get_item(Key={'ItemID': item_id})
       return response['Item']
   
   item = get_item('Bananas')
   current_version = item['Version']
   ```

1. 使用检查版本号的条件表达式更新项目。

   ```
   def update_item(item_id, qty_bought, current_version):
       try:
           response = table.update_item(
               Key={'ItemID': item_id},
               UpdateExpression="SET QuantityLeft = QuantityLeft - :qty, Version = :new_v",
               ConditionExpression="Version = :expected_v",
               ExpressionAttributeValues={
                   ':qty': qty_bought,
                   ':new_v': current_version + 1,
                   ':expected_v': current_version
               },
               ReturnValues="UPDATED_NEW"
           )
           return response
       except ClientError as e:
           if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
               print("Version conflict: another process updated this item.")
           raise
   ```

1. 使用新的读取进行重试来处理冲突。

   每次重试都需要进行额外的读取操作，因此请限制重试的总次数。

   ```
   def update_with_retry(item_id, qty_bought, max_retries=3):
       for attempt in range(max_retries):
           item = get_item(item_id)
           try:
               return update_item(item_id, qty_bought, item['Version'])
           except ClientError as e:
               if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
                   raise
               print(f"Retry {attempt + 1}/{max_retries}")
       raise Exception("Update failed after maximum retries.")
   ```

对于 Java 应用程序，适用于 Java 的 AWS SDK v2 增强版客户端通过 [https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/ddb-en-client-extensions.html#ddb-en-client-extensions-VRE](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/ddb-en-client-extensions.html#ddb-en-client-extensions-VRE) 注解提供内置乐观锁支持，这可以自动为您管理版本号。

有关条件表达式的更多信息，请参阅 [DynamoDB 条件表达式 CLI 示例](Expressions.ConditionExpressions.md)。

# DynamoDB 事务的悲观锁
<a name="BestPractices_PessimisticLocking"></a>

DynamoDB [事务](transactions.md)为分组操作提供了一种全有或全无的方法。使用 `TransactWriteItems` 时，DynamoDB 监控事务中的所有项目。在事务处理期间，如果任何项目被其他操作修改，则整个事务将被取消，DynamoDB 将返回 `TransactionCanceledException`。此行为提供了一种悲观并发控制，因为这样可以防止出现冲突的并发修改，而不是在事后检测。

## 使用事务进行锁定的情况
<a name="BestPractices_PessimisticLocking_WhenToUse"></a>

事务非常适合下列情况：
+ 您需要以原子方式更新多个项目，无论是在同一个表还是多个表中。
+ 您的业务逻辑需要全有或全无语义，即要么所有更改都成功，要么不应用任何更改。

常见的例子包括账户之间的资金转移、同时更新库存和订单表的下订单行为，以及在游戏中玩家之间交换物品。

## 权衡
<a name="BestPractices_PessimisticLocking_Tradeoffs"></a>

**写入成本更高**  
对于不超过 1 KB 的项目，每个项目的事务会消耗 2 WCU（一个用于准备，一个用于提交），而标准写入消耗 1 WCU。

**项目限制**  
一个事务可以包含对一个或多个表的最多 100 个操作。

**冲突敏感性**  
如果事务中的任何项目被其他操作修改，则整个事务失败。在高争用率的场景中，这可能会导致频繁取消。

## 实现
<a name="BestPractices_PessimisticLocking_Implementation"></a>

以下示例使用 `TransactWriteItems` 以原子方式在两个项目之间转移库存。如果其他进程在事务处理期间修改了任一项目，则整个操作都将回退。

```
import boto3

client = boto3.client('dynamodb')

def transfer_inventory(source_id, target_id, quantity):
    try:
        client.transact_write_items(
            TransactItems=[
                {
                    'Update': {
                        'TableName': 'Inventory',
                        'Key': {'ItemID': {'S': source_id}},
                        'UpdateExpression': 'SET QuantityLeft = QuantityLeft - :qty',
                        'ConditionExpression': 'QuantityLeft >= :qty',
                        'ExpressionAttributeValues': {
                            ':qty': {'N': str(quantity)}
                        }
                    }
                },
                {
                    'Update': {
                        'TableName': 'Inventory',
                        'Key': {'ItemID': {'S': target_id}},
                        'UpdateExpression': 'SET QuantityLeft = QuantityLeft + :qty',
                        'ExpressionAttributeValues': {
                            ':qty': {'N': str(quantity)}
                        }
                    }
                }
            ]
        )
        return True
    except client.exceptions.TransactionCanceledException as e:
        print(f"Transaction canceled: {e}")
        return False
```

在此示例中，条件表达式检查是否存在足够的库存，但不需要版本属性。在准备阶段和提交阶段之间，如果事务中的任何项目被其他操作修改，DynamoDB 会自动取消事务。这种机制提供了悲观并发控制，事务本身可以防止相互冲突的并发修改。

**注意**  
您可以通过添加版本检查作为附加的条件表达式，将事务与乐观锁结合起来。这可以提供额外的保护措施，但并不是事务检测冲突所必需的。

有关更多信息，请参阅 [使用 DynamoDB 事务管理复杂工作流](transactions.md)。

# 使用 DynamoDB 锁客户端进行分布式锁定
<a name="BestPractices_DistributedLocking"></a>

DynamoDB 锁客户端是一个开源库，对于需要传统“锁定-获取-释放”语义的应用程序，该客户端使用 DynamoDB 表作为锁定存储来实施分布式锁定。当您需要对多个应用程序实例协调对外部资源（例如 S3 对象或共享配置）的访问时，此方法非常有用。

锁客户端以开源 [Java 库](https://github.com/awslabs/amazon-dynamodb-lock-client)的形式提供。

## 工作原理
<a name="BestPractices_DistributedLocking_HowItWorks"></a>

锁客户端使用专用 DynamoDB 表来跟踪锁定。每个锁定都表示为具有以下关键属性的项目：
+ 分区键，标识锁定的资源。
+ 租赁持续时间，指定锁的有效期限。如果锁持有程序崩溃或无响应，则该锁将在租赁持续时间过后自动过期。
+ 检测信号，由锁持有程序定期发送，用于延长租赁。这样可以防止持有程序仍在活跃地进行处理时，锁定变得过期。

锁客户端使用条件写入来确保一次只有一个进程可以获取某个锁。如果锁已经被持有，则调用方可以选择等待并重试或者立即失败。

## 使用锁客户端的情况
<a name="BestPractices_DistributedLocking_WhenToUse"></a>

锁客户端非常适合以下情况：
+ 需要跨多个应用程序实例或微服务协调对共享资源的访问。
+ 关键环节的运行时间很长（几秒到几分钟），出现冲突时重试整个操作成本很高。
+ 您需要自动锁定到期来妥善处理进程故障。

常见的示例包括编排分布式工作流、跨多个实例协调 cron 作业以及管理对共享外部资源的访问。

## 权衡
<a name="BestPractices_DistributedLocking_Tradeoffs"></a>

**其他基础设施**  
需要专用 DynamoDB 表用于锁定管理，并且需要额外的读取和写入容量用于锁定操作和检测信号。

**时钟依赖性**  
锁定到期取决于时间戳。客户端之间的严重时钟偏差可能会导致意外行为，尤其是对于较短的租赁持续时间。

**死锁风险**  
如果应用程序在多个资源上获取锁定，则必须按一致的顺序进行获取以避免死锁。租赁持续时间可以自动从未响应的持有程序上释放锁定，提供了一层安全保护机制。

## 实现
<a name="BestPractices_DistributedLocking_Implementation"></a>

以下示例说明如何使用 DynamoDB 锁客户端获取和释放锁定：

```
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

final DynamoDbClient dynamoDB = DynamoDbClient.builder()
    .region(Region.US_WEST_2)
    .build();

final AmazonDynamoDBLockClient lockClient = new AmazonDynamoDBLockClient(
    AmazonDynamoDBLockClientOptions.builder(dynamoDB, "Locks")
        .withTimeUnit(TimeUnit.SECONDS)
        .withLeaseDuration(10L)
        .withHeartbeatPeriod(3L)
        .withCreateHeartbeatBackgroundThread(true)
        .build());

// Try to acquire a lock on a resource
final Optional<LockItem> lock =
    lockClient.tryAcquireLock(AcquireLockOptions.builder("my-shared-resource").build());

if (lock.isPresent()) {
    try {
        // Perform operations that require exclusive access
        processSharedResource();
    } finally {
        // Always release the lock when done
        lockClient.releaseLock(lock.get());
    }
} else {
    System.out.println("Failed to acquire lock.");
}

lockClient.close();
```

**重要**  
始终释放 `finally` 块中的锁，以确保即使处理逻辑引发异常也会释放锁定。未释放的锁定会阻塞其他进程，直到租赁到期。

您还可以直接使用条件写入，来实施无需锁客户端库的简单锁定机制。以下示例使用 `UpdateItem` 与条件表达式来获取锁定，使用 `DeleteItem` 将其释放：

```
from datetime import datetime, timedelta
from boto3.dynamodb.conditions import Attr

def acquire_lock(table, resource_name, owner_id, ttl_seconds):
    """Attempt to acquire a lock. Returns True if successful."""
    expiry = (datetime.now() + timedelta(seconds=ttl_seconds)).isoformat()
    now = datetime.now().isoformat()
    try:
        table.update_item(
            Key={'LockID': resource_name},
            UpdateExpression='SET #owner = :owner, #expiry = :expiry',
            ConditionExpression=Attr('LockID').not_exists() | Attr('ExpiresAt').lt(now),
            ExpressionAttributeNames={'#owner': 'OwnerID', '#expiry': 'ExpiresAt'},
            ExpressionAttributeValues={':owner': owner_id, ':expiry': expiry}
        )
        return True
    except table.meta.client.exceptions.ConditionalCheckFailedException:
        return False

def release_lock(table, resource_name, owner_id):
    """Release a lock. Only succeeds if the caller is the lock owner."""
    try:
        table.delete_item(
            Key={'LockID': resource_name},
            ConditionExpression=Attr('OwnerID').eq(owner_id)
        )
        return True
    except table.meta.client.exceptions.ConditionalCheckFailedException:
        return False
```

此方法使用条件表达式来确保只能获取尚不存在的锁或者已过期的锁，并且只能由获取该锁的进程释放锁定。请考虑在锁定表上启用[生存时间（TTL）](TTL.md)来自动清理过期的锁定项目。

## 选择并发控制策略
<a name="BestPractices_ChoosingLockingStrategy"></a>

按照以下准则来为工作负载选择正确的方法：

在下列情况下，**使用乐观锁**：  
+ 很少出现冲突。
+ 重试失败写入的成本较低。
+ 一次更新一个项目。

在下列情况下，**使用事务**：  
+ 需要以原子方式更新多个项目。
+ 需要跨项目或表的全有或全无语义。
+ 需要在单个操作中将条件检查和写入操作结合起来。

在下列情况下，**使用锁客户端**：  
+ 需要跨分布式进程协调对外部资源的访问。
+ 关键环节运行时间长，出现冲突时重试的成本很高。
+ 需要自动锁定到期来处理进程故障。

**注意**  
如果您使用 [DynamoDB 全局表](GlobalTables.md)，请注意对于并发更新，全局表使用“以最后一次写入者为准”的协调策略。在需要跨区域时，使用版本号实施乐观锁将无法正常工作，因为在一个区域进行写入操作可能会覆盖另一个区域的并发写入而没有版本检查。使用全局表时，请将应用程序设计为在应用程序级处理冲突。