Wählen Sie Ihre Cookie-Einstellungen aus

Wir verwenden essentielle Cookies und ähnliche Tools, die für die Bereitstellung unserer Website und Services erforderlich sind. Wir verwenden Performance-Cookies, um anonyme Statistiken zu sammeln, damit wir verstehen können, wie Kunden unsere Website nutzen, und Verbesserungen vornehmen können. Essentielle Cookies können nicht deaktiviert werden, aber Sie können auf „Anpassen“ oder „Ablehnen“ klicken, um Performance-Cookies abzulehnen.

Wenn Sie damit einverstanden sind, verwenden AWS und zugelassene Drittanbieter auch Cookies, um nützliche Features der Website bereitzustellen, Ihre Präferenzen zu speichern und relevante Inhalte, einschließlich relevanter Werbung, anzuzeigen. Um alle nicht notwendigen Cookies zu akzeptieren oder abzulehnen, klicken Sie auf „Akzeptieren“ oder „Ablehnen“. Um detailliertere Entscheidungen zu treffen, klicken Sie auf „Anpassen“.

Beschleunigen Sie DynamoDB-Lesevorgänge mit DAX mithilfe eines SDK AWS

Fokusmodus
Beschleunigen Sie DynamoDB-Lesevorgänge mit DAX mithilfe eines SDK AWS - Amazon-DynamoDB

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Wie das aussehen kann, sehen Sie am nachfolgenden Beispielcode:

  • Erstellen und Schreiben von Daten in eine Tabelle mit sowohl den DAX- als auch den SDK-Clients.

  • Abrufen, Abfragen und Scannen der Tabelle mit beiden Clients und Vergleichen ihrer Leistung.

Weitere Informationen finden Sie unter Entwickeln mit dem DynamoDB-Accelerator-Client.

Python
SDK für Python (Boto3)
Anmerkung

Es gibt noch mehr dazu GitHub. Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Erstellen Sie eine Tabelle mit dem DAX- oder Boto3-Client.

import boto3 def create_dax_table(dyn_resource=None): """ Creates a DynamoDB table. :param dyn_resource: Either a Boto3 or DAX resource. :return: The newly created table. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table_name = "TryDaxTable" params = { "TableName": table_name, "KeySchema": [ {"AttributeName": "partition_key", "KeyType": "HASH"}, {"AttributeName": "sort_key", "KeyType": "RANGE"}, ], "AttributeDefinitions": [ {"AttributeName": "partition_key", "AttributeType": "N"}, {"AttributeName": "sort_key", "AttributeType": "N"}, ], "ProvisionedThroughput": {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}, } table = dyn_resource.create_table(**params) print(f"Creating {table_name}...") table.wait_until_exists() return table if __name__ == "__main__": dax_table = create_dax_table() print(f"Created table.")

Schreiben Sie Testdaten in die Tabelle.

import boto3 def write_data_to_dax_table(key_count, item_size, dyn_resource=None): """ Writes test data to the demonstration table. :param key_count: The number of partition and sort keys to use to populate the table. The total number of items is key_count * key_count. :param item_size: The size of non-key data for each test item. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") some_data = "X" * item_size for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.put_item( Item={ "partition_key": partition_key, "sort_key": sort_key, "some_data": some_data, } ) print(f"Put item ({partition_key}, {sort_key}) succeeded.") if __name__ == "__main__": write_key_count = 10 write_item_size = 1000 print( f"Writing {write_key_count*write_key_count} items to the table. " f"Each item is {write_item_size} characters." ) write_data_to_dax_table(write_key_count, write_item_size)

Rufen Sie Elemente für eine Reihe von Iterationen sowohl für den DAX-Client als auch für den Boto3-Client ab und melden Sie die jeweils aufgewendete Zeit.

