Amazon S3 examples using SDK for Python (Boto3) - AWS SDK Code Examples

There are more AWS SDK examples available in the AWS Doc SDK Examples GitHub repo.

Amazon S3 examples using SDK for Python (Boto3)

The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Python (Boto3) with Amazon S3.

Basics are code examples that show you how to perform the essential operations within a service.

Actions are code excerpts from larger programs and must be run in context. While actions show you how to call individual service functions, you can see actions in context in their related scenarios.

Scenarios are code examples that show you how to accomplish specific tasks by calling multiple functions within a service or combined with other AWS services.

Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.

Get started

The following code examples show how to get started using Amazon S3.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

import boto3 def hello_s3(): """ Use the AWS SDK for Python (Boto3) to create an Amazon Simple Storage Service (Amazon S3) resource and list the buckets in your account. This example uses the default settings specified in your shared credentials and config files. """ s3_resource = boto3.resource("s3") print("Hello, Amazon S3! Let's list your buckets:") for bucket in s3_resource.buckets.all(): print(f"\t{bucket.name}") if __name__ == "__main__": hello_s3()
  • For API details, see ListBuckets in AWS SDK for Python (Boto3) API Reference.

Basics

The following code example shows how to:

  • Create a bucket and upload a file to it.

  • Download an object from a bucket.

  • Copy an object to a subfolder in a bucket.

  • List the objects in a bucket.

  • Delete the bucket objects and the bucket.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

import io import os import uuid import boto3 from boto3.s3.transfer import S3UploadFailedError from botocore.exceptions import ClientError def do_scenario(s3_resource): print("-" * 88) print("Welcome to the Amazon S3 getting started demo!") print("-" * 88) bucket_name = f"amzn-s3-demo-bucket-{uuid.uuid4()}" bucket = s3_resource.Bucket(bucket_name) try: bucket.create( CreateBucketConfiguration={ "LocationConstraint": s3_resource.meta.client.meta.region_name } ) print(f"Created demo bucket named {bucket.name}.") except ClientError as err: print(f"Tried and failed to create demo bucket {bucket_name}.") print(f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}") print(f"\nCan't continue the demo without a bucket!") return file_name = None while file_name is None: file_name = input("\nEnter a file you want to upload to your bucket: ") if not os.path.exists(file_name): print(f"Couldn't find file {file_name}. Are you sure it exists?") file_name = None obj = bucket.Object(os.path.basename(file_name)) try: obj.upload_file(file_name) print( f"Uploaded file {file_name} into bucket {bucket.name} with key {obj.key}." ) except S3UploadFailedError as err: print(f"Couldn't upload file {file_name} to {bucket.name}.") print(f"\t{err}") answer = input(f"\nDo you want to download {obj.key} into memory (y/n)? ") if answer.lower() == "y": data = io.BytesIO() try: obj.download_fileobj(data) data.seek(0) print(f"Got your object. Here are the first 20 bytes:\n") print(f"\t{data.read(20)}") except ClientError as err: print(f"Couldn't download {obj.key}.") print( f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}" ) answer = input( f"\nDo you want to copy {obj.key} to a subfolder in your bucket (y/n)? " ) if answer.lower() == "y": dest_obj = bucket.Object(f"demo-folder/{obj.key}") try: dest_obj.copy({"Bucket": bucket.name, "Key": obj.key}) print(f"Copied {obj.key} to {dest_obj.key}.") except ClientError as err: print(f"Couldn't copy {obj.key} to {dest_obj.key}.") print( f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}" ) print("\nYour bucket contains the following objects:") try: for o in bucket.objects.all(): print(f"\t{o.key}") except ClientError as err: print(f"Couldn't list the objects in bucket {bucket.name}.") print(f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}") answer = input( "\nDo you want to delete all of the objects as well as the bucket (y/n)? " ) if answer.lower() == "y": try: bucket.objects.delete() bucket.delete() print(f"Emptied and deleted bucket {bucket.name}.\n") except ClientError as err: print(f"Couldn't empty and delete bucket {bucket.name}.") print( f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}" ) print("Thanks for watching!") print("-" * 88) if __name__ == "__main__": do_scenario(boto3.resource("s3"))

Actions

The following code example shows how to use CopyObject.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def copy(self, dest_object): """ Copies the object to another bucket. :param dest_object: The destination object initialized with a bucket and key. This is a Boto3 Object resource. """ try: dest_object.copy_from( CopySource={"Bucket": self.object.bucket_name, "Key": self.object.key} ) dest_object.wait_until_exists() logger.info( "Copied object from %s:%s to %s:%s.", self.object.bucket_name, self.object.key, dest_object.bucket_name, dest_object.key, ) except ClientError: logger.exception( "Couldn't copy object from %s/%s to %s/%s.", self.object.bucket_name, self.object.key, dest_object.bucket_name, dest_object.key, ) raise

Copy an object using a conditional request.

