用于 Python 的 DynamoDB 示例 (B SDK oto3) - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用于 Python 的 DynamoDB 示例 (B SDK oto3)

以下代码示例向您展示了如何在 DynamoDB 中使用来执行操作和实现常见场景。 AWS SDK for Python (Boto3)

基础知识是向您展示如何在服务中执行基本操作的代码示例。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您展示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以在其中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例演示如何开始使用 DynamoDB。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import boto3 # Create a DynamoDB client using the default credentials and region dynamodb = boto3.client("dynamodb") # Initialize a paginator for the list_tables operation paginator = dynamodb.get_paginator("list_tables") # Create a PageIterator from the paginator page_iterator = paginator.paginate(Limit=10) # List the tables in the current AWS account print("Here are the DynamoDB tables in your account:") # Use pagination to list all tables table_names = [] for page in page_iterator: for table_name in page.get("TableNames", []): print(f"- {table_name}") table_names.append(table_name) if not table_names: print("You don't have any DynamoDB tables in your account.") else: print(f"\nFound {len(table_names)} tables.")
  • 有关API详细信息,请参阅ListTables中的 AWS SDKPython (Boto3) API 参考。

基础知识

以下代码示例展示了如何:

  • 创建可保存电影数据的表。

  • 在表中加入单一电影,获取并更新此电影。

  • 将样本JSON文件中的影片数据写入表中。

  • 查询在给定年份发行的电影。

  • 扫描在年份范围内发行的电影。

  • 删除表中的电影后再删除表。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

创建封装 DynamoDB 表的类。

