

# 在 DynamoDB 中使用批量数据操作的最佳实践
<a name="BestPractices_BulkDataOperations"></a>

DynamoDB 支持批量操作，例如 `BatchWriteItem`，使用该参数，您可以同时执行多达 25 个 `PutItem` 和 `DeleteItem` 请求。但是，`BatchWriteItem` 不支持 `UpdateItem` 操作。对于批量更新，区别在于更新的要求和性质。您可以使用其他 DynamoDB API（例如 `TransactWriteItems`）来处理不超过 100 的批次大小。当涉及到更多项目时，您可以使用 AWS Glue、Amazon EMR、AWS Step Functions 等服务，也可以使用 DynamoDB-Shell 等自定义脚本和工具进行批量更新。

**Topics**
+ [有条件批量更新](BestPractices_ConditionalBatchUpdate.md)
+ [高效执行批量操作](BestPractices_EfficientBulkOperations.md)

# 有条件批量更新
<a name="BestPractices_ConditionalBatchUpdate"></a>

DynamoDB 支持批量操作，例如 `BatchWriteItem`，使用该参数，您可以在单个批次中执行多达 25 个 `PutItem` 和 `DeleteItem` 请求。但是，`BatchWriteItem` 不支持 `UpdateItem` 操作，也不支持条件表达式。作为解决方法，您可以使用其他 DynamoDB API（例如 `TransactWriteItems`）来处理不超过 100 的批次大小。

在涉及到更多项目并且需要更改主要的数据块时，您可以使用 AWS Glue、Amazon EMR、AWS Step Functions 等服务，也可以使用 DynamoDB-Shell 等自定义脚本和工具高效地进行批量更新。

**使用此模式的时机**
+ 生产环境应用场景不支持 DynamoDB-shell。
+ `TransactWriteItems` – 上限为 100 个单独的更新，可以有条件或无条件，执行方式为全有或全无 ACID 捆绑。如果应用程序需要幂等性，则也可以提供带有 `ClientRequestToken` 的 `TransactWriteItems` 调用，这意味着多个相同的调用与单个调用具有相同的效果。这种方法可以确保您不会多次执行同一个事务，最终导致数据状态不正确。

  权衡 – 会使用额外的吞吐量。每 1KB 写入 2 个 WCU，而不是标准的每 1 KB 写入 1 个 WGU。
+ PartiQL `BatchExecuteStatement` – 最多 25 个更新，可以有条件或无条件。`BatchExecuteStatement` 始终返回对整个请求的成功响应，还会返回保留了顺序的单独操作响应的列表。

  权衡 – 对于较大的批次，需要额外的客户端逻辑，以便按照 25 的批次大小分发请求。在确定重试策略时，需要考虑到单独错误的响应。

## 代码示例
<a name="bp-conditional-code-examples"></a>

这些代码示例使用 boto3 库，即适用于 Python 的 AWS SDK。示例假设您已安装 boto3 并配置了相应的 AWS 凭证。

假设一家电气供应商在欧洲多个城市建有多个仓库，供应商有一个库存数据库。由于夏天快要结束了，供应商想要甩卖台式风扇，以便为其他库存腾出空间。供应商希望对所有从意大利仓库供货的台式风扇的价格打折，但前提是要有 20 个台式风扇的储备库存。DynamoDB 表名为 **inventory**，在其键架构中，分区键为 **sku**，这是每个产品的唯一标识符，排序键为 **warehouse**，这是数据仓库的标识符。

以下 Python 代码演示如何使用 `BatchExecuteStatement` API 调用，执行此有条件的批量更新。

```
import boto3

client=boto3.client("dynamodb")

before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])

response=client.batch_execute_statement(
        Statements=[
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITTUR1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM5'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN3'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
        ],
        ReturnConsumedCapacity='TOTAL'
    )

after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
```

在示例数据上，执行会生成以下输出：

```
Before update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}]
After update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '33'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '30'}, 'sku': {'S': 'F123'}}]
```

由于这是内部系统的限定操作，因此没有考虑幂等性要求。这里可以设置额外的护栏机制，例如只有在价格大于 35 且小于 40 时才应更新价格，以可靠地进行更新靠。

或者，如果存在更严格的幂等性和 ACID 要求，我们可以使用 `TransactWriteItems` 执行相同的批量更新操作。但是，请务必记住，事务捆绑中的所有操作需要都要完成，否则整个捆绑失败。

我们假设意大利出现热浪，对台扇的需求急剧增长。供应商希望将意大利所有仓库的台式风扇价格提高 20 欧元，但监管机构要求，只有当所有库存的当前价格低于 70 欧元时才能够提价。这里的关键之处在于，只有当每个仓库中的价格都低于 70 欧元时，才能一次性更新对所有库存的价格，而且只能更新一次。