class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def copy_object_conditional( self, source_key: str, dest_key: str, source_bucket: str, dest_bucket: str, condition_type: str, condition_value: str, ): """ Copies an object from one Amazon S3 bucket to another with a conditional request. :param source_key: The key of the source object to copy. :param dest_key: The key of the destination object. :param source_bucket: The source bucket of the object. :param dest_bucket: The destination bucket of the object. :param condition_type: The type of condition to apply, e.g. 'CopySourceIfMatch', 'CopySourceIfNoneMatch', 'CopySourceIfModifiedSince', 'CopySourceIfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: self.s3.copy_object( Bucket=dest_bucket, Key=dest_key, CopySource={"Bucket": source_bucket, "Key": source_key}, **{condition_type: condition_value}, ) print( f"\tConditional copy successful for key {dest_key} in bucket {dest_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional copy failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional copy failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise
  • For API details, see CopyObject in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use CreateBucket.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create a bucket with default settings.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def create(self, region_override=None): """ Create an Amazon S3 bucket in the default Region for the account or in the specified Region. :param region_override: The Region in which to create the bucket. If this is not specified, the Region configured in your shared credentials is used. """ if region_override is not None: region = region_override else: region = self.bucket.meta.client.meta.region_name try: self.bucket.create(CreateBucketConfiguration={"LocationConstraint": region}) self.bucket.wait_until_exists() logger.info("Created bucket '%s' in region=%s", self.bucket.name, region) except ClientError as error: logger.exception( "Couldn't create bucket named '%s' in region=%s.", self.bucket.name, region, ) raise error

Create a versioned bucket with a lifecycle configuration.

def create_versioned_bucket(bucket_name, prefix): """ Creates an Amazon S3 bucket, enables it for versioning, and configures a lifecycle that expires noncurrent object versions after 7 days. Adding a lifecycle configuration to a versioned bucket is a best practice. It helps prevent objects in the bucket from accumulating a large number of noncurrent versions, which can slow down request performance. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket_name: The name of the bucket to create. :param prefix: Identifies which objects are automatically expired under the configured lifecycle rules. :return: The newly created bucket. """ try: bucket = s3.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ "LocationConstraint": s3.meta.client.meta.region_name }, ) logger.info("Created bucket %s.", bucket.name) except ClientError as error: if error.response["Error"]["Code"] == "BucketAlreadyOwnedByYou": logger.warning("Bucket %s already exists! Using it.", bucket_name) bucket = s3.Bucket(bucket_name) else: logger.exception("Couldn't create bucket %s.", bucket_name) raise try: bucket.Versioning().enable() logger.info("Enabled versioning on bucket %s.", bucket.name) except ClientError: logger.exception("Couldn't enable versioning on bucket %s.", bucket.name) raise try: expiration = 7 bucket.LifecycleConfiguration().put( LifecycleConfiguration={ "Rules": [ { "Status": "Enabled", "Prefix": prefix, "NoncurrentVersionExpiration": {"NoncurrentDays": expiration}, } ] } ) logger.info( "Configured lifecycle to expire noncurrent versions after %s days " "on bucket %s.", expiration, bucket.name, ) except ClientError as error: logger.warning( "Couldn't configure lifecycle on bucket %s because %s. " "Continuing anyway.", bucket.name, error, ) return bucket
  • For API details, see CreateBucket in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteBucket.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def delete(self): """ Delete the bucket. The bucket must be empty or an error is raised. """ try: self.bucket.delete() self.bucket.wait_until_not_exists() logger.info("Bucket %s successfully deleted.", self.bucket.name) except ClientError: logger.exception("Couldn't delete bucket %s.", self.bucket.name) raise
  • For API details, see DeleteBucket in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteBucketCors.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def delete_cors(self): """ Delete the CORS rules from the bucket. :param bucket_name: The name of the bucket to update. """ try: self.bucket.Cors().delete() logger.info("Deleted CORS from bucket '%s'.", self.bucket.name) except ClientError: logger.exception("Couldn't delete CORS from bucket '%s'.", self.bucket.name) raise
  • For API details, see DeleteBucketCors in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteBucketLifecycle.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def delete_lifecycle_configuration(self): """ Remove the lifecycle configuration from the specified bucket. """ try: self.bucket.LifecycleConfiguration().delete() logger.info( "Deleted lifecycle configuration for bucket '%s'.", self.bucket.name ) except ClientError: logger.exception( "Couldn't delete lifecycle configuration for bucket '%s'.", self.bucket.name, ) raise

The following code example shows how to use DeleteBucketPolicy.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def delete_policy(self): """ Delete the security policy from the bucket. """ try: self.bucket.Policy().delete() logger.info("Deleted policy for bucket '%s'.", self.bucket.name) except ClientError: logger.exception( "Couldn't delete policy for bucket '%s'.", self.bucket.name ) raise

The following code example shows how to use DeleteObject.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Delete an object.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def delete(self): """ Deletes the object. """ try: self.object.delete() self.object.wait_until_not_exists() logger.info( "Deleted object '%s' from bucket '%s'.", self.object.key, self.object.bucket_name, ) except ClientError: logger.exception( "Couldn't delete object '%s' from bucket '%s'.", self.object.key, self.object.bucket_name, ) raise

Roll an object back to a previous version by deleting later versions of the object.

def rollback_object(bucket, object_key, version_id): """ Rolls back an object to an earlier version by deleting all versions that occurred after the specified rollback version. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that holds the object to roll back. :param object_key: The object to roll back. :param version_id: The version ID to roll back to. """ # Versions must be sorted by last_modified date because delete markers are # at the end of the list even when they are interspersed in time. versions = sorted( bucket.object_versions.filter(Prefix=object_key), key=attrgetter("last_modified"), reverse=True, ) logger.debug( "Got versions:\n%s", "\n".join( [ f"\t{version.version_id}, last modified {version.last_modified}" for version in versions ] ), ) if version_id in [ver.version_id for ver in versions]: print(f"Rolling back to version {version_id}") for version in versions: if version.version_id != version_id: version.delete() print(f"Deleted version {version.version_id}") else: break print(f"Active version is now {bucket.Object(object_key).version_id}") else: raise KeyError( f"{version_id} was not found in the list of versions for " f"{object_key}." )

Revive a deleted object by removing the object's active delete marker.

def revive_object(bucket, object_key): """ Revives a versioned object that was deleted by removing the object's active delete marker. A versioned object presents as deleted when its latest version is a delete marker. By removing the delete marker, we make the previous version the latest version and the object then presents as *not* deleted. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that contains the object. :param object_key: The object to revive. """ # Get the latest version for the object. response = s3.meta.client.list_object_versions( Bucket=bucket.name, Prefix=object_key, MaxKeys=1 ) if "DeleteMarkers" in response: latest_version = response["DeleteMarkers"][0] if latest_version["IsLatest"]: logger.info( "Object %s was indeed deleted on %s. Let's revive it.", object_key, latest_version["LastModified"], ) obj = bucket.Object(object_key) obj.Version(latest_version["VersionId"]).delete() logger.info( "Revived %s, active version is now %s with body '%s'", object_key, obj.version_id, obj.get()["Body"].read(), ) else: logger.warning( "Delete marker is not the latest version for %s!", object_key ) elif "Versions" in response: logger.warning("Got an active version for %s, nothing to do.", object_key) else: logger.error("Couldn't get any version info for %s.", object_key)

Create a Lambda handler that removes a delete marker from an S3 object. This handler can be used to efficiently clean up extraneous delete markers in a versioned bucket.

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
  • For API details, see DeleteObject in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteObjects.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Delete a set of objects by using a list of object keys.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key @staticmethod def delete_objects(bucket, object_keys): """ Removes a list of objects from a bucket. This operation is done as a batch in a single request. :param bucket: The bucket that contains the objects. This is a Boto3 Bucket resource. :param object_keys: The list of keys that identify the objects to remove. :return: The response that contains data about which objects were deleted and any that could not be deleted. """ try: response = bucket.delete_objects( Delete={"Objects": [{"Key": key} for key in object_keys]} ) if "Deleted" in response: logger.info( "Deleted objects '%s' from bucket '%s'.", [del_obj["Key"] for del_obj in response["Deleted"]], bucket.name, ) if "Errors" in response: logger.warning( "Could not delete objects '%s' from bucket '%s'.", [ f"{del_obj['Key']}: {del_obj['Code']}" for del_obj in response["Errors"] ], bucket.name, ) except ClientError: logger.exception("Couldn't delete any objects from bucket %s.", bucket.name) raise else: return response

Delete all objects in a bucket.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key @staticmethod def empty_bucket(bucket): """ Remove all objects from a bucket. :param bucket: The bucket to empty. This is a Boto3 Bucket resource. """ try: bucket.objects.delete() logger.info("Emptied bucket '%s'.", bucket.name) except ClientError: logger.exception("Couldn't empty bucket '%s'.", bucket.name) raise

Permanently delete a versioned object by deleting all of its versions.

def permanently_delete_object(bucket, object_key): """ Permanently deletes a versioned object by deleting all of its versions. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that contains the object. :param object_key: The object to delete. """ try: bucket.object_versions.filter(Prefix=object_key).delete() logger.info("Permanently deleted all versions of object %s.", object_key) except ClientError: logger.exception("Couldn't delete all versions of %s.", object_key) raise
  • For API details, see DeleteObjects in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use GetBucketAcl.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def get_acl(self): """ Get the ACL of the bucket. :return: The ACL of the bucket. """ try: acl = self.bucket.Acl() logger.info( "Got ACL for bucket %s. Owner is %s.", self.bucket.name, acl.owner ) except ClientError: logger.exception("Couldn't get ACL for bucket %s.", self.bucket.name) raise else: return acl
  • For API details, see GetBucketAcl in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use GetBucketCors.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def get_cors(self): """ Get the CORS rules for the bucket. :return The CORS rules for the specified bucket. """ try: cors = self.bucket.Cors() logger.info( "Got CORS rules %s for bucket '%s'.", cors.cors_rules, self.bucket.name ) except ClientError: logger.exception(("Couldn't get CORS for bucket %s.", self.bucket.name)) raise else: return cors
  • For API details, see GetBucketCors in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use GetBucketLifecycleConfiguration.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def get_lifecycle_configuration(self): """ Get the lifecycle configuration of the bucket. :return: The lifecycle rules of the specified bucket. """ try: config = self.bucket.LifecycleConfiguration() logger.info( "Got lifecycle rules %s for bucket '%s'.", config.rules, self.bucket.name, ) except: logger.exception( "Couldn't get lifecycle rules for bucket '%s'.", self.bucket.name ) raise else: return config.rules

The following code example shows how to use GetBucketPolicy.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def get_policy(self): """ Get the security policy of the bucket. :return: The security policy of the specified bucket, in JSON format. """ try: policy = self.bucket.Policy() logger.info( "Got policy %s for bucket '%s'.", policy.policy, self.bucket.name ) except ClientError: logger.exception("Couldn't get policy for bucket '%s'.", self.bucket.name) raise else: return json.loads(policy.policy)
  • For API details, see GetBucketPolicy in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use GetObject.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def get(self): """ Gets the object. :return: The object data in bytes. """ try: body = self.object.get()["Body"].read() logger.info( "Got object '%s' from bucket '%s'.", self.object.key, self.object.bucket_name, ) except ClientError: logger.exception( "Couldn't get object '%s' from bucket '%s'.", self.object.key, self.object.bucket_name, ) raise else: return body

Get an object using a conditional request.

