Python 的 Amazon QLDB 驅動程式 – 技術指南參考 - Amazon Quantum Ledger Database (Amazon QLDB)

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Python 的 Amazon QLDB 驅動程式 – 技術指南參考

重要

支援終止通知:現有客戶將可以使用 Amazon QLDB,直到 07/31/2025 的支援結束為止。如需詳細資訊,請參閱將 Amazon QLDB Ledger 遷移至 Amazon Aurora PostgreSQL

此參考指南顯示適用於 Python 的 Amazon QLDB 驅動程式的常見使用案例。它提供 Python 程式碼範例,示範如何使用驅動程式執行基本的建立、讀取、更新和刪除 (CRUD) 操作。它也包含處理 Amazon Ion 資料的程式碼範例。此外,本指南重點介紹了建立交易等冪和實作唯一性限制的最佳實務。

注意

在適用的情況下,某些使用案例對於 Python 的 QLDB 驅動程式的每個支援主要版本都有不同的程式碼範例。

匯入驅動程式

下列程式碼範例會匯入驅動程式。

3.x
from pyqldb.driver.qldb_driver import QldbDriver import amazon.ion.simpleion as simpleion
2.x
from pyqldb.driver.pooled_qldb_driver import PooledQldbDriver import amazon.ion.simpleion as simpleion
注意

此範例也會匯入 Amazon Ion 套件 (amazon.ion.simpleion)。您需要此套件,才能在此參考中執行某些資料操作時處理 Ion 資料。如需進一步了解,請參閱 使用 Amazon Ion

執行個體化驅動程式

下列程式碼範例會建立驅動程式的執行個體,使用預設設定連線至指定的分類帳名稱。

3.x
qldb_driver = QldbDriver(ledger_name='vehicle-registration')
2.x
qldb_driver = PooledQldbDriver(ledger_name='vehicle-registration')

CRUD 操作

QLDB 會在交易中執行建立、讀取、更新和刪除 (CRUD) 操作。

警告

最佳實務是,讓您的寫入交易具有嚴格等冪性。

使交易具有等冪性

我們建議您將寫入交易設為等冪,以避免重試時發生任何非預期的副作用。如果交易可以多次執行,並且每次產生相同的結果,則交易是冪的。

例如,請考慮將文件插入名為 的資料表的交易Person。交易應先檢查資料表中是否已存在該文件。如果沒有此檢查,資料表最終可能會顯示重複的文件。

假設 QLDB 在伺服器端成功遞交交易,但用戶端在等待回應時逾時。如果交易不等冪,在重試的情況下,相同的文件可以插入多次。

使用索引來避免完整資料表掃描

我們也建議您在索引欄位或文件 ID 上使用等式運算子,使用WHERE述詞子來執行陳述式;例如, WHERE indexedField = 123WHERE indexedField IN (456, 789)。如果沒有此索引查詢,QLDB 需要執行資料表掃描,這可能會導致交易逾時或樂觀並行控制 (OCC) 衝突。

如需 OCC 的詳細資訊,請參閱 Amazon QLDB 並行模型

隱含建立的交易

pyqldb.driver.qldb_driver.execute_lambda 方法接受 lambda 函數,該函數會接收 pyqldb.execution.executor.Executor 的執行個體,您可以使用該函數來執行陳述式。執行個體會Executor包裝隱含建立的交易。

您可以使用交易執行器的 execute_statement 方法,在 lambda 函數中執行陳述式。當 lambda 函數傳回時,驅動程式會隱含遞交交易。

注意

execute_statement方法同時支援 Amazon Ion 類型和 Python 原生類型。如果您將 Python 原生類型做為引數傳遞至 execute_statement,驅動程式會使用 amazon.ion.simpleion模組將其轉換為 Ion 類型 (前提是支援指定 Python 資料類型的轉換)。如需支援的資料類型和轉換規則,請參閱簡易原始程式碼

下列各節說明如何執行基本 CRUD 操作、指定自訂重試邏輯,以及實作唯一性限制條件。

建立資料表

def create_table(transaction_executor): transaction_executor.execute_statement("CREATE TABLE Person") qldb_driver.execute_lambda(lambda executor: create_table(executor))

建立索引

def create_index(transaction_executor): transaction_executor.execute_statement("CREATE INDEX ON Person(GovId)") qldb_driver.execute_lambda(lambda executor: create_index(executor))

讀取文件

