

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 DynamoDB 中使用大量資料操作的最佳實務
<a name="BestPractices_BulkDataOperations"></a>

DynamoDB 支援批次操作，例如使用 `BatchWriteItem`，可同時執行最多 25 個 `PutItem` 和 `DeleteItem` 請求。不過，`BatchWriteItem` 不支援 `UpdateItem` 操作。進行大量更新時，差異取決於需求與更新的性質。您可以使用其他 DynamoDB API，例如 `TransactWriteItems`，其批次大小上限為 100。當涉及更多項目時，您可以使用 Amazon EMR AWS Glue等服務， AWS Step Functions 或使用 DynamoDB-shell 等自訂指令碼和工具進行大量更新。

**Topics**
+ [條件式批次更新](BestPractices_ConditionalBatchUpdate.md)
+ [高效大量操作](BestPractices_EfficientBulkOperations.md)

# 條件式批次更新
<a name="BestPractices_ConditionalBatchUpdate"></a>

DynamoDB 支援批次操作，例如使用 `BatchWriteItem`，可在單一批次中同時執行最多 25 個 `PutItem` 與 `DeleteItem` 請求。不過，`BatchWriteItem` 不支援 `UpdateItem` 操作，也不支援條件運算式。可採用替代方案，例如使用其他 DynamoDB API (如 `TransactWriteItems`)，其批次大小上限為 100。

當涉及更多項目，且需要變更主要的資料區塊時，您可以使用 AWS Glue、Amazon EMR 等服務， AWS Step Functions 或使用自訂指令碼和 DynamoDB-shell 等工具來實現高效的大量更新。

**適用情境**
+ DynamoDB-shell 不支援用於生產環境。
+ `TransactWriteItems` – 最多可執行 100 個具或不具條件的個別更新，以全有或全無的 ACID 套件形式執行。若應用程式需要冪等性，可在 `TransactWriteItems` 呼叫中加入 `ClientRequestToken`，確保多次相同呼叫的結果與單次呼叫相同。這可確保不會重複執行相同交易，以避免產生錯誤的資料狀態。

  取捨 – 會消耗額外的輸送量。每 1 KB 寫入需耗用 2 個 WCU，而非標準的每 1 KB 寫入 1 個 WCU。
+ PartiQL `BatchExecuteStatement` – 最多可執行 25 個具或不具條件的更新。`BatchExecuteStatement` 會一律傳回整體請求的成功回應，並附上保留執行順序的個別操作回應清單。

  取捨 – 對於較大批次，需在用戶端實作額外邏輯，以每 25 個為一組分批發送請求。需考量個別錯誤回應，以決定重試策略。

## 程式碼範例
<a name="bp-conditional-code-examples"></a>

這些程式碼範例使用 boto3 程式庫，這是適用於 Python 的 AWS SDK。範例假設您已安裝 boto3，並已使用正確的 AWS 憑證完成設定。

假設一個電器供應商的庫存資料庫，該供應商在歐洲多個城市設有倉儲。由於夏季結束，廠商想要清空桌上風扇庫存，為其他商品騰出空間。廠商希望對義大利倉儲出貨的所有桌上風扇提供價格折扣，但僅在預留庫存達 20 台時適用。DynamoDB 資料表名稱為 **warehouse**，其金鑰結構包括分割區金鑰 **sku** (每個產品的唯一識別碼) 與排序索引鍵 **warehouse** (倉儲識別碼)。

下列 Python 程式碼示範如何透過 `BatchExecuteStatement` API 呼叫執行條件式批次更新。

```
import boto3

client=boto3.client("dynamodb")

before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])

response=client.batch_execute_statement(
        Statements=[
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITTUR1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM5'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN3'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
        ],
        ReturnConsumedCapacity='TOTAL'
    )

after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
```

執行後，範例資料將產生以下輸出：

```
Before update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}]
After update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '33'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '30'}, 'sku': {'S': 'F123'}}]
```

由於此為內部系統的受限操作，因此未納入冪等性需求考量。可加入額外防護機制，例如僅在價格大於 35 且小於 40 時才允許更新，以提升更新的穩健性。

或者，在冪等性與 ACID 要求更嚴格的情況下，可使用 `TransactWriteItems` 執行相同的批次更新操作。需注意，交易套件中的所有操作要麼全部成功，要麼全部失敗。

