使用 DynamoDB 锁客户端进行分布式锁定
DynamoDB 锁客户端是一个开源库,对于需要传统“锁定-获取-释放”语义的应用程序,该客户端使用 DynamoDB 表作为锁定存储来实施分布式锁定。当您需要对多个应用程序实例协调对外部资源(例如 S3 对象或共享配置)的访问时,此方法非常有用。
锁客户端以开源 Java 库
工作原理
锁客户端使用专用 DynamoDB 表来跟踪锁定。每个锁定都表示为具有以下关键属性的项目:
分区键,标识锁定的资源。
租赁持续时间,指定锁的有效期限。如果锁持有程序崩溃或无响应,则该锁将在租赁持续时间过后自动过期。
检测信号,由锁持有程序定期发送,用于延长租赁。这样可以防止持有程序仍在活跃地进行处理时,锁定变得过期。
锁客户端使用条件写入来确保一次只有一个进程可以获取某个锁。如果锁已经被持有,则调用方可以选择等待并重试或者立即失败。
使用锁客户端的情况
锁客户端非常适合以下情况:
需要跨多个应用程序实例或微服务协调对共享资源的访问。
关键环节的运行时间很长(几秒到几分钟),出现冲突时重试整个操作成本很高。
您需要自动锁定到期来妥善处理进程故障。
常见的示例包括编排分布式工作流、跨多个实例协调 cron 作业以及管理对共享外部资源的访问。
权衡
- 其他基础设施
需要专用 DynamoDB 表用于锁定管理,并且需要额外的读取和写入容量用于锁定操作和检测信号。
- 时钟依赖性
锁定到期取决于时间戳。客户端之间的严重时钟偏差可能会导致意外行为,尤其是对于较短的租赁持续时间。
- 死锁风险
如果应用程序在多个资源上获取锁定,则必须按一致的顺序进行获取以避免死锁。租赁持续时间可以自动从未响应的持有程序上释放锁定,提供了一层安全保护机制。
实现
以下示例说明如何使用 DynamoDB 锁客户端获取和释放锁定:
import java.io.IOException; import java.util.Optional; import java.util.concurrent.TimeUnit; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; final DynamoDbClient dynamoDB = DynamoDbClient.builder() .region(Region.US_WEST_2) .build(); final AmazonDynamoDBLockClient lockClient = new AmazonDynamoDBLockClient( AmazonDynamoDBLockClientOptions.builder(dynamoDB, "Locks") .withTimeUnit(TimeUnit.SECONDS) .withLeaseDuration(10L) .withHeartbeatPeriod(3L) .withCreateHeartbeatBackgroundThread(true) .build()); // Try to acquire a lock on a resource final Optional<LockItem> lock = lockClient.tryAcquireLock(AcquireLockOptions.builder("my-shared-resource").build()); if (lock.isPresent()) { try { // Perform operations that require exclusive access processSharedResource(); } finally { // Always release the lock when done lockClient.releaseLock(lock.get()); } } else { System.out.println("Failed to acquire lock."); } lockClient.close();
重要
始终释放 finally 块中的锁,以确保即使处理逻辑引发异常也会释放锁定。未释放的锁定会阻塞其他进程,直到租赁到期。
您还可以直接使用条件写入,来实施无需锁客户端库的简单锁定机制。以下示例使用 UpdateItem 与条件表达式来获取锁定,使用 DeleteItem 将其释放:
from datetime import datetime, timedelta from boto3.dynamodb.conditions import Attr def acquire_lock(table, resource_name, owner_id, ttl_seconds): """Attempt to acquire a lock. Returns True if successful.""" expiry = (datetime.now() + timedelta(seconds=ttl_seconds)).isoformat() now = datetime.now().isoformat() try: table.update_item( Key={'LockID': resource_name}, UpdateExpression='SET #owner = :owner, #expiry = :expiry', ConditionExpression=Attr('LockID').not_exists() | Attr('ExpiresAt').lt(now), ExpressionAttributeNames={'#owner': 'OwnerID', '#expiry': 'ExpiresAt'}, ExpressionAttributeValues={':owner': owner_id, ':expiry': expiry} ) return True except table.meta.client.exceptions.ConditionalCheckFailedException: return False def release_lock(table, resource_name, owner_id): """Release a lock. Only succeeds if the caller is the lock owner.""" try: table.delete_item( Key={'LockID': resource_name}, ConditionExpression=Attr('OwnerID').eq(owner_id) ) return True except table.meta.client.exceptions.ConditionalCheckFailedException: return False
此方法使用条件表达式来确保只能获取尚不存在的锁或者已过期的锁,并且只能由获取该锁的进程释放锁定。请考虑在锁定表上启用生存时间(TTL)来自动清理过期的锁定项目。