Python용 SDK (Boto3)를 사용한 Amazon S3 예제 - AWS SDK 코드 예제

Doc AWS SDK 예제 GitHub 리포지토리에서 더 많은 SDK 예제를 사용할 수 있습니다. AWS

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Python용 SDK (Boto3)를 사용한 Amazon S3 예제

다음 코드 예제에서는 Amazon S3에서를 사용하여 작업을 수행하고 일반적인 시나리오 AWS SDK for Python (Boto3) 를 구현하는 방법을 보여줍니다.

기본 사항은 서비스 내에서 필수 작업을 수행하는 방법을 보여주는 코드 예제입니다.

작업은 대규모 프로그램에서 발췌한 코드이며 컨텍스트에 맞춰 실행해야 합니다. 작업은 관련 시나리오의 컨텍스트에 따라 표시되며, 개별 서비스 함수를 직접적으로 호출하는 방법을 보여줍니다.

시나리오는 동일한 서비스 내에서 또는 다른 AWS 서비스와 결합된 상태에서 여러 함수를 호출하여 특정 태스크를 수행하는 방법을 보여주는 코드 예제입니다.

각 예시에는 전체 소스 코드에 대한 링크가 포함되어 있으며, 여기에서 컨텍스트에 맞춰 코드를 설정하고 실행하는 방법에 대한 지침을 찾을 수 있습니다.

시작하기

다음 코드 예제에서는 Amazon S3를 사용하여 시작하는 방법을 보여줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

import boto3 def hello_s3(): """ Use the AWS SDK for Python (Boto3) to create an Amazon Simple Storage Service (Amazon S3) client and list the buckets in your account. This example uses the default settings specified in your shared credentials and config files. """ # Create an S3 client. s3_client = boto3.client("s3") print("Hello, Amazon S3! Let's list your buckets:") # Create a paginator for the list_buckets operation. paginator = s3_client.get_paginator("list_buckets") # Use the paginator to get a list of all buckets. response_iterator = paginator.paginate( PaginationConfig={ "PageSize": 50, # Adjust PageSize as needed. "StartingToken": None, } ) # Iterate through the pages of the response. buckets_found = False for page in response_iterator: if "Buckets" in page and page["Buckets"]: buckets_found = True for bucket in page["Buckets"]: print(f"\t{bucket['Name']}") if not buckets_found: print("No buckets found!") if __name__ == "__main__": hello_s3()
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조ListBuckets를 참조하십시오.

기본 사항

다음 코드 예제는 다음과 같은 작업을 수행하는 방법을 보여줍니다.

  • 버킷을 만들고 버킷에 파일을 업로드합니다.

  • 버킷에서 객체를 다운로드합니다.

  • 버킷의 하위 폴더에 객체를 복사합니다.

  • 버킷의 객체를 나열합니다.

  • 버킷 객체와 버킷을 삭제합니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

import io import os import uuid import boto3 from boto3.s3.transfer import S3UploadFailedError from botocore.exceptions import ClientError def do_scenario(s3_resource): print("-" * 88) print("Welcome to the Amazon S3 getting started demo!") print("-" * 88) bucket_name = f"amzn-s3-demo-bucket-{uuid.uuid4()}" bucket = s3_resource.Bucket(bucket_name) try: bucket.create( CreateBucketConfiguration={ "LocationConstraint": s3_resource.meta.client.meta.region_name } ) print(f"Created demo bucket named {bucket.name}.") except ClientError as err: print(f"Tried and failed to create demo bucket {bucket_name}.") print(f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}") print(f"\nCan't continue the demo without a bucket!") return file_name = None while file_name is None: file_name = input("\nEnter a file you want to upload to your bucket: ") if not os.path.exists(file_name): print(f"Couldn't find file {file_name}. Are you sure it exists?") file_name = None obj = bucket.Object(os.path.basename(file_name)) try: obj.upload_file(file_name) print( f"Uploaded file {file_name} into bucket {bucket.name} with key {obj.key}." ) except S3UploadFailedError as err: print(f"Couldn't upload file {file_name} to {bucket.name}.") print(f"\t{err}") answer = input(f"\nDo you want to download {obj.key} into memory (y/n)? ") if answer.lower() == "y": data = io.BytesIO() try: obj.download_fileobj(data) data.seek(0) print(f"Got your object. Here are the first 20 bytes:\n") print(f"\t{data.read(20)}") except ClientError as err: print(f"Couldn't download {obj.key}.") print( f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}" ) answer = input( f"\nDo you want to copy {obj.key} to a subfolder in your bucket (y/n)? " ) if answer.lower() == "y": dest_obj = bucket.Object(f"demo-folder/{obj.key}") try: dest_obj.copy({"Bucket": bucket.name, "Key": obj.key}) print(f"Copied {obj.key} to {dest_obj.key}.") except ClientError as err: print(f"Couldn't copy {obj.key} to {dest_obj.key}.") print( f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}" ) print("\nYour bucket contains the following objects:") try: for o in bucket.objects.all(): print(f"\t{o.key}") except ClientError as err: print(f"Couldn't list the objects in bucket {bucket.name}.") print(f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}") answer = input( "\nDo you want to delete all of the objects as well as the bucket (y/n)? " ) if answer.lower() == "y": try: bucket.objects.delete() bucket.delete() print(f"Emptied and deleted bucket {bucket.name}.\n") except ClientError as err: print(f"Couldn't empty and delete bucket {bucket.name}.") print( f"\t{err.response['Error']['Code']}:{err.response['Error']['Message']}" ) print("Thanks for watching!") print("-" * 88) if __name__ == "__main__": do_scenario(boto3.resource("s3"))

작업

다음 코드 예시에서는 CopyObject을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def copy(self, dest_object): """ Copies the object to another bucket. :param dest_object: The destination object initialized with a bucket and key. This is a Boto3 Object resource. """ try: dest_object.copy_from( CopySource={"Bucket": self.object.bucket_name, "Key": self.object.key} ) dest_object.wait_until_exists() logger.info( "Copied object from %s:%s to %s:%s.", self.object.bucket_name, self.object.key, dest_object.bucket_name, dest_object.key, ) except ClientError: logger.exception( "Couldn't copy object from %s/%s to %s/%s.", self.object.bucket_name, self.object.key, dest_object.bucket_name, dest_object.key, ) raise

조건부 요청을 사용하여 객체를 복사합니다.