# Assumes that Person table has documents as follows: # { "GovId": "TOYENC486FH", "FirstName": "Brent" } def read_documents(transaction_executor): cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = 'TOYENC486FH'") for doc in cursor: print(doc["GovId"]) # prints TOYENC486FH print(doc["FirstName"]) # prints Brent qldb_driver.execute_lambda(lambda executor: read_documents(executor))

使用查詢參數

下列程式碼範例使用原生類型查詢參數。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH')

下列程式碼範例使用 Ion 類型查詢參數。

name = ion.loads('Brent') cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE FirstName = ?", name)

下列程式碼範例使用多個查詢參數。

cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ? AND FirstName = ?", 'TOYENC486FH', "Brent")

下列程式碼範例使用查詢參數的清單。

gov_ids = ['TOYENC486FH','ROEE1','YH844'] cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId IN (?,?,?)", *gov_ids)
注意

當您在沒有索引查詢的情況下執行查詢時,它會叫用完整的資料表掃描。在此範例中,我們建議在 GovId 欄位中具有索引,以最佳化效能。如果沒有 上的索引GovId,查詢可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

插入文件

下列程式碼範例會插入原生資料類型。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': "Brent", 'GovId': 'TOYENC486FH', } qldb_driver.execute_lambda(lambda executor: insert_documents(executor, doc_1))

下列程式碼範例會插入 Ion 資料類型。

def insert_documents(transaction_executor, arg_1): # Check if doc with GovId:TOYENC486FH exists # This is critical to make this transaction idempotent cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", 'TOYENC486FH') # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", arg_1) doc_1 = { 'FirstName': 'Brent', 'GovId': 'TOYENC486FH', } # create a sample Ion doc ion_doc_1 = simpleion.loads(simpleion.dumps(doc_1))) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, ion_doc_1))

此交易會將文件插入Person資料表。插入之前,它會先檢查文件是否已存在資料表中。此檢查會讓交易具有等冪性質。即使您多次執行此交易,也不會造成任何非預期的副作用。

注意

在此範例中,我們建議在 GovId 欄位中具有索引,以最佳化效能。如果沒有 上的索引GovId,陳述式可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

在一個陳述式中插入多個文件

若要使用單一INSERT陳述式插入多個文件,您可以將類型清單的參數傳遞至陳述式,如下所示。

# people is a list transaction_executor.execute_statement("INSERT INTO Person ?", people)

傳遞清單時,您不會將變數預留位置 (?) 括在雙角度括號 ( ) <<...>> 中。在手動 PartiQL 陳述式中,雙角括號表示稱為的未排序集合。

更新文件

下列程式碼範例使用原生資料類型。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) gov_id = 'TOYENC486FH' name = 'John' qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))

下列程式碼範例使用 Ion 資料類型。

def update_documents(transaction_executor, gov_id, name): transaction_executor.execute_statement("UPDATE Person SET FirstName = ? WHERE GovId = ?", name, gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') name = simpleion.loads('John') qldb_driver.execute_lambda(lambda executor: update_documents(executor, gov_id, name))
注意

在此範例中,我們建議在 GovId 欄位中具有索引,以最佳化效能。如果沒有 上的索引GovId,陳述式可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

刪除文件

下列程式碼範例使用原生資料類型。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) gov_id = 'TOYENC486FH' qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))

下列程式碼範例使用 Ion 資料類型。

def delete_documents(transaction_executor, gov_id): cursor = transaction_executor.execute_statement("DELETE FROM Person WHERE GovId = ?", gov_id) # Ion datatypes gov_id = simpleion.loads('TOYENC486FH') qldb_driver.execute_lambda(lambda executor: delete_documents(executor, gov_id))
注意

在此範例中,我們建議在 GovId 欄位中具有索引,以最佳化效能。如果沒有 上的索引GovId,陳述式可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

在交易中執行多個陳述式

# This code snippet is intentionally trivial. In reality you wouldn't do this because you'd # set your UPDATE to filter on vin and insured, and check if you updated something or not. def do_insure_car(transaction_executor, vin): cursor = transaction_executor.execute_statement( "SELECT insured FROM Vehicles WHERE vin = ? AND insured = FALSE", vin) first_record = next(cursor, None) if first_record: transaction_executor.execute_statement( "UPDATE Vehicles SET insured = TRUE WHERE vin = ?", vin) return True else: return False def insure_car(qldb_driver, vin_to_insure): return qldb_driver.execute_lambda( lambda executor: do_insure_car(executor, vin_to_insure))

重試邏輯