from decimal import Decimal from io import BytesIO import json import logging import os from pprint import pprint import requests from zipfile import ZipFile import boto3 from boto3.dynamodb.conditions import Key from botocore.exceptions import ClientError from question import Question logger = logging.getLogger(__name__) class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def exists(self, table_name): """ Determines whether a table exists. As a side effect, stores the table in a member variable. :param table_name: The name of the table to check. :return: True when the table exists; otherwise, False. """ try: table = self.dyn_resource.Table(table_name) table.load() exists = True except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": exists = False else: logger.error( "Couldn't check for existence of %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: self.table = table return exists def create_table(self, table_name): """ Creates an Amazon DynamoDB table that can be used to store movie data. The table uses the release year of the movie as the partition key and the title as the sort key. :param table_name: The name of the table to create. :return: The newly created table. """ try: self.table = self.dyn_resource.create_table( TableName=table_name, KeySchema=[ {"AttributeName": "year", "KeyType": "HASH"}, # Partition key {"AttributeName": "title", "KeyType": "RANGE"}, # Sort key ], AttributeDefinitions=[ {"AttributeName": "year", "AttributeType": "N"}, {"AttributeName": "title", "AttributeType": "S"}, ], ProvisionedThroughput={ "ReadCapacityUnits": 10, "WriteCapacityUnits": 10, }, ) self.table.wait_until_exists() except ClientError as err: logger.error( "Couldn't create table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return self.table def list_tables(self): """ Lists the Amazon DynamoDB tables for the current account. :return: The list of tables. """ try: tables = [] for table in self.dyn_resource.tables.all(): print(table.name) tables.append(table) except ClientError as err: logger.error( "Couldn't list tables. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return tables def write_batch(self, movies): """ Fills an Amazon DynamoDB table with the specified data, using the Boto3 Table.batch_writer() function to put the items in the table. Inside the context manager, Table.batch_writer builds a list of requests. On exiting the context manager, Table.batch_writer starts sending batches of write requests to Amazon DynamoDB and automatically handles chunking, buffering, and retrying. :param movies: The data to put in the table. Each item must contain at least the keys required by the schema that was specified when the table was created. """ try: with self.table.batch_writer() as writer: for movie in movies: writer.put_item(Item=movie) except ClientError as err: logger.error( "Couldn't load data into table %s. Here's why: %s: %s", self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def add_movie(self, title, year, plot, rating): """ Adds a movie to the table. :param title: The title of the movie. :param year: The release year of the movie. :param plot: The plot summary of the movie. :param rating: The quality rating of the movie. """ try: self.table.put_item( Item={ "year": year, "title": title, "info": {"plot": plot, "rating": Decimal(str(rating))}, } ) except ClientError as err: logger.error( "Couldn't add movie %s to table %s. Here's why: %s: %s", title, self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_movie(self, title, year): """ Gets movie data from the table for a specific movie. :param title: The title of the movie. :param year: The release year of the movie. :return: The data about the requested movie. """ try: response = self.table.get_item(Key={"year": year, "title": title}) except ClientError as err: logger.error( "Couldn't get movie %s from table %s. Here's why: %s: %s", title, self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Item"] def update_movie(self, title, year, rating, plot): """ Updates rating and plot data for a movie in the table. :param title: The title of the movie to update. :param year: The release year of the movie to update. :param rating: The updated rating to the give the movie. :param plot: The updated plot summary to give the movie. :return: The fields that were updated, with their new values. """ try: response = self.table.update_item( Key={"year": year, "title": title}, UpdateExpression="set info.rating=:r, info.plot=:p", ExpressionAttributeValues={":r": Decimal(str(rating)), ":p": plot}, ReturnValues="UPDATED_NEW", ) except ClientError as err: logger.error( "Couldn't update movie %s in table %s. Here's why: %s: %s", title, self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Attributes"] def query_movies(self, year): """ Queries for movies that were released in the specified year. :param year: The year to query. :return: The list of movies that were released in the specified year. """ try: response = self.table.query(KeyConditionExpression=Key("year").eq(year)) except ClientError as err: logger.error( "Couldn't query for movies released in %s. Here's why: %s: %s", year, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Items"] def scan_movies(self, year_range): """ Scans for movies that were released in a range of years. Uses a projection expression to return a subset of data for each movie. :param year_range: The range of years to retrieve. :return: The list of movies released in the specified years. """ movies = [] scan_kwargs = { "FilterExpression": Key("year").between( year_range["first"], year_range["second"] ), "ProjectionExpression": "#yr, title, info.rating", "ExpressionAttributeNames": {"#yr": "year"}, } try: done = False start_key = None while not done: if start_key: scan_kwargs["ExclusiveStartKey"] = start_key response = self.table.scan(**scan_kwargs) movies.extend(response.get("Items", [])) start_key = response.get("LastEvaluatedKey", None) done = start_key is None except ClientError as err: logger.error( "Couldn't scan for movies. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return movies def delete_movie(self, title, year): """ Deletes a movie from the table. :param title: The title of the movie to delete. :param year: The release year of the movie to delete. """ try: self.table.delete_item(Key={"year": year, "title": title}) except ClientError as err: logger.error( "Couldn't delete movie %s. Here's why: %s: %s", title, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_table(self): """ Deletes the table. """ try: self.table.delete() self.table = None except ClientError as err: logger.error( "Couldn't delete table. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

创建一个辅助函数来下载和提取示例JSON文件。

def get_sample_movie_data(movie_file_name): """ Gets sample movie data, either from a local file or by first downloading it from the Amazon DynamoDB developer guide. :param movie_file_name: The local file name where the movie data is stored in JSON format. :return: The movie data as a dict. """ if not os.path.isfile(movie_file_name): print(f"Downloading {movie_file_name}...") movie_content = requests.get( "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/moviedata.zip" ) movie_zip = ZipFile(BytesIO(movie_content.content)) movie_zip.extractall() try: with open(movie_file_name) as movie_file: movie_data = json.load(movie_file, parse_float=Decimal) except FileNotFoundError: print( f"File {movie_file_name} not found. You must first download the file to " "run this demo. See the README for instructions." ) raise else: # The sample file lists over 4000 movies, return only the first 250. return movie_data[:250]

运行交互式场景以创建表并对其执行操作。

def run_scenario(table_name, movie_file_name, dyn_resource): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the Amazon DynamoDB getting started demo.") print("-" * 88) movies = Movies(dyn_resource) movies_exists = movies.exists(table_name) if not movies_exists: print(f"\nCreating table {table_name}...") movies.create_table(table_name) print(f"\nCreated table {movies.table.name}.") my_movie = Question.ask_questions( [ Question( "title", "Enter the title of a movie you want to add to the table: " ), Question("year", "What year was it released? ", Question.is_int), Question( "rating", "On a scale of 1 - 10, how do you rate it? ", Question.is_float, Question.in_range(1, 10), ), Question("plot", "Summarize the plot for me: "), ] ) movies.add_movie(**my_movie) print(f"\nAdded '{my_movie['title']}' to '{movies.table.name}'.") print("-" * 88) movie_update = Question.ask_questions( [ Question( "rating", f"\nLet's update your movie.\nYou rated it {my_movie['rating']}, what new " f"rating would you give it? ", Question.is_float, Question.in_range(1, 10), ), Question( "plot", f"You summarized the plot as '{my_movie['plot']}'.\nWhat would you say now? ", ), ] ) my_movie.update(movie_update) updated = movies.update_movie(**my_movie) print(f"\nUpdated '{my_movie['title']}' with new attributes:") pprint(updated) print("-" * 88) if not movies_exists: movie_data = get_sample_movie_data(movie_file_name) print(f"\nReading data from '{movie_file_name}' into your table.") movies.write_batch(movie_data) print(f"\nWrote {len(movie_data)} movies into {movies.table.name}.") print("-" * 88) title = "The Lord of the Rings: The Fellowship of the Ring" if Question.ask_question( f"Let's move on...do you want to get info about '{title}'? (y/n) ", Question.is_yesno, ): movie = movies.get_movie(title, 2001) print("\nHere's what I found:") pprint(movie) print("-" * 88) ask_for_year = True while ask_for_year: release_year = Question.ask_question( f"\nLet's get a list of movies released in a given year. Enter a year between " f"1972 and 2018: ", Question.is_int, Question.in_range(1972, 2018), ) releases = movies.query_movies(release_year) if releases: print(f"There were {len(releases)} movies released in {release_year}:") for release in releases: print(f"\t{release['title']}") ask_for_year = False else: print(f"I don't know about any movies released in {release_year}!") ask_for_year = Question.ask_question( "Try another year? (y/n) ", Question.is_yesno ) print("-" * 88) years = Question.ask_questions( [ Question( "first", f"\nNow let's scan for movies released in a range of years. Enter a year: ", Question.is_int, Question.in_range(1972, 2018), ), Question( "second", "Now enter another year: ", Question.is_int, Question.in_range(1972, 2018), ), ] ) releases = movies.scan_movies(years) if releases: count = Question.ask_question( f"\nFound {len(releases)} movies. How many do you want to see? ", Question.is_int, Question.in_range(1, len(releases)), ) print(f"\nHere are your {count} movies:\n") pprint(releases[:count]) else: print( f"I don't know about any movies released between {years['first']} " f"and {years['second']}." ) print("-" * 88) if Question.ask_question( f"\nLet's remove your movie from the table. Do you want to remove " f"'{my_movie['title']}'? (y/n)", Question.is_yesno, ): movies.delete_movie(my_movie["title"], my_movie["year"]) print(f"\nRemoved '{my_movie['title']}' from the table.") print("-" * 88) if Question.ask_question(f"\nDelete the table? (y/n) ", Question.is_yesno): movies.delete_table() print(f"Deleted {table_name}.") else: print( "Don't forget to delete the table when you're done or you might incur " "charges on your account." ) print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": try: run_scenario( "doc-example-table-movies", "moviedata.json", boto3.resource("dynamodb") ) except Exception as e: print(f"Something went wrong with the demo! Here's what: {e}")

此场景使用以下帮助程序类在命令提示符处提问。

class Question: """ A helper class to ask questions at a command prompt and validate and convert the answers. """ def __init__(self, key, question, *validators): """ :param key: The key that is used for storing the answer in a dict, when multiple questions are asked in a set. :param question: The question to ask. :param validators: The answer is passed through the list of validators until one fails or they all pass. Validators may also convert the answer to another form, such as from a str to an int. """ self.key = key self.question = question self.validators = Question.non_empty, *validators @staticmethod def ask_questions(questions): """ Asks a set of questions and stores the answers in a dict. :param questions: The list of questions to ask. :return: A dict of answers. """ answers = {} for question in questions: answers[question.key] = Question.ask_question( question.question, *question.validators ) return answers @staticmethod def ask_question(question, *validators): """ Asks a single question and validates it against a list of validators. When an answer fails validation, the complaint is printed and the question is asked again. :param question: The question to ask. :param validators: The list of validators that the answer must pass. :return: The answer, converted to its final form by the validators. """ answer = None while answer is None: answer = input(question) for validator in validators: answer, complaint = validator(answer) if answer is None: print(complaint) break return answer @staticmethod def non_empty(answer): """ Validates that the answer is not empty. :return: The non-empty answer, or None. """ return answer if answer != "" else None, "I need an answer. Please?" @staticmethod def is_yesno(answer): """ Validates a yes/no answer. :return: True when the answer is 'y'; otherwise, False. """ return answer.lower() == "y", "" @staticmethod def is_int(answer): """ Validates that the answer can be converted to an int. :return: The int answer; otherwise, None. """ try: int_answer = int(answer) except ValueError: int_answer = None return int_answer, f"{answer} must be a valid integer." @staticmethod def is_letter(answer): """ Validates that the answer is a letter. :return The letter answer, converted to uppercase; otherwise, None. """ return ( answer.upper() if answer.isalpha() else None, f"{answer} must be a single letter.", ) @staticmethod def is_float(answer): """ Validate that the answer can be converted to a float. :return The float answer; otherwise, None. """ try: float_answer = float(answer) except ValueError: float_answer = None return float_answer, f"{answer} must be a valid float." @staticmethod def in_range(lower, upper): """ Validate that the answer is within a range. The answer must be of a type that can be compared to the lower and upper bounds. :return: The answer, if it is within the range; otherwise, None. """ def _validate(answer): return ( answer if lower <= answer <= upper else None, f"{answer} must be between {lower} and {upper}.", ) return _validate

操作

以下代码示例显示了如何使用BatchExecuteStatement

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class PartiQLBatchWrapper: """ Encapsulates a DynamoDB resource to run PartiQL statements. """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource def run_partiql(self, statements, param_list): """ Runs a PartiQL statement. A Boto3 resource is used even though `execute_statement` is called on the underlying `client` object because the resource transforms input and output from plain old Python objects (POPOs) to the DynamoDB format. If you create the client directly, you must do these transforms yourself. :param statements: The batch of PartiQL statements. :param param_list: The batch of PartiQL parameters that are associated with each statement. This list must be in the same order as the statements. :return: The responses returned from running the statements, if any. """ try: output = self.dyn_resource.meta.client.batch_execute_statement( Statements=[ {"Statement": statement, "Parameters": params} for statement, params in zip(statements, param_list) ] ) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Couldn't execute batch of PartiQL statements because the table " "does not exist." ) else: logger.error( "Couldn't execute batch of PartiQL statements. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return output

以下代码示例显示了如何使用BatchGetItem

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import decimal import json import logging import os import pprint import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) dynamodb = boto3.resource("dynamodb") MAX_GET_SIZE = 100 # Amazon DynamoDB rejects a get batch larger than 100 items. def do_batch_get(batch_keys): """ Gets a batch of items from Amazon DynamoDB. Batches can contain keys from more than one table. When Amazon DynamoDB cannot process all items in a batch, a set of unprocessed keys is returned. This function uses an exponential backoff algorithm to retry getting the unprocessed keys until all are retrieved or the specified number of tries is reached. :param batch_keys: The set of keys to retrieve. A batch can contain at most 100 keys. Otherwise, Amazon DynamoDB returns an error. :return: The dictionary of retrieved items grouped under their respective table names. """ tries = 0 max_tries = 5 sleepy_time = 1 # Start with 1 second of sleep, then exponentially increase. retrieved = {key: [] for key in batch_keys} while tries < max_tries: response = dynamodb.batch_get_item(RequestItems=batch_keys) # Collect any retrieved items and retry unprocessed keys. for key in response.get("Responses", []): retrieved[key] += response["Responses"][key] unprocessed = response["UnprocessedKeys"] if len(unprocessed) > 0: batch_keys = unprocessed unprocessed_count = sum( [len(batch_key["Keys"]) for batch_key in batch_keys.values()] ) logger.info( "%s unprocessed keys returned. Sleep, then retry.", unprocessed_count ) tries += 1 if tries < max_tries: logger.info("Sleeping for %s seconds.", sleepy_time) time.sleep(sleepy_time) sleepy_time = min(sleepy_time * 2, 32) else: break return retrieved
  • 有关API详细信息,请参阅BatchGetItem中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用BatchWriteItem

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def write_batch(self, movies): """ Fills an Amazon DynamoDB table with the specified data, using the Boto3 Table.batch_writer() function to put the items in the table. Inside the context manager, Table.batch_writer builds a list of requests. On exiting the context manager, Table.batch_writer starts sending batches of write requests to Amazon DynamoDB and automatically handles chunking, buffering, and retrying. :param movies: The data to put in the table. Each item must contain at least the keys required by the schema that was specified when the table was created. """ try: with self.table.batch_writer() as writer: for movie in movies: writer.put_item(Item=movie) except ClientError as err: logger.error( "Couldn't load data into table %s. Here's why: %s: %s", self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅BatchWriteItem中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用CreateTable

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

创建用于存储电影数据的表。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def create_table(self, table_name): """ Creates an Amazon DynamoDB table that can be used to store movie data. The table uses the release year of the movie as the partition key and the title as the sort key. :param table_name: The name of the table to create. :return: The newly created table. """ try: self.table = self.dyn_resource.create_table( TableName=table_name, KeySchema=[ {"AttributeName": "year", "KeyType": "HASH"}, # Partition key {"AttributeName": "title", "KeyType": "RANGE"}, # Sort key ], AttributeDefinitions=[ {"AttributeName": "year", "AttributeType": "N"}, {"AttributeName": "title", "AttributeType": "S"}, ], ProvisionedThroughput={ "ReadCapacityUnits": 10, "WriteCapacityUnits": 10, }, ) self.table.wait_until_exists() except ClientError as err: logger.error( "Couldn't create table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return self.table
  • 有关API详细信息,请参阅CreateTable中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用DeleteItem

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def delete_movie(self, title, year): """ Deletes a movie from the table. :param title: The title of the movie to delete. :param year: The release year of the movie to delete. """ try: self.table.delete_item(Key={"year": year, "title": title}) except ClientError as err: logger.error( "Couldn't delete movie %s. Here's why: %s: %s", title, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

您可以指定条件,以便只有在项目满足特定条件时才会被删除。

class UpdateQueryWrapper: def __init__(self, table): self.table = table def delete_underrated_movie(self, title, year, rating): """ Deletes a movie only if it is rated below a specified value. By using a condition expression in a delete operation, you can specify that an item is deleted only when it meets certain criteria. :param title: The title of the movie to delete. :param year: The release year of the movie to delete. :param rating: The rating threshold to check before deleting the movie. """ try: self.table.delete_item( Key={"year": year, "title": title}, ConditionExpression="info.rating <= :val", ExpressionAttributeValues={":val": Decimal(str(rating))}, ) except ClientError as err: if err.response["Error"]["Code"] == "ConditionalCheckFailedException": logger.warning( "Didn't delete %s because its rating is greater than %s.", title, rating, ) else: logger.error( "Couldn't delete movie %s. Here's why: %s: %s", title, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅DeleteItem中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用DeleteTable

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def delete_table(self): """ Deletes the table. """ try: self.table.delete() self.table = None except ClientError as err: logger.error( "Couldn't delete table. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅DeleteTable中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用DescribeTable

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def exists(self, table_name): """ Determines whether a table exists. As a side effect, stores the table in a member variable. :param table_name: The name of the table to check. :return: True when the table exists; otherwise, False. """ try: table = self.dyn_resource.Table(table_name) table.load() exists = True except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": exists = False else: logger.error( "Couldn't check for existence of %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: self.table = table return exists
  • 有关API详细信息,请参阅DescribeTable中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用DescribeTimeToLive

SDK适用于 Python (Boto3)
import boto3 def describe_ttl(table_name, region): """ Describes TTL on an existing table, as well as a region. :param table_name: String representing the name of the table :param region: AWS Region of the table - example `us-east-1` :return: Time to live description. """ try: dynamodb = boto3.resource('dynamodb', region_name=region) ttl_description = dynamodb.describe_time_to_live(TableName=table_name) print( f"TimeToLive for table {table_name} is status {ttl_description['TimeToLiveDescription']['TimeToLiveStatus']}") return ttl_description except Exception as e: print(f"Error describing table: {e}") raise # Enter your own table name and AWS region describe_ttl('your-table-name', 'us-east-1')
  • 有关API详细信息,请参阅DescribeTimeToLive中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用ExecuteStatement

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class PartiQLWrapper: """ Encapsulates a DynamoDB resource to run PartiQL statements. """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource def run_partiql(self, statement, params): """ Runs a PartiQL statement. A Boto3 resource is used even though `execute_statement` is called on the underlying `client` object because the resource transforms input and output from plain old Python objects (POPOs) to the DynamoDB format. If you create the client directly, you must do these transforms yourself. :param statement: The PartiQL statement. :param params: The list of PartiQL parameters. These are applied to the statement in the order they are listed. :return: The items returned from the statement, if any. """ try: output = self.dyn_resource.meta.client.execute_statement( Statement=statement, Parameters=params ) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Couldn't execute PartiQL '%s' because the table does not exist.", statement, ) else: logger.error( "Couldn't execute PartiQL '%s'. Here's why: %s: %s", statement, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return output
  • 有关API详细信息,请参阅ExecuteStatement中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用GetItem

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def get_movie(self, title, year): """ Gets movie data from the table for a specific movie. :param title: The title of the movie. :param year: The release year of the movie. :return: The data about the requested movie. """ try: response = self.table.get_item(Key={"year": year, "title": title}) except ClientError as err: logger.error( "Couldn't get movie %s from table %s. Here's why: %s: %s", title, self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Item"]
  • 有关API详细信息,请参阅GetItem中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用ListTables

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def list_tables(self): """ Lists the Amazon DynamoDB tables for the current account. :return: The list of tables. """ try: tables = [] for table in self.dyn_resource.tables.all(): print(table.name) tables.append(table) except ClientError as err: logger.error( "Couldn't list tables. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return tables
  • 有关API详细信息,请参阅ListTables中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用PutItem

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def add_movie(self, title, year, plot, rating): """ Adds a movie to the table. :param title: The title of the movie. :param year: The release year of the movie. :param plot: The plot summary of the movie. :param rating: The quality rating of the movie. """ try: self.table.put_item( Item={ "year": year, "title": title, "info": {"plot": plot, "rating": Decimal(str(rating))}, } ) except ClientError as err: logger.error( "Couldn't add movie %s to table %s. Here's why: %s: %s", title, self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅PutItem中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用Query

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

使用关键条件表达式查询项目。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def query_movies(self, year): """ Queries for movies that were released in the specified year. :param year: The year to query. :return: The list of movies that were released in the specified year. """ try: response = self.table.query(KeyConditionExpression=Key("year").eq(year)) except ClientError as err: logger.error( "Couldn't query for movies released in %s. Here's why: %s: %s", year, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Items"]

查询项目并将其投影以返回数据的子集。

class UpdateQueryWrapper: def __init__(self, table): self.table = table def query_and_project_movies(self, year, title_bounds): """ Query for movies that were released in a specified year and that have titles that start within a range of letters. A projection expression is used to return a subset of data for each movie. :param year: The release year to query. :param title_bounds: The range of starting letters to query. :return: The list of movies. """ try: response = self.table.query( ProjectionExpression="#yr, title, info.genres, info.actors[0]", ExpressionAttributeNames={"#yr": "year"}, KeyConditionExpression=( Key("year").eq(year) & Key("title").between( title_bounds["first"], title_bounds["second"] ) ), ) except ClientError as err: if err.response["Error"]["Code"] == "ValidationException": logger.warning( "There's a validation error. Here's the message: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) else: logger.error( "Couldn't query for movies. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Items"]
  • 有关API详细信息,请参阅查询 Python (Boto3) API 参考文档。AWS SDK

以下代码示例显示了如何使用Scan

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def scan_movies(self, year_range): """ Scans for movies that were released in a range of years. Uses a projection expression to return a subset of data for each movie. :param year_range: The range of years to retrieve. :return: The list of movies released in the specified years. """ movies = [] scan_kwargs = { "FilterExpression": Key("year").between( year_range["first"], year_range["second"] ), "ProjectionExpression": "#yr, title, info.rating", "ExpressionAttributeNames": {"#yr": "year"}, } try: done = False start_key = None while not done: if start_key: scan_kwargs["ExclusiveStartKey"] = start_key response = self.table.scan(**scan_kwargs) movies.extend(response.get("Items", [])) start_key = response.get("LastEvaluatedKey", None) done = start_key is None except ClientError as err: logger.error( "Couldn't scan for movies. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return movies
  • 有关API详细信息,请参见扫描以获取 Python (Boto3) API 参考。AWS SDK

以下代码示例显示了如何使用UpdateItem

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

使用更新表达式更新项目

class Movies: """Encapsulates an Amazon DynamoDB table of movie data. Example data structure for a movie record in this table: { "year": 1999, "title": "For Love of the Game", "info": { "directors": ["Sam Raimi"], "release_date": "1999-09-15T00:00:00Z", "rating": 6.3, "plot": "A washed up pitcher flashes through his career.", "rank": 4987, "running_time_secs": 8220, "actors": [ "Kevin Costner", "Kelly Preston", "John C. Reilly" ] } } """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource # The table variable is set during the scenario in the call to # 'exists' if the table exists. Otherwise, it is set by 'create_table'. self.table = None def update_movie(self, title, year, rating, plot): """ Updates rating and plot data for a movie in the table. :param title: The title of the movie to update. :param year: The release year of the movie to update. :param rating: The updated rating to the give the movie. :param plot: The updated plot summary to give the movie. :return: The fields that were updated, with their new values. """ try: response = self.table.update_item( Key={"year": year, "title": title}, UpdateExpression="set info.rating=:r, info.plot=:p", ExpressionAttributeValues={":r": Decimal(str(rating)), ":p": plot}, ReturnValues="UPDATED_NEW", ) except ClientError as err: logger.error( "Couldn't update movie %s in table %s. Here's why: %s: %s", title, self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Attributes"]

使用包含算术运算的更新表达式更新项目。

class UpdateQueryWrapper: def __init__(self, table): self.table = table def update_rating(self, title, year, rating_change): """ Updates the quality rating of a movie in the table by using an arithmetic operation in the update expression. By specifying an arithmetic operation, you can adjust a value in a single request, rather than first getting its value and then setting its new value. :param title: The title of the movie to update. :param year: The release year of the movie to update. :param rating_change: The amount to add to the current rating for the movie. :return: The updated rating. """ try: response = self.table.update_item( Key={"year": year, "title": title}, UpdateExpression="set info.rating = info.rating + :val", ExpressionAttributeValues={":val": Decimal(str(rating_change))}, ReturnValues="UPDATED_NEW", ) except ClientError as err: logger.error( "Couldn't update movie %s in table %s. Here's why: %s: %s", title, self.table.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Attributes"]

只有在项目满足特定条件时才更新项目。

class UpdateQueryWrapper: def __init__(self, table): self.table = table def remove_actors(self, title, year, actor_threshold): """ Removes an actor from a movie, but only when the number of actors is greater than a specified threshold. If the movie does not list more than the threshold, no actors are removed. :param title: The title of the movie to update. :param year: The release year of the movie to update. :param actor_threshold: The threshold of actors to check. :return: The movie data after the update. """ try: response = self.table.update_item( Key={"year": year, "title": title}, UpdateExpression="remove info.actors[0]", ConditionExpression="size(info.actors) > :num", ExpressionAttributeValues={":num": actor_threshold}, ReturnValues="ALL_NEW", ) except ClientError as err: if err.response["Error"]["Code"] == "ConditionalCheckFailedException": logger.warning( "Didn't update %s because it has fewer than %s actors.", title, actor_threshold + 1, ) else: logger.error( "Couldn't update movie %s. Here's why: %s: %s", title, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Attributes"]
  • 有关API详细信息,请参阅UpdateItem中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何使用UpdateTimeToLive

SDK适用于 Python (Boto3)

在现有 DynamoDB 表TTL上启用。

import boto3 def enable_ttl(table_name, ttl_attribute_name): """ Enables TTL on DynamoDB table for a given attribute name on success, returns a status code of 200 on error, throws an exception :param table_name: Name of the DynamoDB table :param ttl_attribute_name: The name of the TTL attribute being provided to the table. """ try: dynamodb = boto3.client('dynamodb') # Enable TTL on an existing DynamoDB table response = dynamodb.update_time_to_live( TableName=table_name, TimeToLiveSpecification={ 'Enabled': True, 'AttributeName': ttl_attribute_name } ) # In the returned response, check for a successful status code. if response['ResponseMetadata']['HTTPStatusCode'] == 200: print("TTL has been enabled successfully.") else: print(f"Failed to enable TTL, status code {response['ResponseMetadata']['HTTPStatusCode']}") return response except Exception as ex: print("Couldn't enable TTL in table %s. Here's why: %s" % (table_name, ex)) raise # your values enable_ttl('your-table-name', 'expireAt')

在现有 DynamoDB 表TTL上禁用。

import boto3 def disable_ttl(table_name, ttl_attribute_name): """ Disables TTL on DynamoDB table for a given attribute name on success, returns a status code of 200 on error, throws an exception :param table_name: Name of the DynamoDB table being modified :param ttl_attribute_name: The name of the TTL attribute being provided to the table. """ try: dynamodb = boto3.client('dynamodb') # Enable TTL on an existing DynamoDB table response = dynamodb.update_time_to_live( TableName=table_name, TimeToLiveSpecification={ 'Enabled': False, 'AttributeName': ttl_attribute_name } ) # In the returned response, check for a successful status code. if response['ResponseMetadata']['HTTPStatusCode'] == 200: print("TTL has been disabled successfully.") else: print(f"Failed to disable TTL, status code {response['ResponseMetadata']['HTTPStatusCode']}") except Exception as ex: print("Couldn't disable TTL in table %s. Here's why: %s" % (table_name, ex)) raise # your values disable_ttl('your-table-name', 'expireAt')
  • 有关API详细信息,请参阅UpdateTimeToLive中的 AWS SDKPython (Boto3) API 参考。

场景

以下代码示例展示了如何:

  • 同时使用和SDK客户端创建数据并将其写入表。DAX

  • 同时使用两个客户端获取、查询和扫描表,并比较其性能。

有关更多信息,请参阅使用 DynamoDB Accelerator 客户端进行开发

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

使用DAX或 Boto3 客户端创建表。

import boto3 def create_dax_table(dyn_resource=None): """ Creates a DynamoDB table. :param dyn_resource: Either a Boto3 or DAX resource. :return: The newly created table. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table_name = "TryDaxTable" params = { "TableName": table_name, "KeySchema": [ {"AttributeName": "partition_key", "KeyType": "HASH"}, {"AttributeName": "sort_key", "KeyType": "RANGE"}, ], "AttributeDefinitions": [ {"AttributeName": "partition_key", "AttributeType": "N"}, {"AttributeName": "sort_key", "AttributeType": "N"}, ], "ProvisionedThroughput": {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}, } table = dyn_resource.create_table(**params) print(f"Creating {table_name}...") table.wait_until_exists() return table if __name__ == "__main__": dax_table = create_dax_table() print(f"Created table.")

将测试数据写入表中。

import boto3 def write_data_to_dax_table(key_count, item_size, dyn_resource=None): """ Writes test data to the demonstration table. :param key_count: The number of partition and sort keys to use to populate the table. The total number of items is key_count * key_count. :param item_size: The size of non-key data for each test item. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") some_data = "X" * item_size for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.put_item( Item={ "partition_key": partition_key, "sort_key": sort_key, "some_data": some_data, } ) print(f"Put item ({partition_key}, {sort_key}) succeeded.") if __name__ == "__main__": write_key_count = 10 write_item_size = 1000 print( f"Writing {write_key_count*write_key_count} items to the table. " f"Each item is {write_item_size} characters." ) write_data_to_dax_table(write_key_count, write_item_size)

获取客户端和 Boto3 DAX 客户端多次迭代的项目,并报告每次迭代所花费的时间。

import argparse import sys import time import amazondax import boto3 def get_item_test(key_count, iterations, dyn_resource=None): """ Gets items from the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param key_count: The number of items to get from the table in each iteration. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): for partition_key in range(1, key_count + 1): for sort_key in range(1, key_count + 1): table.get_item( Key={"partition_key": partition_key, "sort_key": sort_key} ) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_key_count = 10 test_iterations = 50 if args.endpoint_url: print( f"Getting each item from the table {test_iterations} times, " f"using the DAX client." ) # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = get_item_test( test_key_count, test_iterations, dyn_resource=dax ) else: print( f"Getting each item from the table {test_iterations} times, " f"using the Boto3 client." ) test_start, test_end = get_item_test(test_key_count, test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/ test_iterations}." )