class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def get_object_conditional( self, object_key: str, source_bucket: str, condition_type: str, condition_value: str, ): """ Retrieves an object from Amazon S3 with a conditional request. :param object_key: The key of the object to retrieve. :param source_bucket: The source bucket of the object. :param condition_type: The type of condition: 'IfMatch', 'IfNoneMatch', 'IfModifiedSince', 'IfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: response = self.s3.get_object( Bucket=source_bucket, Key=object_key, **{condition_type: condition_value}, ) sample_bytes = response["Body"].read(20) print( f"\tConditional read successful. Here are the first 20 bytes of the object:\n" ) print(f"\t{sample_bytes}") except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional read failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional read failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise
  • For API details, see GetObject in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use GetObjectAcl.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def get_acl(self): """ Gets the ACL of the object. :return: The ACL of the object. """ try: acl = self.object.Acl() logger.info( "Got ACL for object %s owned by %s.", self.object.key, acl.owner["DisplayName"], ) except ClientError: logger.exception("Couldn't get ACL for object %s.", self.object.key) raise else: return acl
  • For API details, see GetObjectAcl in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use GetObjectLegalHold.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Put an object legal hold.

def get_legal_hold(s3_client, bucket: str, key: str) -> None: """ Get the legal hold status of a specific file in a bucket. Args: s3_client: Boto3 S3 client. bucket: The name of the bucket containing the file. key: The key of the file to get the legal hold status of. """ print() logger.info("Getting legal hold status of file [%s] in bucket [%s]", key, bucket) try: response = s3_client.get_object_legal_hold(Bucket=bucket, Key=key) legal_hold_status = response["LegalHold"]["Status"] logger.debug( "Legal hold status of file [%s] in bucket [%s] is [%s]", key, bucket, legal_hold_status, ) except Exception as e: logger.error( "Failed to get legal hold status of file [%s] in bucket [%s]: %s", key, bucket, e, )

The following code example shows how to use GetObjectLockConfiguration.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Get the object lock configuration.

def is_object_lock_enabled(s3_client, bucket: str) -> bool: """ Check if object lock is enabled for a bucket. Args: s3_client: Boto3 S3 client. bucket: The name of the bucket to check. Returns: True if object lock is enabled, False otherwise. """ try: response = s3_client.get_object_lock_configuration(Bucket=bucket) return ( "ObjectLockConfiguration" in response and response["ObjectLockConfiguration"]["ObjectLockEnabled"] == "Enabled" ) except s3_client.exceptions.ClientError as e: if e.response["Error"]["Code"] == "ObjectLockConfigurationNotFoundError": return False else: raise

The following code example shows how to use HeadBucket.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def exists(self): """ Determine whether the bucket exists and you have access to it. :return: True when the bucket exists; otherwise, False. """ try: self.bucket.meta.client.head_bucket(Bucket=self.bucket.name) logger.info("Bucket %s exists.", self.bucket.name) exists = True except ClientError: logger.warning( "Bucket %s doesn't exist or you don't have access to it.", self.bucket.name, ) exists = False return exists
  • For API details, see HeadBucket in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use ListBuckets.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name @staticmethod def list(s3_resource): """ Get the buckets in all Regions for the current account. :param s3_resource: A Boto3 S3 resource. This is a high-level resource in Boto3 that contains collections and factory methods to create other high-level S3 sub-resources. :return: The list of buckets. """ try: buckets = list(s3_resource.buckets.all()) logger.info("Got buckets: %s.", buckets) except ClientError: logger.exception("Couldn't get buckets.") raise else: return buckets
  • For API details, see ListBuckets in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use ListObjectsV2.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key @staticmethod def list(bucket, prefix=None): """ Lists the objects in a bucket, optionally filtered by a prefix. :param bucket: The bucket to query. This is a Boto3 Bucket resource. :param prefix: When specified, only objects that start with this prefix are listed. :return: The list of objects. """ try: if not prefix: objects = list(bucket.objects.all()) else: objects = list(bucket.objects.filter(Prefix=prefix)) logger.info( "Got objects %s from bucket '%s'", [o.key for o in objects], bucket.name ) except ClientError: logger.exception("Couldn't get objects for bucket '%s'.", bucket.name) raise else: return objects
  • For API details, see ListObjectsV2 in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use PutBucketAcl.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def grant_log_delivery_access(self): """ Grant the AWS Log Delivery group write access to the bucket so that Amazon S3 can deliver access logs to the bucket. This is the only recommended use of an S3 bucket ACL. """ try: acl = self.bucket.Acl() # Putting an ACL overwrites the existing ACL. If you want to preserve # existing grants, append new grants to the list of existing grants. grants = acl.grants if acl.grants else [] grants.append( { "Grantee": { "Type": "Group", "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", }, "Permission": "WRITE", } ) acl.put(AccessControlPolicy={"Grants": grants, "Owner": acl.owner}) logger.info("Granted log delivery access to bucket '%s'", self.bucket.name) except ClientError: logger.exception("Couldn't add ACL to bucket '%s'.", self.bucket.name) raise
  • For API details, see PutBucketAcl in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use PutBucketCors.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def put_cors(self, cors_rules): """ Apply CORS rules to the bucket. CORS rules specify the HTTP actions that are allowed from other domains. :param cors_rules: The CORS rules to apply. """ try: self.bucket.Cors().put(CORSConfiguration={"CORSRules": cors_rules}) logger.info( "Put CORS rules %s for bucket '%s'.", cors_rules, self.bucket.name ) except ClientError: logger.exception("Couldn't put CORS rules for bucket %s.", self.bucket.name) raise
  • For API details, see PutBucketCors in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use PutBucketLifecycleConfiguration.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def put_lifecycle_configuration(self, lifecycle_rules): """ Apply a lifecycle configuration to the bucket. The lifecycle configuration can be used to archive or delete the objects in the bucket according to specified parameters, such as a number of days. :param lifecycle_rules: The lifecycle rules to apply. """ try: self.bucket.LifecycleConfiguration().put( LifecycleConfiguration={"Rules": lifecycle_rules} ) logger.info( "Put lifecycle rules %s for bucket '%s'.", lifecycle_rules, self.bucket.name, ) except ClientError: logger.exception( "Couldn't put lifecycle rules for bucket '%s'.", self.bucket.name ) raise

The following code example shows how to use PutBucketPolicy.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def put_policy(self, policy): """ Apply a security policy to the bucket. Policies control users' ability to perform specific actions, such as listing the objects in the bucket. :param policy: The policy to apply to the bucket. """ try: self.bucket.Policy().put(Policy=json.dumps(policy)) logger.info("Put policy %s for bucket '%s'.", policy, self.bucket.name) except ClientError: logger.exception("Couldn't apply policy to bucket '%s'.", self.bucket.name) raise
  • For API details, see PutBucketPolicy in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use PutObject.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def put(self, data): """ Upload data to the object. :param data: The data to upload. This can either be bytes or a string. When this argument is a string, it is interpreted as a file name, which is opened in read bytes mode. """ put_data = data if isinstance(data, str): try: put_data = open(data, "rb") except IOError: logger.exception("Expected file name or binary data, got '%s'.", data) raise try: self.object.put(Body=put_data) self.object.wait_until_exists() logger.info( "Put object '%s' to bucket '%s'.", self.object.key, self.object.bucket_name, ) except ClientError: logger.exception( "Couldn't put object '%s' to bucket '%s'.", self.object.key, self.object.bucket_name, ) raise finally: if getattr(put_data, "close", None): put_data.close()

Upload an object using a conditional request.

class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def put_object_conditional(self, object_key: str, source_bucket: str, data: bytes): """ Uploads an object to Amazon S3 with a conditional request. Prevents overwrite using an IfNoneMatch condition for the object key. :param object_key: The key of the object to upload. :param source_bucket: The source bucket of the object. :param data: The data to upload. """ try: self.s3.put_object( Bucket=source_bucket, Key=object_key, Body=data, IfNoneMatch="*" ) print( f"\tConditional write successful for key {object_key} in bucket {source_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional write failed: Precondition failed") else: logger.error(f"Unexpected error: {error_code}") raise
  • For API details, see PutObject in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use PutObjectAcl.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def put_acl(self, email): """ Applies an ACL to the object that grants read access to an AWS user identified by email address. :param email: The email address of the user to grant access. """ try: acl = self.object.Acl() # Putting an ACL overwrites the existing ACL, so append new grants # if you want to preserve existing grants. grants = acl.grants if acl.grants else [] grants.append( { "Grantee": {"Type": "AmazonCustomerByEmail", "EmailAddress": email}, "Permission": "READ", } ) acl.put(AccessControlPolicy={"Grants": grants, "Owner": acl.owner}) logger.info("Granted read access to %s.", email) except ClientError: logger.exception("Couldn't add ACL to object '%s'.", self.object.key) raise
  • For API details, see PutObjectAcl in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use PutObjectLegalHold.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Put an object legal hold.

