使用 Python 和 Boto3 对 Amazon DynamoDB 进行编程
本指南为想要结合 Python 使用 Amazon DynamoDB 的程序员提供了指导。了解不同的抽象层、配置管理、错误处理、控制重试策略、管理 keep-alive 等。
主题
Boto 简介
您可以使用官方适用于 Python 的 AWS SDK(通常称为 Boto3)从 Python 访问 DynamoDB。Boto 这个名字(发音为 boh-toh)来自一种原产于亚马逊河的淡水海豚。Boto3 库是该库的第三个主要版本,于 2015 年首次发布。Boto3 库非常大,因为它支持所有 AWS 服务,而不仅仅是 DynamoDB。此指导仅针对 Boto3 中与 DynamoDB 相关的部分。
Boto 由 AWS 作为 GitHub 上托管的开源项目进行维护和发布。它分为两个软件包:Botocore
-
Botocore 提供了低级别功能。在 Botocore 中,您将找到客户端、会话、凭证、配置和异常类。
-
Boto3 基于 Botocore 构建。它提供了更高级别、更具 Python 风格的接口。具体而言,它将 DynamoDB 表作为资源公开,与较低级别的面向服务的客户端接口相比,它提供了更简单、更优雅的接口。
由于这些项目托管在 GitHub 上,因此您可以查看源代码、跟踪未解决的问题或提交自己的问题。
使用 Boto 文档
通过以下资源开始学习 Boto 文档:
-
从快速入门部分
开始,该部分为软件包安装提供了可靠的起点。如果尚未安装 Boto3,请前往那里获取有关安装 Boto3 的说明(Boto3 通常会在诸如 AWS Lambda 的 AWS 服务中自动提供)。 -
之后,请关注文档的 DynamoDB 指南
。它向您展示了如何执行基本的 DynamoDB 活动:创建和删除表、操作项目、运行批量操作、运行查询和执行扫描。其示例使用资源接口。如果看到 boto3.resource('dynamodb')
,表明您正在使用较高级别的资源接口。 -
阅读完指南后,您可以查看 DynamoDB 参考
。此登录页面提供了可供您使用的类和方法的详尽列表。在顶部,您将看到 DynamoDB.Client
类。这提供了对所有控制面板和数据面板操作的低级别访问。在底部,请看DynamoDB.ServiceResource
类。这是较高级别的 Python 风格的接口。使用它,您可以创建表、对表执行批量操作或获取表特定操作的DynamoDB.ServiceResource.Table
实例。
了解客户端和资源抽象层
您将使用的两个接口是客户端接口和资源接口。
-
低级别客户端接口提供与底层服务 API 的一对一映射。DynamoDB 提供的每个 API 都可通过客户端获得。这意味着客户端接口可以提供完整的功能,但使用起来往往更加冗长且复杂。
-
更高级别的资源接口不提供底层服务 API 的一对一映射。但是,它提供了一些方法,让您能够更方便地访问
batch_writer
等服务。
以下是使用客户端接口插入项目的示例。请注意所有值是如何以映射形式传递的,键表示它们的类型(“S”代表字符串,“N”代表数字),它们的值作为字符串。这被称为 DynamoDB JSON 格式。
import boto3 dynamodb = boto3.client('dynamodb') dynamodb.put_item( TableName='YourTableName', Item={ 'pk': {'S': 'id#1'}, 'sk': {'S': 'cart#123'}, 'name': {'S': 'SomeName'}, 'inventory': {'N': '500'}, # ... more attributes ... } )
以下是使用资源接口的相同 PutItem
操作。数据输入是隐式的:
import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item={ 'pk': 'id#1', 'sk': 'cart#123', 'name': 'SomeName', 'inventory': 500, # ... more attributes ... } )
如果需要,您可以使用随 boto3 提供的 TypeSerializer
和 TypeDeserializer
类在常规 JSON 和 DynamoDB JSON 之间进行转换:
def dynamo_to_python(dynamo_object: dict) -> dict: deserializer = TypeDeserializer() return { k: deserializer.deserialize(v) for k, v in dynamo_object.items() } def python_to_dynamo(python_object: dict) -> dict: serializer = TypeSerializer() return { k: serializer.serialize(v) for k, v in python_object.items() }
下面展示了如何使用客户端接口执行查询。它将查询表示为 JSON 构造。它使用 KeyConditionExpression
字符串,该字符串需要变量替换来处理任何潜在的关键字冲突:
import boto3 client = boto3.client('dynamodb') # Construct the query response = client.query( TableName='YourTableName', KeyConditionExpression='pk = :pk_val AND begins_with(sk, :sk_val)', FilterExpression='#name = :name_val', ExpressionAttributeValues={ ':pk_val': {'S': 'id#1'}, ':sk_val': {'S': 'cart#'}, ':name_val': {'S': 'SomeName'}, }, ExpressionAttributeNames={ '#name': 'name', } )
使用资源接口的相同查询操作可以缩短和简化:
import boto3 from boto3.dynamodb.conditions import Key, Attr dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') response = table.query( KeyConditionExpression=Key('pk').eq('id#1') & Key('sk').begins_with('cart#'), FilterExpression=Attr('name').eq('SomeName') )
最后一个例子,假设您想得到一个表的大致大小(这是保存在表中的元数据,大约每 6 小时更新一次)。使用客户端接口,您可以执行 describe_table()
操作并从返回的 JSON 结构中提取答案:
import boto3 dynamodb = boto3.client('dynamodb') response = dynamodb.describe_table(TableName='YourTableName') size = response['Table']['TableSizeBytes']
通过资源接口,表隐式执行描述操作,并将数据直接作为属性呈现:
import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') size = table.table_size_bytes
注意
在考虑使用客户端接口还是资源接口进行开发时,请注意,根据以下资源文档
使用表资源 batch_writer
只有较高级别的表资源才有的一种便利就是 batch_writer
。DynamoDB 支持批量写入操作,允许在一个网络请求中执行多达 25 个放置或删除操作。像这样的批处理可以最大限度地减少网络往返行程,从而提高效率。
使用低级别客户端库,您可以使用 client.batch_write_item()
操作来运行批处理。您必须手动将工作分成 25 个批次。每次操作后,您还必须请求接收未处理的项目列表(有些写入操作可能成功,而有些可能失败)。然后,您必须将这些未处理的项目再次传递到以后的 batch_write_item()
操作中。有大量的样板代码。
Table.batch_writer
dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') movies = # long list of movies in {'pk': 'val', 'sk': 'val', etc} format with table.batch_writer() as writer: for movie in movies: writer.put_item(Item=movie)
探索客户端和资源层的其它代码示例
您还可以参考以下代码示例存储库,这些存储库使用客户端和资源探索各种函数的用法:
了解客户端和资源对象如何与会话和线程交互
资源对象并非线程安全的,不应跨线程或进程共享。有关更多详细信息,请参阅资源指南
相比之下,客户端对象通常是线程安全的,但特定的高级特征除外。有关更多详细信息,请参阅客户端指南
会话对象不是线程安全的。因此,每次在多线程环境中创建客户端或资源时,都应先创建一个新会话,然后从会话中创建客户端或资源。有关更多详细信息,请参阅会话指南
如果调用 boto3.resource()
,则隐式使用默认会话。这便于编写单线程代码。在编写多线程代码时,您需要先为每个线程构造一个新的会话,然后从该会话中检索资源:
# Explicitly create a new Session for this thread session = boto3.Session() dynamodb = session.resource('dynamodb')
自定义配置对象
在构造客户端或资源对象时,您可以传递可选的命名参数来自定义行为。名为 config
的参数可以解锁各种功能。它是 botocore.client.Config
的一个实例,配置参考文档
注意
您可以在 AWS 配置文件内的会话级别,或作为环境变量修改其中的许多行为设置。
超时配置
自定义配置的一个用途是调整网络行为:
-
connect_timeout(浮点或整数)– 尝试建立连接时引发超时异常之前的时间(以秒为单位)。默认值为 60 秒。
-
read_timeout(浮点或整数)– 尝试从连接中读取时引发超时异常之前的时间(以秒为单位)。默认值为 60 秒。
对于 DynamoDB 来说,60 秒的超时时间过长。这意味着暂时性网络故障会导致客户端延迟一分钟,然后才能重试。以下代码将超时时间缩短到一秒:
import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0 ) dynamodb = boto3.resource('dynamodb', config=my_config)
有关超时的更多讨论,请参阅调整延迟感知型 DynamoDB 应用程序的 AWS Java SDK HTTP 请求设置
keep-alive 配置
如果您使用的是 botocore 1.27.84 或更高版本,您也可以控制 TCP Keep-Alive:
-
tcp_keepalive(布尔值)- 如果设置为
True
(默认为False
),则启用创建新连接时使用的 TCP Keep-Alive 套接字选项。这仅支持 botocore 1.27.84 及更高版本。
将 TCP Keep-Alive 设置为 True
可以减少平均延迟。以下是当您拥有合适的 botocore 版本时,有条件地将 TCP Keep-Alive 设置为 true 的示例代码:
import botocore import boto3 from botocore.config import Config from distutils.version import LooseVersion required_version = "1.27.84" current_version = botocore.__version__ my_config = Config( connect_timeout = 0.5, read_timeout = 0.5 ) if LooseVersion(current_version) > LooseVersion(required_version): my_config = my_config.merge(Config(tcp_keepalive = True)) dynamodb = boto3.resource('dynamodb', config=my_config)
注意
TCP Keep-Alive 与 HTTP Keep-Alive 不同。使用 TCP Keep-Alive,底层操作系统通过套接字连接发送小数据包,以保持连接处于活动状态并立即检测到任何丢包。使用 HTTP Keep-Alive,在底层套接字上构建的 Web 连接可以重复使用。使用 boto3 时,HTTP Keep-Alive 始终处于启用状态。
空闲连接可以保持活动状态的时间是有限制的。如果您有一个空闲的连接,但希望下次请求使用已经建立的连接,可以考虑定期发送请求(比如每分钟发送一次)。
重试配置
该配置还接受名为 retries 的字典,您可以在其中指定所需的重试行为。当 SDK 收到错误且错误属于临时类型时,SDK 中会发生重试。如果在内部重试错误(并且重试最终生成成功响应),则从调用代码的角度来看,没有错误,只是延迟略有增加。以下是您可以指定的值:
-
max_attempts – 一个整数,表示单个请求将进行的最大重试次数。例如,将此值设置为 2 将导致请求在初始请求之后最多重试两次。将此值设置为 0 将导致在初始请求之后不会尝试任何重试。
-
total_max_attempts – 一个整数,表示单个请求将进行的最大总尝试次数。这包括初始请求,因此值为 1 表示不会重试任何请求。如果同时提供
total_max_attempts
和max_attempts
,则total_max_attempts
优先。total_max_attempts
之所以优先于max_attempts
,是因为它映射到AWS_MAX_ATTEMPTS
环境变量和max_attempts
配置文件值。 -
mode – 一个字符串,表示 botocore 应使用的重试模式类型。有效值为:
-
legacy - 默认模式。首次重试等待 50 毫秒,然后使用基准因子为 2 的指数回退。对于 DynamoDB,除非使用上述值覆盖,否则它最多总共可执行 10 次尝试。
注意
使用指数回退,最后一次尝试将等待几乎 13 秒。
-
standard – 之所以命名为标准版,是因为它与其它 AWS SDK 更加一致。对于首次重试,随机等待 0 毫秒到 1000 毫秒不等。如果需要再次重试,它会从 0 毫秒到 1000 毫秒之间随机选择另一个时间,然后将其乘以 2。如果需要更多重试,它会进行相同的随机选择,然后乘以 4,依此类推。每次等待的上限为 20 秒。与
legacy
模式相比,此模式将对更多检测到的故障条件执行重试。对于 DynamoDB,除非使用上述值覆盖,否则它最多总共可执行 3 次尝试。 -
adaptive - 一种实验性重试模式,包括标准模式的所有功能,但添加了自动客户端限制。通过自适应速率限制,SDK 可以降低请求的发送速率,以便更好地容纳 AWS 服务的容量。这是一种临时模式,其行为可能会发生变化。
-
这些重试模式的扩展定义可以在重试指南
以下示例明确使用 legacy
重试策略,请求总数最多为 3 次(2 次重试):
import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0, retries = { 'mode': 'legacy', 'total_max_attempts': 3 } ) dynamodb = boto3.resource('dynamodb', config=my_config)
由于 DynamoDB 是一个高可用、低延迟的系统,因此您可能希望在重试速度上比内置重试策略所允许的更激进。您可以实现自己的重试策略,方法是将最大尝试次数设置为 0,自己捕获异常,然后根据自己的代码酌情重试,而不是依赖 boto3 进行隐式重试。
如果您管理自己的重试策略,则需要区分节流和错误:
-
节流(由
ProvisionedThroughputExceededException
或ThrottlingException
表示)表示服务运行正常,它会通知您已超出 DynamoDB 表或分区的读取或写入容量。每过一毫秒,就会有多一点的读取或写入容量可用,因此您可以快速重试(例如每 50 毫秒重试一次),尝试访问新释放的容量。使用节流,您并不特别需要指数回退,因为节流属于轻量级,DynamoDB 可以返回,而且不会向您收取每次请求的费用。指数回退会将更长的延迟分配给已经等待最长时间的客户端线程,从统计学上讲,将超越 p50 和 p99。 -
错误(由
InternalServerError
或ServiceUnavailable
等表示)表示服务存在暂时性问题。这可以是针对整个表,也可以只是您正在读取或写入的分区。使用错误,您可以在重试前暂停更长时间(例如 250 毫秒或 500 毫秒),并使用抖动来错开重试。
最大池连接数配置
最后,配置允许您控制连接池大小:
-
max_pool_connections (int) – 连接池中要保留的最大连接数。如果未指定值,则使用默认值 10。
此选项控制要保留在池中以供重复使用的最大 HTTP 连接数。每个会话保留一个不同的池。如果您预计会有超过 10 个线程使用基于同一会话构建的客户端或资源,则应考虑提高该值,这样线程就不必等待使用池连接的其它线程。
import boto3 from botocore.config import Config my_config = Config( max_pool_connections = 20 ) # Setup a single session holding up to 20 pooled connections session = boto3.Session(my_config) # Create up to 20 resources against that session for handing to threads # Notice the single-threaded access to the Session and each Resource resource1 = session.resource('dynamodb') resource2 = session.resource('dynamodb') # etc
错误处理
Boto3 中并未全部静态定义 AWS 服务异常。这是因为 AWS 服务的错误和异常差异很大,并且可能会发生变化。Boto3 将所有服务异常封装为 ClientError
,并以结构化 JSON 公开详细信息。例如,错误响应的结构可能如下所示:
{ 'Error': { 'Code': 'SomeServiceException', 'Message': 'Details/context around the exception or error' }, 'ResponseMetadata': { 'RequestId': '1234567890ABCDEF', 'HostId': 'host ID data will appear here as a hash', 'HTTPStatusCode': 400, 'HTTPHeaders': {'header metadata key/values will appear here'}, 'RetryAttempts': 0 } }
以下代码会捕获任何 ClientError
异常,并查看 Error
中 Code
的字符串值来确定要采取的操作:
import botocore import boto3 dynamodb = boto3.client('dynamodb') try: response = dynamodb.put_item(...) except botocore.exceptions.ClientError as err: print('Error Code: {}'.format(err.response['Error']['Code'])) print('Error Message: {}'.format(err.response['Error']['Message'])) print('Http Code: {}'.format(err.response['ResponseMetadata']['HTTPStatusCode'])) print('Request ID: {}'.format(err.response['ResponseMetadata']['RequestId'])) if err.response['Error']['Code'] in ('ProvisionedThroughputExceededException', 'ThrottlingException'): print("Received a throttle") elif err.response['Error']['Code'] == 'InternalServerError': print("Received a server error") else: raise err
一些(但不是全部)异常代码已被具体化为顶级类。您可以选择直接处理这些类。使用客户端接口时,这些异常会在您的客户端上动态填充,您可以使用客户端实例捕获这些异常,如下所示:
except ddb_client.exceptions.ProvisionedThroughputExceededException:
使用资源接口时,必须使用 .meta.client
从资源遍历到底层客户端才能访问异常,如下所示:
except ddb_resource.meta.client.exceptions.ProvisionedThroughputExceededException:
要查看具体化异常类型的列表,可以动态生成列表:
ddb = boto3.client("dynamodb") print([e for e in dir(ddb.exceptions) if e.endswith('Exception') or e.endswith('Error')])
使用条件表达式执行写入操作时,您可以请求:如果表达式失败,则应在错误响应中返回该项目的值。
try: response = table.put_item( Item=item, ConditionExpression='attribute_not_exists(pk)', ReturnValuesOnConditionCheckFailure='ALL_OLD' ) except table.meta.client.exceptions.ConditionalCheckFailedException as e: print('Item already exists:', e.response['Item'])
要进一步了解错误处理和异常,请执行以下操作:
-
boto3 错误处理指南
提供了有关错误处理技巧的更多信息。 -
DynamoDB 开发人员指南中关于编程错误的部分列出了您可能遇到的错误。
-
每个 API 操作的文档都列出了调用可能产生的错误(例如 BatchWriteItem)。
日志记录
Boto3 库与 Python 的内置日志记录模块集成,用于跟踪会话期间发生的情况。要控制日志记录级别,可以配置日志记录模块:
import logging logging.basicConfig(level=logging.INFO)
这会将根记录器配置为记录 INFO
及更高级别的消息。严重程度低于这些级别的日志消息将被忽略。日志记录级别包括 DEBUG
、INFO
、WARNING
、ERROR
和 CRITICAL
。默认为 WARNING
。
Boto3 中的记录器是分层的。该库使用几个不同的记录器,每个记录器对应库的不同部分。您可以分别控制每个记录器的行为:
-
boto3:boto3 模块的主记录器。
-
botocore:botocore 软件包的主记录器。
-
botocore.auth:用于记录请求的 AWS 签名创建过程。
-
botocore.credentials:用于记录凭证获取和刷新的过程。
-
botocore.endpoint:用于在请求通过网络发送之前记录请求的创建过程。
-
botocore.hooks:用于记录库中触发的事件。
-
botocore.loaders:用于记录加载部分 AWS 服务模型的情况。
-
botocore.parsers:用于在解析 AWS 服务响应之前对其进行记录。
-
botocore.retryhandler:用于记录 AWS 服务请求重试的处理情况(传统模式)。
-
botocore.retries.standard:用于记录 AWS 服务请求重试的处理情况(标准模式或自适应模式)。
-
botocore.utils:用于记录库中的其它活动。
-
botocore.waiter:用于记录 Waiter 的功能,后者轮询 AWS 服务直到达到特定状态。
其它库也记录。在内部,boto3 使用第三方 urllib3 进行 HTTP 连接处理。如果延迟很重要,您可以通过查看 urllib3 何时建立新连接或关闭空闲连接来查看其日志,以确保您的池得到充分利用。
-
urllib3.connectionpool:用于记录连接池处理事件。
以下代码段将端点和连接池活动的大部分记录设置为 INFO
和 DEBUG
:
import logging logging.getLogger('boto3').setLevel(logging.INFO) logging.getLogger('botocore').setLevel(logging.INFO) logging.getLogger('botocore.endpoint').setLevel(logging.DEBUG) logging.getLogger('urllib3.connectionpool').setLevel(logging.DEBUG)
事件钩子
Botocore 在其执行的各个阶段都会发出事件。您可以为这些事件注册处理程序,这样无论何时发出事件,您的处理程序都会被调用。这使您无需修改内部结构即可扩展 botocore 的行为。
例如,假设您要跟踪应用程序中的任何 DynamoDB 表上每次调用 PutItem
操作的情况。每次在关联的会话上调用 PutItem
操作时,您都可以在 'provide-client-params.dynamodb.PutItem'
事件上注册以捕获并记录。示例如下:
import boto3 import botocore import logging def log_put_params(params, **kwargs): if 'TableName' in params and 'Item' in params: logging.info(f"PutItem on table {params['TableName']}: {params['Item']}") logging.basicConfig(level=logging.INFO) session = boto3.Session() event_system = session.events # Register our interest in hooking in when the parameters are provided to PutItem event_system.register('provide-client-params.dynamodb.PutItem', log_put_params) # Now, every time you use this session to put an item in DynamoDB, # it will log the table name and item data. dynamodb = session.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item={ 'pk': '123', 'sk': 'cart#123', 'item_data': 'YourItemData', # ... more attributes ... } )
在处理程序中,您甚至可以通过编程方式操作参数来改变行为:
params['TableName'] = "NewTableName"
有关事件的更多信息,请参阅事件的 botocore 文档
分页和分页工具
某些请求(例如查询和扫描)会限制针对单个请求返回的数据大小,并要求您重复请求才能显示后续页面。
您可以使用 limit
参数控制每页可读取的最大项目数。例如,如果您想要读取最后 10 个项目,则可以使用 limit
仅检索最后 10 个项目。请注意,限制是在应用任何筛选条件之前应从表中读取多少项目。筛选后无法确切指定想要检索 10 个项目;只有在切实检索到 10 个项目后,您才能控制预先筛选的数量并检查客户端。不管限制如何,每个响应始终有 1 MB 的最大大小。
如果响应包含 LastEvaluatedKey
,则表示响应已结束,因为它达到了计数或大小限制。此密钥是针对该响应评估的最后一个密钥。您可以检索此 LastEvaluatedKey
并将其作为 ExclusiveStartKey
传递给后续调用,以便从该起点读取下一个数据块。如果未返回 LastEvaluatedKey
,则表示没有其它与查询或扫描匹配的项目。
以下是一个简单的示例(使用资源接口,但客户端接口具有相同的模式),它每页最多读取 100 个项目,并循环操作,直到读取完所有项目。
import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') query_params = { 'KeyConditionExpression': Key('pk').eq('123') & Key('sk').gt(1000), 'Limit': 100 } while True: response = table.query(**query_params) # Process the items however you like for item in response['Items']: print(item) # No LastEvaluatedKey means no more items to retrieve if 'LastEvaluatedKey' not in response: break # If there are possibly more items, update the start key for the next page query_params['ExclusiveStartKey'] = response['LastEvaluatedKey']
为方便起见,boto3 可以用分页工具为您执行此操作。但是,它仅适用于客户端接口。以下是为使用分页工具而重写的代码:
import boto3 dynamodb = boto3.client('dynamodb') paginator = dynamodb.get_paginator('query') query_params = { 'TableName': 'YourTableName', 'KeyConditionExpression': 'pk = :pk_val AND sk > :sk_val', 'ExpressionAttributeValues': { ':pk_val': {'S': '123'}, ':sk_val': {'N': '1000'}, }, 'Limit': 100 } page_iterator = paginator.paginate(**query_params) for page in page_iterator: # Process the items however you like for item in page['Items']: print(item)
有关更多信息,请参阅分页工具指南
注意
分页工具也有自己的配置设置,名为 MaxItems
、StartingToken
和 PageSize
。要使用 DynamoDB 进行分页,则应忽略这些设置。
Waiter
通过 Waiter 可以等待某件事完成后再继续操作。目前,它们仅支持等待创建或删除表。在后台,Waiter 操作每 20 秒为您检查一次,最多 25 次。您可以自己做,但是在编写自动化时使用 Waiter 会很舒服。
以下代码演示了如何等待创建特定表:
# Create a table, wait until it exists, and print its ARN response = client.create_table(...) waiter = client.get_waiter('table_exists') waiter.wait(TableName='YourTableName') print('Table created:', response['TableDescription']['TableArn']
有关更多信息,请参阅 Waiter 指南