Python 的 Amazon QLDB 驅動程序-食譜參考 - Amazon Quantum Ledger 資料庫 (Amazon QLDB)

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Python 的 Amazon QLDB 驅動程序-食譜參考

重要

支援結束通知:現有客戶將能夠使用 Amazon,QLDB直到 2025 年 7 月 31 日終止支援為止。有關更多詳細信息,請參閱將 Amazon QLDB 分類帳遷移到 Amazon Aurora 郵政. SQL

本參考指南顯示適用於 Python 的 Amazon QLDB 驅動程式的常見使用案例。它提供了 Python 代碼示例,演示瞭如何使用驅動程序來運行基本的創建,讀取,更新和刪除(CRUD)操作。它還包括用於處理 Amazon Ion 資料的程式碼範例。此外,本指南還重點介紹了使交易冪等和實施唯一性約束的最佳實踐。

注意

在適用的情況下,某些使用案例會針對每個支援的 Python QLDB 驅動程式主要版本提供不同的程式碼範例。

匯入驅動程式

下列程式碼範例會匯入驅動程式。

3.x
from pyqldb.driver.qldb_driver import QldbDriver import amazon.ion.simpleion as simpleion
2.x
from pyqldb.driver.pooled_qldb_driver import PooledQldbDriver import amazon.ion.simpleion as simpleion
注意

此範例也會匯入 Amazon Ion 套件 (amazon.ion.simpleion)。在此參考中執行某些資料作業時,您需要此套件來處理 Ion 資料。如需進一步了解,請參閱與 Amazon 離子工作

實例化驅動程序

下列程式碼範例會建立使用預設設定連線至指定分類帳名稱的驅動程式執行環境。

3.x
qldb_driver = QldbDriver(ledger_name='vehicle-registration')
2.x
qldb_driver = PooledQldbDriver(ledger_name='vehicle-registration')

CRUD操作

QLDB在交易中執行建立、讀取、更新和刪除 (CRUD) 作業。

警告

作為最佳實踐,使您的寫入事務嚴格為冪等。

使交易冪等

我們建議您將寫入事務設為冪等,以避免在重試的情況下出現任何意外的副作用。如果事務可以運行多次並每次產生相同的結果,則事務是冪等的。

例如,假設將文件插入名為的資料表中的交易Person。事務應該首先檢查文檔是否已經存在於表中。如果沒有此檢查,表格最終可能會出現重複的文件。

假設成QLDB功地在服務器端提交事務,但客戶端在等待響應時超時。如果事務不是冪等的,則在重試的情況下,可以多次插入相同的文檔。

使用索引來避免全表掃描

我們也建議您在索引欄位或文件 ID 上使用相等運算子來執行具有述WHERE詞子句的陳述式;例如,WHERE indexedField = 123WHERE indexedField IN (456, 789)。如果沒有此索引查找,則QLDB需要進行表掃描,這可能會導致事務超時或樂觀的並發控制(OCC)衝突。

如需有關 OCC 的詳細資訊,請參閱 Amazon QLDB 并发模型

隱含建立的交易

執行程序 .execute_lambda 方法接受一個 lambda 函數,該函數接受一個 lambda 函數,該函數接收執行程序。執行程序,您可以使用它來運行語句的實例Executor封裝隱含建立之交易的執行個體。

您可以使用交易執行程式的 execute_陳述式方法,在 Lambda 函數中執行陳述式。當 lambda 函數返回時,驅動程序隱式地提交交易。

注意

execute_statement方法同時支持 Amazon 離子類型和 Python 本機類型。如果您將 Python 原生類型作為引數傳遞給execute_statement,則驅動程序會使用amazon.ion.simpleion模塊將其轉換為 Ion 類型(前提是支持給定 Python 數據類型的轉換)。有關支持的數據類型和轉換規則,請參閱簡單源代碼

下列各節說明如何執行基本CRUD作業、指定自訂重試邏輯,以及實作唯一性條件約束。

建立資料表

def create_table(transaction_executor): transaction_executor.execute_statement("CREATE TABLE Person") qldb_driver.execute_lambda(lambda executor: create_table(executor))

建立索引

def create_index(transaction_executor): transaction_executor.execute_statement("CREATE INDEX ON Person(GovId)") qldb_driver.execute_lambda(lambda executor: create_index(executor))

閱讀文件

# Assumes that Person table has documents as follows: # { "GovId": "TOYENC486FH", "FirstName": "Brent" } def read_documents(transaction_executor): cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = 'TOYENC486FH'") for doc in cursor: print(doc["GovId"]) # prints TOYENC486FH print(doc["FirstName"]) # prints Brent qldb_driver.execute_lambda(lambda executor: read_documents(executor))

使用查詢參數

下列程式碼範例會使用原生型別查詢參數。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH')

下列程式碼範例會使用 Ion 型別查詢參數。

name = ion.loads('Brent') cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE FirstName = ?", name)

下列程式碼範例會使用多個查詢參數。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ? AND FirstName = ?", 'TOYENC486FH', "Brent")

下列程式碼範例使用查詢參數清單。

gov_ids = ['TOYENC486FH','ROEE1','YH844'] cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId IN (?,?,?)", *gov_ids)
注意

當您在沒有索引查閱的情況下執行查詢時,會叫用完整資料表掃描。在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,查詢可能會有更多延遲,也可能導致OCC衝突例外狀況或交易逾時。

插入文件