def set_legal_hold(s3_client, bucket: str, key: str) -> None: """ Set a legal hold on a specific file in a bucket. Args: s3_client: Boto3 S3 client. bucket: The name of the bucket containing the file. key: The key of the file to set the legal hold on. """ print() logger.info("Setting legal hold on file [%s] in bucket [%s]", key, bucket) try: before_status = "OFF" after_status = "ON" s3_client.put_object_legal_hold( Bucket=bucket, Key=key, LegalHold={"Status": after_status} ) logger.debug( "Legal hold set successfully on file [%s] in bucket [%s]", key, bucket ) _print_legal_hold_update(bucket, key, before_status, after_status) except Exception as e: logger.error( "Failed to set legal hold on file [%s] in bucket [%s]: %s", key, bucket, e )

The following code example shows how to use PutObjectLockConfiguration.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Put object lock configuration.

s3_client.put_object_lock_configuration( Bucket=bucket, ObjectLockConfiguration={"ObjectLockEnabled": "Disabled", "Rule": {}}, )

The following code example shows how to use PutObjectRetention.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Put an object retention.

s3_client.put_object_retention( Bucket=bucket, Key=key, VersionId=version_id, Retention={"Mode": "GOVERNANCE", "RetainUntilDate": far_future_date}, BypassGovernanceRetention=True, )

Scenarios

The following code example shows how to create a presigned URL for Amazon S3 and upload an object.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Generate a presigned URL that can perform an S3 action for a limited time. Use the Requests package to make a request with the URL.

import argparse import logging import boto3 from botocore.exceptions import ClientError import requests logger = logging.getLogger(__name__) def generate_presigned_url(s3_client, client_method, method_parameters, expires_in): """ Generate a presigned Amazon S3 URL that can be used to perform an action. :param s3_client: A Boto3 Amazon S3 client. :param client_method: The name of the client method that the URL performs. :param method_parameters: The parameters of the specified client method. :param expires_in: The number of seconds the presigned URL is valid for. :return: The presigned URL. """ try: url = s3_client.generate_presigned_url( ClientMethod=client_method, Params=method_parameters, ExpiresIn=expires_in ) logger.info("Got presigned URL: %s", url) except ClientError: logger.exception( "Couldn't get a presigned URL for client method '%s'.", client_method ) raise return url def usage_demo(): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the Amazon S3 presigned URL demo.") print("-" * 88) parser = argparse.ArgumentParser() parser.add_argument("bucket", help="The name of the bucket.") parser.add_argument( "key", help="For a GET operation, the key of the object in Amazon S3. For a " "PUT operation, the name of a file to upload.", ) parser.add_argument("action", choices=("get", "put"), help="The action to perform.") args = parser.parse_args() s3_client = boto3.client("s3") client_action = "get_object" if args.action == "get" else "put_object" url = generate_presigned_url( s3_client, client_action, {"Bucket": args.bucket, "Key": args.key}, 1000 ) print("Using the Requests package to send a request to the URL.") response = None if args.action == "get": response = requests.get(url) if response.status_code == 200: with open(args.key.split("/")[-1], 'wb') as object_file: object_file.write(response.content) elif args.action == "put": print("Putting data to the URL.") try: with open(args.key, "rb") as object_file: object_text = object_file.read() response = requests.put(url, data=object_text) except FileNotFoundError: print( f"Couldn't find {args.key}. For a PUT operation, the key must be the " f"name of a file that exists on your computer." ) if response is not None: print(f"Status: {response.status_code}\nReason: {response.reason}") print("-" * 88) if __name__ == "__main__": usage_demo()

Generate a presigned POST request to upload a file.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def generate_presigned_post(self, object_key, expires_in): """ Generate a presigned Amazon S3 POST request to upload a file. A presigned POST can be used for a limited time to let someone without an AWS account upload a file to a bucket. :param object_key: The object key to identify the uploaded object. :param expires_in: The number of seconds the presigned POST is valid. :return: A dictionary that contains the URL and form fields that contain required access data. """ try: response = self.bucket.meta.client.generate_presigned_post( Bucket=self.bucket.name, Key=object_key, ExpiresIn=expires_in ) logger.info("Got presigned POST URL: %s", response["url"]) except ClientError: logger.exception( "Couldn't get a presigned POST URL for bucket '%s' and object '%s'", self.bucket.name, object_key, ) raise return response

The following code example shows how to explore Amazon Textract output through an interactive application.

SDK for Python (Boto3)

Shows how to use the AWS SDK for Python (Boto3) with Amazon Textract to detect text, form, and table elements in a document image. The input image and Amazon Textract output are shown in a Tkinter application that lets you explore the detected elements.

  • Submit a document image to Amazon Textract and explore the output of detected elements.

  • Submit images directly to Amazon Textract or through an Amazon Simple Storage Service (Amazon S3) bucket.

  • Use asynchronous APIs to start a job that publishes a notification to an Amazon Simple Notification Service (Amazon SNS) topic when the job completes.

  • Poll an Amazon Simple Queue Service (Amazon SQS) queue for a job completion message and display the results.

For complete source code and instructions on how to set up and run, see the full example on GitHub.

Services used in this example
  • Amazon S3

  • Amazon SNS

  • Amazon SQS

  • Amazon Textract

The following code example shows how to use Amazon Comprehend to detect entities in text extracted by Amazon Textract from an image that is stored in Amazon S3.

SDK for Python (Boto3)

Shows how to use the AWS SDK for Python (Boto3) in a Jupyter notebook to detect entities in text that is extracted from an image. This example uses Amazon Textract to extract text from an image stored in Amazon Simple Storage Service (Amazon S3) and Amazon Comprehend to detect entities in the extracted text.

This example is a Jupyter notebook and must be run in an environment that can host notebooks. For instructions on how to run the example using Amazon SageMaker, see the directions in TextractAndComprehendNotebook.ipynb.

For complete source code and instructions on how to set up and run, see the full example on GitHub.

Services used in this example
  • Amazon Comprehend

  • Amazon S3

  • Amazon Textract

The following code example shows how to build an app that uses Amazon Rekognition to detect objects by category in images.

SDK for Python (Boto3)

Shows you how to use the AWS SDK for Python (Boto3) to create a web application that lets you do the following:

  • Upload photos to an Amazon Simple Storage Service (Amazon S3) bucket.

  • Use Amazon Rekognition to analyze and label the photos.

  • Use Amazon Simple Email Service (Amazon SES) to send email reports of image analysis.

This example contains two main components: a webpage written in JavaScript that is built with React, and a REST service written in Python that is built with Flask-RESTful.

You can use the React webpage to:

  • Display a list of images that are stored in your S3 bucket.

  • Upload images from your computer to your S3 bucket.

  • Display images and labels that identify items that are detected in the image.

  • Get a report of all images in your S3 bucket and send an email of the report.

The webpage calls the REST service. The service sends requests to AWS to perform the following actions:

  • Get and filter the list of images in your S3 bucket.

  • Upload photos to your S3 bucket.

  • Use Amazon Rekognition to analyze individual photos and get a list of labels that identify items that are detected in the photo.

  • Analyze all photos in your S3 bucket and use Amazon SES to email a report.

For complete source code and instructions on how to set up and run, see the full example on GitHub.

Services used in this example
  • Amazon Rekognition

  • Amazon S3

  • Amazon SES

The following code example shows how to add preconditions to Amazon S3 requests.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Run an interactive scenario demonstrating Amazon S3 conditional requests.