import argparse import sys import time import amazondax import boto3 def get_item_test(key_count, iterations, dyn_resource=None): """ Gets items from the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param key_count: The number of items to get from the table in each iteration. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.get_item( Key={"partition_key": partition_key, "sort_key": sort_key} ) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_key_count = 10 test_iterations = 50 if args.endpoint_url: print( f"Getting each item from the table {test_iterations} times, " f"using the DAX client." ) # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = get_item_test( test_key_count, test_iterations, dyn_resource=dax ) else: print( f"Getting each item from the table {test_iterations} times, " f"using the Boto3 client." ) test_start, test_end = get_item_test(test_key_count, test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/ test_iterations}." )

Fragen Sie die Tabelle im Hinblick auf eine Reihe von Iterationen sowohl für den DAX-Client als auch für den Boto3-Client ab und melden Sie die jeweils aufgewendete Zeit.

import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") key_condition_expression = Key("partition_key").eq(partition_key) & Key( "sort_key" ).between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax ) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations ) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )

Scannen Sie die Tabelle auf eine Reihe von Iterationen sowohl für den DAX-Client als auch für den Boto3-Client und melden Sie die jeweils aufgewendete Zeit.

import argparse import time import sys import amazondax import boto3 def scan_test(iterations, dyn_resource=None): """ Scans the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): table.scan() print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_iterations = 100 if args.endpoint_url: print(f"Scanning the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = scan_test(test_iterations, dyn_resource=dax) else: print(f"Scanning the table {test_iterations} times, using the Boto3 client.") test_start, test_end = scan_test(test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )

Löschen Sie die Tabelle.

import boto3 def delete_dax_table(dyn_resource=None): """ Deletes the demonstration table. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") table.delete() print(f"Deleting {table.name}...") table.wait_until_not_exists() if __name__ == "__main__": delete_dax_table() print("Table deleted!")
SDK für Python (Boto3)
Anmerkung

Es gibt noch mehr dazu GitHub. Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Erstellen Sie eine Tabelle mit dem DAX- oder Boto3-Client.

import boto3 def create_dax_table(dyn_resource=None): """ Creates a DynamoDB table. :param dyn_resource: Either a Boto3 or DAX resource. :return: The newly created table. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table_name = "TryDaxTable" params = { "TableName": table_name, "KeySchema": [ {"AttributeName": "partition_key", "KeyType": "HASH"}, {"AttributeName": "sort_key", "KeyType": "RANGE"}, ], "AttributeDefinitions": [ {"AttributeName": "partition_key", "AttributeType": "N"}, {"AttributeName": "sort_key", "AttributeType": "N"}, ], "ProvisionedThroughput": {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}, } table = dyn_resource.create_table(**params) print(f"Creating {table_name}...") table.wait_until_exists() return table if __name__ == "__main__": dax_table = create_dax_table() print(f"Created table.")

Schreiben Sie Testdaten in die Tabelle.

import boto3 def write_data_to_dax_table(key_count, item_size, dyn_resource=None): """ Writes test data to the demonstration table. :param key_count: The number of partition and sort keys to use to populate the table. The total number of items is key_count * key_count. :param item_size: The size of non-key data for each test item. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") some_data = "X" * item_size for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.put_item( Item={ "partition_key": partition_key, "sort_key": sort_key, "some_data": some_data, } ) print(f"Put item ({partition_key}, {sort_key}) succeeded.") if __name__ == "__main__": write_key_count = 10 write_item_size = 1000 print( f"Writing {write_key_count*write_key_count} items to the table. " f"Each item is {write_item_size} characters." ) write_data_to_dax_table(write_key_count, write_item_size)

Rufen Sie Elemente für eine Reihe von Iterationen sowohl für den DAX-Client als auch für den Boto3-Client ab und melden Sie die jeweils aufgewendete Zeit.

import argparse import sys import time import amazondax import boto3 def get_item_test(key_count, iterations, dyn_resource=None): """ Gets items from the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param key_count: The number of items to get from the table in each iteration. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.get_item( Key={"partition_key": partition_key, "sort_key": sort_key} ) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_key_count = 10 test_iterations = 50 if args.endpoint_url: print( f"Getting each item from the table {test_iterations} times, " f"using the DAX client." ) # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = get_item_test( test_key_count, test_iterations, dyn_resource=dax ) else: print( f"Getting each item from the table {test_iterations} times, " f"using the Boto3 client." ) test_start, test_end = get_item_test(test_key_count, test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/ test_iterations}." )

Fragen Sie die Tabelle im Hinblick auf eine Reihe von Iterationen sowohl für den DAX-Client als auch für den Boto3-Client ab und melden Sie die jeweils aufgewendete Zeit.

import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") key_condition_expression = Key("partition_key").eq(partition_key) & Key( "sort_key" ).between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax ) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations ) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )

Scannen Sie die Tabelle auf eine Reihe von Iterationen sowohl für den DAX-Client als auch für den Boto3-Client und melden Sie die jeweils aufgewendete Zeit.

import argparse import time import sys import amazondax import boto3 def scan_test(iterations, dyn_resource=None): """ Scans the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): table.scan() print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_iterations = 100 if args.endpoint_url: print(f"Scanning the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = scan_test(test_iterations, dyn_resource=dax) else: print(f"Scanning the table {test_iterations} times, using the Boto3 client.") test_start, test_end = scan_test(test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )

Löschen Sie die Tabelle.

import boto3 def delete_dax_table(dyn_resource=None): """ Deletes the demonstration table. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") table.delete() print(f"Deleting {table.name}...") table.wait_until_not_exists() if __name__ == "__main__": delete_dax_table() print("Table deleted!")

Eine vollständige Liste der AWS SDK-Entwicklerhandbücher und Codebeispiele finden Sie unterDynamoDB mit einem SDK verwenden AWS. Dieses Thema enthält auch Informationen zu den ersten Schritten und Details zu früheren SDK-Versionen.

DatenschutzNutzungsbedingungen für die WebsiteCookie-Einstellungen
© 2025, Amazon Web Services, Inc. oder Tochtergesellschaften. Alle Rechte vorbehalten.