class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def copy_object_conditional( self, source_key: str, dest_key: str, source_bucket: str, dest_bucket: str, condition_type: str, condition_value: str, ): """ Copies an object from one Amazon S3 bucket to another with a conditional request. :param source_key: The key of the source object to copy. :param dest_key: The key of the destination object. :param source_bucket: The source bucket of the object. :param dest_bucket: The destination bucket of the object. :param condition_type: The type of condition to apply, e.g. 'CopySourceIfMatch', 'CopySourceIfNoneMatch', 'CopySourceIfModifiedSince', 'CopySourceIfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: self.s3.copy_object( Bucket=dest_bucket, Key=dest_key, CopySource={"Bucket": source_bucket, "Key": source_key}, **{condition_type: condition_value}, ) print( f"\tConditional copy successful for key {dest_key} in bucket {dest_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional copy failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional copy failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조CopyObject를 참조하십시오.

다음 코드 예시에서는 CreateBucket을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

기본 설정으로 버킷을 생성합니다.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def create(self, region_override=None): """ Create an Amazon S3 bucket in the default Region for the account or in the specified Region. :param region_override: The Region in which to create the bucket. If this is not specified, the Region configured in your shared credentials is used. """ if region_override is not None: region = region_override else: region = self.bucket.meta.client.meta.region_name try: self.bucket.create(CreateBucketConfiguration={"LocationConstraint": region}) self.bucket.wait_until_exists() logger.info("Created bucket '%s' in region=%s", self.bucket.name, region) except ClientError as error: logger.exception( "Couldn't create bucket named '%s' in region=%s.", self.bucket.name, region, ) raise error

수명 주기 구성으로 버전이 지정된 버킷을 생성합니다.

def create_versioned_bucket(bucket_name, prefix): """ Creates an Amazon S3 bucket, enables it for versioning, and configures a lifecycle that expires noncurrent object versions after 7 days. Adding a lifecycle configuration to a versioned bucket is a best practice. It helps prevent objects in the bucket from accumulating a large number of noncurrent versions, which can slow down request performance. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket_name: The name of the bucket to create. :param prefix: Identifies which objects are automatically expired under the configured lifecycle rules. :return: The newly created bucket. """ try: bucket = s3.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ "LocationConstraint": s3.meta.client.meta.region_name }, ) logger.info("Created bucket %s.", bucket.name) except ClientError as error: if error.response["Error"]["Code"] == "BucketAlreadyOwnedByYou": logger.warning("Bucket %s already exists! Using it.", bucket_name) bucket = s3.Bucket(bucket_name) else: logger.exception("Couldn't create bucket %s.", bucket_name) raise try: bucket.Versioning().enable() logger.info("Enabled versioning on bucket %s.", bucket.name) except ClientError: logger.exception("Couldn't enable versioning on bucket %s.", bucket.name) raise try: expiration = 7 bucket.LifecycleConfiguration().put( LifecycleConfiguration={ "Rules": [ { "Status": "Enabled", "Prefix": prefix, "NoncurrentVersionExpiration": {"NoncurrentDays": expiration}, } ] } ) logger.info( "Configured lifecycle to expire noncurrent versions after %s days " "on bucket %s.", expiration, bucket.name, ) except ClientError as error: logger.warning( "Couldn't configure lifecycle on bucket %s because %s. " "Continuing anyway.", bucket.name, error, ) return bucket
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조CreateBucket를 참조하세요.

다음 코드 예시에서는 DeleteBucket을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def delete(self): """ Delete the bucket. The bucket must be empty or an error is raised. """ try: self.bucket.delete() self.bucket.wait_until_not_exists() logger.info("Bucket %s successfully deleted.", self.bucket.name) except ClientError: logger.exception("Couldn't delete bucket %s.", self.bucket.name) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DeleteBucket를 참조하십시오.

다음 코드 예시에서는 DeleteBucketCors을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def delete_cors(self): """ Delete the CORS rules from the bucket. :param bucket_name: The name of the bucket to update. """ try: self.bucket.Cors().delete() logger.info("Deleted CORS from bucket '%s'.", self.bucket.name) except ClientError: logger.exception("Couldn't delete CORS from bucket '%s'.", self.bucket.name) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DeleteBucketCors를 참조하십시오.

다음 코드 예시에서는 DeleteBucketLifecycle을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def delete_lifecycle_configuration(self): """ Remove the lifecycle configuration from the specified bucket. """ try: self.bucket.LifecycleConfiguration().delete() logger.info( "Deleted lifecycle configuration for bucket '%s'.", self.bucket.name ) except ClientError: logger.exception( "Couldn't delete lifecycle configuration for bucket '%s'.", self.bucket.name, ) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DeleteBucketLifecycle를 참조하십시오.

다음 코드 예시에서는 DeleteBucketPolicy을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def delete_policy(self): """ Delete the security policy from the bucket. """ try: self.bucket.Policy().delete() logger.info("Deleted policy for bucket '%s'.", self.bucket.name) except ClientError: logger.exception( "Couldn't delete policy for bucket '%s'.", self.bucket.name ) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DeleteBucketPolicy를 참조하십시오.

다음 코드 예시에서는 DeleteObject을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

객체를 삭제합니다.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def delete(self): """ Deletes the object. """ try: self.object.delete() self.object.wait_until_not_exists() logger.info( "Deleted object '%s' from bucket '%s'.", self.object.key, self.object.bucket_name, ) except ClientError: logger.exception( "Couldn't delete object '%s' from bucket '%s'.", self.object.key, self.object.bucket_name, ) raise

객체의 최신 버전을 삭제하여 객체를 이전 버전으로 롤백합니다.

def rollback_object(bucket, object_key, version_id): """ Rolls back an object to an earlier version by deleting all versions that occurred after the specified rollback version. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that holds the object to roll back. :param object_key: The object to roll back. :param version_id: The version ID to roll back to. """ # Versions must be sorted by last_modified date because delete markers are # at the end of the list even when they are interspersed in time. versions = sorted( bucket.object_versions.filter(Prefix=object_key), key=attrgetter("last_modified"), reverse=True, ) logger.debug( "Got versions:\n%s", "\n".join( [ f"\t{version.version_id}, last modified {version.last_modified}" for version in versions ] ), ) if version_id in [ver.version_id for ver in versions]: print(f"Rolling back to version {version_id}") for version in versions: if version.version_id != version_id: version.delete() print(f"Deleted version {version.version_id}") else: break print(f"Active version is now {bucket.Object(object_key).version_id}") else: raise KeyError( f"{version_id} was not found in the list of versions for " f"{object_key}." )

객체의 활성 삭제 마커를 제거하여 삭제된 객체를 다시 활성화합니다.

def revive_object(bucket, object_key): """ Revives a versioned object that was deleted by removing the object's active delete marker. A versioned object presents as deleted when its latest version is a delete marker. By removing the delete marker, we make the previous version the latest version and the object then presents as *not* deleted. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that contains the object. :param object_key: The object to revive. """ # Get the latest version for the object. response = s3.meta.client.list_object_versions( Bucket=bucket.name, Prefix=object_key, MaxKeys=1 ) if "DeleteMarkers" in response: latest_version = response["DeleteMarkers"][0] if latest_version["IsLatest"]: logger.info( "Object %s was indeed deleted on %s. Let's revive it.", object_key, latest_version["LastModified"], ) obj = bucket.Object(object_key) obj.Version(latest_version["VersionId"]).delete() logger.info( "Revived %s, active version is now %s with body '%s'", object_key, obj.version_id, obj.get()["Body"].read(), ) else: logger.warning( "Delete marker is not the latest version for %s!", object_key ) elif "Versions" in response: logger.warning("Got an active version for %s, nothing to do.", object_key) else: logger.error("Couldn't get any version info for %s.", object_key)

S3 객체에서 삭제 마커를 제거하는 Lambda 핸들러를 생성합니다. 이 핸들러를 사용하면 버전이 지정된 버킷에서 불필요한 삭제 마커를 효율적으로 정리할 수 있습니다.

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DeleteObject를 참조하십시오.

다음 코드 예시에서는 DeleteObjects을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

객체 키 목록을 사용하여 객체 세트를 삭제합니다.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key @staticmethod def delete_objects(bucket, object_keys): """ Removes a list of objects from a bucket. This operation is done as a batch in a single request. :param bucket: The bucket that contains the objects. This is a Boto3 Bucket resource. :param object_keys: The list of keys that identify the objects to remove. :return: The response that contains data about which objects were deleted and any that could not be deleted. """ try: response = bucket.delete_objects( Delete={"Objects": [{"Key": key} for key in object_keys]} ) if "Deleted" in response: logger.info( "Deleted objects '%s' from bucket '%s'.", [del_obj["Key"] for del_obj in response["Deleted"]], bucket.name, ) if "Errors" in response: logger.warning( "Could not delete objects '%s' from bucket '%s'.", [ f"{del_obj['Key']}: {del_obj['Code']}" for del_obj in response["Errors"] ], bucket.name, ) except ClientError: logger.exception("Couldn't delete any objects from bucket %s.", bucket.name) raise else: return response

버킷의 모든 객체를 삭제합니다.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key @staticmethod def empty_bucket(bucket): """ Remove all objects from a bucket. :param bucket: The bucket to empty. This is a Boto3 Bucket resource. """ try: bucket.objects.delete() logger.info("Emptied bucket '%s'.", bucket.name) except ClientError: logger.exception("Couldn't empty bucket '%s'.", bucket.name) raise

모든 버전을 삭제하여 버전이 지정된 객체를 영구적으로 삭제합니다.

def permanently_delete_object(bucket, object_key): """ Permanently deletes a versioned object by deleting all of its versions. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that contains the object. :param object_key: The object to delete. """ try: bucket.object_versions.filter(Prefix=object_key).delete() logger.info("Permanently deleted all versions of object %s.", object_key) except ClientError: logger.exception("Couldn't delete all versions of %s.", object_key) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DeleteObjects를 참조하십시오.

다음 코드 예시에서는 GetBucketAcl을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def get_acl(self): """ Get the ACL of the bucket. :return: The ACL of the bucket. """ try: acl = self.bucket.Acl() logger.info( "Got ACL for bucket %s. Owner is %s.", self.bucket.name, acl.owner ) except ClientError: logger.exception("Couldn't get ACL for bucket %s.", self.bucket.name) raise else: return acl
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조GetBucketAcl를 참조하십시오.

다음 코드 예시에서는 GetBucketCors을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def get_cors(self): """ Get the CORS rules for the bucket. :return The CORS rules for the specified bucket. """ try: cors = self.bucket.Cors() logger.info( "Got CORS rules %s for bucket '%s'.", cors.cors_rules, self.bucket.name ) except ClientError: logger.exception(("Couldn't get CORS for bucket %s.", self.bucket.name)) raise else: return cors
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조GetBucketCors를 참조하십시오.

다음 코드 예시에서는 GetBucketLifecycleConfiguration을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def get_lifecycle_configuration(self): """ Get the lifecycle configuration of the bucket. :return: The lifecycle rules of the specified bucket. """ try: config = self.bucket.LifecycleConfiguration() logger.info( "Got lifecycle rules %s for bucket '%s'.", config.rules, self.bucket.name, ) except: logger.exception( "Couldn't get lifecycle rules for bucket '%s'.", self.bucket.name ) raise else: return config.rules

다음 코드 예시에서는 GetBucketPolicy을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def get_policy(self): """ Get the security policy of the bucket. :return: The security policy of the specified bucket, in JSON format. """ try: policy = self.bucket.Policy() logger.info( "Got policy %s for bucket '%s'.", policy.policy, self.bucket.name ) except ClientError: logger.exception("Couldn't get policy for bucket '%s'.", self.bucket.name) raise else: return json.loads(policy.policy)
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조GetBucketPolicy를 참조하십시오.

다음 코드 예시에서는 GetObject을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def get(self): """ Gets the object. :return: The object data in bytes. """ try: body = self.object.get()["Body"].read() logger.info( "Got object '%s' from bucket '%s'.", self.object.key, self.object.bucket_name, ) except ClientError: logger.exception( "Couldn't get object '%s' from bucket '%s'.", self.object.key, self.object.bucket_name, ) raise else: return body