下列程式碼範例會插入原生資料類型。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': "Brent", 'GovId': 'TOYENC486FH', } qldb_driver.execute_lambda(lambda executor: insert_documents(executor, doc_1))

下列程式碼範例會插入 Ion 資料型別。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': 'Brent', 'GovId': 'TOYENC486FH', } # create a sample Ion doc ion_doc_1 = simpleion.loads(simpleion.dumps(doc_1))) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, ion_doc_1))

此事務將一個文檔插入到Person表中。在插入之前,它會先檢查文件是否已存在於表格中。這個檢查使事務本質上是冪等的。即使您多次運行此事務,也不會造成任何意外的副作用。

注意

在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,陳述式可能會有更多延遲,也可能導致OCC衝突例外狀況或交易逾時。

在一個語句中插入多個文檔

要通過使用單個INSERT語句插入多個文檔,你可以通過類型列表的參數到語句,如下所示。

# people is a list transaction_executor.execute_statement("INSERT INTO Person ?", people)

傳遞清單時,您不會將變數預留位置 (?) 括在雙尖括號 (<<...>>) 中。在手動 PartiQL 陳述式中,雙角括號表示稱為袋子的無序集合。

更新文件

下列程式碼範例會使用原生資料類型。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) gov_id = 'TOYENC486FH' name = 'John' qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))

下列程式碼範例會使用 Ion 資料型別。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') name = simpleion.loads('John') qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))
注意

在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,陳述式可能會有更多延遲,也可能導致OCC衝突例外狀況或交易逾時。

刪除文件

下列程式碼範例會使用原生資料類型。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) gov_id = 'TOYENC486FH' qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))

下列程式碼範例會使用 Ion 資料型別。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))
注意

在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,陳述式可能會有更多延遲,也可能導致OCC衝突例外狀況或交易逾時。

在交易中執行多個陳述式

# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

重試邏輯

驅動程序的execute_lambda方法具有內置的重試機制,如果發生可重試的異常(例如超時或衝突),則會重試事務。OCC

3.x

重試嘗試次數上限和輪詢策略是可設定的。

預設重試限制為4,預設輪詢策略為指數輪詢和抖動,以毫秒為基礎。10您可以通過使用 pyqldb.config.retry_config 的實例來設置每個驅動程序實例和每個事務的重試配置。 RetryConfig

下列程式碼範例會指定具有自訂重試限制的重試邏輯,以及驅動程式執行個體的自訂輪詢策略。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config = RetryConfig(retry_limit=2) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_custom_backoff = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_custom_backoff)

下列程式碼範例會指定具有自訂重試限制的重試邏輯,以及針對特定 lambda 執行的自訂輪詢策略。此組態execute_lambda會覆寫為驅動程式執行個體設定的重試邏輯。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config_1 = RetryConfig(retry_limit=4) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_1) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_2 = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) # The config `retry_config_1` will be overriden by `retry_config_2` qldb_driver.execute_lambda(lambda txn: txn.execute_statement("CREATE TABLE Person"), retry_config_2)
2.x

重試嘗試次數上限是可設定的。您可以在初始化PooledQldbDriver時設定retry_limit屬性來設定重試限制。

預設重試限制為4

實施唯一性約束

QLDB不支持唯一索引,但您可以在應用程序中實現此行為。

假設您要在Person表中的GovId字段上實現唯一性約束。要做到這一點,你可以寫一個事務,執行以下操作:

  1. 斷言該表沒有具有指定GovId的現有文檔。

  2. 插入文檔,如果斷言通過。

如果競爭交易同時通過斷言,則只有其中一個交易將成功提交。另一個交易將會失敗,並出現OCC衝突例外狀況。

下列程式碼範例會示範如何實作此唯一性條件約束邏輯。

def insert_documents(transaction_executor, gov_id, document): # Check if doc with GovId = gov_id exists cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", gov_id) # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", document) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, gov_id, document))
注意

在此範例中,我們建議在GovId欄位上建立索引以最佳化效能。如果沒有開啟索引GovId,陳述式可能會有更多延遲,也可能導致OCC衝突例外狀況或交易逾時。

與 Amazon 離子工作

以下各節說明如何使用 Amazon Ion 模組來處理離子資料。

匯入離子模組

import amazon.ion.simpleion as simpleion

建立離子類型

下列程式碼範例會從 Ion 文字建立 Ion 物件。

ion_text = '{GovId: "TOYENC486FH", FirstName: "Brent"}' ion_obj = simpleion.loads(ion_text) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['Name']) # prints Brent

下列程式碼範例會從 Python 建立離子物件dict

a_dict = { 'GovId': 'TOYENC486FH', 'FirstName': "Brent" } ion_obj = simpleion.loads(simpleion.dumps(a_dict)) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['FirstName']) # prints Brent

獲取離子二進制轉儲

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj)) # b'\xe0\x01\x00\xea\xee\x97\x81\x83\xde\x93\x87\xbe\x90\x85GovId\x89FirstName\xde\x94\x8a\x8bTOYENC486FH\x8b\x85Brent'

獲取離子文本轉儲

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj, binary=False)) # prints $ion_1_0 {GovId:'TOYENC486FH',FirstName:"Brent"}

如需有關使用 Ion 的詳細資訊,請參閱上的 Amazon Ion 文件 GitHub。如需中使用 Ion 的更多程式碼範例QLDB,請參閱使用 Amazon 離子數據類型在 Amazon QLDB