

# 有条件批量更新
<a name="BestPractices_ConditionalBatchUpdate"></a>

DynamoDB 支持批量操作，例如 `BatchWriteItem`，使用该参数，您可以在单个批次中执行多达 25 个 `PutItem` 和 `DeleteItem` 请求。但是，`BatchWriteItem` 不支持 `UpdateItem` 操作，也不支持条件表达式。作为解决方法，您可以使用其他 DynamoDB API（例如 `TransactWriteItems`）来处理不超过 100 的批次大小。

在涉及到更多项目并且需要更改主要的数据块时，您可以使用 AWS Glue、Amazon EMR、AWS Step Functions 等服务，也可以使用 DynamoDB-Shell 等自定义脚本和工具高效地进行批量更新。

**使用此模式的时机**
+ 生产环境应用场景不支持 DynamoDB-shell。
+ `TransactWriteItems` – 上限为 100 个单独的更新，可以有条件或无条件，执行方式为全有或全无 ACID 捆绑。如果应用程序需要幂等性，则也可以提供带有 `ClientRequestToken` 的 `TransactWriteItems` 调用，这意味着多个相同的调用与单个调用具有相同的效果。这种方法可以确保您不会多次执行同一个事务，最终导致数据状态不正确。

  权衡 – 会使用额外的吞吐量。每 1KB 写入 2 个 WCU，而不是标准的每 1 KB 写入 1 个 WGU。
+ PartiQL `BatchExecuteStatement` – 最多 25 个更新，可以有条件或无条件。`BatchExecuteStatement` 始终返回对整个请求的成功响应，还会返回保留了顺序的单独操作响应的列表。

  权衡 – 对于较大的批次，需要额外的客户端逻辑，以便按照 25 的批次大小分发请求。在确定重试策略时，需要考虑到单独错误的响应。

## 代码示例
<a name="bp-conditional-code-examples"></a>

这些代码示例使用 boto3 库，即适用于 Python 的 AWS SDK。示例假设您已安装 boto3 并配置了相应的 AWS 凭证。

假设一家电气供应商在欧洲多个城市建有多个仓库，供应商有一个库存数据库。由于夏天快要结束了，供应商想要甩卖台式风扇，以便为其他库存腾出空间。供应商希望对所有从意大利仓库供货的台式风扇的价格打折，但前提是要有 20 个台式风扇的储备库存。DynamoDB 表名为 **inventory**，在其键架构中，分区键为 **sku**，这是每个产品的唯一标识符，排序键为 **warehouse**，这是数据仓库的标识符。

以下 Python 代码演示如何使用 `BatchExecuteStatement` API 调用，执行此有条件的批量更新。

```
import boto3

client=boto3.client("dynamodb")

before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])

response=client.batch_execute_statement(
        Statements=[
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITTUR1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM5'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN3'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
        ],
        ReturnConsumedCapacity='TOTAL'
    )

after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
```

在示例数据上，执行会生成以下输出：

```
Before update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}]
After update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '33'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '30'}, 'sku': {'S': 'F123'}}]
```

由于这是内部系统的限定操作，因此没有考虑幂等性要求。这里可以设置额外的护栏机制，例如只有在价格大于 35 且小于 40 时才应更新价格，以可靠地进行更新靠。

或者，如果存在更严格的幂等性和 ACID 要求，我们可以使用 `TransactWriteItems` 执行相同的批量更新操作。但是，请务必记住，事务捆绑中的所有操作需要都要完成，否则整个捆绑失败。

我们假设意大利出现热浪，对台扇的需求急剧增长。供应商希望将意大利所有仓库的台式风扇价格提高 20 欧元，但监管机构要求，只有当所有库存的当前价格低于 70 欧元时才能够提价。这里的关键之处在于，只有当每个仓库中的价格都低于 70 欧元时，才能一次性更新对所有库存的价格，而且只能更新一次。

以下 Python 代码演示如何使用 `TransactWriteItems` API 调用，执行此批量更新。

```
import boto3

client=boto3.client("dynamodb")

before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])

response=client.transact_write_items(
        ClientRequestToken='UUIDAWS124',
        TransactItems=[
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITTUR1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM5'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN3'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
        ],
        ReturnConsumedCapacity='TOTAL'
    )

after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
```

在示例数据上，执行会生成以下输出：

```
Before update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '60'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '53'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '50'}, 'sku': {'S': 'F123'}}]
After update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '80'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '73'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '70'}, 'sku': {'S': 'F123'}}]
```

在 DynamoDB 中有多种执行批量更新的方法。哪种方法合适取决于多种因素，例如 ACID 和/或幂等性等要求、要更新的项目数量以及对 API 的熟悉程度等。