조건부 요청을 사용하여 객체를 가져옵니다.

class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def get_object_conditional( self, object_key: str, source_bucket: str, condition_type: str, condition_value: str, ): """ Retrieves an object from Amazon S3 with a conditional request. :param object_key: The key of the object to retrieve. :param source_bucket: The source bucket of the object. :param condition_type: The type of condition: 'IfMatch', 'IfNoneMatch', 'IfModifiedSince', 'IfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: response = self.s3.get_object( Bucket=source_bucket, Key=object_key, **{condition_type: condition_value}, ) sample_bytes = response["Body"].read(20) print( f"\tConditional read successful. Here are the first 20 bytes of the object:\n" ) print(f"\t{sample_bytes}") except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional read failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional read failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조GetObject를 참조하십시오.

다음 코드 예시에서는 GetObjectAcl을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def get_acl(self): """ Gets the ACL of the object. :return: The ACL of the object. """ try: acl = self.object.Acl() logger.info( "Got ACL for object %s owned by %s.", self.object.key, acl.owner["DisplayName"], ) except ClientError: logger.exception("Couldn't get ACL for object %s.", self.object.key) raise else: return acl
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조GetObjectAcl를 참조하십시오.

다음 코드 예시에서는 GetObjectLegalHold을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

객체에 법적 보존을 적용하세요.

def get_legal_hold(s3_client, bucket: str, key: str) -> None: """ Get the legal hold status of a specific file in a bucket. Args: s3_client: Boto3 S3 client. bucket: The name of the bucket containing the file. key: The key of the file to get the legal hold status of. """ print() logger.info("Getting legal hold status of file [%s] in bucket [%s]", key, bucket) try: response = s3_client.get_object_legal_hold(Bucket=bucket, Key=key) legal_hold_status = response["LegalHold"]["Status"] logger.debug( "Legal hold status of file [%s] in bucket [%s] is [%s]", key, bucket, legal_hold_status, ) except Exception as e: logger.error( "Failed to get legal hold status of file [%s] in bucket [%s]: %s", key, bucket, e, )
  • API 세부 정보는AWS SDK for Python(Boto3) API 참조GetObjectLegalHold를 참조하세요.

다음 코드 예시에서는 GetObjectLockConfiguration을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

객체 잠금 구성을 가져오세요.

def is_object_lock_enabled(s3_client, bucket: str) -> bool: """ Check if object lock is enabled for a bucket. Args: s3_client: Boto3 S3 client. bucket: The name of the bucket to check. Returns: True if object lock is enabled, False otherwise. """ try: response = s3_client.get_object_lock_configuration(Bucket=bucket) return ( "ObjectLockConfiguration" in response and response["ObjectLockConfiguration"]["ObjectLockEnabled"] == "Enabled" ) except s3_client.exceptions.ClientError as e: if e.response["Error"]["Code"] == "ObjectLockConfigurationNotFoundError": return False else: raise

다음 코드 예시에서는 HeadBucket을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def exists(self): """ Determine whether the bucket exists and you have access to it. :return: True when the bucket exists; otherwise, False. """ try: self.bucket.meta.client.head_bucket(Bucket=self.bucket.name) logger.info("Bucket %s exists.", self.bucket.name) exists = True except ClientError: logger.warning( "Bucket %s doesn't exist or you don't have access to it.", self.bucket.name, ) exists = False return exists
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조HeadBucket를 참조하십시오.

다음 코드 예시에서는 ListBuckets을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name @staticmethod def list(s3_resource): """ Get the buckets in all Regions for the current account. :param s3_resource: A Boto3 S3 resource. This is a high-level resource in Boto3 that contains collections and factory methods to create other high-level S3 sub-resources. :return: The list of buckets. """ try: buckets = list(s3_resource.buckets.all()) logger.info("Got buckets: %s.", buckets) except ClientError: logger.exception("Couldn't get buckets.") raise else: return buckets
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조ListBuckets를 참조하십시오.

다음 코드 예시에서는 ListObjectsV2을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key @staticmethod def list(bucket, prefix=None): """ Lists the objects in a bucket, optionally filtered by a prefix. :param bucket: The bucket to query. This is a Boto3 Bucket resource. :param prefix: When specified, only objects that start with this prefix are listed. :return: The list of objects. """ try: if not prefix: objects = list(bucket.objects.all()) else: objects = list(bucket.objects.filter(Prefix=prefix)) logger.info( "Got objects %s from bucket '%s'", [o.key for o in objects], bucket.name ) except ClientError: logger.exception("Couldn't get objects for bucket '%s'.", bucket.name) raise else: return objects
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조ListObjectsV2를 참조하십시오.

다음 코드 예시에서는 PutBucketAcl을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def grant_log_delivery_access(self): """ Grant the AWS Log Delivery group write access to the bucket so that Amazon S3 can deliver access logs to the bucket. This is the only recommended use of an S3 bucket ACL. """ try: acl = self.bucket.Acl() # Putting an ACL overwrites the existing ACL. If you want to preserve # existing grants, append new grants to the list of existing grants. grants = acl.grants if acl.grants else [] grants.append( { "Grantee": { "Type": "Group", "URI": "http://acs.amazonaws.com/groups/s3/LogDelivery", }, "Permission": "WRITE", } ) acl.put(AccessControlPolicy={"Grants": grants, "Owner": acl.owner}) logger.info("Granted log delivery access to bucket '%s'", self.bucket.name) except ClientError: logger.exception("Couldn't add ACL to bucket '%s'.", self.bucket.name) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조PutBucketAcl를 참조하십시오.

다음 코드 예시에서는 PutBucketCors을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def put_cors(self, cors_rules): """ Apply CORS rules to the bucket. CORS rules specify the HTTP actions that are allowed from other domains. :param cors_rules: The CORS rules to apply. """ try: self.bucket.Cors().put(CORSConfiguration={"CORSRules": cors_rules}) logger.info( "Put CORS rules %s for bucket '%s'.", cors_rules, self.bucket.name ) except ClientError: logger.exception("Couldn't put CORS rules for bucket %s.", self.bucket.name) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조PutBucketCors를 참조하십시오.

다음 코드 예시에서는 PutBucketLifecycleConfiguration을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def put_lifecycle_configuration(self, lifecycle_rules): """ Apply a lifecycle configuration to the bucket. The lifecycle configuration can be used to archive or delete the objects in the bucket according to specified parameters, such as a number of days. :param lifecycle_rules: The lifecycle rules to apply. """ try: self.bucket.LifecycleConfiguration().put( LifecycleConfiguration={"Rules": lifecycle_rules} ) logger.info( "Put lifecycle rules %s for bucket '%s'.", lifecycle_rules, self.bucket.name, ) except ClientError: logger.exception( "Couldn't put lifecycle rules for bucket '%s'.", self.bucket.name ) raise

다음 코드 예시에서는 PutBucketPolicy을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def put_policy(self, policy): """ Apply a security policy to the bucket. Policies control users' ability to perform specific actions, such as listing the objects in the bucket. :param policy: The policy to apply to the bucket. """ try: self.bucket.Policy().put(Policy=json.dumps(policy)) logger.info("Put policy %s for bucket '%s'.", policy, self.bucket.name) except ClientError: logger.exception("Couldn't apply policy to bucket '%s'.", self.bucket.name) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조PutBucketPolicy를 참조하십시오.

다음 코드 예시에서는 PutObject을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def put(self, data): """ Upload data to the object. :param data: The data to upload. This can either be bytes or a string. When this argument is a string, it is interpreted as a file name, which is opened in read bytes mode. """ put_data = data if isinstance(data, str): try: put_data = open(data, "rb") except IOError: logger.exception("Expected file name or binary data, got '%s'.", data) raise try: self.object.put(Body=put_data) self.object.wait_until_exists() logger.info( "Put object '%s' to bucket '%s'.", self.object.key, self.object.bucket_name, ) except ClientError: logger.exception( "Couldn't put object '%s' to bucket '%s'.", self.object.key, self.object.bucket_name, ) raise finally: if getattr(put_data, "close", None): put_data.close()

조건부 요청을 사용하여 객체를 업로드합니다.

class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def put_object_conditional(self, object_key: str, source_bucket: str, data: bytes): """ Uploads an object to Amazon S3 with a conditional request. Prevents overwrite using an IfNoneMatch condition for the object key. :param object_key: The key of the object to upload. :param source_bucket: The source bucket of the object. :param data: The data to upload. """ try: self.s3.put_object( Bucket=source_bucket, Key=object_key, Body=data, IfNoneMatch="*" ) print( f"\tConditional write successful for key {object_key} in bucket {source_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional write failed: Precondition failed") else: logger.error(f"Unexpected error: {error_code}") raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조PutObject를 참조하십시오.

다음 코드 예시에서는 PutObjectAcl을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

