使用 AWS SDK 通过 DAX 加快 DynamoDB 读取速度
以下代码示例展示了如何:
同时使用 DAX 和 SDK 客户端创建数据并将其写入表。
同时使用两个客户端获取、查询和扫描表,并比较其性能。
有关更多信息,请参阅使用 DynamoDB Accelerator ke'hu'd客户端进行开发。
- Python
-
- SDK for Python (Boto3)
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 使用 DAX 或 Boto3 客户端创建一个表。
import boto3 def create_dax_table(dyn_resource=None): """ Creates a DynamoDB table. :param dyn_resource: Either a Boto3 or DAX resource. :return: The newly created table. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table_name = "TryDaxTable" params = { "TableName": table_name, "KeySchema": [ {"AttributeName": "partition_key", "KeyType": "HASH"}, {"AttributeName": "sort_key", "KeyType": "RANGE"}, ], "AttributeDefinitions": [ {"AttributeName": "partition_key", "AttributeType": "N"}, {"AttributeName": "sort_key", "AttributeType": "N"}, ], "ProvisionedThroughput": {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}, } table = dyn_resource.create_table(**params) print(f"Creating {table_name}...") table.wait_until_exists() return table if __name__ == "__main__": dax_table = create_dax_table() print(f"Created table.")
将测试数据写入表中。
import boto3 def write_data_to_dax_table(key_count, item_size, dyn_resource=None): """ Writes test data to the demonstration table. :param key_count: The number of partition and sort keys to use to populate the table. The total number of items is key_count * key_count. :param item_size: The size of non-key data for each test item. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") some_data = "X" * item_size for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.put_item( Item={ "partition_key": partition_key, "sort_key": sort_key, "some_data": some_data, } ) print(f"Put item ({partition_key}, {sort_key}) succeeded.") if __name__ == "__main__": write_key_count = 10 write_item_size = 1000 print( f"Writing {write_key_count*write_key_count} items to the table. " f"Each item is {write_item_size} characters." ) write_data_to_dax_table(write_key_count, write_item_size)
获取项目以查看 DAX 客户端和 Boto3 客户端的多次迭代,并报告每个客户端花费的时间。
import argparse import sys import time import amazondax import boto3 def get_item_test(key_count, iterations, dyn_resource=None): """ Gets items from the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param key_count: The number of items to get from the table in each iteration. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.get_item( Key={"partition_key": partition_key, "sort_key": sort_key} ) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_key_count = 10 test_iterations = 50 if args.endpoint_url: print( f"Getting each item from the table {test_iterations} times, " f"using the DAX client." ) # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = get_item_test( test_key_count, test_iterations, dyn_resource=dax ) else: print( f"Getting each item from the table {test_iterations} times, " f"using the Boto3 client." ) test_start, test_end = get_item_test(test_key_count, test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/ test_iterations}." )
在表中查询 DAX 客户端和 Boto3 客户端的多次迭代,并报告每个客户端花费的时间。
import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") key_condition_expression = Key("partition_key").eq(partition_key) & Key( "sort_key" ).between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax ) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations ) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )
扫描表以查看 DAX 客户端和 Boto3 客户端的多次迭代,并报告每个客户端花费的时间。
import argparse import time import sys import amazondax import boto3 def scan_test(iterations, dyn_resource=None): """ Scans the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): table.scan() print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_iterations = 100 if args.endpoint_url: print(f"Scanning the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = scan_test(test_iterations, dyn_resource=dax) else: print(f"Scanning the table {test_iterations} times, using the Boto3 client.") test_start, test_end = scan_test(test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )
删除表。
import boto3 def delete_dax_table(dyn_resource=None): """ Deletes the demonstration table. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") table.delete() print(f"Deleting {table.name}...") table.wait_until_not_exists() if __name__ == "__main__": delete_dax_table() print("Table deleted!")
-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的以下主题。
-
有关 AWS SDK 开发人员指南和代码示例的完整列表,请参阅 结合使用 DynamoDB 与 AWS SDK。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。
场景
构建应用程序以将数据提交到 DynamoDB 表