在表格中查询客户端和 Boto3 DAX 客户端的迭代次数,并报告每次迭代所花费的时间。

import argparse import time import sys import amazondax import boto3 from boto3.dynamodb.conditions import Key def query_test(partition_key, sort_keys, iterations, dyn_resource=None): """ Queries the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param partition_key: The partition key value to use in the query. The query returns items that have partition keys equal to this value. :param sort_keys: The range of sort key values for the query. The query returns items that have sort key values between these two values. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") key_condition_expression = Key("partition_key").eq(partition_key) & Key( "sort_key" ).between(*sort_keys) start = time.perf_counter() for _ in range(iterations): table.query(KeyConditionExpression=key_condition_expression) print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_partition_key = 5 test_sort_keys = (2, 9) test_iterations = 100 if args.endpoint_url: print(f"Querying the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations, dyn_resource=dax ) else: print(f"Querying the table {test_iterations} times, using the Boto3 client.") test_start, test_end = query_test( test_partition_key, test_sort_keys, test_iterations ) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )

在表格中扫描客户端和 Boto3 DAX 客户端的多次迭代,并报告每个迭代所花费的时间。

import argparse import time import sys import amazondax import boto3 def scan_test(iterations, dyn_resource=None): """ Scans the table a specified number of times. The time before the first iteration and the time after the last iteration are both captured and reported. :param iterations: The number of iterations to run. :param dyn_resource: Either a Boto3 or DAX resource. :return: The start and end times of the test. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") start = time.perf_counter() for _ in range(iterations): table.scan() print(".", end="") sys.stdout.flush() print() end = time.perf_counter() return start, end if __name__ == "__main__": # pylint: disable=not-context-manager parser = argparse.ArgumentParser() parser.add_argument( "endpoint_url", nargs="?", help="When specified, the DAX cluster endpoint. Otherwise, DAX is not used.", ) args = parser.parse_args() test_iterations = 100 if args.endpoint_url: print(f"Scanning the table {test_iterations} times, using the DAX client.") # Use a with statement so the DAX client closes the cluster after completion. with amazondax.AmazonDaxClient.resource(endpoint_url=args.endpoint_url) as dax: test_start, test_end = scan_test(test_iterations, dyn_resource=dax) else: print(f"Scanning the table {test_iterations} times, using the Boto3 client.") test_start, test_end = scan_test(test_iterations) print( f"Total time: {test_end - test_start:.4f} sec. Average time: " f"{(test_end - test_start)/test_iterations}." )