class ObjectWrapper: """Encapsulates S3 object actions.""" def __init__(self, s3_object): """ :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3 that wraps object actions in a class-like structure. """ self.object = s3_object self.key = self.object.key def put_acl(self, email): """ Applies an ACL to the object that grants read access to an AWS user identified by email address. :param email: The email address of the user to grant access. """ try: acl = self.object.Acl() # Putting an ACL overwrites the existing ACL, so append new grants # if you want to preserve existing grants. grants = acl.grants if acl.grants else [] grants.append( { "Grantee": {"Type": "AmazonCustomerByEmail", "EmailAddress": email}, "Permission": "READ", } ) acl.put(AccessControlPolicy={"Grants": grants, "Owner": acl.owner}) logger.info("Granted read access to %s.", email) except ClientError: logger.exception("Couldn't add ACL to object '%s'.", self.object.key) raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조PutObjectAcl를 참조하십시오.

다음 코드 예시에서는 PutObjectLegalHold을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

객체에 법적 보존을 적용하세요.

def set_legal_hold(s3_client, bucket: str, key: str) -> None: """ Set a legal hold on a specific file in a bucket. Args: s3_client: Boto3 S3 client. bucket: The name of the bucket containing the file. key: The key of the file to set the legal hold on. """ print() logger.info("Setting legal hold on file [%s] in bucket [%s]", key, bucket) try: before_status = "OFF" after_status = "ON" s3_client.put_object_legal_hold( Bucket=bucket, Key=key, LegalHold={"Status": after_status} ) logger.debug( "Legal hold set successfully on file [%s] in bucket [%s]", key, bucket ) _print_legal_hold_update(bucket, key, before_status, after_status) except Exception as e: logger.error( "Failed to set legal hold on file [%s] in bucket [%s]: %s", key, bucket, e )
  • API 세부 정보는 AWS SDK for Python(Boto3) API 참조의 PutObjectLegalHold를 참조하세요.

다음 코드 예시에서는 PutObjectLockConfiguration을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

객체 잠금 구성을 적용하세요.

s3_client.put_object_lock_configuration( Bucket=bucket, ObjectLockConfiguration={"ObjectLockEnabled": "Disabled", "Rule": {}}, )

다음 코드 예시에서는 PutObjectRetention을 사용하는 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

객체에 보존을 적용하세요.

s3_client.put_object_retention( Bucket=bucket, Key=key, VersionId=version_id, Retention={"Mode": "GOVERNANCE", "RetainUntilDate": far_future_date}, BypassGovernanceRetention=True, )
  • API 세부 정보는 AWS SDK for Python(Boto3) API 참조의 PutObjectRetention을 참조하세요.

시나리오

다음 코드 예제에서는 Amazon S3에 대해 미리 서명된 URL을 생성하고 객체를 업로드하는 방법을 보여줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

제한된 시간 동안 S3 작업을 수행할 수 있는 미리 서명된 URL을 생성합니다. 요청 패키지를 사용하여 URL로 요청을 수행합니다.

import argparse import logging import boto3 from botocore.exceptions import ClientError import requests logger = logging.getLogger(__name__) def generate_presigned_url(s3_client, client_method, method_parameters, expires_in): """ Generate a presigned Amazon S3 URL that can be used to perform an action. :param s3_client: A Boto3 Amazon S3 client. :param client_method: The name of the client method that the URL performs. :param method_parameters: The parameters of the specified client method. :param expires_in: The number of seconds the presigned URL is valid for. :return: The presigned URL. """ try: url = s3_client.generate_presigned_url( ClientMethod=client_method, Params=method_parameters, ExpiresIn=expires_in ) logger.info("Got presigned URL: %s", url) except ClientError: logger.exception( "Couldn't get a presigned URL for client method '%s'.", client_method ) raise return url def usage_demo(): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the Amazon S3 presigned URL demo.") print("-" * 88) parser = argparse.ArgumentParser() parser.add_argument("bucket", help="The name of the bucket.") parser.add_argument( "key", help="For a GET operation, the key of the object in Amazon S3. For a " "PUT operation, the name of a file to upload.", ) parser.add_argument("action", choices=("get", "put"), help="The action to perform.") args = parser.parse_args() s3_client = boto3.client("s3") client_action = "get_object" if args.action == "get" else "put_object" url = generate_presigned_url( s3_client, client_action, {"Bucket": args.bucket, "Key": args.key}, 1000 ) print("Using the Requests package to send a request to the URL.") response = None if args.action == "get": response = requests.get(url) if response.status_code == 200: with open(args.key.split("/")[-1], 'wb') as object_file: object_file.write(response.content) elif args.action == "put": print("Putting data to the URL.") try: with open(args.key, "rb") as object_file: object_text = object_file.read() response = requests.put(url, data=object_text) except FileNotFoundError: print( f"Couldn't find {args.key}. For a PUT operation, the key must be the " f"name of a file that exists on your computer." ) if response is not None: print(f"Status: {response.status_code}\nReason: {response.reason}") print("-" * 88) if __name__ == "__main__": usage_demo()

미리 서명된 POST 요청을 생성하여 파일을 업로드합니다.

class BucketWrapper: """Encapsulates S3 bucket actions.""" def __init__(self, bucket): """ :param bucket: A Boto3 Bucket resource. This is a high-level resource in Boto3 that wraps bucket actions in a class-like structure. """ self.bucket = bucket self.name = bucket.name def generate_presigned_post(self, object_key, expires_in): """ Generate a presigned Amazon S3 POST request to upload a file. A presigned POST can be used for a limited time to let someone without an AWS account upload a file to a bucket. :param object_key: The object key to identify the uploaded object. :param expires_in: The number of seconds the presigned POST is valid. :return: A dictionary that contains the URL and form fields that contain required access data. """ try: response = self.bucket.meta.client.generate_presigned_post( Bucket=self.bucket.name, Key=object_key, ExpiresIn=expires_in ) logger.info("Got presigned POST URL: %s", response["url"]) except ClientError: logger.exception( "Couldn't get a presigned POST URL for bucket '%s' and object '%s'", self.bucket.name, object_key, ) raise return response

다음 코드 예제는 대화형 애플리케이션을 통해 Amazon Textract 출력을 탐색하는 방법을 보여줍니다.

SDK for Python(Boto3)

Amazon Textract와 AWS SDK for Python (Boto3) 함께를 사용하여 문서 이미지에서 텍스트, 양식 및 테이블 요소를 감지하는 방법을 보여줍니다. 입력 이미지와 Amazon Textract 출력은 탐지된 요소를 탐색할 수 있는 Tkinter 애플리케이션에 표시됩니다.

  • 문서 이미지를 Amazon Textract에 제출하고 감지된 요소의 출력을 탐색합니다.

  • Amazon Textract로 직접, 또는 Amazon Simple Storage Service (Amazon S3) 버킷을 통해 이미지를 제출합니다.

  • 비동기식 API를 사용하여 작업이 완료되면 Amazon Simple Notification Service(Amazon SNS) 주제에 알림을 게시하는 작업을 시작합니다.

  • Amazon Simple Queue Service(Amazon SQS) 대기열에서 작업 완료 메시지를 폴링하고 결과를 표시합니다.

전체 소스 코드와 설정 및 실행 방법에 대한 지침은 GitHub에서 전체 예제를 참조하세요.

이 예제에서 사용되는 서비스
  • Amazon S3

  • Amazon SNS

  • Amazon SQS

  • Amazon Textract

다음 코드 예제에서는 Amazon Comprehend를 사용하여 Amazon S3에 저장된 이미지에서 Amazon Textract를 통해 추출한 텍스트의 엔터티를 감지하는 방법을 보여줍니다.

SDK for Python(Boto3)

Jupyter 노트북 AWS SDK for Python (Boto3) 에서를 사용하여 이미지에서 추출된 텍스트의 개체를 감지하는 방법을 보여줍니다. 이 예제에서는 Amazon Textract를 통해 Amazon Simple Storage Service (Amazon S3) 및 Amazon Comprehend에 저장된 이미지에서 텍스트를 추출하여 추출된 텍스트의 엔터티를 감지합니다.

이 예제는 Jupyter Notebook에 관한 것이며, 노트북을 호스팅할 수 있는 환경에서 실행되어야 합니다. Amazon SageMaker AI를 사용하여 예제를 실행하는 방법에 대한 지침은 TextractAndComprehendNotebook.ipynb의 지침을 참조하세요.

전체 소스 코드와 설정 및 실행 방법에 대한 지침은 GitHub에서 전체 예제를 참조하세요.

이 예시에서 사용되는 서비스
  • Amazon Comprehend

  • Amazon S3

  • Amazon Textract

다음 코드 예제는 Amazon Rekognition을 사용하여 이미지의 범주별로 객체를 감지하는 앱을 빌드하는 방법을 보여줍니다.

SDK for Python(Boto3)

AWS SDK for Python (Boto3) 를 사용하여 다음을 수행할 수 있는 웹 애플리케이션을 생성하는 방법을 보여줍니다.

  • 사진을 Amazon Simple Storage Service (Amazon S3) 버킷에 업로드합니다.

  • Amazon Rekognition을 사용하여 사진을 분석하고 레이블을 지정합니다.

  • Amazon Simple Email Service(Amazon SES)를 사용하여 이미지 분석에 대한 이메일 보고서를 보냅니다.

이 예제에는 두 가지 주요 구성 요소가 포함되어 있습니다. 바로 JavaScript로 작성되고 React로 빌드된 웹 페이지와 Python으로 작성되고 Flask-RESTful로 빌드된 REST 서비스입니다.

