使用发出 Amazon S3 有条件的请求 AWS SDK。 - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用发出 Amazon S3 有条件的请求 AWS SDK。

以下代码示例显示了如何向 Amazon S3 请求添加先决条件。

Python
SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

运行演示 Amazon S3 条件请求的交互式场景。

""" Purpose Shows how to use AWS SDK for Python (Boto3) to get started using conditional requests for Amazon Simple Storage Service (Amazon S3). """ import logging import random import sys import datetime import boto3 from botocore.exceptions import ClientError from s3_conditional_requests import S3ConditionalRequests # Add relative path to include demo_tools in this code example without need for setup. sys.path.append("../../../..") import demo_tools.question as q # noqa # Constants FILE_CONTENT = "This is a test file for S3 conditional requests." RANDOM_SUFFIX = str(random.randint(100, 999)) logger = logging.getLogger(__name__) class ConditionalRequestsScenario: """Runs a scenario that shows how to use S3 Conditional Requests.""" def __init__(self, conditional_requests, s3_client): """ :param conditional_requests: An object that wraps S3 conditional request actions. :param s3_client: A Boto3 S3 client for setup and cleanup operations. """ self.conditional_requests = conditional_requests self.s3_client = s3_client def setup_scenario(self, source_bucket: str, dest_bucket: str, object_key: str): """ Sets up the scenario by creating a source and destination bucket. Prompts the user to provide a bucket name prefix. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. :param object_key: The name of a test file to add to the source bucket. """ # Create the buckets. try: self.s3_client.create_bucket(Bucket=source_bucket) self.s3_client.create_bucket(Bucket=dest_bucket) print( f"Created source bucket: {source_bucket} and destination bucket: {dest_bucket}" ) except ClientError as e: error_code = e.response["Error"]["Code"] logger.error(f"Error creating buckets: {error_code}") raise # Upload test file into the source bucket. try: print(f"Uploading file {object_key} to bucket {source_bucket}") response = self.s3_client.put_object( Bucket=source_bucket, Key=object_key, Body=FILE_CONTENT ) object_etag = response["ETag"] return object_etag except Exception as e: logger.error( f"Failed to upload file {object_key} to bucket {source_bucket}: {e}" ) def cleanup_scenario(self, source_bucket: str, dest_bucket: str): """ Cleans up the scenario by deleting the source and destination buckets. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. """ self.cleanup_bucket(source_bucket) self.cleanup_bucket(dest_bucket) def cleanup_bucket(self, bucket_name: str): """ Cleans up the bucket by deleting all objects and then the bucket itself. :param bucket_name: The name of the bucket. """ try: # Get list of all objects in the bucket. list_response = self.s3_client.list_objects_v2(Bucket=bucket_name) objs = list_response.get("Contents", []) for obj in objs: key = obj["Key"] self.s3_client.delete_object(Bucket=bucket_name, Key=key) self.s3_client.delete_bucket(Bucket=bucket_name) print(f"Cleaned up bucket: {bucket_name}.") except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "NoSuchBucket": logger.info(f"Bucket {bucket_name} does not exist, skipping cleanup.") else: logger.error(f"Error deleting bucket: {error_code}") raise def display_buckets(self, source_bucket: str, dest_bucket: str): """ Display a list of the objects in the test buckets. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. """ self.list_bucket_contents(source_bucket) self.list_bucket_contents(dest_bucket) def list_bucket_contents(self, bucket_name): """ Display a list of the objects in the bucket. :param bucket_name: The name of the bucket. """ try: # Get list of all objects in the bucket. print(f"\t Items in bucket {bucket_name}") list_response = self.s3_client.list_objects_v2(Bucket=bucket_name) objs = list_response.get("Contents", []) if not objs: print("\t\tNo objects found.") for obj in objs: key = obj["Key"] print(f"\t\t object: {key} ETag {obj['ETag']}") return objs except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "NoSuchBucket": logger.info(f"Bucket {bucket_name} does not exist.") else: logger.error(f"Error listing bucket and objects: {error_code}") raise def display_menu( self, source_bucket: str, dest_bucket: str, object_key: str, etag: str ): """ Displays the menu of conditional request options for the user. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. :param object_key: The key of the test object in the source bucket. :param etag: The etag of the test object in the source bucket. """ actions = [ "Print list of bucket items.", "Perform a conditional read.", "Perform a conditional copy.", "Perform a conditional write.", "Clean up and exit.", ] conditions = [ "If-Match: using the object's ETag. This condition should succeed.", "If-None-Match: using the object's ETag. This condition should fail.", "If-Modified-Since: using yesterday's date. This condition should succeed.", "If-Unmodified-Since: using yesterday's date. This condition should fail.", ] condition_types = [ "IfMatch", "IfNoneMatch", "IfModifiedSince", "IfUnmodifiedSince", ] copy_condition_types = [ "CopySourceIfMatch", "CopySourceIfNoneMatch", "CopySourceIfModifiedSince", "CopySourceIfUnmodifiedSince", ] yesterday_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) choice = 0 while choice != 4: print("-" * 88) print("Choose an action to explore some example conditional requests.") choice = q.choose("Which action would you like to take? ", actions) if choice == 0: print("Listing the objects and buckets.") self.display_buckets(source_bucket, dest_bucket) elif choice == 1: print("Perform a conditional read.") condition_type = q.choose("Enter the condition type : ", conditions) if condition_type == 0 or condition_type == 1: self.conditional_requests.get_object_conditional( object_key, source_bucket, condition_types[condition_type], etag ) elif condition_type == 2 or condition_type == 3: self.conditional_requests.get_object_conditional( object_key, source_bucket, condition_types[condition_type], yesterday_date, ) elif choice == 2: print("Perform a conditional copy.") condition_type = q.choose("Enter the condition type : ", conditions) dest_key = q.ask("Enter an object key: ", q.non_empty) if condition_type == 0 or condition_type == 1: self.conditional_requests.copy_object_conditional( object_key, dest_key, source_bucket, dest_bucket, copy_condition_types[condition_type], etag, ) elif condition_type == 2 or condition_type == 3: self.conditional_requests.copy_object_conditional( object_key, dest_key, copy_condition_types[condition_type], yesterday_date, ) elif choice == 3: print( "Perform a conditional write using IfNoneMatch condition on the object key." ) print("If the key is a duplicate, the write will fail.") object_key = q.ask("Enter an object key: ", q.non_empty) self.conditional_requests.put_object_conditional( object_key, source_bucket, b"Conditional write example data." ) elif choice == 4: print("Proceeding to cleanup.") def run_scenario(self): """ Runs the interactive scenario. """ print("-" * 88) print("Welcome to the Amazon S3 conditional requests example.") print("-" * 88) print( f"""\ This example demonstrates the use of conditional requests for S3 operations. You can use conditional requests to add preconditions to S3 read requests to return or copy an object based on its Entity tag (ETag), or last modified date. You can use a conditional write requests to prevent overwrites by ensuring there is no existing object with the same key. This example will allow you to perform conditional reads and writes that will succeed or fail based on your selected options. Sample buckets and a sample object will be created as part of the example. """ ) bucket_prefix = q.ask("Enter a bucket name prefix: ", q.non_empty) source_bucket_name = f"{bucket_prefix}-source-{RANDOM_SUFFIX}" dest_bucket_name = f"{bucket_prefix}-dest-{RANDOM_SUFFIX}" object_key = "test-upload-file.txt" try: etag = self.setup_scenario(source_bucket_name, dest_bucket_name, object_key) self.display_menu(source_bucket_name, dest_bucket_name, object_key, etag) finally: self.cleanup_scenario(source_bucket_name, dest_bucket_name) print("-" * 88) print("Thanks for watching.") print("-" * 88) if __name__ == "__main__": scenario = ConditionalRequestsScenario( S3ConditionalRequests.from_client(), boto3.client("s3") ) scenario.run_scenario()