删除表。

import boto3 def delete_dax_table(dyn_resource=None): """ Deletes the demonstration table. :param dyn_resource: Either a Boto3 or DAX resource. """ if dyn_resource is None: dyn_resource = boto3.resource("dynamodb") table = dyn_resource.Table("TryDaxTable") table.delete() print(f"Deleting {table.name}...") table.wait_until_not_exists() if __name__ == "__main__": delete_dax_table() print("Table deleted!")

以下代码示例显示了如何有条件地更新项目。TTL

SDK适用于 Python (Boto3)
import boto3 from datetime import datetime, timedelta from botocore.exceptions import ClientError def update_dynamodb_item(table_name, region, primary_key, sort_key, ttl_attribute): """ Updates an existing record in a DynamoDB table with a new or updated TTL attribute. :param table_name: Name of the DynamoDB table :param region: AWS Region of the table - example `us-east-1` :param primary_key: one attribute known as the partition key. :param sort_key: Also known as a range attribute. :param ttl_attribute: name of the TTL attribute in the target DynamoDB table :return: """ try: dynamodb = boto3.resource('dynamodb', region_name=region) table = dynamodb.Table(table_name) # Generate updated TTL in epoch second format updated_expiration_time = int((datetime.now() + timedelta(days=90)).timestamp()) # Define the update expression for adding/updating a new attribute update_expression = "SET newAttribute = :val1" # Define the condition expression for checking if 'expireAt' is not expired condition_expression = "expireAt > :val2" # Define the expression attribute values expression_attribute_values = { ':val1': ttl_attribute, ':val2': updated_expiration_time } response = table.update_item( Key={ 'primaryKey': primary_key, 'sortKey': sort_key }, UpdateExpression=update_expression, ConditionExpression=condition_expression, ExpressionAttributeValues=expression_attribute_values ) print("Item updated successfully.") return response['ResponseMetadata']['HTTPStatusCode'] # Ideally a 200 OK except ClientError as e: if e.response['Error']['Code'] == "ConditionalCheckFailedException": print("Condition check failed: Item's 'expireAt' is expired.") else: print(f"Error updating item: {e}") except Exception as e: print(f"Error updating item: {e}") # replace with your values update_dynamodb_item('your-table-name', 'us-east-1', 'your-partition-key-value', 'your-sort-key-value', 'your-ttl-attribute-value')
  • 有关API详细信息,请参阅UpdateItem中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用虚构数据创建一个模拟系统 RESTAPI,以跟踪美国每天有 COVID 19 例的案例。

