适用于 Python 的亚马逊QLDB驱动程序 — 食谱参考 - 亚马逊 Quantum Ledger 数据库(亚马逊QLDB)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适用于 Python 的亚马逊QLDB驱动程序 — 食谱参考

重要

终止支持通知:现有客户将能够使用亚马逊,QLDB直到 2025 年 7 月 31 日终止支持。有关更多详细信息,请参阅将亚马逊QLDB账本迁移到亚马逊 Aurora Postgr SQL e。

本参考指南显示了 Python 版 Amazon QLDB 驱动程序的常见用例。它提供了 Python 代码示例,演示了如何使用驱动程序运行基本的创建、读取、更新和删除 (CRUD) 操作。它还包括用于处理 Amazon Ion 数据的代码示例。此外,本指南还重点介绍了使事务具有幂等性和实现唯一性约束的最佳实践。

注意

在适用的情况下,某些用例对应的 Python QLDB 驱动程序的每个主要版本都有不同的代码示例。

导入驱动程序

下面的代码示例导入驱动程序。

3.x
from pyqldb.driver.qldb_driver import QldbDriver import amazon.ion.simpleion as simpleion
2.x
from pyqldb.driver.pooled_qldb_driver import PooledQldbDriver import amazon.ion.simpleion as simpleion
注意

此示例还导入了 Amazon Ion 软件包(amazon.ion.simpleion)。在本参考中运行某些数据操作时,您需要此软件包来处理 Ion 数据。要了解更多信息,请参阅使用 Amazon Ion

实例化驱动程序

以下代码示例使用默认设置创建连接到指定分类账名称的驱动程序实例。

3.x
qldb_driver = QldbDriver(ledger_name='vehicle-registration')
2.x
qldb_driver = PooledQldbDriver(ledger_name='vehicle-registration')

CRUD操作

QLDB作为事务的一部分运行创建、读取、更新和删除 (CRUD) 操作。

警告

作为最佳实践,使写事务严格地幂等。

使事务幂等

我们建议将写事务设置为幂等,以避免重试时出现任何意想不到的副作用。如果事务可以运行多次并每次都产生相同的结果,则事务是幂等的

例如,假设有一个事务,要将文档插入名为 Person 的表中。事务应该首先检查文档是否已经存在于表中。如果没有这种检查,表最终可能会有重复的文档。

假设在服务器端QLDB成功提交了事务,但是客户端在等待响应时超时。如果事务不是幂等的,则在重试的情况下可以多次插入相同的文档。

使用索引避免全表扫描

我们还建议您在索引字段或文档 ID 上使用相等运算符来运行带有WHERE谓词子句的语句;例如,WHERE indexedField = 123WHERE indexedField IN (456, 789)。如果没有这种索引查找,则QLDB需要进行表扫描,这可能会导致事务超时或乐观的并发控制 () OCC 冲突

有关 OCC 的更多信息,请参阅 亚马逊QLDB并发模型

隐式创建的事务

pyqldb.driver.qldb_driver.execute_lambda 方法接受一个 lambda 函数,该函数接收一个 TransactionExecutor 的实例,您可以用它来运行语句。Executor 的实例封装了隐式创建的事务。

您可以使用事务执行器的 execute_statement 法在 lambda 函数中运行语句。当 lambda 函数返回时,驱动程序会隐式提交事务。

注意

execute_statement 方法同时支持 Amazon Ion 类型和 Python 本机类型。如果您将 Python 本地类型作为参数传递给 execute_statement,则驱动程序会使用 amazon.ion.simpleion 模块将其转换为 Ion 类型(前提是支持对给定 Python 数据类型的转换)。有关支持的数据类型和转换规则,请参阅 simpleion 源代码

以下各节介绍如何运行基本CRUD操作、指定自定义重试逻辑以及如何实现唯一性约束。

创建表

def create_table(transaction_executor): transaction_executor.execute_statement("CREATE TABLE Person") qldb_driver.execute_lambda(lambda executor: create_table(executor))

创建索引

def create_index(transaction_executor): transaction_executor.execute_statement("CREATE INDEX ON Person(GovId)") qldb_driver.execute_lambda(lambda executor: create_index(executor))

阅读文档

# Assumes that Person table has documents as follows: # { "GovId": "TOYENC486FH", "FirstName": "Brent" } def read_documents(transaction_executor): cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = 'TOYENC486FH'") for doc in cursor: print(doc["GovId"]) # prints TOYENC486FH print(doc["FirstName"]) # prints Brent qldb_driver.execute_lambda(lambda executor: read_documents(executor))

使用查询参数

以下代码示例使用原生类型查询参数。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH')

以下代码示例使用 Ion 类型查询参数。

name = ion.loads('Brent') cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE FirstName = ?", name)

以下代码示例使用了多个查询参数。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ? AND FirstName = ?", 'TOYENC486FH', "Brent")

以下代码示例使用查询参数列表。

gov_ids = ['TOYENC486FH','ROEE1','YH844'] cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId IN (?,?,?)", *gov_ids)
注意

当您在没有索引查找的情况下运行查询时,它会调用全表扫描。在此示例中,我们建议在GovId字段上设置 索引以优化性能。如果不开启索引GovId,查询可能会有更长的延迟,还可能导致OCC冲突异常或事务超时。

插入文档

