使用 Python 和 Boto3 对 Amazon DynamoDB 进行编程 - Amazon DynamoDB

使用 Python 和 Boto3 对 Amazon DynamoDB 进行编程

本指南为想要结合 Python 使用 Amazon DynamoDB 的程序员提供了指导。了解不同的抽象层、配置管理、错误处理、控制重试策略、管理 keep-alive 等。

Boto 简介

您可以使用官方适用于 Python 的 AWS SDK(通常称为 Boto3)从 Python 访问 DynamoDB。Boto 这个名字(发音为 boh-toh)来自一种原产于亚马逊河的淡水海豚。Boto3 库是该库的第三个主要版本,于 2015 年首次发布。Boto3 库非常大,因为它支持所有 AWS 服务,而不仅仅是 DynamoDB。此指导仅针对 Boto3 中与 DynamoDB 相关的部分。

Boto 由 AWS 作为 GitHub 上托管的开源项目进行维护和发布。它分为两个软件包:BotocoreBoto3

  • Botocore 提供了低级别功能。在 Botocore 中,您将找到客户端、会话、凭证、配置和异常类。

  • Boto3 基于 Botocore 构建。它提供了更高级别、更具 Python 风格的接口。具体而言,它将 DynamoDB 表作为资源公开,与较低级别的面向服务的客户端接口相比,它提供了更简单、更优雅的接口。

由于这些项目托管在 GitHub 上,因此您可以查看源代码、跟踪未解决的问题或提交自己的问题。

使用 Boto 文档

通过以下资源开始学习 Boto 文档:

  • 快速入门部分开始,该部分为软件包安装提供了可靠的起点。如果尚未安装 Boto3,请前往那里获取有关安装 Boto3 的说明(Boto3 通常会在诸如 AWS Lambda 的 AWS 服务中自动提供)。

  • 之后,请关注文档的 DynamoDB 指南。它向您展示了如何执行基本的 DynamoDB 活动:创建和删除表、操作项目、运行批量操作、运行查询和执行扫描。其示例使用资源接口。如果看到 boto3.resource('dynamodb'),表明您正在使用较高级别的资源接口。

  • 阅读完指南后,您可以查看 DynamoDB 参考。此登录页面提供了可供您使用的类和方法的详尽列表。在顶部,您将看到 DynamoDB.Client 类。这提供了对所有控制面板和数据面板操作的低级别访问。在底部,请看 DynamoDB.ServiceResource 类。这是较高级别的 Python 风格的接口。使用它,您可以创建表、对表执行批量操作或获取表特定操作的 DynamoDB.ServiceResource.Table 实例。

了解客户端和资源抽象层

您将使用的两个接口是客户端接口和资源接口。

  • 低级别客户端接口提供与底层服务 API 的一对一映射。DynamoDB 提供的每个 API 都可通过客户端获得。这意味着客户端接口可以提供完整的功能,但使用起来往往更加冗长且复杂。

  • 更高级别的资源接口不提供底层服务 API 的一对一映射。但是,它提供了一些方法,让您能够更方便地访问 batch_writer 等服务。

以下是使用客户端接口插入项目的示例。请注意所有值是如何以映射形式传递的,键表示它们的类型(“S”代表字符串,“N”代表数字),它们的值作为字符串。这被称为 DynamoDB JSON 格式。