SDK适用于 Python (Boto3)

演示如何使用 AWS Chalice 和创建RESTAPI使用 Amazon Gate API way 和 Amazon DynamoDB AWS Lambda的无服务器。 AWS SDK for Python (Boto3) RESTAPI它模拟了一个使用虚构数据追踪美国每天 COVID 19例病例的系统。了解如何:

  • 使用 AWS Chalice 在 Lambda 函数中定义路由,这些函数用于处理通过网关发REST出的请求。API

  • 使用 Lambda 函数在 DynamoDB 表中检索和存储数据以处理请求。REST

  • 在 AWS CloudFormation 模板中定义表结构和安全角色资源。

  • 使用 AWS Chalice and CloudFormation 来打包和部署所有必要的资源。

  • CloudFormation 用于清理所有已创建的资源。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • API网关

  • AWS CloudFormation

  • DynamoDB

  • Lambda

以下代码示例说明如何创建用于从数据库表中检索消息记录的 AWS Step Functions Messenger 应用程序。

SDK适用于 Python (Boto3)

演示如何使用 with 创建信使应用程序,该 AWS SDK for Python (Boto3) 应用程序从 Amazon DynamoDB 表中检索消息记录并通过亚马逊简单队列服务 (Amazon) 将其发送。 AWS Step Functions SQS状态机集成了扫描数据库中是否有未发送消息的 AWS Lambda 功能。

  • 创建检索并更新 Amazon DynamoDB 表中的消息记录的状态机。

  • 更新状态机定义以同时向亚马逊简单队列服务 (AmazonSQS) 发送消息。

  • 启动和停止状态机运行。

  • 使用服务集成,从状态机连接到 Lambda、DynamoDB 和SQS亚马逊。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • DynamoDB

  • Lambda

  • 亚马逊 SQS

  • Step Functions