定义条件请求操作的包装器类。

import boto3 import logging from botocore.exceptions import ClientError # Configure logging logger = logging.getLogger(__name__) class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def get_object_conditional( self, object_key: str, source_bucket: str, condition_type: str, condition_value: str, ): """ Retrieves an object from Amazon S3 with a conditional request. :param object_key: The key of the object to retrieve. :param source_bucket: The source bucket of the object. :param condition_type: The type of condition: 'IfMatch', 'IfNoneMatch', 'IfModifiedSince', 'IfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: response = self.s3.get_object( Bucket=source_bucket, Key=object_key, **{condition_type: condition_value}, ) sample_bytes = response["Body"].read(20) print( f"\tConditional read successful. Here are the first 20 bytes of the object:\n" ) print(f"\t{sample_bytes}") except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional read failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional read failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise def put_object_conditional(self, object_key: str, source_bucket: str, data: bytes): """ Uploads an object to Amazon S3 with a conditional request. Prevents overwrite using an IfNoneMatch condition for the object key. :param object_key: The key of the object to upload. :param source_bucket: The source bucket of the object. :param data: The data to upload. """ try: self.s3.put_object( Bucket=source_bucket, Key=object_key, Body=data, IfNoneMatch="*" ) print( f"\tConditional write successful for key {object_key} in bucket {source_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional write failed: Precondition failed") else: logger.error(f"Unexpected error: {error_code}") raise def copy_object_conditional( self, source_key: str, dest_key: str, source_bucket: str, dest_bucket: str, condition_type: str, condition_value: str, ): """ Copies an object from one Amazon S3 bucket to another with a conditional request. :param source_key: The key of the source object to copy. :param dest_key: The key of the destination object. :param source_bucket: The source bucket of the object. :param dest_bucket: The destination bucket of the object. :param condition_type: The type of condition to apply, e.g. 'CopySourceIfMatch', 'CopySourceIfNoneMatch', 'CopySourceIfModifiedSince', 'CopySourceIfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: self.s3.copy_object( Bucket=dest_bucket, Key=dest_key, CopySource={"Bucket": source_bucket, "Key": source_key}, **{condition_type: condition_value}, ) print( f"\tConditional copy successful for key {dest_key} in bucket {dest_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional copy failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional copy failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise