AWS SDK を使用して DAX で DynamoDB の読み取りを高速化する
次のコードサンプルは、以下の操作方法を示しています。
DAX クライアントと SDK クライアントの両方でデータを作成してテーブルに書き込みます。
両方のクライアントでテーブルを取得、クエリ、スキャンし、パフォーマンスを比較します。
詳細については、「DynamoDB Accelerator クライアントで開発する」を参照してください。
- Python
-
- SDK for Python (Boto3)
-
注記
GitHub には、その他のリソースもあります。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 DAX または Boto3 クライアントでテーブルを作成します。
import boto3 def create_dax_table(dyn_resource=None): """ Creates a DynamoDB table. :param dyn_resource: Either a Boto3 or DAX resource. :return: The newly created table. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table_name = "TryDaxTable" params = { "TableName": table_name, "KeySchema": [ {"AttributeName": "partition_key", "KeyType": "HASH"}, {"AttributeName": "sort_key", "KeyType": "RANGE"}, ], "AttributeDefinitions": [ {"AttributeName": "partition_key", "AttributeType": "N"}, {"AttributeName": "sort_key", "AttributeType": "N"}, ], "ProvisionedThroughput": {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}, } table = dyn_resource.create_table(**params) print(f"Creating {table_name}...") table.wait_until_exists() return table if __name__ == "__main__": dax_table = create_dax_table() print(f"Created table.")
テーブルにテストデータを書き込みます。
import boto3 def write_data_to_dax_table(key_count, item_size, dyn_resource=None): """ Writes test data to the demonstration table. :param key_count: The number of partition and sort keys to use to populate the table. The total number of items is key_count * key_count. :param item_size: The size of non-key data for each test item. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") some_data = "X" * item_size for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.put_item( Item={ "partition_key": partition_key, "sort_key": sort_key, "some_data": some_data, } ) print(f"Put item ({partition_key}, {sort_key}) succeeded.") if __name__ == "__main__": write_key_count = 10 write_item_size = 1000 print( f"Writing {write_key_count*write_key_count} items to the table. " f"Each item is {write_item_size} characters." ) write_data_to_dax_table(write_key_count, write_item_size)
DAX クライアントと Boto3 クライアントの両方について、いくつかのイテレーションの項目を取得し、それぞれに費やした時間を報告します。
import argparse import sys import time import amazondax import boto3 def get_item_test(key_count, iterations, dyn_resource=None): """ Gets items from the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param key_count: The number of items to get from the table in each iteration. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.get_item( Key={"partition_key": partition_key, "sort_key": sort_key} ) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_key_count = 10 test_iterations = 50 if args.endpoint_url: print( f"Getting each item from the table {test_iterations} times, " f"using the DAX client." ) # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = get_item_test( test_key_count, test_iterations, dyn_resource=dax ) else: print( f"Getting each item from the table {test_iterations} times, " f"using the Boto3 client." ) test_start, test_end = get_item_test(test_key_count, test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/ test_iterations}." )
DAX クライアントと Boto3 クライアントの両方について、いくつかのイテレーションのテーブルに対してクエリを実行し、それぞれに費やした時間を報告します。
import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") key_condition_expression = Key("partition_key").eq(partition_key) & Key( "sort_key" ).between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax ) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations ) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )
DAX クライアントと Boto3 クライアントの両方について、いくつかのイテレーションのテーブルをスキャンし、それぞれに費やした時間を報告します。
import argparse import time import sys import amazondax import boto3 def scan_test(iterations, dyn_resource=None): """ Scans the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): table.scan() print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_iterations = 100 if args.endpoint_url: print(f"Scanning the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = scan_test(test_iterations, dyn_resource=dax) else: print(f"Scanning the table {test_iterations} times, using the Boto3 client.") test_start, test_end = scan_test(test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )
テーブルを削除する。
import boto3 def delete_dax_table(dyn_resource=None): """ Deletes the demonstration table. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") table.delete() print(f"Deleting {table.name}...") table.wait_until_not_exists() if __name__ == "__main__": delete_dax_table() print("Table deleted!")
-
API の詳細については、「AWS SDK for Python (Boto3) API リファレンス」の以下のトピックを参照してください。
-
AWS SDK デベロッパーガイドとコード例の完全なリストについては、「AWS SDK で DynamoDB を使用する」を参照してください。このトピックには、使用開始方法に関する情報と、以前の SDK バージョンの詳細も含まれています。
シナリオ
DynamoDB テーブルにデータを送信するアプリケーションを構築する