React 웹 페이지를 사용하여 다음을 수행할 수 있습니다.

  • S3 버킷에 저장된 이미지 목록을 표시합니다.

  • 컴퓨터에서 S3 버킷에 이미지를 업로드합니다.

  • 이미지에서 감지된 항목을 식별하는 이미지와 레이블을 표시합니다.

  • S3 버킷의 모든 이미지에 대한 보고서를 받고 보고서의 이메일을 보냅니다.

웹 페이지가 REST 서비스를 호출합니다. 서비스가 다음 작업을 수행하기 위해 AWS 에 요청을 전송합니다.

  • S3 버킷의 이미지 목록을 가져오고 필터링합니다.

  • S3 버킷에 사진을 업로드합니다.

  • Amazon Rekognition을 사용하여 개별 사진을 분석하고 사진에서 감지된 항목을 식별하는 레이블 목록을 가져옵니다.

  • S3 버킷의 모든 사진을 분석하고 Amazon SES를 사용하여 보고서를 이메일로 보냅니다.

전체 소스 코드와 설정 및 실행 방법에 대한 지침은 GitHub에서 전체 예제를 참조하십시오.

이 예제에서 사용되는 서비스
  • Amazon Rekognition

  • Amazon S3

  • Amazon SES

다음 코드 예제에서는 Amazon S3 요청에 사전 조건을 추가하는 방법을 보여줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

Amazon S3 조건부 요청을 보여주는 대화형 시나리오를 실행합니다.

""" Purpose Shows how to use AWS SDK for Python (Boto3) to get started using conditional requests for Amazon Simple Storage Service (Amazon S3). """ import logging import random import sys import datetime import boto3 from botocore.exceptions import ClientError from s3_conditional_requests import S3ConditionalRequests # Add relative path to include demo_tools in this code example without need for setup. sys.path.append("../../../..") import demo_tools.question as q # noqa # Constants FILE_CONTENT = "This is a test file for S3 conditional requests." RANDOM_SUFFIX = str(random.randint(100, 999)) logger = logging.getLogger(__name__) class ConditionalRequestsScenario: """Runs a scenario that shows how to use S3 Conditional Requests.""" def __init__(self, conditional_requests, s3_client): """ :param conditional_requests: An object that wraps S3 conditional request actions. :param s3_client: A Boto3 S3 client for setup and cleanup operations. """ self.conditional_requests = conditional_requests self.s3_client = s3_client def setup_scenario(self, source_bucket: str, dest_bucket: str, object_key: str): """ Sets up the scenario by creating a source and destination bucket. Prompts the user to provide a bucket name prefix. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. :param object_key: The name of a test file to add to the source bucket. """ # Create the buckets. try: self.s3_client.create_bucket(Bucket=source_bucket) self.s3_client.create_bucket(Bucket=dest_bucket) print( f"Created source bucket: {source_bucket} and destination bucket: {dest_bucket}" ) except ClientError as e: error_code = e.response["Error"]["Code"] logger.error(f"Error creating buckets: {error_code}") raise # Upload test file into the source bucket. try: print(f"Uploading file {object_key} to bucket {source_bucket}") response = self.s3_client.put_object( Bucket=source_bucket, Key=object_key, Body=FILE_CONTENT ) object_etag = response["ETag"] return object_etag except Exception as e: logger.error( f"Failed to upload file {object_key} to bucket {source_bucket}: {e}" ) def cleanup_scenario(self, source_bucket: str, dest_bucket: str): """ Cleans up the scenario by deleting the source and destination buckets. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. """ self.cleanup_bucket(source_bucket) self.cleanup_bucket(dest_bucket) def cleanup_bucket(self, bucket_name: str): """ Cleans up the bucket by deleting all objects and then the bucket itself. :param bucket_name: The name of the bucket. """ try: # Get list of all objects in the bucket. list_response = self.s3_client.list_objects_v2(Bucket=bucket_name) objs = list_response.get("Contents", []) for obj in objs: key = obj["Key"] self.s3_client.delete_object(Bucket=bucket_name, Key=key) self.s3_client.delete_bucket(Bucket=bucket_name) print(f"Cleaned up bucket: {bucket_name}.") except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "NoSuchBucket": logger.info(f"Bucket {bucket_name} does not exist, skipping cleanup.") else: logger.error(f"Error deleting bucket: {error_code}") raise def display_buckets(self, source_bucket: str, dest_bucket: str): """ Display a list of the objects in the test buckets. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. """ self.list_bucket_contents(source_bucket) self.list_bucket_contents(dest_bucket) def list_bucket_contents(self, bucket_name): """ Display a list of the objects in the bucket. :param bucket_name: The name of the bucket. """ try: # Get list of all objects in the bucket. print(f"\t Items in bucket {bucket_name}") list_response = self.s3_client.list_objects_v2(Bucket=bucket_name) objs = list_response.get("Contents", []) if not objs: print("\t\tNo objects found.") for obj in objs: key = obj["Key"] print(f"\t\t object: {key} ETag {obj['ETag']}") return objs except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "NoSuchBucket": logger.info(f"Bucket {bucket_name} does not exist.") else: logger.error(f"Error listing bucket and objects: {error_code}") raise def display_menu( self, source_bucket: str, dest_bucket: str, object_key: str, etag: str ): """ Displays the menu of conditional request options for the user. :param source_bucket: The name of the source bucket. :param dest_bucket: The name of the destination bucket. :param object_key: The key of the test object in the source bucket. :param etag: The etag of the test object in the source bucket. """ actions = [ "Print list of bucket items.", "Perform a conditional read.", "Perform a conditional copy.", "Perform a conditional write.", "Clean up and exit.", ] conditions = [ "If-Match: using the object's ETag. This condition should succeed.", "If-None-Match: using the object's ETag. This condition should fail.", "If-Modified-Since: using yesterday's date. This condition should succeed.", "If-Unmodified-Since: using yesterday's date. This condition should fail.", ] condition_types = [ "IfMatch", "IfNoneMatch", "IfModifiedSince", "IfUnmodifiedSince", ] copy_condition_types = [ "CopySourceIfMatch", "CopySourceIfNoneMatch", "CopySourceIfModifiedSince", "CopySourceIfUnmodifiedSince", ] yesterday_date = datetime.datetime.utcnow() - datetime.timedelta(days=1) choice = 0 while choice != 4: print("-" * 88) print("Choose an action to explore some example conditional requests.") choice = q.choose("Which action would you like to take? ", actions) if choice == 0: print("Listing the objects and buckets.") self.display_buckets(source_bucket, dest_bucket) elif choice == 1: print("Perform a conditional read.") condition_type = q.choose("Enter the condition type : ", conditions) if condition_type == 0 or condition_type == 1: self.conditional_requests.get_object_conditional( object_key, source_bucket, condition_types[condition_type], etag ) elif condition_type == 2 or condition_type == 3: self.conditional_requests.get_object_conditional( object_key, source_bucket, condition_types[condition_type], yesterday_date, ) elif choice == 2: print("Perform a conditional copy.") condition_type = q.choose("Enter the condition type : ", conditions) dest_key = q.ask("Enter an object key: ", q.non_empty) if condition_type == 0 or condition_type == 1: self.conditional_requests.copy_object_conditional( object_key, dest_key, source_bucket, dest_bucket, copy_condition_types[condition_type], etag, ) elif condition_type == 2 or condition_type == 3: self.conditional_requests.copy_object_conditional( object_key, dest_key, copy_condition_types[condition_type], yesterday_date, ) elif choice == 3: print( "Perform a conditional write using IfNoneMatch condition on the object key." ) print("If the key is a duplicate, the write will fail.") object_key = q.ask("Enter an object key: ", q.non_empty) self.conditional_requests.put_object_conditional( object_key, source_bucket, b"Conditional write example data." ) elif choice == 4: print("Proceeding to cleanup.") def run_scenario(self): """ Runs the interactive scenario. """ print("-" * 88) print("Welcome to the Amazon S3 conditional requests example.") print("-" * 88) print( f"""\ This example demonstrates the use of conditional requests for S3 operations. You can use conditional requests to add preconditions to S3 read requests to return or copy an object based on its Entity tag (ETag), or last modified date. You can use a conditional write requests to prevent overwrites by ensuring there is no existing object with the same key. This example will allow you to perform conditional reads and writes that will succeed or fail based on your selected options. Sample buckets and a sample object will be created as part of the example. """ ) bucket_prefix = q.ask("Enter a bucket name prefix: ", q.non_empty) source_bucket_name = f"{bucket_prefix}-source-{RANDOM_SUFFIX}" dest_bucket_name = f"{bucket_prefix}-dest-{RANDOM_SUFFIX}" object_key = "test-upload-file.txt" try: etag = self.setup_scenario(source_bucket_name, dest_bucket_name, object_key) self.display_menu(source_bucket_name, dest_bucket_name, object_key, etag) finally: self.cleanup_scenario(source_bucket_name, dest_bucket_name) print("-" * 88) print("Thanks for watching.") print("-" * 88) if __name__ == "__main__": scenario = ConditionalRequestsScenario( S3ConditionalRequests.from_client(), boto3.client("s3") ) scenario.run_scenario()

조건부 요청 작업을 정의하는 래퍼 클래스입니다.