""" Purpose Shows how to use AWS SDK for Python (Boto3) to get started using conditional requests for Amazon Simple Storage Service (Amazon S3). """ import logging import random import sys import datetime import boto3 from botocore.exceptions import ClientError from s3_conditional_requests import S3ConditionalRequests # Add relative path to include demo_tools in this code example without need for setup. sys.path.append("../../../..") import demo_tools.question as q # noqa # Constants FILE_CONTENT = "This is a test file for S3 conditional requests." RANDOM_SUFFIX = str(random.randint(100, 999)) logger = logging.getLogger(__name__) class ConditionalRequestsScenario: """Runs a scenario that shows how to use S3 Conditional Requests.""" def __init__(self, conditional_requests, s3_client): """ :param conditional_requests: An object that wraps S3 conditional request actions. :param s3_client: A Boto3 S3 client for setup and cleanup operations. """ self.conditional_requests = conditional_requests self.s3_client = s3_client def setup_scenario(self, source_bucket: str, dest_bucket: str, object_key: str): """ Sets up the scenario by creating a source and destination bucket. Prompts the user to provide a bucket name prefix. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. :param object_key: The name of a test file to add to the source bucket. """ # Create the buckets. try: self.s3_client.create_bucket(Bucket=source_bucket) self.s3_client.create_bucket(Bucket=dest_bucket) print( f"Created source bucket: {source_bucket} and destination bucket: {dest_bucket}" ) except ClientError as e: error_code = e.response["Error"]["Code"] logger.error(f"Error creating buckets: {error_code}") raise # Upload test file into the source bucket. try: print(f"Uploading file {object_key} to bucket {source_bucket}") response = self.s3_client.put_object( Bucket=source_bucket, Key=object_key, Body=FILE_CONTENT ) object_etag = response["ETag"] return object_etag except Exception as e: logger.error( f"Failed to upload file {object_key} to bucket {source_bucket}: {e}" ) def cleanup_scenario(self, source_bucket: str, dest_bucket: str): """ Cleans up the scenario by deleting the source and destination buckets. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. """ self.cleanup_bucket(source_bucket) self.cleanup_bucket(dest_bucket) def cleanup_bucket(self, bucket_name: str): """ Cleans up the bucket by deleting all objects and then the bucket itself. :param bucket_name: The name of the bucket. """ try: # Get list of all objects in the bucket. list_response = self.s3_client.list_objects_v2(Bucket=bucket_name) objs = list_response.get("Contents", []) for obj in objs: key = obj["Key"] self.s3_client.delete_object(Bucket=bucket_name, Key=key) self.s3_client.delete_bucket(Bucket=bucket_name) print(f"Cleaned up bucket: {bucket_name}.") except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "NoSuchBucket": logger.info(f"Bucket {bucket_name} does not exist, skipping cleanup.") else: logger.error(f"Error deleting bucket: {error_code}") raise def display_buckets(self, source_bucket: str, dest_bucket: str): """ Display a list of the objects in the test buckets. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. """ self.list_bucket_contents(source_bucket) self.list_bucket_contents(dest_bucket) def list_bucket_contents(self, bucket_name): """ Display a list of the objects in the bucket. :param bucket_name: The name of the bucket. """ try: # Get list of all objects in the bucket. print(f"\t Items in bucket {bucket_name}") list_response = self.s3_client.list_objects_v2(Bucket=bucket_name) objs = list_response.get("Contents", []) if not objs: print("\t\tNo objects found.") for obj in objs: key = obj["Key"] print(f"\t\t object: {key} ETag {obj['ETag']}") return objs except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "NoSuchBucket": logger.info(f"Bucket {bucket_name} does not exist.") else: logger.error(f"Error listing bucket and objects: {error_code}") raise def display_menu( self, source_bucket: str, dest_bucket: str, object_key: str, etag: str ): """ Displays the menu of conditional request options for the user. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. :param object_key: The key of the test object in the source bucket. :param etag: The etag of the test object in the source bucket. """ actions = [ "Print list of bucket items.", "Perform a conditional read.", "Perform a conditional copy.", "Perform a conditional write.", "Clean up and exit.", ] conditions = [ "If-Match: using the object's ETag. This condition should succeed.", "If-None-Match: using the object's ETag. This condition should fail.", "If-Modified-Since: using yesterday's date. This condition should succeed.", "If-Unmodified-Since: using yesterday's date. This condition should fail.", ] condition_types = [ "IfMatch", "IfNoneMatch", "IfModifiedSince", "IfUnmodifiedSince", ] copy_condition_types = [ "CopySourceIfMatch", "CopySourceIfNoneMatch", "CopySourceIfModifiedSince", "CopySourceIfUnmodifiedSince", ] yesterday_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) choice = 0 while choice != 4: print("-" * 88) print("Choose an action to explore some example conditional requests.") choice = q.choose("Which action would you like to take? ", actions) if choice == 0: print("Listing the objects and buckets.") self.display_buckets(source_bucket, dest_bucket) elif choice == 1: print("Perform a conditional read.") condition_type = q.choose("Enter the condition type : ", conditions) if condition_type == 0 or condition_type == 1: self.conditional_requests.get_object_conditional( object_key, source_bucket, condition_types[condition_type], etag ) elif condition_type == 2 or condition_type == 3: self.conditional_requests.get_object_conditional( object_key, source_bucket, condition_types[condition_type], yesterday_date, ) elif choice == 2: print("Perform a conditional copy.") condition_type = q.choose("Enter the condition type : ", conditions) dest_key = q.ask("Enter an object key: ", q.non_empty) if condition_type == 0 or condition_type == 1: self.conditional_requests.copy_object_conditional( object_key, dest_key, source_bucket, dest_bucket, copy_condition_types[condition_type], etag, ) elif condition_type == 2 or condition_type == 3: self.conditional_requests.copy_object_conditional( object_key, dest_key, copy_condition_types[condition_type], yesterday_date, ) elif choice == 3: print( "Perform a conditional write using IfNoneMatch condition on the object key." ) print("If the key is a duplicate, the write will fail.") object_key = q.ask("Enter an object key: ", q.non_empty) self.conditional_requests.put_object_conditional( object_key, source_bucket, b"Conditional write example data." ) elif choice == 4: print("Proceeding to cleanup.") def run_scenario(self): """ Runs the interactive scenario. """ print("-" * 88) print("Welcome to the Amazon S3 conditional requests example.") print("-" * 88) print( f"""\ This example demonstrates the use of conditional requests for S3 operations. You can use conditional requests to add preconditions to S3 read requests to return or copy an object based on its Entity tag (ETag), or last modified date. You can use a conditional write requests to prevent overwrites by ensuring there is no existing object with the same key. This example will allow you to perform conditional reads and writes that will succeed or fail based on your selected options. Sample buckets and a sample object will be created as part of the example. """ ) bucket_prefix = q.ask("Enter a bucket name prefix: ", q.non_empty) source_bucket_name = f"{bucket_prefix}-source-{RANDOM_SUFFIX}" dest_bucket_name = f"{bucket_prefix}-dest-{RANDOM_SUFFIX}" object_key = "test-upload-file.txt" try: etag = self.setup_scenario(source_bucket_name, dest_bucket_name, object_key) self.display_menu(source_bucket_name, dest_bucket_name, object_key, etag) finally: self.cleanup_scenario(source_bucket_name, dest_bucket_name) print("-" * 88) print("Thanks for watching.") print("-" * 88) if __name__ == "__main__": scenario = ConditionalRequestsScenario( S3ConditionalRequests.from_client(), boto3.client("s3") ) scenario.run_scenario()

A wrapper class that defines the conditional request operations.

import boto3 import logging from botocore.exceptions import ClientError # Configure logging logger = logging.getLogger(__name__) class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def get_object_conditional( self, object_key: str, source_bucket: str, condition_type: str, condition_value: str, ): """ Retrieves an object from Amazon S3 with a conditional request. :param object_key: The key of the object to retrieve. :param source_bucket: The source bucket of the object. :param condition_type: The type of condition: 'IfMatch', 'IfNoneMatch', 'IfModifiedSince', 'IfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: response = self.s3.get_object( Bucket=source_bucket, Key=object_key, **{condition_type: condition_value}, ) sample_bytes = response["Body"].read(20) print( f"\tConditional read successful. Here are the first 20 bytes of the object:\n" ) print(f"\t{sample_bytes}") except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional read failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional read failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise def put_object_conditional(self, object_key: str, source_bucket: str, data: bytes): """ Uploads an object to Amazon S3 with a conditional request. Prevents overwrite using an IfNoneMatch condition for the object key. :param object_key: The key of the object to upload. :param source_bucket: The source bucket of the object. :param data: The data to upload. """ try: self.s3.put_object( Bucket=source_bucket, Key=object_key, Body=data, IfNoneMatch="*" ) print( f"\tConditional write successful for key {object_key} in bucket {source_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional write failed: Precondition failed") else: logger.error(f"Unexpected error: {error_code}") raise def copy_object_conditional( self, source_key: str, dest_key: str, source_bucket: str, dest_bucket: str, condition_type: str, condition_value: str, ): """ Copies an object from one Amazon S3 bucket to another with a conditional request. :param source_key: The key of the source object to copy. :param dest_key: The key of the destination object. :param source_bucket: The source bucket of the object. :param dest_bucket: The destination bucket of the object. :param condition_type: The type of condition to apply, e.g. 'CopySourceIfMatch', 'CopySourceIfNoneMatch', 'CopySourceIfModifiedSince', 'CopySourceIfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: self.s3.copy_object( Bucket=dest_bucket, Key=dest_key, CopySource={"Bucket": source_bucket, "Key": source_key}, **{condition_type: condition_value}, ) print( f"\tConditional copy successful for key {dest_key} in bucket {dest_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional copy failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional copy failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise

The following code example shows how to manage versioned S3 objects in batches with a Lambda function.

SDK for Python (Boto3)

Shows how to manipulate Amazon Simple Storage Service (Amazon S3) versioned objects in batches by creating jobs that call AWS Lambda functions to perform processing. This example creates a version-enabled bucket, uploads the stanzas from the poem You Are Old, Father William by Lewis Carroll, and uses Amazon S3 batch jobs to twist the poem in various ways.

Learn how to:
  • Create Lambda functions that operate on versioned objects.

  • Create a manifest of objects to update.

  • Create batch jobs that invoke Lambda functions to update objects.

  • Delete Lambda functions.

  • Empty and delete a versioned bucket.

This example is best viewed on GitHub. For complete source code and instructions on how to set up and run, see the full example on GitHub.

Services used in this example
  • Amazon S3

The following code example shows how to upload or download large files to and from Amazon S3.

For more information, see Uploading an object using multipart upload.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create functions that transfer files using several of the available transfer manager settings. Use a callback class to write callback progress during file transfer.

import sys import threading import boto3 from boto3.s3.transfer import TransferConfig MB = 1024 * 1024 s3 = boto3.resource("s3") class TransferCallback: """ Handle callbacks from the transfer manager. The transfer manager periodically calls the __call__ method throughout the upload and download process so that it can take action, such as displaying progress to the user and collecting data about the transfer. """ def __init__(self, target_size): self._target_size = target_size self._total_transferred = 0 self._lock = threading.Lock() self.thread_info = {} def __call__(self, bytes_transferred): """ The callback method that is called by the transfer manager. Display progress during file transfer and collect per-thread transfer data. This method can be called by multiple threads, so shared instance data is protected by a thread lock. """ thread = threading.current_thread() with self._lock: self._total_transferred += bytes_transferred if thread.ident not in self.thread_info.keys(): self.thread_info[thread.ident] = bytes_transferred else: self.thread_info[thread.ident] += bytes_transferred target = self._target_size * MB sys.stdout.write( f"\r{self._total_transferred} of {target} transferred " f"({(self._total_transferred / target) * 100:.2f}%)." ) sys.stdout.flush() def upload_with_default_configuration( local_file_path, bucket_name, object_key, file_size_mb ): """ Upload a file from a local folder to an Amazon S3 bucket, using the default configuration. """ transfer_callback = TransferCallback(file_size_mb) s3.Bucket(bucket_name).upload_file( local_file_path, object_key, Callback=transfer_callback ) return transfer_callback.thread_info def upload_with_chunksize_and_meta( local_file_path, bucket_name, object_key, file_size_mb, metadata=None ): """ Upload a file from a local folder to an Amazon S3 bucket, setting a multipart chunk size and adding metadata to the Amazon S3 object. The multipart chunk size controls the size of the chunks of data that are sent in the request. A smaller chunk size typically results in the transfer manager using more threads for the upload. The metadata is a set of key-value pairs that are stored with the object in Amazon S3. """ transfer_callback = TransferCallback(file_size_mb) config = TransferConfig(multipart_chunksize=1 * MB) extra_args = {"Metadata": metadata} if metadata else None s3.Bucket(bucket_name).upload_file( local_file_path, object_key, Config=config, ExtraArgs=extra_args, Callback=transfer_callback, ) return transfer_callback.thread_info def upload_with_high_threshold(local_file_path, bucket_name, object_key, file_size_mb): """ Upload a file from a local folder to an Amazon S3 bucket, setting a multipart threshold larger than the size of the file. Setting a multipart threshold larger than the size of the file results in the transfer manager sending the file as a standard upload instead of a multipart upload. """ transfer_callback = TransferCallback(file_size_mb) config = TransferConfig(multipart_threshold=file_size_mb * 2 * MB) s3.Bucket(bucket_name).upload_file( local_file_path, object_key, Config=config, Callback=transfer_callback ) return transfer_callback.thread_info def upload_with_sse( local_file_path, bucket_name, object_key, file_size_mb, sse_key=None ): """ Upload a file from a local folder to an Amazon S3 bucket, adding server-side encryption with customer-provided encryption keys to the object. When this kind of encryption is specified, Amazon S3 encrypts the object at rest and allows downloads only when the expected encryption key is provided in the download request. """ transfer_callback = TransferCallback(file_size_mb) if sse_key: extra_args = {"SSECustomerAlgorithm": "AES256", "SSECustomerKey": sse_key} else: extra_args = None s3.Bucket(bucket_name).upload_file( local_file_path, object_key, ExtraArgs=extra_args, Callback=transfer_callback ) return transfer_callback.thread_info def download_with_default_configuration( bucket_name, object_key, download_file_path, file_size_mb ): """ Download a file from an Amazon S3 bucket to a local folder, using the default configuration. """ transfer_callback = TransferCallback(file_size_mb) s3.Bucket(bucket_name).Object(object_key).download_file( download_file_path, Callback=transfer_callback ) return transfer_callback.thread_info def download_with_single_thread( bucket_name, object_key, download_file_path, file_size_mb ): """ Download a file from an Amazon S3 bucket to a local folder, using a single thread. """ transfer_callback = TransferCallback(file_size_mb) config = TransferConfig(use_threads=False) s3.Bucket(bucket_name).Object(object_key).download_file( download_file_path, Config=config, Callback=transfer_callback ) return transfer_callback.thread_info def download_with_high_threshold( bucket_name, object_key, download_file_path, file_size_mb ): """ Download a file from an Amazon S3 bucket to a local folder, setting a multipart threshold larger than the size of the file. Setting a multipart threshold larger than the size of the file results in the transfer manager sending the file as a standard download instead of a multipart download. """ transfer_callback = TransferCallback(file_size_mb) config = TransferConfig(multipart_threshold=file_size_mb * 2 * MB) s3.Bucket(bucket_name).Object(object_key).download_file( download_file_path, Config=config, Callback=transfer_callback ) return transfer_callback.thread_info def download_with_sse( bucket_name, object_key, download_file_path, file_size_mb, sse_key ): """ Download a file from an Amazon S3 bucket to a local folder, adding a customer-provided encryption key to the request. When this kind of encryption is specified, Amazon S3 encrypts the object at rest and allows downloads only when the expected encryption key is provided in the download request. """ transfer_callback = TransferCallback(file_size_mb) if sse_key: extra_args = {"SSECustomerAlgorithm": "AES256", "SSECustomerKey": sse_key} else: extra_args = None s3.Bucket(bucket_name).Object(object_key).download_file( download_file_path, ExtraArgs=extra_args, Callback=transfer_callback ) return transfer_callback.thread_info

Demonstrate the transfer manager functions and report results.

