

# DynamoDB での一括データオペレーション使用についてのベストプラクティス
<a name="BestPractices_BulkDataOperations"></a>

DynamoDB は、最大 25 の `PutItem` リクエストと `DeleteItem` リクエストを同時に実行できる、`BatchWriteItem` などのバッチオペレーションをサポートしています。ただし、`BatchWriteItem` は、`UpdateItem` オペレーションをサポートしていません。一括更新の場合、更新の要件と内容によって異なります。最大 100 のバッチサイズには、`TransactWriteItems` などの他の DynamoDB API を使用できます。これ以上の項目が含まれる場合は、AWS Glue、Amazon EMR AWS Step Functions などのサービスを使用するか、DynamoDB シェルなどのカスタムスクリプトやツールを使用して一括更新できます。

**Topics**
+ [条件付きバッチ更新](BestPractices_ConditionalBatchUpdate.md)
+ [効率的な一括オペレーション](BestPractices_EfficientBulkOperations.md)

# 条件付きバッチ更新
<a name="BestPractices_ConditionalBatchUpdate"></a>

DynamoDB は、最大 25 の `PutItem` リクエストと `DeleteItem` リクエストを単一のバッチで実行できる、`BatchWriteItem` などのバッチオペレーションをサポートしています。ただし、`BatchWriteItem` は `UpdateItem` オペレーションはサポートしておらず、条件式もサポートしていません。この回避策として、最大 100 のバッチサイズには、`TransactWriteItems` などのその他の DynamoDB API を使用できます。

これ以上の項目が含まれており、大量のデータの変更が必要な場合は、AWS Glue、Amazon EMR AWS Step Functions などのサービスを使用するか、DynamoDB シェルなどのカスタムスクリプトやツールを使用すると、効率的に一括更新できます。

**このパターンを使用すべきケース**
+ DynamoDB シェルは、本番稼働のユースケースではサポートされていません。
+ `TransactWriteItems` – 条件付きまたは条件なしの最大 100 の個別の更新。all-or-nothing の ACID バンドルとして実行されます。アプリケーションでべき等性が必要な場合、つまり複数の同一の呼び出しが 1 回の呼び出しと同じ効果を持つような場合は、`TransactWriteItems` コールに `ClientRequestToken` を指定することもできます。これにより、同じトランザクションを複数回実行してもデータの状態が正しくないという状況を避けることができます。

  トレードオフ – 追加のスループットが使用されます。標準の 1 KB の書き込みあたり 1 WGU ではなく、1 KB の書き込みあたり 2 WCU になります。
+ PartiQL `BatchExecuteStatement` – 条件付きまたは条件なしで、最大 25 件の更新。`BatchExecuteStatement` は常にリクエスト全体に成功応答を返し、順序を維持する個別のオペレーション応答のリストも返します。

  トレードオフ – バッチが大きい場合、リクエストを 25 のバッチで分散するために、クライアント側で追加のロジックが必要です。再試行戦略を決定するには、個別のエラーレスポンスを考慮する必要があります。

## コードの例
<a name="bp-conditional-code-examples"></a>

これらのコード例では、AWS SDK for Python である boto3 ライブラリを使用しています。これらの例では、boto3 がインストール済みで、適切な AWS 認証情報で設定されていることを前提としています。

ヨーロッパの都市に複数の倉庫がある電気機器ベンダーの在庫データベースがあるとします。現在は夏の終わりであるため、ベンダーはデスク用ファンを処分して他の製品の在庫スペースを確保したいと考えています。ベンダーは、イタリアの倉庫から供給されたすべてのデスク用ファンについて、デスク用ファンの予備在庫が 20 ある場合に限り、価格の割引を提供することを検討しています。DynamoDB テーブル名は、**inventory** であり、このテーブルには、各製品の一意の識別子であるパーティションキー **sku** のキースキーマと、倉庫の識別子であるソートキー **warehouse** があります。

以下の Python コードは、`BatchExecuteStatement` API コールを使用して、この条件付きバッチ更新を実行する方法を説明しています。

```
import boto3

client=boto3.client("dynamodb")

before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])

response=client.batch_execute_statement(
        Statements=[
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITTUR1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM5'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
            {'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN3'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
        ],
        ReturnConsumedCapacity='TOTAL'
    )

after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
```

実行すると、サンプルデータに以下の出力が生成されます。

```
Before update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}]
After update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '33'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '30'}, 'sku': {'S': 'F123'}}]
```

これは内部システムの境界オペレーションであるため、べき等性の要件は考慮されていません。価格更新のような追加のガードレールを配置できるのは、価格が 35 より大きく 40 より小さい場合のみとするなど、更新をより堅牢にすることができます。

または、べき等性や ACID 要件をより厳密にする要件の場合は、`TransactWriteItems` を使用して同じバッチ更新オペレーションを実行できます。ただし、トランザクションバンドル内のすべてのオペレーションが完了するか、バンドル全体が失敗する、という点に注意する必要があります。

