で TryDaxTable
import argparse
import time
import sys
import amazondax
import boto3
from boto3.dynamodb.conditions import Key
def query_test(partition_key, sort_keys, iterations, dyn_resource=None):
Queries the table a specified number of times. The time before the
first iteration and the time after the last iteration are both captured
and reported.
:param partition_key: The partition key value to use in the query. The query
returns items that have partition keys equal to this value.
:param sort_keys: The range of sort key values for the query. The query returns
items that have sort key values between these two values.
:param iterations: The number of iterations to run.
:param dyn_resource: Either a Boto3 or DAX resource.
:return: The start and end times of the test.
if dyn_resource is None:
dyn_resource = boto3.resource("dynamodb")
table = dyn_resource.Table("TryDaxTable")
key_condition_expression = Key("partition_key").eq(partition_key) & Key(
start = time.perf_counter()
for _ in range(iterations):
print(".", end="")
end = time.perf_counter()
return start, end
if __name__ == "__main__":
# pylint: disable=not-context-manager
parser = argparse.ArgumentParser()
help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.",
args = parser.parse_args()
test_partition_key = 5
test_sort_keys = (2, 9)
test_iterations = 100
if args.endpoint_url:
print(f"Querying the table {test_iterations} times, using the DAX client.")
# Use a with statement so the DAX client closes the cluster after completion.
with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax:
test_start, test_end = query_test(
test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax
print(f"Querying the table {test_iterations} times, using the Boto3 client.")
test_start, test_end = query_test(
test_partition_key, test_sort_keys, test_iterations
f"Total time: {test_end - test_start:.4f} sec. Average time: "
f"{(test_end - test_start)/test_iterations}."