以下代码示例插入本地数据类型。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': "Brent", 'GovId': 'TOYENC486FH', } qldb_driver.execute_lambda(lambda executor: insert_documents(executor, doc_1))

以下代码示例插入 Ion 数据类型。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': 'Brent', 'GovId': 'TOYENC486FH', } # create a sample Ion doc ion_doc_1 = simpleion.loads(simpleion.dumps(doc_1))) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, ion_doc_1))

此事务将文档插入 Person 表中。在插入之前,它首先检查文档是否已存在于表格内。此检查使事务本质上是幂等。即使您多次运行此事务,也不会造成任何异常副作用。

注意

在此示例中,我们建议在 GovId 字段上设置索引以优化性能。如果不开启索引GovId,语句可能会有更长的延迟,还可能导致OCC冲突异常或事务超时。

在一条语句内插入多个文档

要使用单个 INSERT 语句插入多个文档,可以向该语句传递一个列表类型的参数,如下所示。

# people is a list transaction_executor.execute_statement("INSERT INTO Person ?", people)

传递 Ion 列表时,不要将变量占位符(?)括在双尖括号(<<...>>)内。在手动 PartiQL 语句中,双尖括号表示名为bag的无序集合。

更新文档

以下代码示例使用原生数据类型。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) gov_id = 'TOYENC486FH' name = 'John' qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))

以下代码示例使用 Ion 数据类型。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') name = simpleion.loads('John') qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))
注意

在此示例中,我们建议在 GovId 字段上设置索引以优化性能。如果不开启索引GovId,语句可能会有更长的延迟,还可能导致OCC冲突异常或事务超时。

删除文档

以下代码示例使用原生数据类型。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) gov_id = 'TOYENC486FH' qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))

以下代码示例使用 Ion 数据类型。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))
注意

在此示例中,我们建议在 GovId 字段上设置索引以优化性能。如果不开启索引GovId,语句可能会有更长的延迟,还可能导致OCC冲突异常或事务超时。

在一个事务中运行多条语句

# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

重试逻辑

驱动程序的execute_lambda方法具有内置的重试机制,如果发生可重试的异常(例如超时或冲突),该机制可以重试事务。OCC

3.x

最大重试次数和退避策略为可配置。

默认的重试限制为 4,默认的退避策略是以10毫秒为基数的 Exponential Backoff and Jitter。你可以使用 pyqldb.config.retr y_config 的实例为每个驱动程序实例和每个事务设置重试配置。 RetryConfig

以下代码示例使用自定义重试限制和驱动程序实例的自定义退避策略指定重试逻辑。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config = RetryConfig(retry_limit=2) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_custom_backoff = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_custom_backoff)

以下代码示例使用自定义重试限制和自定义回退策略为特定 lambda 执行指定重试逻辑。此execute_lambda配置将覆盖为驱动程序实例设置的重试逻辑。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config_1 = RetryConfig(retry_limit=4) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_1) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_2 = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) # The config `retry_config_1` will be overriden by `retry_config_2` qldb_driver.execute_lambda(lambda txn: txn.execute_statement("CREATE TABLE Person"), retry_config_2)
2.x

最大重试次数。您可以通过在初始化PooledQldbDriver 时设置 retry_limit 属性来配置重试限制。

默认重试限制为 4

实现唯一限制

QLDB不支持唯一索引,但你可以在应用程序中实现此行为。

假设您要对 Person 表中的 GovId 字段实现唯一性约束。据此,可以编写执行以下操作的事务:

  1. 断言该表中没有指定 GovId 的现有文档。

  2. 如果断言通过,请插入文档。

如果一个竞争事务同时通过断言,则只有一个事务将成功提交。另一笔交易将因OCC冲突异常而失败。

以下代码示例显示了如何实现此唯一约束。

def insert_documents(transaction_executor, gov_id, document): # Check if doc with GovId = gov_id exists cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", gov_id) # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", document) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, gov_id, document))
注意

在此示例中,我们建议在 GovId 字段上设置索引以优化性能。如果不开启索引GovId,语句可能会有更长的延迟,还可能导致OCC冲突异常或事务超时。

使用 Amazon Ion

以下各节说明了如何使用 Amazon Ion 模块处理 Ion 数据。

导入 Ion 模块

import amazon.ion.simpleion as simpleion

创建 Ion 类型

以下代码示例从 Ion 文本创建 Ion 对象。

ion_text = '{GovId: "TOYENC486FH", FirstName: "Brent"}' ion_obj = simpleion.loads(ion_text) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['Name']) # prints Brent

以下代码示例从 Python dict创建 Ion 对象。

a_dict = { 'GovId': 'TOYENC486FH', 'FirstName': "Brent" } ion_obj = simpleion.loads(simpleion.dumps(a_dict)) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['FirstName']) # prints Brent

获取 Ion 二进制转储

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj)) # b'\xe0\x01\x00\xea\xee\x97\x81\x83\xde\x93\x87\xbe\x90\x85GovId\x89FirstName\xde\x94\x8a\x8bTOYENC486FH\x8b\x85Brent'

获取 Ion 文本转储

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj, binary=False)) # prints $ion_1_0 {GovId:'TOYENC486FH',FirstName:"Brent"}

有关使用 Ion 的更多信息,请参阅上的 Amazon Ion 文档 GitHub。有关在中使用 Ion 的更多代码示例QLDB,请参阅在亚马逊中使用 Amazon Ion 数据类型 QLDB