import boto3 import logging from botocore.exceptions import ClientError # Configure logging logger = logging.getLogger(__name__) class S3ConditionalRequests: """Encapsulates S3 conditional request operations.""" def __init__(self, s3_client): self.s3 = s3_client @classmethod def from_client(cls): """ Instantiates this class from a Boto3 client. """ s3_client = boto3.client("s3") return cls(s3_client) def get_object_conditional( self, object_key: str, source_bucket: str, condition_type: str, condition_value: str, ): """ Retrieves an object from Amazon S3 with a conditional request. :param object_key: The key of the object to retrieve. :param source_bucket: The source bucket of the object. :param condition_type: The type of condition: 'IfMatch', 'IfNoneMatch', 'IfModifiedSince', 'IfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: response = self.s3.get_object( Bucket=source_bucket, Key=object_key, **{condition_type: condition_value}, ) sample_bytes = response["Body"].read(20) print( f"\tConditional read successful. Here are the first 20 bytes of the object:\n" ) print(f"\t{sample_bytes}") except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional read failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional read failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise def put_object_conditional(self, object_key: str, source_bucket: str, data: bytes): """ Uploads an object to Amazon S3 with a conditional request. Prevents overwrite using an IfNoneMatch condition for the object key. :param object_key: The key of the object to upload. :param source_bucket: The source bucket of the object. :param data: The data to upload. """ try: self.s3.put_object( Bucket=source_bucket, Key=object_key, Body=data, IfNoneMatch="*" ) print( f"\tConditional write successful for key {object_key} in bucket {source_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional write failed: Precondition failed") else: logger.error(f"Unexpected error: {error_code}") raise def copy_object_conditional( self, source_key: str, dest_key: str, source_bucket: str, dest_bucket: str, condition_type: str, condition_value: str, ): """ Copies an object from one Amazon S3 bucket to another with a conditional request. :param source_key: The key of the source object to copy. :param dest_key: The key of the destination object. :param source_bucket: The source bucket of the object. :param dest_bucket: The destination bucket of the object. :param condition_type: The type of condition to apply, e.g. 'CopySourceIfMatch', 'CopySourceIfNoneMatch', 'CopySourceIfModifiedSince', 'CopySourceIfUnmodifiedSince'. :param condition_value: The value to use for the condition. """ try: self.s3.copy_object( Bucket=dest_bucket, Key=dest_key, CopySource={"Bucket": source_bucket, "Key": source_key}, **{condition_type: condition_value}, ) print( f"\tConditional copy successful for key {dest_key} in bucket {dest_bucket}." ) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "PreconditionFailed": print("\tConditional copy failed: Precondition failed") elif error_code == "304": # Not modified error code. print("\tConditional copy failed: Object not modified") else: logger.error(f"Unexpected error: {error_code}") raise

다음 코드 예제에서는 Lambda 함수를 사용하여 버전이 지정된 S3 객체를 배치 단위로 관리하는 방법을 보여줍니다.

SDK for Python(Boto3)

처리를 수행하기 위해 AWS Lambda 함수를 호출하는 작업을 생성하여 Amazon Simple Storage Service(Amazon S3) 버전 객체를 일괄적으로 조작하는 방법을 보여줍니다. 이 예제에서는 버전 관리를 사용한 버킷을 생성하고, Lewis Carroll의 You Are Old, Father William이라는 시의 시구를 업로드하며, Amazon S3 배치 작업을 사용하여 다양한 방법으로 시를 번형합니다.

다음 작업을 수행하는 방법에 대해 알아보세요.
  • 버전이 지정된 객체에서 작동하는 Lambda 함수를 생성합니다.

  • 업데이트할 객체의 매니페스트를 만듭니다.

  • 객체를 업데이트하기 위해 Lambda 함수를 호출하는 배치 작업을 생성합니다.

  • Lambda 함수를 삭제합니다.

  • 버전이 지정된 버킷을 비운 다음 삭제합니다.

이 예제는 GitHub에서 가장 잘 볼 수 있습니다. 전체 소스 코드와 설정 및 실행 방법에 대한 지침은 GitHub에서 전체 예제를 참조하세요.

이 예제에서 사용되는 서비스
  • Amazon S3

다음 코드 예제는 Amazon S3에 대용량 파일을 업로드하고 Amazon S3에서 대용량 파일을 다운로드하는 방법을 보여줍니다.

자세한 내용은 멀티파트 업로드를 사용하여 객체 업로드를 참조하십시오.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

사용 가능한 여러 전송 관리자 설정을 사용하여 파일을 전송하는 함수를 생성합니다. 콜백 클래스를 사용하여 파일 전송 중에 콜백 진행률을 작성합니다.

import sys import threading import boto3 from boto3.s3.transfer import TransferConfig MB = 1024 * 1024 s3 = boto3.resource("s3") class TransferCallback: """ Handle callbacks from the transfer manager. The transfer manager periodically calls the __call__ method throughout the upload and download process so that it can take action, such as displaying progress to the user and collecting data about the transfer. """ def __init__(self, target_size): self._target_size = target_size self._total_transferred = 0 self._lock = threading.Lock() self.thread_info = {} def __call__(self, bytes_transferred): """ The callback method that is called by the transfer manager. Display progress during file transfer and collect per-thread transfer data. This method can be called by multiple threads, so shared instance data is protected by a thread lock. """ thread = threading.current_thread() with self._lock: self._total_transferred += bytes_transferred if thread.ident not in self.thread_info.keys(): self.thread_info[thread.ident] = bytes_transferred else: self.thread_info[thread.ident] += bytes_transferred target = self._target_size * MB sys.stdout.write( f"\r{self._total_transferred} of {target} transferred " f"({(self._total_transferred / target) * 100:.2f}%)." ) sys.stdout.flush() def upload_with_default_configuration( local_file_path, bucket_name, object_key, file_size_mb ): """ Upload a file from a local folder to an Amazon S3 bucket, using the default configuration. """ transfer_callback = TransferCallback(file_size_mb) s3.Bucket(bucket_name).upload_file( local_file_path, object_key, Callback=transfer_callback ) return transfer_callback.thread_info def upload_with_chunksize_and_meta( local_file_path, bucket_name, object_key, file_size_mb, metadata=None ): """ Upload a file from a local folder to an Amazon S3 bucket, setting a multipart chunk size and adding metadata to the Amazon S3 object. The multipart chunk size controls the size of the chunks of data that are sent in the request. A smaller chunk size typically results in the transfer manager using more threads for the upload. The metadata is a set of key-value pairs that are stored with the object in Amazon S3. """ transfer_callback = TransferCallback(file_size_mb) config = TransferConfig(multipart_chunksize=1 * MB) extra_args = {"Metadata": metadata} if metadata else None s3.Bucket(bucket_name).upload_file( local_file_path, object_key, Config=config, ExtraArgs=extra_args, Callback=transfer_callback, ) return transfer_callback.thread_info def upload_with_high_threshold(local_file_path, bucket_name, object_key, file_size_mb): """ Upload a file from a local folder to an Amazon S3 bucket, setting a multipart threshold larger than the size of the file. Setting a multipart threshold larger than the size of the file results in the transfer manager sending the file as a standard upload instead of a multipart upload. """ transfer_callback = TransferCallback(file_size_mb) config = TransferConfig(multipart_threshold=file_size_mb * 2 * MB) s3.Bucket(bucket_name).upload_file( local_file_path, object_key, Config=config, Callback=transfer_callback ) return transfer_callback.thread_info def upload_with_sse( local_file_path, bucket_name, object_key, file_size_mb, sse_key=None ): """ Upload a file from a local folder to an Amazon S3 bucket, adding server-side encryption with customer-provided encryption keys to the object. When this kind of encryption is specified, Amazon S3 encrypts the object at rest and allows downloads only when the expected encryption key is provided in the download request. """ transfer_callback = TransferCallback(file_size_mb) if sse_key: extra_args = {"SSECustomerAlgorithm": "AES256", "SSECustomerKey": sse_key} else: extra_args = None s3.Bucket(bucket_name).upload_file( local_file_path, object_key, ExtraArgs=extra_args, Callback=transfer_callback ) return transfer_callback.thread_info def download_with_default_configuration( bucket_name, object_key, download_file_path, file_size_mb ): """ Download a file from an Amazon S3 bucket to a local folder, using the default configuration. """ transfer_callback = TransferCallback(file_size_mb) s3.Bucket(bucket_name).Object(object_key).download_file( download_file_path, Callback=transfer_callback ) return transfer_callback.thread_info def download_with_single_thread( bucket_name, object_key, download_file_path, file_size_mb ): """ Download a file from an Amazon S3 bucket to a local folder, using a single thread. """ transfer_callback = TransferCallback(file_size_mb) config = TransferConfig(use_threads=False) s3.Bucket(bucket_name).Object(object_key).download_file( download_file_path, Config=config, Callback=transfer_callback ) return transfer_callback.thread_info def download_with_high_threshold( bucket_name, object_key, download_file_path, file_size_mb ): """ Download a file from an Amazon S3 bucket to a local folder, setting a multipart threshold larger than the size of the file. Setting a multipart threshold larger than the size of the file results in the transfer manager sending the file as a standard download instead of a multipart download. """ transfer_callback = TransferCallback(file_size_mb) config = TransferConfig(multipart_threshold=file_size_mb * 2 * MB) s3.Bucket(bucket_name).Object(object_key).download_file( download_file_path, Config=config, Callback=transfer_callback ) return transfer_callback.thread_info def download_with_sse( bucket_name, object_key, download_file_path, file_size_mb, sse_key ): """ Download a file from an Amazon S3 bucket to a local folder, adding a customer-provided encryption key to the request. When this kind of encryption is specified, Amazon S3 encrypts the object at rest and allows downloads only when the expected encryption key is provided in the download request. """ transfer_callback = TransferCallback(file_size_mb) if sse_key: extra_args = {"SSECustomerAlgorithm": "AES256", "SSECustomerKey": sse_key} else: extra_args = None s3.Bucket(bucket_name).Object(object_key).download_file( download_file_path, ExtraArgs=extra_args, Callback=transfer_callback ) return transfer_callback.thread_info