イタリアで熱波が発生し、デスク用ファンの需要が急増したと仮定します。ベンダーは、イタリアのすべての倉庫から出荷されるデスク用ファンのコストを 20 ユーロ引き上げたいと考えていますが、規制機関がこのコスト上昇を許可するのは、現在のコストが在庫全体で 70 ユーロ未満の場合にのみです。在庫全体で価格が一度に一度のみ更新され、更新されるのは各倉庫のコストが 70 ユーロ未満の場合にのみである天が重要です。

次の Python コードは、`TransactWriteItems` API コールを使用して、このバッチ更新を実行する方法を説明しています。

```
import boto3

client=boto3.client("dynamodb")

before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])

response=client.transact_write_items(
        ClientRequestToken='UUIDAWS124',
        TransactItems=[
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITTUR1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM5'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
            {'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN3'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
        ],
        ReturnConsumedCapacity='TOTAL'
    )

after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
```

実行すると、サンプルデータに以下の出力が生成されます。

```
Before update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '60'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '53'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '50'}, 'sku': {'S': 'F123'}}]
After update:  [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '80'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '73'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '70'}, 'sku': {'S': 'F123'}}]
```

DynamoDB でバッチ更新を実行する方法は、複数あります。適切なアプローチは、ACID やべき等性の要件、更新する項目数、API の知識などの要因によって異なります。

# 効率的な一括オペレーション
<a name="BestPractices_EfficientBulkOperations"></a>

**このパターンを使用すべきケース**

これらのパターンは、DynamoDB 項目の一括更新を効率的に実行するうえで役立ちます。
+ DynamoDB シェルは、本番稼働のユースケースではサポートされていません。
+ `TransactWriteItems` – 条件付きまたは条件なしの最大 100 の個別の更新。all-or-nothing の ACID バンドルとして実行されます。

  トレードオフ — 追加のスループットが使用され、1 KB の書き込みあたり 2 WCU が使用されます。
+ PartiQL `BatchExecuteStatement` – 条件付きまたは条件なしで、最大 25 件の更新。

  トレードオフ – リクエストを 25 のバッチで分散するために、追加のロジックが必要です。
+ AWS Step Functions – AWS Lambda に精通しているデベロッパー向けのレート制限付き一括オペレーション。

  トレードオフ — 実行時間はレート制限に反比例します。Lambda 関数の最長タイムアウトによって制限されます。この機能では、読み取りと書き込みの間に発生するデータ変更が上書きされる可能性があります。詳細については、「[Backfilling an Amazon DynamoDB Time to Live attribute using Amazon EMR: Part 2](https://aws.amazon.com/blogs/database/part-2-backfilling-an-amazon-dynamodb-time-to-live-attribute-using-amazon-emr/)」を参照してください。
+ AWS Glue および Amazon EMR – マネージド並列処理によるレート制限付き一括オペレーション。時間的制約のないアプリケーションや更新の場合、これらのオプションはスループットのごく一部しか使用せずにバックグラウンドで実行できます。どちらのサービスも、emr-dynamodb-connector を使用して DynamoDB オペレーションを実行します。これらのサービスは、大規模な読み取りを実行した後、レート制限のオプションを使用して更新済みの項目を大量に書き込みます。

  トレードオフ — 実行時間はレート制限に反比例します。この機能では、読み取りと書き込みの間に発生するデータ変更が上書きされる可能性があります。グローバルセカンダリインデックス (GSI) からの読み取りはできません。詳細については、「[Backfilling an Amazon DynamoDB Time to Live attribute using Amazon EMR: Part 2](https://aws.amazon.com/blogs/database/part-2-backfilling-an-amazon-dynamodb-time-to-live-attribute-using-amazon-emr/)」を参照してください。
+ DynamoDB シェル – SQL のようなクエリを使用したレート制限付き一括オペレーション。効率を向上させるために GSI から読み取ることができます。

  トレードオフ — 実行時間はレート制限に反比例します。「[Rate limited bulk operations in DynamoDB Shell](https://aws.amazon.com/blogs/database/rate-limited-bulk-operations-in-dynamodb-shell/)」を参照してください。

## パターンの使用
<a name="BestPractices_EfficientBulkOperations_UsingThePattern"></a>

オンデマンドスループットモードを使用する場合、特に一括更新については、コストに多大な影響が及ぶ可能性があります。プロビジョンドスループットモードを使用する場合、速度とコストの間にトレードオフが発生します。レート制限パラメータを厳密に設定すると、処理時間が非常に長くなる可能性があります。平均項目サイズとレート制限を使用すると、更新の速度を大まかに判断できます。

別の方法として、更新プロセスの予想期間と平均項目サイズに基づいて、プロセスに必要なスループットの量を決定することもできます。各パターンで共有されているブログのリファレンスには、パターン使用の戦略、実装、制限に関する詳細が記載されています。詳細については、「[Cost-effective bulk processing with Amazon DynamoDB](https://aws.amazon.com/blogs/database/cost-effective-bulk-processing-with-amazon-dynamodb/)」を参照してください。

ライブ DynamoDB テーブルに対して一括更新を実行するには、複数の方法があります。適切なアプローチは、ACID やべき等性の要件、更新する項目数、API の知識などの要因によって異なります。コストと時間の間のトレードオフを考慮することが重要となります。上記のほとんどのアプローチでは、一括更新ジョブで使用されるスループットをレート制限するオプションが提供されています。