import hashlib import os import platform import shutil import time import boto3 from boto3.s3.transfer import TransferConfig from botocore.exceptions import ClientError from botocore.exceptions import ParamValidationError from botocore.exceptions import NoCredentialsError import file_transfer MB = 1024 * 1024 # These configuration attributes affect both uploads and downloads. CONFIG_ATTRS = ( "multipart_threshold", "multipart_chunksize", "max_concurrency", "use_threads", ) # These configuration attributes affect only downloads. DOWNLOAD_CONFIG_ATTRS = ("max_io_queue", "io_chunksize", "num_download_attempts") class TransferDemoManager: """ Manages the demonstration. Collects user input from a command line, reports transfer results, maintains a list of artifacts created during the demonstration, and cleans them up after the demonstration is completed. """ def __init__(self): self._s3 = boto3.resource("s3") self._chore_list = [] self._create_file_cmd = None self._size_multiplier = 0 self.file_size_mb = 30 self.demo_folder = None self.demo_bucket = None self._setup_platform_specific() self._terminal_width = shutil.get_terminal_size(fallback=(80, 80))[0] def collect_user_info(self): """ Collect local folder and Amazon S3 bucket name from the user. These locations are used to store files during the demonstration. """ while not self.demo_folder: self.demo_folder = input( "Which file folder do you want to use to store " "demonstration files? " ) if not os.path.isdir(self.demo_folder): print(f"{self.demo_folder} isn't a folder!") self.demo_folder = None while not self.demo_bucket: self.demo_bucket = input( "Which Amazon S3 bucket do you want to use to store " "demonstration files? " ) try: self._s3.meta.client.head_bucket(Bucket=self.demo_bucket) except ParamValidationError as err: print(err) self.demo_bucket = None except ClientError as err: print(err) print( f"Either {self.demo_bucket} doesn't exist or you don't " f"have access to it." ) self.demo_bucket = None def demo( self, question, upload_func, download_func, upload_args=None, download_args=None ): """Run a demonstration. Ask the user if they want to run this specific demonstration. If they say yes, create a file on the local path, upload it using the specified upload function, then download it using the specified download function. """ if download_args is None: download_args = {} if upload_args is None: upload_args = {} question = question.format(self.file_size_mb) answer = input(f"{question} (y/n)") if answer.lower() == "y": local_file_path, object_key, download_file_path = self._create_demo_file() file_transfer.TransferConfig = self._config_wrapper( TransferConfig, CONFIG_ATTRS ) self._report_transfer_params( "Uploading", local_file_path, object_key, **upload_args ) start_time = time.perf_counter() thread_info = upload_func( local_file_path, self.demo_bucket, object_key, self.file_size_mb, **upload_args, ) end_time = time.perf_counter() self._report_transfer_result(thread_info, end_time - start_time) file_transfer.TransferConfig = self._config_wrapper( TransferConfig, CONFIG_ATTRS + DOWNLOAD_CONFIG_ATTRS ) self._report_transfer_params( "Downloading", object_key, download_file_path, **download_args ) start_time = time.perf_counter() thread_info = download_func( self.demo_bucket, object_key, download_file_path, self.file_size_mb, **download_args, ) end_time = time.perf_counter() self._report_transfer_result(thread_info, end_time - start_time) def last_name_set(self): """Get the name set used for the last demo.""" return self._chore_list[-1] def cleanup(self): """ Remove files from the demo folder, and uploaded objects from the Amazon S3 bucket. """ print("-" * self._terminal_width) for local_file_path, s3_object_key, downloaded_file_path in self._chore_list: print(f"Removing {local_file_path}") try: os.remove(local_file_path) except FileNotFoundError as err: print(err) print(f"Removing {downloaded_file_path}") try: os.remove(downloaded_file_path) except FileNotFoundError as err: print(err) if self.demo_bucket: print(f"Removing {self.demo_bucket}:{s3_object_key}") try: self._s3.Bucket(self.demo_bucket).Object(s3_object_key).delete() except ClientError as err: print(err) def _setup_platform_specific(self): """Set up platform-specific command used to create a large file.""" if platform.system() == "Windows": self._create_file_cmd = "fsutil file createnew {} {}" self._size_multiplier = MB elif platform.system() == "Linux" or platform.system() == "Darwin": self._create_file_cmd = f"dd if=/dev/urandom of={{}} " f"bs={MB} count={{}}" self._size_multiplier = 1 else: raise EnvironmentError( f"Demo of platform {platform.system()} isn't supported." ) def _create_demo_file(self): """ Create a file in the demo folder specified by the user. Store the local path, object name, and download path for later cleanup. Only the local file is created by this method. The Amazon S3 object and download file are created later during the demonstration. Returns: A tuple that contains the local file path, object name, and download file path. """ file_name_template = "TestFile{}-{}.demo" local_suffix = "local" object_suffix = "s3object" download_suffix = "downloaded" file_tag = len(self._chore_list) + 1 local_file_path = os.path.join( self.demo_folder, file_name_template.format(file_tag, local_suffix) ) s3_object_key = file_name_template.format(file_tag, object_suffix) downloaded_file_path = os.path.join( self.demo_folder, file_name_template.format(file_tag, download_suffix) ) filled_cmd = self._create_file_cmd.format( local_file_path, self.file_size_mb * self._size_multiplier ) print( f"Creating file of size {self.file_size_mb} MB " f"in {self.demo_folder} by running:" ) print(f"{'':4}{filled_cmd}") os.system(filled_cmd) chore = (local_file_path, s3_object_key, downloaded_file_path) self._chore_list.append(chore) return chore def _report_transfer_params(self, verb, source_name, dest_name, **kwargs): """Report configuration and extra arguments used for a file transfer.""" print("-" * self._terminal_width) print(f"{verb} {source_name} ({self.file_size_mb} MB) to {dest_name}") if kwargs: print("With extra args:") for arg, value in kwargs.items(): print(f'{"":4}{arg:<20}: {value}') @staticmethod def ask_user(question): """ Ask the user a yes or no question. Returns: True when the user answers 'y' or 'Y'; otherwise, False. """ answer = input(f"{question} (y/n) ") return answer.lower() == "y" @staticmethod def _config_wrapper(func, config_attrs): def wrapper(*args, **kwargs): config = func(*args, **kwargs) print("With configuration:") for attr in config_attrs: print(f'{"":4}{attr:<20}: {getattr(config, attr)}') return config return wrapper @staticmethod def _report_transfer_result(thread_info, elapsed): """Report the result of a transfer, including per-thread data.""" print(f"\nUsed {len(thread_info)} threads.") for ident, byte_count in thread_info.items(): print(f"{'':4}Thread {ident} copied {byte_count} bytes.") print(f"Your transfer took {elapsed:.2f} seconds.") def main(): """ Run the demonstration script for s3_file_transfer. """ demo_manager = TransferDemoManager() demo_manager.collect_user_info() # Upload and download with default configuration. Because the file is 30 MB # and the default multipart_threshold is 8 MB, both upload and download are # multipart transfers. demo_manager.demo( "Do you want to upload and download a {} MB file " "using the default configuration?", file_transfer.upload_with_default_configuration, file_transfer.download_with_default_configuration, ) # Upload and download with multipart_threshold set higher than the size of # the file. This causes the transfer manager to use standard transfers # instead of multipart transfers. demo_manager.demo( "Do you want to upload and download a {} MB file " "as a standard (not multipart) transfer?", file_transfer.upload_with_high_threshold, file_transfer.download_with_high_threshold, ) # Upload with specific chunk size and additional metadata. # Download with a single thread. demo_manager.demo( "Do you want to upload a {} MB file with a smaller chunk size and " "then download the same file using a single thread?", file_transfer.upload_with_chunksize_and_meta, file_transfer.download_with_single_thread, upload_args={ "metadata": { "upload_type": "chunky", "favorite_color": "aqua", "size": "medium", } }, ) # Upload using server-side encryption with customer-provided # encryption keys. # Generate a 256-bit key from a passphrase. sse_key = hashlib.sha256("demo_passphrase".encode("utf-8")).digest() demo_manager.demo( "Do you want to upload and download a {} MB file using " "server-side encryption?", file_transfer.upload_with_sse, file_transfer.download_with_sse, upload_args={"sse_key": sse_key}, download_args={"sse_key": sse_key}, ) # Download without specifying an encryption key to show that the # encryption key must be included to download an encrypted object. if demo_manager.ask_user( "Do you want to try to download the encrypted " "object without sending the required key?" ): try: _, object_key, download_file_path = demo_manager.last_name_set() file_transfer.download_with_default_configuration( demo_manager.demo_bucket, object_key, download_file_path, demo_manager.file_size_mb, ) except ClientError as err: print( "Got expected error when trying to download an encrypted " "object without specifying encryption info:" ) print(f"{'':4}{err}") # Remove all created and downloaded files, remove all objects from # S3 storage. if demo_manager.ask_user( "Demonstration complete. Do you want to remove local files " "and S3 objects?" ): demo_manager.cleanup() if __name__ == "__main__": try: main() except NoCredentialsError as error: print(error) print( "To run this example, you must have valid credentials in " "a shared credential file or set in environment variables." )