以下代码示例演示如何创建一个 Web 应用程序,该应用程序可跟踪 Amazon DynamoDB 表中的工作项目,并使用亚马逊简单电子邮件服务 (Amazon) SES 发送报告。

SDK适用于 Python (Boto3)

演示如何使用创建一项REST服务,通过亚马逊简单电子邮件服务 (Amazon) 来跟踪亚马逊 DynamoDB 中的工作项目并通过电子邮件发送报告。 AWS SDK for Python (Boto3) SES此示例使用 Flask Web 框架来处理HTTP路由,并与 React 网页集成以呈现功能齐全的 Web 应用程序。

  • 构建与 AWS 服务集成的 Flask REST 服务。

  • 读取、写入和更新存储在 DynamoDB 表中的工作项。

  • 使用 Amazon SES 发送工作项的电子邮件报告。

有关完整的源代码以及如何设置和运行的说明,请参阅上的 “AWS 代码示例存储库” 中的完整示例 GitHub。

本示例中使用的服务
  • DynamoDB

  • 亚马逊 SES

以下代码示例演示如何创建由API基于 Amazon API Gateway 的 websocket 提供的聊天应用程序。

SDK适用于 Python (Boto3)

演示如何使用 with Amazon API Gateway V2 来创建 AWS SDK for Python (Boto3) 与亚马逊 DynamoDB 集成API的 websocket。 AWS Lambda

  • 创建一个由 API Gateway API 提供服务的网络套接字。

  • 定义在 DynamoDB 中存储连接并向其他聊天参与者发布消息的 Lambda 处理程序。

  • 连接到 Websocket 聊天应用程序并使用 WebSocket 软件包发送消息。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • API网关

  • DynamoDB

  • Lambda

以下代码示例显示了如何使用创建项目TTL。

SDK适用于 Python (Boto3)
import boto3 from datetime import datetime, timedelta def create_dynamodb_item(table_name, region, primary_key, sort_key): """ Creates a DynamoDB item with an attached expiry attribute. :param table_name: Table name for the boto3 resource to target when creating an item :param region: string representing the AWS region. Example: `us-east-1` :param primary_key: one attribute known as the partition key. :param sort_key: Also known as a range attribute. :return: Void (nothing) """ try: dynamodb = boto3.resource('dynamodb', region_name=region) table = dynamodb.Table(table_name) # Get the current time in epoch second format current_time = int(datetime.now().timestamp()) # Calculate the expiration time (90 days from now) in epoch second format expiration_time = int((datetime.now() + timedelta(days=90)).timestamp()) item = { 'primaryKey': primary_key, 'sortKey': sort_key, 'creationDate': current_time, 'expireAt': expiration_time } table.put_item(Item=item) print("Item created successfully.") except Exception as e: print(f"Error creating item: {e}") raise # Use your own values create_dynamodb_item('your-table-name', 'us-west-2', 'your-partition-key-value', 'your-sort-key-value')
  • 有关API详细信息,请参阅PutItem中的 AWS SDKPython (Boto3) API 参考。

以下代码示例展示了如何:

  • 通过运行多个SELECT语句获取一批项目。

  • 通过运行多个INSERT语句来添加一批项目。

  • 通过运行多个UPDATE语句来更新一批项目。

  • 通过运行多个DELETE语句来删除一批项目。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

创建一个可以运行批量 PartiQL 语句的类。

from datetime import datetime from decimal import Decimal import logging from pprint import pprint import boto3 from botocore.exceptions import ClientError from scaffold import Scaffold logger = logging.getLogger(__name__) class PartiQLBatchWrapper: """ Encapsulates a DynamoDB resource to run PartiQL statements. """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource def run_partiql(self, statements, param_list): """ Runs a PartiQL statement. A Boto3 resource is used even though `execute_statement` is called on the underlying `client` object because the resource transforms input and output from plain old Python objects (POPOs) to the DynamoDB format. If you create the client directly, you must do these transforms yourself. :param statements: The batch of PartiQL statements. :param param_list: The batch of PartiQL parameters that are associated with each statement. This list must be in the same order as the statements. :return: The responses returned from running the statements, if any. """ try: output = self.dyn_resource.meta.client.batch_execute_statement( Statements=[ {"Statement": statement, "Parameters": params} for statement, params in zip(statements, param_list) ] ) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Couldn't execute batch of PartiQL statements because the table " "does not exist." ) else: logger.error( "Couldn't execute batch of PartiQL statements. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return output

运行创建表并批量运行 PartIQL 查询的场景。

def run_scenario(scaffold, wrapper, table_name): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the Amazon DynamoDB PartiQL batch statement demo.") print("-" * 88) print(f"Creating table '{table_name}' for the demo...") scaffold.create_table(table_name) print("-" * 88) movie_data = [ { "title": f"House PartiQL", "year": datetime.now().year - 5, "info": { "plot": "Wacky high jinks result from querying a mysterious database.", "rating": Decimal("8.5"), }, }, { "title": f"House PartiQL 2", "year": datetime.now().year - 3, "info": { "plot": "Moderate high jinks result from querying another mysterious database.", "rating": Decimal("6.5"), }, }, { "title": f"House PartiQL 3", "year": datetime.now().year - 1, "info": { "plot": "Tepid high jinks result from querying yet another mysterious database.", "rating": Decimal("2.5"), }, }, ] print(f"Inserting a batch of movies into table '{table_name}.") statements = [ f'INSERT INTO "{table_name}" ' f"VALUE {{'title': ?, 'year': ?, 'info': ?}}" ] * len(movie_data) params = [list(movie.values()) for movie in movie_data] wrapper.run_partiql(statements, params) print("Success!") print("-" * 88) print(f"Getting data for a batch of movies.") statements = [f'SELECT * FROM "{table_name}" WHERE title=? AND year=?'] * len( movie_data ) params = [[movie["title"], movie["year"]] for movie in movie_data] output = wrapper.run_partiql(statements, params) for item in output["Responses"]: print(f"\n{item['Item']['title']}, {item['Item']['year']}") pprint(item["Item"]) print("-" * 88) ratings = [Decimal("7.7"), Decimal("5.5"), Decimal("1.3")] print(f"Updating a batch of movies with new ratings.") statements = [ f'UPDATE "{table_name}" SET info.rating=? ' f"WHERE title=? AND year=?" ] * len(movie_data) params = [ [rating, movie["title"], movie["year"]] for rating, movie in zip(ratings, movie_data) ] wrapper.run_partiql(statements, params) print("Success!") print("-" * 88) print(f"Getting projected data from the table to verify our update.") output = wrapper.dyn_resource.meta.client.execute_statement( Statement=f'SELECT title, info.rating FROM "{table_name}"' ) pprint(output["Items"]) print("-" * 88) print(f"Deleting a batch of movies from the table.") statements = [f'DELETE FROM "{table_name}" WHERE title=? AND year=?'] * len( movie_data ) params = [[movie["title"], movie["year"]] for movie in movie_data] wrapper.run_partiql(statements, params) print("Success!") print("-" * 88) print(f"Deleting table '{table_name}'...") scaffold.delete_table() print("-" * 88) print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": try: dyn_res = boto3.resource("dynamodb") scaffold = Scaffold(dyn_res) movies = PartiQLBatchWrapper(dyn_res) run_scenario(scaffold, movies, "doc-example-table-partiql-movies") except Exception as e: print(f"Something went wrong with the demo! Here's what: {e}")

以下代码示例展示了如何:

  • 通过运行SELECT语句获取项目。

  • 通过运行INSERT语句添加项目。

  • 通过运行UPDATE语句更新项目。

  • 通过运行DELETE语句删除项目。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

创建一个可以运行 PartiQL 语句的类。

from datetime import datetime from decimal import Decimal import logging from pprint import pprint import boto3 from botocore.exceptions import ClientError from scaffold import Scaffold logger = logging.getLogger(__name__) class PartiQLWrapper: """ Encapsulates a DynamoDB resource to run PartiQL statements. """ def __init__(self, dyn_resource): """ :param dyn_resource: A Boto3 DynamoDB resource. """ self.dyn_resource = dyn_resource def run_partiql(self, statement, params): """ Runs a PartiQL statement. A Boto3 resource is used even though `execute_statement` is called on the underlying `client` object because the resource transforms input and output from plain old Python objects (POPOs) to the DynamoDB format. If you create the client directly, you must do these transforms yourself. :param statement: The PartiQL statement. :param params: The list of PartiQL parameters. These are applied to the statement in the order they are listed. :return: The items returned from the statement, if any. """ try: output = self.dyn_resource.meta.client.execute_statement( Statement=statement, Parameters=params ) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Couldn't execute PartiQL '%s' because the table does not exist.", statement, ) else: logger.error( "Couldn't execute PartiQL '%s'. Here's why: %s: %s", statement, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return output

运行创建表并运行 PartiQL 查询的场景。

def run_scenario(scaffold, wrapper, table_name): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the Amazon DynamoDB PartiQL single statement demo.") print("-" * 88) print(f"Creating table '{table_name}' for the demo...") scaffold.create_table(table_name) print("-" * 88) title = "24 Hour PartiQL People" year = datetime.now().year plot = "A group of data developers discover a new query language they can't stop using." rating = Decimal("9.9") print(f"Inserting movie '{title}' released in {year}.") wrapper.run_partiql( f"INSERT INTO \"{table_name}\" VALUE {{'title': ?, 'year': ?, 'info': ?}}", [title, year, {"plot": plot, "rating": rating}], ) print("Success!") print("-" * 88) print(f"Getting data for movie '{title}' released in {year}.") output = wrapper.run_partiql( f'SELECT * FROM "{table_name}" WHERE title=? AND year=?', [title, year] ) for item in output["Items"]: print(f"\n{item['title']}, {item['year']}") pprint(output["Items"]) print("-" * 88) rating = Decimal("2.4") print(f"Updating movie '{title}' with a rating of {float(rating)}.") wrapper.run_partiql( f'UPDATE "{table_name}" SET info.rating=? WHERE title=? AND year=?', [rating, title, year], ) print("Success!") print("-" * 88) print(f"Getting data again to verify our update.") output = wrapper.run_partiql( f'SELECT * FROM "{table_name}" WHERE title=? AND year=?', [title, year] ) for item in output["Items"]: print(f"\n{item['title']}, {item['year']}") pprint(output["Items"]) print("-" * 88) print(f"Deleting movie '{title}' released in {year}.") wrapper.run_partiql( f'DELETE FROM "{table_name}" WHERE title=? AND year=?', [title, year] ) print("Success!") print("-" * 88) print(f"Deleting table '{table_name}'...") scaffold.delete_table() print("-" * 88) print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": try: dyn_res = boto3.resource("dynamodb") scaffold = Scaffold(dyn_res) movies = PartiQLWrapper(dyn_res) run_scenario(scaffold, movies, "doc-example-table-partiql-movies") except Exception as e: print(f"Something went wrong with the demo! Here's what: {e}")
  • 有关API详细信息,请参阅ExecuteStatement中的 AWS SDKPython (Boto3) API 参考。

以下代码示例显示了如何查询TTL项目。

SDK适用于 Python (Boto3)
import boto3 from datetime import datetime def query_dynamodb_items(table_name, partition_key): """ :param table_name: Name of the DynamoDB table :param partition_key: :return: """ try: # Initialize a DynamoDB resource dynamodb = boto3.resource('dynamodb', region_name='us-east-1') # Specify your table table = dynamodb.Table(table_name) # Get the current time in epoch format current_time = int(datetime.now().timestamp()) # Perform the query operation with a filter expression to exclude expired items # response = table.query( # KeyConditionExpression=boto3.dynamodb.conditions.Key('partitionKey').eq(partition_key), # FilterExpression=boto3.dynamodb.conditions.Attr('expireAt').gt(current_time) # ) response = table.query( KeyConditionExpression=dynamodb.conditions.Key('partitionKey').eq(partition_key), FilterExpression=dynamodb.conditions.Attr('expireAt').gt(current_time) ) # Print the items that are not expired for item in response['Items']: print(item) except Exception as e: print(f"Error querying items: {e}") # Call the function with your values query_dynamodb_items('Music', 'your-partition-key-value')
  • 有关API详细信息,请参阅查询 Python (Boto3) API 参考文档。AWS SDK

以下代码示例显示了如何更新项目TTL。

SDK适用于 Python (Boto3)
import boto3 from datetime import datetime, timedelta def update_dynamodb_item(table_name, region, primary_key, sort_key): """ Update an existing DynamoDB item with a TTL. :param table_name: Name of the DynamoDB table :param region: AWS Region of the table - example `us-east-1` :param primary_key: one attribute known as the partition key. :param sort_key: Also known as a range attribute. :return: Void (nothing) """ try: # Create the DynamoDB resource. dynamodb = boto3.resource('dynamodb', region_name=region) table = dynamodb.Table(table_name) # Get the current time in epoch second format current_time = int(datetime.now().timestamp()) # Calculate the expireAt time (90 days from now) in epoch second format expire_at = int((datetime.now() + timedelta(days=90)).timestamp()) table.update_item( Key={ 'partitionKey': primary_key, 'sortKey': sort_key }, UpdateExpression="set updatedAt=:c, expireAt=:e", ExpressionAttributeValues={ ':c': current_time, ':e': expire_at }, ) print("Item updated successfully.") except Exception as e: print(f"Error updating item: {e}") # Replace with your own values update_dynamodb_item('your-table-name', 'us-west-2', 'your-partition-key-value', 'your-sort-key-value')
  • 有关API详细信息,请参阅UpdateItem中的 AWS SDKPython (Boto3) API 参考。

无服务器示例

以下代码示例演示如何实现 Lambda 函数,该函数接收通过从 DynamoDB 流接收记录而触发的事件。该函数检索 DynamoDB 有效负载,并记录下记录内容。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 Python 将 DynamoDB 事件与 Lambda 结合使用。

import json def lambda_handler(event, context): print(json.dumps(event, indent=2)) for record in event['Records']: log_dynamodb_record(record) def log_dynamodb_record(record): print(record['eventID']) print(record['eventName']) print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")

以下代码示例演示如何为接收来自 DynamoDB 流的事件的 Lambda 函数实现部分批量响应。该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 Python 通过 Lambda 进行 DynamoDB 批处理项目失败。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}