전송 관리자 기능을 시연하고 결과를 보고합니다.

import hashlib import os import platform import shutil import time import boto3 from boto3.s3.transfer import TransferConfig from botocore.exceptions import ClientError from botocore.exceptions import ParamValidationError from botocore.exceptions import NoCredentialsError import file_transfer MB = 1024 * 1024 # These configuration attributes affect both uploads and downloads. CONFIG_ATTRS = ( "multipart_threshold", "multipart_chunksize", "max_concurrency", "use_threads", ) # These configuration attributes affect only downloads. DOWNLOAD_CONFIG_ATTRS = ("max_io_queue", "io_chunksize", "num_download_attempts") class TransferDemoManager: """ Manages the demonstration. Collects user input from a command line, reports transfer results, maintains a list of artifacts created during the demonstration, and cleans them up after the demonstration is completed. """ def __init__(self): self._s3 = boto3.resource("s3") self._chore_list = [] self._create_file_cmd = None self._size_multiplier = 0 self.file_size_mb = 30 self.demo_folder = None self.demo_bucket = None self._setup_platform_specific() self._terminal_width = shutil.get_terminal_size(fallback=(80, 80))[0] def collect_user_info(self): """ Collect local folder and Amazon S3 bucket name from the user. These locations are used to store files during the demonstration. """ while not self.demo_folder: self.demo_folder = input( "Which file folder do you want to use to store " "demonstration files? " ) if not os.path.isdir(self.demo_folder): print(f"{self.demo_folder} isn't a folder!") self.demo_folder = None while not self.demo_bucket: self.demo_bucket = input( "Which Amazon S3 bucket do you want to use to store " "demonstration files? " ) try: self._s3.meta.client.head_bucket(Bucket=self.demo_bucket) except ParamValidationError as err: print(err) self.demo_bucket = None except ClientError as err: print(err) print( f"Either {self.demo_bucket} doesn't exist or you don't " f"have access to it." ) self.demo_bucket = None def demo( self, question, upload_func, download_func, upload_args=None, download_args=None ): """Run a demonstration. Ask the user if they want to run this specific demonstration. If they say yes, create a file on the local path, upload it using the specified upload function, then download it using the specified download function. """ if download_args is None: download_args = {} if upload_args is None: upload_args = {} question = question.format(self.file_size_mb) answer = input(f"{question} (y/n)") if answer.lower() == "y": local_file_path, object_key, download_file_path = self._create_demo_file() file_transfer.TransferConfig = self._config_wrapper( TransferConfig, CONFIG_ATTRS ) self._report_transfer_params( "Uploading", local_file_path, object_key, **upload_args ) start_time = time.perf_counter() thread_info = upload_func( local_file_path, self.demo_bucket, object_key, self.file_size_mb, **upload_args, ) end_time = time.perf_counter() self._report_transfer_result(thread_info, end_time - start_time) file_transfer.TransferConfig = self._config_wrapper( TransferConfig, CONFIG_ATTRS + DOWNLOAD_CONFIG_ATTRS ) self._report_transfer_params( "Downloading", object_key, download_file_path, **download_args ) start_time = time.perf_counter() thread_info = download_func( self.demo_bucket, object_key, download_file_path, self.file_size_mb, **download_args, ) end_time = time.perf_counter() self._report_transfer_result(thread_info, end_time - start_time) def last_name_set(self): """Get the name set used for the last demo.""" return self._chore_list[-1] def cleanup(self): """ Remove files from the demo folder, and uploaded objects from the Amazon S3 bucket. """ print("-" * self._terminal_width) for local_file_path, s3_object_key, downloaded_file_path in self._chore_list: print(f"Removing {local_file_path}") try: os.remove(local_file_path) except FileNotFoundError as err: print(err) print(f"Removing {downloaded_file_path}") try: os.remove(downloaded_file_path) except FileNotFoundError as err: print(err) if self.demo_bucket: print(f"Removing {self.demo_bucket}:{s3_object_key}") try: self._s3.Bucket(self.demo_bucket).Object(s3_object_key).delete() except ClientError as err: print(err) def _setup_platform_specific(self): """Set up platform-specific command used to create a large file.""" if platform.system() == "Windows": self._create_file_cmd = "fsutil file createnew {} {}" self._size_multiplier = MB elif platform.system() == "Linux" or platform.system() == "Darwin": self._create_file_cmd = f"dd if=/dev/urandom of={{}} " f"bs={MB} count={{}}" self._size_multiplier = 1 else: raise EnvironmentError( f"Demo of platform {platform.system()} isn't supported." ) def _create_demo_file(self): """ Create a file in the demo folder specified by the user. Store the local path, object name, and download path for later cleanup. Only the local file is created by this method. The Amazon S3 object and download file are created later during the demonstration. Returns: A tuple that contains the local file path, object name, and download file path. """ file_name_template = "TestFile{}-{}.demo" local_suffix = "local" object_suffix = "s3object" download_suffix = "downloaded" file_tag = len(self._chore_list) + 1 local_file_path = os.path.join( self.demo_folder, file_name_template.format(file_tag, local_suffix) ) s3_object_key = file_name_template.format(file_tag, object_suffix) downloaded_file_path = os.path.join( self.demo_folder, file_name_template.format(file_tag, download_suffix) ) filled_cmd = self._create_file_cmd.format( local_file_path, self.file_size_mb * self._size_multiplier ) print( f"Creating file of size {self.file_size_mb} MB " f"in {self.demo_folder} by running:" ) print(f"{'':4}{filled_cmd}") os.system(filled_cmd) chore = (local_file_path, s3_object_key, downloaded_file_path) self._chore_list.append(chore) return chore def _report_transfer_params(self, verb, source_name, dest_name, **kwargs): """Report configuration and extra arguments used for a file transfer.""" print("-" * self._terminal_width) print(f"{verb} {source_name} ({self.file_size_mb} MB) to {dest_name}") if kwargs: print("With extra args:") for arg, value in kwargs.items(): print(f'{"":4}{arg:<20}: {value}') @staticmethod def ask_user(question): """ Ask the user a yes or no question. Returns: True when the user answers 'y' or 'Y'; otherwise, False. """ answer = input(f"{question} (y/n) ") return answer.lower() == "y" @staticmethod def _config_wrapper(func, config_attrs): def wrapper(*args, **kwargs): config = func(*args, **kwargs) print("With configuration:") for attr in config_attrs: print(f'{"":4}{attr:<20}: {getattr(config, attr)}') return config return wrapper @staticmethod def _report_transfer_result(thread_info, elapsed): """Report the result of a transfer, including per-thread data.""" print(f"\nUsed {len(thread_info)} threads.") for ident, byte_count in thread_info.items(): print(f"{'':4}Thread {ident} copied {byte_count} bytes.") print(f"Your transfer took {elapsed:.2f} seconds.") def main(): """ Run the demonstration script for s3_file_transfer. """ demo_manager = TransferDemoManager() demo_manager.collect_user_info() # Upload and download with default configuration. Because the file is 30 MB # and the default multipart_threshold is 8 MB, both upload and download are # multipart transfers. demo_manager.demo( "Do you want to upload and download a {} MB file " "using the default configuration?", file_transfer.upload_with_default_configuration, file_transfer.download_with_default_configuration, ) # Upload and download with multipart_threshold set higher than the size of # the file. This causes the transfer manager to use standard transfers # instead of multipart transfers. demo_manager.demo( "Do you want to upload and download a {} MB file " "as a standard (not multipart) transfer?", file_transfer.upload_with_high_threshold, file_transfer.download_with_high_threshold, ) # Upload with specific chunk size and additional metadata. # Download with a single thread. demo_manager.demo( "Do you want to upload a {} MB file with a smaller chunk size and " "then download the same file using a single thread?", file_transfer.upload_with_chunksize_and_meta, file_transfer.download_with_single_thread, upload_args={ "metadata": { "upload_type": "chunky", "favorite_color": "aqua", "size": "medium", } }, ) # Upload using server-side encryption with customer-provided # encryption keys. # Generate a 256-bit key from a passphrase. sse_key = hashlib.sha256("demo_passphrase".encode("utf-8")).digest() demo_manager.demo( "Do you want to upload and download a {} MB file using " "server-side encryption?", file_transfer.upload_with_sse, file_transfer.download_with_sse, upload_args={"sse_key": sse_key}, download_args={"sse_key": sse_key}, ) # Download without specifying an encryption key to show that the # encryption key must be included to download an encrypted object. if demo_manager.ask_user( "Do you want to try to download the encrypted " "object without sending the required key?" ): try: _, object_key, download_file_path = demo_manager.last_name_set() file_transfer.download_with_default_configuration( demo_manager.demo_bucket, object_key, download_file_path, demo_manager.file_size_mb, ) except ClientError as err: print( "Got expected error when trying to download an encrypted " "object without specifying encryption info:" ) print(f"{'':4}{err}") # Remove all created and downloaded files, remove all objects from # S3 storage. if demo_manager.ask_user( "Demonstration complete. Do you want to remove local files " "and S3 objects?" ): demo_manager.cleanup() if __name__ == "__main__": try: main() except NoCredentialsError as error: print(error) print( "To run this example, you must have valid credentials in " "a shared credential file or set in environment variables." )