The following code example shows how to:

  • Create a versioned S3 bucket.

  • Get all versions of an object.

  • Roll an object back to a previous version.

  • Delete and restore a versioned object.

  • Permanently delete all versions of an object.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create functions that wrap S3 actions.

def create_versioned_bucket(bucket_name, prefix): """ Creates an Amazon S3 bucket, enables it for versioning, and configures a lifecycle that expires noncurrent object versions after 7 days. Adding a lifecycle configuration to a versioned bucket is a best practice. It helps prevent objects in the bucket from accumulating a large number of noncurrent versions, which can slow down request performance. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket_name: The name of the bucket to create. :param prefix: Identifies which objects are automatically expired under the configured lifecycle rules. :return: The newly created bucket. """ try: bucket = s3.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ "LocationConstraint": s3.meta.client.meta.region_name }, ) logger.info("Created bucket %s.", bucket.name) except ClientError as error: if error.response["Error"]["Code"] == "BucketAlreadyOwnedByYou": logger.warning("Bucket %s already exists! Using it.", bucket_name) bucket = s3.Bucket(bucket_name) else: logger.exception("Couldn't create bucket %s.", bucket_name) raise try: bucket.Versioning().enable() logger.info("Enabled versioning on bucket %s.", bucket.name) except ClientError: logger.exception("Couldn't enable versioning on bucket %s.", bucket.name) raise try: expiration = 7 bucket.LifecycleConfiguration().put( LifecycleConfiguration={ "Rules": [ { "Status": "Enabled", "Prefix": prefix, "NoncurrentVersionExpiration": {"NoncurrentDays": expiration}, } ] } ) logger.info( "Configured lifecycle to expire noncurrent versions after %s days " "on bucket %s.", expiration, bucket.name, ) except ClientError as error: logger.warning( "Couldn't configure lifecycle on bucket %s because %s. " "Continuing anyway.", bucket.name, error, ) return bucket def rollback_object(bucket, object_key, version_id): """ Rolls back an object to an earlier version by deleting all versions that occurred after the specified rollback version. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that holds the object to roll back. :param object_key: The object to roll back. :param version_id: The version ID to roll back to. """ # Versions must be sorted by last_modified date because delete markers are # at the end of the list even when they are interspersed in time. versions = sorted( bucket.object_versions.filter(Prefix=object_key), key=attrgetter("last_modified"), reverse=True, ) logger.debug( "Got versions:\n%s", "\n".join( [ f"\t{version.version_id}, last modified {version.last_modified}" for version in versions ] ), ) if version_id in [ver.version_id for ver in versions]: print(f"Rolling back to version {version_id}") for version in versions: if version.version_id != version_id: version.delete() print(f"Deleted version {version.version_id}") else: break print(f"Active version is now {bucket.Object(object_key).version_id}") else: raise KeyError( f"{version_id} was not found in the list of versions for " f"{object_key}." ) def revive_object(bucket, object_key): """ Revives a versioned object that was deleted by removing the object's active delete marker. A versioned object presents as deleted when its latest version is a delete marker. By removing the delete marker, we make the previous version the latest version and the object then presents as *not* deleted. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that contains the object. :param object_key: The object to revive. """ # Get the latest version for the object. response = s3.meta.client.list_object_versions( Bucket=bucket.name, Prefix=object_key, MaxKeys=1 ) if "DeleteMarkers" in response: latest_version = response["DeleteMarkers"][0] if latest_version["IsLatest"]: logger.info( "Object %s was indeed deleted on %s. Let's revive it.", object_key, latest_version["LastModified"], ) obj = bucket.Object(object_key) obj.Version(latest_version["VersionId"]).delete() logger.info( "Revived %s, active version is now %s with body '%s'", object_key, obj.version_id, obj.get()["Body"].read(), ) else: logger.warning( "Delete marker is not the latest version for %s!", object_key ) elif "Versions" in response: logger.warning("Got an active version for %s, nothing to do.", object_key) else: logger.error("Couldn't get any version info for %s.", object_key) def permanently_delete_object(bucket, object_key): """ Permanently deletes a versioned object by deleting all of its versions. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that contains the object. :param object_key: The object to delete. """ try: bucket.object_versions.filter(Prefix=object_key).delete() logger.info("Permanently deleted all versions of object %s.", object_key) except ClientError: logger.exception("Couldn't delete all versions of %s.", object_key) raise

Upload the stanza of a poem to a versioned object and perform a series of actions on it.

def usage_demo_single_object(obj_prefix="demo-versioning/"): """ Demonstrates usage of versioned object functions. This demo uploads a stanza of a poem and performs a series of revisions, deletions, and revivals on it. :param obj_prefix: The prefix to assign to objects created by this demo. """ with open("father_william.txt") as file: stanzas = file.read().split("\n\n") width = get_terminal_size((80, 20))[0] print("-" * width) print("Welcome to the usage demonstration of Amazon S3 versioning.") print( "This demonstration uploads a single stanza of a poem to an Amazon " "S3 bucket and then applies various revisions to it." ) print("-" * width) print("Creating a version-enabled bucket for the demo...") bucket = create_versioned_bucket("bucket-" + str(uuid.uuid1()), obj_prefix) print("\nThe initial version of our stanza:") print(stanzas[0]) # Add the first stanza and revise it a few times. print("\nApplying some revisions to the stanza...") obj_stanza_1 = bucket.Object(f"{obj_prefix}stanza-1") obj_stanza_1.put(Body=bytes(stanzas[0], "utf-8")) obj_stanza_1.put(Body=bytes(stanzas[0].upper(), "utf-8")) obj_stanza_1.put(Body=bytes(stanzas[0].lower(), "utf-8")) obj_stanza_1.put(Body=bytes(stanzas[0][::-1], "utf-8")) print( "The latest version of the stanza is now:", obj_stanza_1.get()["Body"].read().decode("utf-8"), sep="\n", ) # Versions are returned in order, most recent first. obj_stanza_1_versions = bucket.object_versions.filter(Prefix=obj_stanza_1.key) print( "The version data of the stanza revisions:", *[ f" {version.version_id}, last modified {version.last_modified}" for version in obj_stanza_1_versions ], sep="\n", ) # Rollback two versions. print("\nRolling back two versions...") rollback_object(bucket, obj_stanza_1.key, list(obj_stanza_1_versions)[2].version_id) print( "The latest version of the stanza:", obj_stanza_1.get()["Body"].read().decode("utf-8"), sep="\n", ) # Delete the stanza print("\nDeleting the stanza...") obj_stanza_1.delete() try: obj_stanza_1.get() except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": print("The stanza is now deleted (as expected).") else: raise # Revive the stanza print("\nRestoring the stanza...") revive_object(bucket, obj_stanza_1.key) print( "The stanza is restored! The latest version is again:", obj_stanza_1.get()["Body"].read().decode("utf-8"), sep="\n", ) # Permanently delete all versions of the object. This cannot be undone! print("\nPermanently deleting all versions of the stanza...") permanently_delete_object(bucket, obj_stanza_1.key) obj_stanza_1_versions = bucket.object_versions.filter(Prefix=obj_stanza_1.key) if len(list(obj_stanza_1_versions)) == 0: print("The stanza has been permanently deleted and now has no versions.") else: print("Something went wrong. The stanza still exists!") print(f"\nRemoving {bucket.name}...") bucket.delete() print(f"{bucket.name} deleted.") print("Demo done!")

Serverless examples

The following code example shows how to implement a Lambda function that receives an event triggered by uploading an object to an S3 bucket. The function retrieves the S3 bucket name and object key from the event parameter and calls the Amazon S3 API to retrieve and log the content type of the object.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming an S3 event with Lambda using Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import json import urllib.parse import boto3 print('Loading function') s3 = boto3.client('s3') def lambda_handler(event, context): #print("Received event: " + json.dumps(event, indent=2)) # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') try: response = s3.get_object(Bucket=bucket, Key=key) print("CONTENT TYPE: " + response['ContentType']) return response['ContentType'] except Exception as e: print(e) print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) raise e