import boto3 dynamodb = boto3.client('dynamodb') dynamodb.put_item( TableName='YourTableName', Item={ 'pk': {'S': 'id#1'}, 'sk': {'S': 'cart#123'}, 'name': {'S': 'SomeName'}, 'inventory': {'N': '500'}, # ... more attributes ... } )

以下是使用资源接口的相同 PutItem 操作。数据输入是隐式的:

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item={ 'pk': 'id#1', 'sk': 'cart#123', 'name': 'SomeName', 'inventory': 500, # ... more attributes ... } )

如果需要,您可以使用随 boto3 提供的 TypeSerializerTypeDeserializer 类在常规 JSON 和 DynamoDB JSON 之间进行转换:

def dynamo_to_python(dynamo_object: dict) -> dict: deserializer = TypeDeserializer() return { k: deserializer.deserialize(v) for k, v in dynamo_object.items() } def python_to_dynamo(python_object: dict) -> dict: serializer = TypeSerializer() return { k: serializer.serialize(v) for k, v in python_object.items() }

下面展示了如何使用客户端接口执行查询。它将查询表示为 JSON 构造。它使用 KeyConditionExpression 字符串,该字符串需要变量替换来处理任何潜在的关键字冲突:

import boto3 client = boto3.client('dynamodb') # Construct the query response = client.query( TableName='YourTableName', KeyConditionExpression='pk = :pk_val AND begins_with(sk, :sk_val)', FilterExpression='#name = :name_val', ExpressionAttributeValues={ ':pk_val': {'S': 'id#1'}, ':sk_val': {'S': 'cart#'}, ':name_val': {'S': 'SomeName'}, }, ExpressionAttributeNames={ '#name': 'name', } )

使用资源接口的相同查询操作可以缩短和简化:

import boto3 from boto3.dynamodb.conditions import Key, Attr dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') response = table.query( KeyConditionExpression=Key('pk').eq('id#1') & Key('sk').begins_with('cart#'), FilterExpression=Attr('name').eq('SomeName') )

最后一个例子,假设您想得到一个表的大致大小(这是保存在表中的元数据,大约每 6 小时更新一次)。使用客户端接口,您可以执行 describe_table() 操作并从返回的 JSON 结构中提取答案:

import boto3 dynamodb = boto3.client('dynamodb') response = dynamodb.describe_table(TableName='YourTableName') size = response['Table']['TableSizeBytes']

通过资源接口,表隐式执行描述操作,并将数据直接作为属性呈现:

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') size = table.table_size_bytes
注意

在考虑使用客户端接口还是资源接口进行开发时,请注意,根据以下资源文档,不会向资源接口添加新特征:“AWS Python SDK 团队不打算向 boto3 的资源接口中添加新特征。现有接口将在 boto3 的生命周期内继续使用。客户可以通过客户端接口访问更新的服务特征。”

使用表资源 batch_writer

只有较高级别的表资源才有的一种便利就是 batch_writer。DynamoDB 支持批量写入操作,允许在一个网络请求中执行多达 25 个放置或删除操作。像这样的批处理可以最大限度地减少网络往返行程,从而提高效率。

使用低级别客户端库,您可以使用 client.batch_write_item() 操作来运行批处理。您必须手动将工作分成 25 个批次。每次操作后,您还必须请求接收未处理的项目列表(有些写入操作可能成功,而有些可能失败)。然后,您必须将这些未处理的项目再次传递到以后的 batch_write_item() 操作中。有大量的样板代码。

Table.batch_writer 方法创建了一个上下文管理器,用于批量写入对象。它提供了一个接口,在这个接口中,您好像是一次写入一个项目,但在内部,它是缓冲并批量发送项目。此外,它还会隐式处理未处理的项目重试。

dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') movies = # long list of movies in {'pk': 'val', 'sk': 'val', etc} format with table.batch_writer() as writer: for movie in movies: writer.put_item(Item=movie)

探索客户端和资源层的其它代码示例

您还可以参考以下代码示例存储库,这些存储库使用客户端和资源探索各种函数的用法:

了解客户端和资源对象如何与会话和线程交互

资源对象并非线程安全的,不应跨线程或进程共享。有关更多详细信息,请参阅资源指南

相比之下,客户端对象通常是线程安全的,但特定的高级特征除外。有关更多详细信息,请参阅客户端指南

会话对象不是线程安全的。因此,每次在多线程环境中创建客户端或资源时,都应先创建一个新会话,然后从会话中创建客户端或资源。有关更多详细信息,请参阅会话指南

如果调用 boto3.resource(),则隐式使用默认会话。这便于编写单线程代码。在编写多线程代码时,您需要先为每个线程构造一个新的会话,然后从该会话中检索资源:

# Explicitly create a new Session for this thread session = boto3.Session() dynamodb = session.resource('dynamodb')

自定义配置对象

在构造客户端或资源对象时,您可以传递可选的命名参数来自定义行为。名为 config 的参数可以解锁各种功能。它是 botocore.client.Config 的一个实例,配置参考文档展示了它公开供您控制的所有内容。配置指南提供了一个很好的概述。

注意

您可以在 AWS 配置文件内的会话级别,或作为环境变量修改其中的许多行为设置。

超时配置

自定义配置的一个用途是调整网络行为:

  • connect_timeout(浮点或整数)– 尝试建立连接时引发超时异常之前的时间(以秒为单位)。默认值为 60 秒。

  • read_timeout(浮点或整数)– 尝试从连接中读取时引发超时异常之前的时间(以秒为单位)。默认值为 60 秒。

对于 DynamoDB 来说,60 秒的超时时间过长。这意味着暂时性网络故障会导致客户端延迟一分钟,然后才能重试。以下代码将超时时间缩短到一秒:

import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0 ) dynamodb = boto3.resource('dynamodb', config=my_config)

有关超时的更多讨论,请参阅调整延迟感知型 DynamoDB 应用程序的 AWS Java SDK HTTP 请求设置。注意,Java SDK 的超时配置比 Python 多。

keep-alive 配置

如果您使用的是 botocore 1.27.84 或更高版本,您也可以控制 TCP Keep-Alive

  • tcp_keepalive(布尔值)- 如果设置为 True(默认为 False),则启用创建新连接时使用的 TCP Keep-Alive 套接字选项。这仅支持 botocore 1.27.84 及更高版本。

将 TCP Keep-Alive 设置为 True 可以减少平均延迟。以下是当您拥有合适的 botocore 版本时,有条件地将 TCP Keep-Alive 设置为 true 的示例代码:

import botocore import boto3 from botocore.config import Config from distutils.version import LooseVersion required_version = "1.27.84" current_version = botocore.__version__ my_config = Config( connect_timeout = 0.5, read_timeout = 0.5 ) if LooseVersion(current_version) > LooseVersion(required_version): my_config = my_config.merge(Config(tcp_keepalive = True)) dynamodb = boto3.resource('dynamodb', config=my_config)
注意

TCP Keep-Alive 与 HTTP Keep-Alive 不同。使用 TCP Keep-Alive,底层操作系统通过套接字连接发送小数据包,以保持连接处于活动状态并立即检测到任何丢包。使用 HTTP Keep-Alive,在底层套接字上构建的 Web 连接可以重复使用。使用 boto3 时,HTTP Keep-Alive 始终处于启用状态。

空闲连接可以保持活动状态的时间是有限制的。如果您有一个空闲的连接,但希望下次请求使用已经建立的连接,可以考虑定期发送请求(比如每分钟发送一次)。

重试配置

该配置还接受名为 retries 的字典,您可以在其中指定所需的重试行为。当 SDK 收到错误且错误属于临时类型时,SDK 中会发生重试。如果在内部重试错误(并且重试最终生成成功响应),则从调用代码的角度来看,没有错误,只是延迟略有增加。以下是您可以指定的值:

  • max_attempts – 一个整数,表示单个请求将进行的最大重试次数。例如,将此值设置为 2 将导致请求在初始请求之后最多重试两次。将此值设置为 0 将导致在初始请求之后不会尝试任何重试。

  • total_max_attempts – 一个整数,表示单个请求将进行的最大总尝试次数。这包括初始请求,因此值为 1 表示不会重试任何请求。如果同时提供 total_max_attemptsmax_attempts,则 total_max_attempts 优先。total_max_attempts 之所以优先于 max_attempts,是因为它映射到 AWS_MAX_ATTEMPTS 环境变量和 max_attempts 配置文件值。

  • mode – 一个字符串,表示 botocore 应使用的重试模式类型。有效值为:

    • legacy - 默认模式。首次重试等待 50 毫秒,然后使用基准因子为 2 的指数回退。对于 DynamoDB,除非使用上述值覆盖,否则它最多总共可执行 10 次尝试。

      注意

      使用指数回退,最后一次尝试将等待几乎 13 秒。

    • standard – 之所以命名为标准版,是因为它与其它 AWS SDK 更加一致。对于首次重试,随机等待 0 毫秒到 1000 毫秒不等。如果需要再次重试,它会从 0 毫秒到 1000 毫秒之间随机选择另一个时间,然后将其乘以 2。如果需要更多重试,它会进行相同的随机选择,然后乘以 4,依此类推。每次等待的上限为 20 秒。与 legacy 模式相比,此模式将对更多检测到的故障条件执行重试。对于 DynamoDB,除非使用上述值覆盖,否则它最多总共可执行 3 次尝试。

    • adaptive - 一种实验性重试模式,包括标准模式的所有功能,但添加了自动客户端限制。通过自适应速率限制,SDK 可以降低请求的发送速率,以便更好地容纳 AWS 服务的容量。这是一种临时模式,其行为可能会发生变化。

这些重试模式的扩展定义可以在重试指南中找到,也可以在 SDK 参考的“重试行为”主题中找到。

以下示例明确使用 legacy 重试策略,请求总数最多为 3 次(2 次重试):

import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0, retries = { 'mode': 'legacy', 'total_max_attempts': 3 } ) dynamodb = boto3.resource('dynamodb', config=my_config)

由于 DynamoDB 是一个高可用、低延迟的系统,因此您可能希望在重试速度上比内置重试策略所允许的更激进。您可以实现自己的重试策略,方法是将最大尝试次数设置为 0,自己捕获异常,然后根据自己的代码酌情重试,而不是依赖 boto3 进行隐式重试。

如果您管理自己的重试策略,则需要区分节流和错误:

  • 节流(由 ProvisionedThroughputExceededExceptionThrottlingException 表示)表示服务运行正常,它会通知您已超出 DynamoDB 表或分区的读取或写入容量。每过一毫秒,就会有多一点的读取或写入容量可用,因此您可以快速重试(例如每 50 毫秒重试一次),尝试访问新释放的容量。使用节流,您并不特别需要指数回退,因为节流属于轻量级,DynamoDB 可以返回,而且不会向您收取每次请求的费用。指数回退会将更长的延迟分配给已经等待最长时间的客户端线程,从统计学上讲,将超越 p50 和 p99。

  • 错误(由 InternalServerErrorServiceUnavailable 等表示)表示服务存在暂时性问题。这可以是针对整个表,也可以只是您正在读取或写入的分区。使用错误,您可以在重试前暂停更长时间(例如 250 毫秒或 500 毫秒),并使用抖动来错开重试。

最大池连接数配置

最后,配置允许您控制连接池大小:

  • max_pool_connections (int) – 连接池中要保留的最大连接数。如果未指定值,则使用默认值 10。

此选项控制要保留在池中以供重复使用的最大 HTTP 连接数。每个会话保留一个不同的池。如果您预计会有超过 10 个线程使用基于同一会话构建的客户端或资源,则应考虑提高该值,这样线程就不必等待使用池连接的其它线程。

import boto3 from botocore.config import Config my_config = Config( max_pool_connections = 20 ) # Setup a single session holding up to 20 pooled connections session = boto3.Session(my_config) # Create up to 20 resources against that session for handing to threads # Notice the single-threaded access to the Session and each Resource resource1 = session.resource('dynamodb') resource2 = session.resource('dynamodb') # etc

错误处理

Boto3 中并未全部静态定义 AWS 服务异常。这是因为 AWS 服务的错误和异常差异很大,并且可能会发生变化。Boto3 将所有服务异常封装为 ClientError,并以结构化 JSON 公开详细信息。例如,错误响应的结构可能如下所示:

{ 'Error': { 'Code': 'SomeServiceException', 'Message': 'Details/context around the exception or error' }, 'ResponseMetadata': { 'RequestId': '1234567890ABCDEF', 'HostId': 'host ID data will appear here as a hash', 'HTTPStatusCode': 400, 'HTTPHeaders': {'header metadata key/values will appear here'}, 'RetryAttempts': 0 } }

以下代码会捕获任何 ClientError 异常,并查看 ErrorCode 的字符串值来确定要采取的操作:

import botocore import boto3 dynamodb = boto3.client('dynamodb') try: response = dynamodb.put_item(...) except botocore.exceptions.ClientError as err: print('Error Code: {}'.format(err.response['Error']['Code'])) print('Error Message: {}'.format(err.response['Error']['Message'])) print('Http Code: {}'.format(err.response['ResponseMetadata']['HTTPStatusCode'])) print('Request ID: {}'.format(err.response['ResponseMetadata']['RequestId'])) if err.response['Error']['Code'] in ('ProvisionedThroughputExceededException', 'ThrottlingException'): print("Received a throttle") elif err.response['Error']['Code'] == 'InternalServerError': print("Received a server error") else: raise err

一些(但不是全部)异常代码已被具体化为顶级类。您可以选择直接处理这些类。使用客户端接口时,这些异常会在您的客户端上动态填充,您可以使用客户端实例捕获这些异常,如下所示:

except ddb_client.exceptions.ProvisionedThroughputExceededException:

使用资源接口时,必须使用 .meta.client 从资源遍历到底层客户端才能访问异常,如下所示:

except ddb_resource.meta.client.exceptions.ProvisionedThroughputExceededException:

要查看具体化异常类型的列表,可以动态生成列表:

ddb = boto3.client("dynamodb") print([e for e in dir(ddb.exceptions) if e.endswith('Exception') or e.endswith('Error')])

使用条件表达式执行写入操作时,您可以请求:如果表达式失败,则应在错误响应中返回该项目的值。

try: response = table.put_item( Item=item, ConditionExpression='attribute_not_exists(pk)', ReturnValuesOnConditionCheckFailure='ALL_OLD' ) except table.meta.client.exceptions.ConditionalCheckFailedException as e: print('Item already exists:', e.response['Item'])

要进一步了解错误处理和异常,请执行以下操作:

日志记录

Boto3 库与 Python 的内置日志记录模块集成,用于跟踪会话期间发生的情况。要控制日志记录级别,可以配置日志记录模块:

import logging logging.basicConfig(level=logging.INFO)

这会将根记录器配置为记录 INFO 及更高级别的消息。严重程度低于这些级别的日志消息将被忽略。日志记录级别包括 DEBUGINFOWARNINGERRORCRITICAL。默认为 WARNING

Boto3 中的记录器是分层的。该库使用几个不同的记录器,每个记录器对应库的不同部分。您可以分别控制每个记录器的行为:

  • boto3:boto3 模块的主记录器。

  • botocore:botocore 软件包的主记录器。

  • botocore.auth:用于记录请求的 AWS 签名创建过程。

  • botocore.credentials:用于记录凭证获取和刷新的过程。

  • botocore.endpoint:用于在请求通过网络发送之前记录请求的创建过程。

  • botocore.hooks:用于记录库中触发的事件。

  • botocore.loaders:用于记录加载部分 AWS 服务模型的情况。

  • botocore.parsers:用于在解析 AWS 服务响应之前对其进行记录。

  • botocore.retryhandler:用于记录 AWS 服务请求重试的处理情况(传统模式)。

  • botocore.retries.standard:用于记录 AWS 服务请求重试的处理情况(标准模式或自适应模式)。

  • botocore.utils:用于记录库中的其它活动。

  • botocore.waiter:用于记录 Waiter 的功能,后者轮询 AWS 服务直到达到特定状态。

其它库也记录。在内部,boto3 使用第三方 urllib3 进行 HTTP 连接处理。如果延迟很重要,您可以通过查看 urllib3 何时建立新连接或关闭空闲连接来查看其日志,以确保您的池得到充分利用。

  • urllib3.connectionpool:用于记录连接池处理事件。

以下代码段将端点和连接池活动的大部分记录设置为 INFODEBUG

import logging logging.getLogger('boto3').setLevel(logging.INFO) logging.getLogger('botocore').setLevel(logging.INFO) logging.getLogger('botocore.endpoint').setLevel(logging.DEBUG) logging.getLogger('urllib3.connectionpool').setLevel(logging.DEBUG)

事件钩子

Botocore 在其执行的各个阶段都会发出事件。您可以为这些事件注册处理程序,这样无论何时发出事件,您的处理程序都会被调用。这使您无需修改内部结构即可扩展 botocore 的行为。

例如,假设您要跟踪应用程序中的任何 DynamoDB 表上每次调用 PutItem 操作的情况。每次在关联的会话上调用 PutItem 操作时,您都可以在 'provide-client-params.dynamodb.PutItem' 事件上注册以捕获并记录。示例如下:

import boto3 import botocore import logging def log_put_params(params, **kwargs): if 'TableName' in params and 'Item' in params: logging.info(f"PutItem on table {params['TableName']}: {params['Item']}") logging.basicConfig(level=logging.INFO) session = boto3.Session() event_system = session.events # Register our interest in hooking in when the parameters are provided to PutItem event_system.register('provide-client-params.dynamodb.PutItem', log_put_params) # Now, every time you use this session to put an item in DynamoDB, # it will log the table name and item data. dynamodb = session.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item={ 'pk': '123', 'sk': 'cart#123', 'item_data': 'YourItemData', # ... more attributes ... } )

在处理程序中,您甚至可以通过编程方式操作参数来改变行为:

params['TableName'] = "NewTableName"

有关事件的更多信息,请参阅事件的 botocore 文档事件的 boto3 文档

分页和分页工具

某些请求(例如查询和扫描)会限制针对单个请求返回的数据大小,并要求您重复请求才能显示后续页面。

您可以使用 limit 参数控制每页可读取的最大项目数。例如,如果您想要读取最后 10 个项目,则可以使用 limit 仅检索最后 10 个项目。请注意,限制是在应用任何筛选条件之前应从表中读取多少项目。筛选后无法确切指定想要检索 10 个项目;只有在切实检索到 10 个项目后,您才能控制预先筛选的数量并检查客户端。不管限制如何,每个响应始终有 1 MB 的最大大小。

如果响应包含 LastEvaluatedKey,则表示响应已结束,因为它达到了计数或大小限制。此密钥是针对该响应评估的最后一个密钥。您可以检索此 LastEvaluatedKey 并将其作为 ExclusiveStartKey 传递给后续调用,以便从该起点读取下一个数据块。如果未返回 LastEvaluatedKey,则表示没有其它与查询或扫描匹配的项目。

以下是一个简单的示例(使用资源接口,但客户端接口具有相同的模式),它每页最多读取 100 个项目,并循环操作,直到读取完所有项目。

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') query_params = { 'KeyConditionExpression': Key('pk').eq('123') & Key('sk').gt(1000), 'Limit': 100 } while True: response = table.query(**query_params) # Process the items however you like for item in response['Items']: print(item) # No LastEvaluatedKey means no more items to retrieve if 'LastEvaluatedKey' not in response: break # If there are possibly more items, update the start key for the next page query_params['ExclusiveStartKey'] = response['LastEvaluatedKey']

为方便起见,boto3 可以用分页工具为您执行此操作。但是,它仅适用于客户端接口。以下是为使用分页工具而重写的代码:

import boto3 dynamodb = boto3.client('dynamodb') paginator = dynamodb.get_paginator('query') query_params = { 'TableName': 'YourTableName', 'KeyConditionExpression': 'pk = :pk_val AND sk > :sk_val', 'ExpressionAttributeValues': { ':pk_val': {'S': '123'}, ':sk_val': {'N': '1000'}, }, 'Limit': 100 } page_iterator = paginator.paginate(**query_params) for page in page_iterator: # Process the items however you like for item in page['Items']: print(item)

有关更多信息,请参阅分页工具指南DynamoDB.Paginator.Query 的 API 参考

注意

分页工具也有自己的配置设置,名为 MaxItemsStartingTokenPageSize。要使用 DynamoDB 进行分页,则应忽略这些设置。

Waiter

通过 Waiter 可以等待某件事完成后再继续操作。目前,它们仅支持等待创建或删除表。在后台,Waiter 操作每 20 秒为您检查一次,最多 25 次。您可以自己做,但是在编写自动化时使用 Waiter 会很舒服。

以下代码演示了如何等待创建特定表:

# Create a table, wait until it exists, and print its ARN response = client.create_table(...) waiter = client.get_waiter('table_exists') waiter.wait(TableName='YourTableName') print('Table created:', response['TableDescription']['TableArn']

有关更多信息,请参阅 Waiter 指南Waiter 参考