다음 코드 예제에서는 다음과 같은 작업을 수행하는 방법을 보여줍니다.

  • 버전이 지정된 S3 버킷을 생성합니다.

  • 객체의 모든 버전을 가져옵니다.

  • 객체를 이전 버전으로 롤백합니다.

  • 버전이 지정된 객체를 삭제하고 복원합니다.

  • 객체의 모든 버전을 영구 삭제합니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

S3 작업을 래핑하는 함수를 만듭니다.

def create_versioned_bucket(bucket_name, prefix): """ Creates an Amazon S3 bucket, enables it for versioning, and configures a lifecycle that expires noncurrent object versions after 7 days. Adding a lifecycle configuration to a versioned bucket is a best practice. It helps prevent objects in the bucket from accumulating a large number of noncurrent versions, which can slow down request performance. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket_name: The name of the bucket to create. :param prefix: Identifies which objects are automatically expired under the configured lifecycle rules. :return: The newly created bucket. """ try: bucket = s3.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ "LocationConstraint": s3.meta.client.meta.region_name }, ) logger.info("Created bucket %s.", bucket.name) except ClientError as error: if error.response["Error"]["Code"] == "BucketAlreadyOwnedByYou": logger.warning("Bucket %s already exists! Using it.", bucket_name) bucket = s3.Bucket(bucket_name) else: logger.exception("Couldn't create bucket %s.", bucket_name) raise try: bucket.Versioning().enable() logger.info("Enabled versioning on bucket %s.", bucket.name) except ClientError: logger.exception("Couldn't enable versioning on bucket %s.", bucket.name) raise try: expiration = 7 bucket.LifecycleConfiguration().put( LifecycleConfiguration={ "Rules": [ { "Status": "Enabled", "Prefix": prefix, "NoncurrentVersionExpiration": {"NoncurrentDays": expiration}, } ] } ) logger.info( "Configured lifecycle to expire noncurrent versions after %s days " "on bucket %s.", expiration, bucket.name, ) except ClientError as error: logger.warning( "Couldn't configure lifecycle on bucket %s because %s. " "Continuing anyway.", bucket.name, error, ) return bucket def rollback_object(bucket, object_key, version_id): """ Rolls back an object to an earlier version by deleting all versions that occurred after the specified rollback version. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that holds the object to roll back. :param object_key: The object to roll back. :param version_id: The version ID to roll back to. """ # Versions must be sorted by last_modified date because delete markers are # at the end of the list even when they are interspersed in time. versions = sorted( bucket.object_versions.filter(Prefix=object_key), key=attrgetter("last_modified"), reverse=True, ) logger.debug( "Got versions:\n%s", "\n".join( [ f"\t{version.version_id}, last modified {version.last_modified}" for version in versions ] ), ) if version_id in [ver.version_id for ver in versions]: print(f"Rolling back to version {version_id}") for version in versions: if version.version_id != version_id: version.delete() print(f"Deleted version {version.version_id}") else: break print(f"Active version is now {bucket.Object(object_key).version_id}") else: raise KeyError( f"{version_id} was not found in the list of versions for " f"{object_key}." ) def revive_object(bucket, object_key): """ Revives a versioned object that was deleted by removing the object's active delete marker. A versioned object presents as deleted when its latest version is a delete marker. By removing the delete marker, we make the previous version the latest version and the object then presents as *not* deleted. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that contains the object. :param object_key: The object to revive. """ # Get the latest version for the object. response = s3.meta.client.list_object_versions( Bucket=bucket.name, Prefix=object_key, MaxKeys=1 ) if "DeleteMarkers" in response: latest_version = response["DeleteMarkers"][0] if latest_version["IsLatest"]: logger.info( "Object %s was indeed deleted on %s. Let's revive it.", object_key, latest_version["LastModified"], ) obj = bucket.Object(object_key) obj.Version(latest_version["VersionId"]).delete() logger.info( "Revived %s, active version is now %s with body '%s'", object_key, obj.version_id, obj.get()["Body"].read(), ) else: logger.warning( "Delete marker is not the latest version for %s!", object_key ) elif "Versions" in response: logger.warning("Got an active version for %s, nothing to do.", object_key) else: logger.error("Couldn't get any version info for %s.", object_key) def permanently_delete_object(bucket, object_key): """ Permanently deletes a versioned object by deleting all of its versions. Usage is shown in the usage_demo_single_object function at the end of this module. :param bucket: The bucket that contains the object. :param object_key: The object to delete. """ try: bucket.object_versions.filter(Prefix=object_key).delete() logger.info("Permanently deleted all versions of object %s.", object_key) except ClientError: logger.exception("Couldn't delete all versions of %s.", object_key) raise

버전이 지정된 개체에 시의 스탠자를 업로드하고 해당 개체에 대해 일련의 작업을 수행합니다.

def usage_demo_single_object(obj_prefix="demo-versioning/"): """ Demonstrates usage of versioned object functions. This demo uploads a stanza of a poem and performs a series of revisions, deletions, and revivals on it. :param obj_prefix: The prefix to assign to objects created by this demo. """ with open("father_william.txt") as file: stanzas = file.read().split("\n\n") width = get_terminal_size((80, 20))[0] print("-" * width) print("Welcome to the usage demonstration of Amazon S3 versioning.") print( "This demonstration uploads a single stanza of a poem to an Amazon " "S3 bucket and then applies various revisions to it." ) print("-" * width) print("Creating a version-enabled bucket for the demo...") bucket = create_versioned_bucket("bucket-" + str(uuid.uuid1()), obj_prefix) print("\nThe initial version of our stanza:") print(stanzas[0]) # Add the first stanza and revise it a few times. print("\nApplying some revisions to the stanza...") obj_stanza_1 = bucket.Object(f"{obj_prefix}stanza-1") obj_stanza_1.put(Body=bytes(stanzas[0], "utf-8")) obj_stanza_1.put(Body=bytes(stanzas[0].upper(), "utf-8")) obj_stanza_1.put(Body=bytes(stanzas[0].lower(), "utf-8")) obj_stanza_1.put(Body=bytes(stanzas[0][::-1], "utf-8")) print( "The latest version of the stanza is now:", obj_stanza_1.get()["Body"].read().decode("utf-8"), sep="\n", ) # Versions are returned in order, most recent first. obj_stanza_1_versions = bucket.object_versions.filter(Prefix=obj_stanza_1.key) print( "The version data of the stanza revisions:", *[ f" {version.version_id}, last modified {version.last_modified}" for version in obj_stanza_1_versions ], sep="\n", ) # Rollback two versions. print("\nRolling back two versions...") rollback_object(bucket, obj_stanza_1.key, list(obj_stanza_1_versions)[2].version_id) print( "The latest version of the stanza:", obj_stanza_1.get()["Body"].read().decode("utf-8"), sep="\n", ) # Delete the stanza print("\nDeleting the stanza...") obj_stanza_1.delete() try: obj_stanza_1.get() except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": print("The stanza is now deleted (as expected).") else: raise # Revive the stanza print("\nRestoring the stanza...") revive_object(bucket, obj_stanza_1.key) print( "The stanza is restored! The latest version is again:", obj_stanza_1.get()["Body"].read().decode("utf-8"), sep="\n", ) # Permanently delete all versions of the object. This cannot be undone! print("\nPermanently deleting all versions of the stanza...") permanently_delete_object(bucket, obj_stanza_1.key) obj_stanza_1_versions = bucket.object_versions.filter(Prefix=obj_stanza_1.key) if len(list(obj_stanza_1_versions)) == 0: print("The stanza has been permanently deleted and now has no versions.") else: print("Something went wrong. The stanza still exists!") print(f"\nRemoving {bucket.name}...") bucket.delete() print(f"{bucket.name} deleted.") print("Demo done!")

서버리스 예제

다음 코드 예제는 S3 버킷에 객체를 업로드하여 트리거된 이벤트를 수신하는 Lambda 함수를 구현하는 방법을 보여줍니다. 해당 함수는 이벤트 파라미터에서 S3 버킷 이름과 객체 키를 검색하고 Amazon S3 API를 호출하여 객체의 콘텐츠 유형을 검색하고 로깅합니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Python을 사용하여 Lambda로 S3 이벤트를 사용합니다.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import json import urllib.parse import boto3 print('Loading function') s3 = boto3.client('s3') def lambda_handler(event, context): #print("Received event: " + json.dumps(event, indent=2)) # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') try: response = s3.get_object(Bucket=bucket, Key=key) print("CONTENT TYPE: " + response['ContentType']) return response['ContentType'] except Exception as e: print(e) print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) raise e