驅動程式的 execute_lambda方法具有內建重試機制,可在發生可重試例外狀況 (例如逾時或 OCC 衝突) 時重試交易。

3.x

重試嘗試次數上限和退避策略是可設定的。

預設重試限制為 4,預設退避策略為指數退避和抖動,基礎10為毫秒。您可以使用 pyqldb.config.retry_config.RetryConfig 執行個體,設定每個驅動程式執行個體和每個交易的重試組態。

下列程式碼範例指定具有自訂重試限制的重試邏輯,以及驅動程式執行個體的自訂退避策略。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config = RetryConfig(retry_limit=2) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_custom_backoff = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_custom_backoff)

下列程式碼範例指定具有自訂重試限制的重試邏輯,以及特定 lambda 執行的自訂退避策略。此 組態會execute_lambda覆寫為驅動程式執行個體設定的重試邏輯。

from pyqldb.config.retry_config import RetryConfig from pyqldb.driver.qldb_driver import QldbDriver # Configuring retry limit to 2 retry_config_1 = RetryConfig(retry_limit=4) qldb_driver = QldbDriver("test-ledger", retry_config=retry_config_1) # Configuring a custom backoff which increases delay by 1s for each attempt. def custom_backoff(retry_attempt, error, transaction_id): return 1000 * retry_attempt retry_config_2 = RetryConfig(retry_limit=2, custom_backoff=custom_backoff) # The config `retry_config_1` will be overriden by `retry_config_2` qldb_driver.execute_lambda(lambda txn: txn.execute_statement("CREATE TABLE Person"), retry_config_2)
2.x

重試嘗試次數上限為可設定。您可以在初始化 時設定 retry_limit 屬性,以設定重試限制PooledQldbDriver

預設重試限制為 4

實作唯一性限制

QLDB 不支援唯一索引,但您可以在應用程式中實作此行為。

假設您想要在Person資料表中的 GovId 欄位實作唯一性限制條件。若要執行此操作,您可以撰寫執行下列動作的交易:

  1. 宣告資料表沒有具有指定 的現有文件GovId

  2. 如果聲明通過,請插入文件。

如果競爭交易同時通過聲明,則只有一個交易會成功遞交。另一個交易將失敗,並出現 OCC 衝突例外狀況。

下列程式碼範例示範如何實作此唯一性限制邏輯。

def insert_documents(transaction_executor, gov_id, document): # Check if doc with GovId = gov_id exists cursor = transaction_executor.execute_statement("SELECT * FROM Person WHERE GovId = ?", gov_id) # Check if there is any record in the cursor first_record = next(cursor, None) if first_record: # Record already exists, no need to insert pass else: transaction_executor.execute_statement("INSERT INTO Person ?", document) qldb_driver.execute_lambda(lambda executor: insert_documents(executor, gov_id, document))
注意

在此範例中,我們建議在 GovId 欄位中具有索引,以最佳化效能。如果沒有 上的索引GovId,陳述式可能會有更多延遲,也可能導致 OCC 衝突例外狀況或交易逾時。

使用 Amazon Ion

下列各節說明如何使用 Amazon Ion 模組來處理 Ion 資料。

匯入 Ion 模組

import amazon.ion.simpleion as simpleion

建立 Ion 類型

下列程式碼範例會從 Ion 文字建立 Ion 物件。

ion_text = '{GovId: "TOYENC486FH", FirstName: "Brent"}' ion_obj = simpleion.loads(ion_text) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['Name']) # prints Brent

下列程式碼範例會從 Python 建立 Ion 物件dict

a_dict = { 'GovId': 'TOYENC486FH', 'FirstName': "Brent" } ion_obj = simpleion.loads(simpleion.dumps(a_dict)) print(ion_obj['GovId']) # prints TOYENC486FH print(ion_obj['FirstName']) # prints Brent

取得 Ion 二進位傾印

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj)) # b'\xe0\x01\x00\xea\xee\x97\x81\x83\xde\x93\x87\xbe\x90\x85GovId\x89FirstName\xde\x94\x8a\x8bTOYENC486FH\x8b\x85Brent'

取得 Ion 文字傾印

# ion_obj is an Ion struct print(simpleion.dumps(ion_obj, binary=False)) # prints $ion_1_0 {GovId:'TOYENC486FH',FirstName:"Brent"}

如需使用 Ion 的詳細資訊,請參閱 GitHub 上的 Amazon Ion 文件。如需在 QLDB 中使用 Ion 的更多程式碼範例,請參閱 在 Amazon QLDB 中使用 Amazon Ion 資料類型