以下 Python 代码演示如何使用 `TransactWriteItems` API 调用，执行此批量更新。

```
import boto3

client=boto3.client("dynamodb")

before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])

response=client.transact_write_items(
        ClientRequestToken='UUIDAWS124',
        TransactItems=[
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITTUR1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM5'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN3'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
        ],
        ReturnConsumedCapacity='TOTAL'
    )

after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
```

在示例数据上，执行会生成以下输出：

```
Before update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '60'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '53'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '50'}, 'sku': {'S': 'F123'}}]
After update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '80'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '73'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '70'}, 'sku': {'S': 'F123'}}]
```

在 DynamoDB 中有多种执行批量更新的方法。哪种方法合适取决于多种因素，例如 ACID 和/或幂等性等要求、要更新的项目数量以及对 API 的熟悉程度等。

# 高效执行批量操作
<a name="BestPractices_EfficientBulkOperations"></a>

**使用此模式的时机**

这些模式对于在 DynamoDB 项目上高效地执行批量更新非常有用。
+ 生产环境应用场景不支持 DynamoDB-shell。
+ `TransactWriteItems` – 最高 100 个单独的更新，可以有条件或无条件，以全有或全无的 ACID 捆绑的形式执行 

  权衡 – 会消耗额外的吞吐量，每 1 KB 写入 2 个 WCU。
+ PartiQL `BatchExecuteStatement` – 最高 25 个更新，可以有条件或无条件

  权衡 – 需要额外的逻辑，以便按照 25 的批次大小分发请求。
+ AWS Step Functions – 为熟悉 AWS Lambda 的开发人员提供速率受限的批量操作。

  权衡 – 执行时间与速率限制成反比。受最大 Lambda 函数超时时间的限制。该功能会使得在读取和写入之间发生的数据更改可能会被覆盖。有关更多信息，请参阅 [Backfilling an Amazon DynamoDB Time to Live attribute using Amazon EMR: Part 2](https://aws.amazon.com/blogs/database/part-2-backfilling-an-amazon-dynamodb-time-to-live-attribute-using-amazon-emr/)。
+ AWS Glue 和 Amazon EMR – 速率受限的批量操作，采用托管式并行度。对于不注重时效性的应用程序或更新，这些选项可以在后台运行，只消耗一小部分吞吐量。这两项服务都使用 emr-dynamodb-connector 来执行 DynamoDB 操作。这些服务会执行大量读取，然后大量写入更新的项目，并有速率限制选项。

  权衡 – 执行时间与速率限制成反比。使用该功能时，所包含的在读取和写入之间发生的数据更改可能会被覆盖。您无法从全局二级索引（GSI）中读取。请参阅 [Backfilling an Amazon DynamoDB Time to Live attribute using Amazon EMR: Part 2](https://aws.amazon.com/blogs/database/part-2-backfilling-an-amazon-dynamodb-time-to-live-attribute-using-amazon-emr/)。
+ DynamoDB Shell – 使用类似 SQL 的查询执行速率受限的批量操作。您可以从 GSI 中读取以提高效率。

  权衡 – 执行时间与速率限制成反比。请参阅 [Rate limited bulk operations in DynamoDB Shell](https://aws.amazon.com/blogs/database/rate-limited-bulk-operations-in-dynamodb-shell/)。

## 使用模式
<a name="BestPractices_EfficientBulkOperations_UsingThePattern"></a>

批量更新会对成本造成巨大的影响，尤其是当您使用按需吞吐量模式时。如果您使用预置吞吐量模式，则需要在速度和成本之间进行权衡。设置极为严格的速率限制参数可能会导致极长的处理时间。您可以使用平均项目大小和速率限制来大致确定更新速度。

或者，您可以根据更新过程的预期持续时间以及平均项目大小，来确定该过程所需的吞吐量。随各模式提供的博客引用详细介绍了使用该模式的策略、实施和限制。有关更多信息，请参阅 [Cost-effective bulk processing with Amazon DynamoDB](https://aws.amazon.com/blogs/database/cost-effective-bulk-processing-with-amazon-dynamodb/)。

在对活动 DynamoDB 表执行批量更新时，可以采取多种方法。哪种方法合适取决于多种因素，例如 ACID 和/或幂等性等要求、要更新的项目数量以及对 API 的熟悉程度等。此处非常重要的一点是要权衡成本与时间，上文讨论的大多数方法都提供了选项，可对更新作业使用的吞吐量限制速率。