假設義大利出現熱浪，導致桌上風扇需求急遽上升。廠商希望將義大利各倉儲出貨的桌上風扇成本提高 20 歐元，但監管機構僅在全庫存中目前成本低於 70 歐元時允許此調漲。價格必須在整個庫存中同時且僅更新一次，且僅當各倉儲的成本低於 70 歐元時才執行更新。

下列 Python 程式碼示範如何透過 `TransactWriteItems` API 呼叫執行此批次更新。

```
import boto3

client=boto3.client("dynamodb")

before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])

response=client.transact_write_items(
        ClientRequestToken='UUIDAWS124',
        TransactItems=[
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITTUR1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM5'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN3'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
        ],
        ReturnConsumedCapacity='TOTAL'
    )

after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
```

執行後，範例資料將產生以下輸出：

```
Before update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '60'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '53'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '50'}, 'sku': {'S': 'F123'}}]
After update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '80'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '73'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '70'}, 'sku': {'S': 'F123'}}]
```

DynamoDB 提供多種執行批次更新的方法。適合的方法取決於 ACID 與/或冪等性需求、更新項目數量，以及對 API 的熟悉程度。

# 高效大量操作
<a name="BestPractices_EfficientBulkOperations"></a>

**適用情境**

這些模式可協助在 DynamoDB 項目上高效執行大量更新。
+ DynamoDB-shell 不支援用於生產環境。
+ `TransactWriteItems` – 最多可執行 100 個有或無條件的個別更新，並以「全有或全無」的 ACID 套件方式運行 

  取捨 – 額外消耗輸送量，每 1 KB 寫入需耗用 2 WCUs。
+ PartiQL `BatchExecuteStatement` – 最多可執行 25 個有或無條件的更新。

  取捨 – 需額外邏輯以 25 筆為一批分配請求。
+ AWS Step Functions – 開發人員熟悉的速率限制大量操作 AWS Lambda。

  取捨 – 執行時期與速率限制成反比。受限於 Lambda 函式的最大逾時時間。此功能意味著讀取與寫入之間的資料變更可能被覆寫。如需更多資訊，請參閱[使用 Amazon EMR 回填 Amazon DynamoDB 存留時間屬性：第 2 部分](https://aws.amazon.com/blogs/database/part-2-backfilling-an-amazon-dynamodb-time-to-live-attribute-using-amazon-emr/)。
+ AWS Glue 和 Amazon EMR – 具有受管平行處理的速率限制大量操作。對於非即時的應用程式或更新，這些選項可在背景執行，僅耗用少量輸送量。兩項服務皆透過 emr-dynamodb-connector 執行 DynamoDB 操作。這些服務會先進行大量讀取，再進行大量寫入更新項目，並可設定速率限制。

  取捨 – 執行時期與速率限制成反比。功能包括可覆寫讀取與寫入期間發生的資料變更。您無法從全域次要索引 (GSIs) 讀取資料。請參閱[使用 Amazon EMR 回填 Amazon DynamoDB 存留時間屬性：第 2 部分](https://aws.amazon.com/blogs/database/part-2-backfilling-an-amazon-dynamodb-time-to-live-attribute-using-amazon-emr/)。
+ DynamoDB Shell – 透過類似 SQL 的查詢執行具速率限制的大量操作。您可從 GSIs 讀取資料以提升效率。

  取捨 – 執行時期與速率限制成反比。請參閱[在 DynamoDB Shell 中進行速率限制大量操作](https://aws.amazon.com/blogs/database/rate-limited-bulk-operations-in-dynamodb-shell/)。

## 使用此模式
<a name="BestPractices_EfficientBulkOperations_UsingThePattern"></a>

大量更新可能顯著影響成本，特別是在使用隨需輸送量模式時。使用佈建輸送量模式時，速度與成本之間需要權衡。將 rate-limit 參數設定得過於嚴格，可能會造成處理時間過長。您可以根據平均項目大小和速率限制，大致估算更新速度。

或者，您可以根據更新程序的預期持續時間與平均項目大小，估算所需的輸送量。每個模式附帶的部落格參考說明使用該模式的策略、實作方式與限制。如需更多資訊，請參閱[使用 Amazon DynamoDB 進行具成本效益的大量處理](https://aws.amazon.com/blogs/database/cost-effective-bulk-processing-with-amazon-dynamodb/)。

可使用多種方法對即時 DynamoDB 資料表執行大量更新。適合的方法取決於 ACID 與/或冪等性需求、更新項目數量，以及對 API 的熟悉程度。必須同時考量成本與時間的權衡，上述多數方法皆可設定速率限制以控制大量更新任務的輸送量。