IAM 使用 SDK for Python (Boto3) 的範例

下列程式碼範例示範如何使用 AWS SDK for Python (Boto3) 搭配 來執行動作和實作常見案例IAM。

基本知識是程式碼範例,示範如何在 服務中執行基本操作。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會示範如何呼叫個別服務函數,但您可以在其相關案例中查看內容中的動作。

案例是程式碼範例,示範如何透過呼叫服務內的多個函數或與其他 結合來完成特定任務 AWS 服務。



下列程式碼範例示範如何使用 IAM。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

import boto3 def main(): """ Lists the managed policies in your AWS account using the AWS SDK for Python (Boto3). """ iam = boto3.client("iam") try: # Get a paginator for the list_policies operation paginator = iam.get_paginator("list_policies") # Iterate through the pages of results for page in paginator.paginate(Scope="All", OnlyAttached=False): for policy in page["Policies"]: print(f"Policy name: {policy['PolicyName']}") print(f" Policy ARN: {policy['Arn']}") except boto3.exceptions.BotoCoreError as e: print(f"Encountered an error while listing policies: {e}") if __name__ == "__main__": main()
  如需API詳細資訊,請參閱 ListPolicies 中的 AWS SDK for Python (Boto3) API參考




為了避免安全風險,在開發專用軟體或使用真實資料時,請勿使用IAM使用者進行身分驗證。相反地,搭配使用聯合功能和身分提供者,例如 AWS IAM Identity Center

  • 建立沒有許可的使用者。

  • 建立一個可授予許可的角色,以列出帳戶的 Amazon S3 儲存貯體。

  • 新增政策,讓使用者擔任該角色。

  • 使用暫時憑證,擔任角色並列出 Amazon S3 儲存貯體,然後清理資源。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立IAM使用者和角色,以授予列出 Amazon S3 儲存貯體的許可。使用者只有擔任該角色的權利。擔任角色後,請使用暫時性憑證列出該帳戶的儲存貯體。

import json import sys import time from uuid import uuid4 import boto3 from botocore.exceptions import ClientError def progress_bar(seconds): """Shows a simple progress bar in the command window.""" for _ in range(seconds): time.sleep(1) print(".", end="") sys.stdout.flush() print() def setup(iam_resource): """ Creates a new user with no permissions. Creates an access key pair for the user. Creates a role with a policy that lets the user assume the role. Creates a policy that allows listing Amazon S3 buckets. Attaches the policy to the role. Creates an inline policy for the user that lets the user assume the role. :param iam_resource: A Boto3 AWS Identity and Access Management (IAM) resource that has permissions to create users, roles, and policies in the account. :return: The newly created user, user key, and role. """ try: user = iam_resource.create_user(UserName=f"demo-user-{uuid4()}") print(f"Created user {user.name}.") except ClientError as error: print( f"Couldn't create a user for the demo. Here's why: " f"{error.response['Error']['Message']}" ) raise try: user_key = user.create_access_key_pair() print(f"Created access key pair for user.") except ClientError as error: print( f"Couldn't create access keys for user {user.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise print(f"Wait for user to be ready.", end="") progress_bar(10) try: role = iam_resource.create_role( RoleName=f"demo-role-{uuid4()}", AssumeRolePolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"AWS": user.arn}, "Action": "sts:AssumeRole", } ], } ), ) print(f"Created role {role.name}.") except ClientError as error: print( f"Couldn't create a role for the demo. Here's why: " f"{error.response['Error']['Message']}" ) raise try: policy = iam_resource.create_policy( PolicyName=f"demo-policy-{uuid4()}", PolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "s3:ListAllMyBuckets", "Resource": "arn:aws:s3:::*", } ], } ), ) role.attach_policy(PolicyArn=policy.arn) print(f"Created policy {policy.policy_name} and attached it to the role.") except ClientError as error: print( f"Couldn't create a policy and attach it to role {role.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise try: user.create_policy( PolicyName=f"demo-user-policy-{uuid4()}", PolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": role.arn, } ], } ), ) print( f"Created an inline policy for {user.name} that lets the user assume " f"the role." ) except ClientError as error: print( f"Couldn't create an inline policy for user {user.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise print("Give AWS time to propagate these new resources and connections.", end="") progress_bar(10) return user, user_key, role def show_access_denied_without_role(user_key): """ Shows that listing buckets without first assuming the role is not allowed. :param user_key: The key of the user created during setup. This user does not have permission to list buckets in the account. """ print(f"Try to list buckets without first assuming the role.") s3_denied_resource = boto3.resource( "s3", aws_access_key_id=user_key.id, aws_secret_access_key=user_key.secret ) try: for bucket in s3_denied_resource.buckets.all(): print(bucket.name) raise RuntimeError("Expected to get AccessDenied error when listing buckets!") except ClientError as error: if error.response["Error"]["Code"] == "AccessDenied": print("Attempt to list buckets with no permissions: AccessDenied.") else: raise def list_buckets_from_assumed_role(user_key, assume_role_arn, session_name): """ Assumes a role that grants permission to list the Amazon S3 buckets in the account. Uses the temporary credentials from the role to list the buckets that are owned by the assumed role's account. :param user_key: The access key of a user that has permission to assume the role. :param assume_role_arn: The Amazon Resource Name (ARN) of the role that grants access to list the other account's buckets. :param session_name: The name of the STS session. """ sts_client = boto3.client( "sts", aws_access_key_id=user_key.id, aws_secret_access_key=user_key.secret ) try: response = sts_client.assume_role( RoleArn=assume_role_arn, RoleSessionName=session_name ) temp_credentials = response["Credentials"] print(f"Assumed role {assume_role_arn} and got temporary credentials.") except ClientError as error: print( f"Couldn't assume role {assume_role_arn}. Here's why: " f"{error.response['Error']['Message']}" ) raise # Create an S3 resource that can access the account with the temporary credentials. s3_resource = boto3.resource( "s3", aws_access_key_id=temp_credentials["AccessKeyId"], aws_secret_access_key=temp_credentials["SecretAccessKey"], aws_session_token=temp_credentials["SessionToken"], ) print(f"Listing buckets for the assumed role's account:") try: for bucket in s3_resource.buckets.all(): print(bucket.name) except ClientError as error: print( f"Couldn't list buckets for the account. Here's why: " f"{error.response['Error']['Message']}" ) raise def teardown(user, role): """ Removes all resources created during setup. :param user: The demo user. :param role: The demo role. """ try: for attached in role.attached_policies.all(): policy_name = attached.policy_name role.detach_policy(PolicyArn=attached.arn) attached.delete() print(f"Detached and deleted {policy_name}.") role.delete() print(f"Deleted {role.name}.") except ClientError as error: print( "Couldn't detach policy, delete policy, or delete role. Here's why: " f"{error.response['Error']['Message']}" ) raise try: for user_pol in user.policies.all(): user_pol.delete() print("Deleted inline user policy.") for key in user.access_keys.all(): key.delete() print("Deleted user's access key.") user.delete() print(f"Deleted {user.name}.") except ClientError as error: print( "Couldn't delete user policy or delete user. Here's why: " f"{error.response['Error']['Message']}" ) def usage_demo(): """Drives the demonstration.""" print("-" * 88) print(f"Welcome to the IAM create user and assume role demo.") print("-" * 88) iam_resource = boto3.resource("iam") user = None role = None try: user, user_key, role = setup(iam_resource) print(f"Created {user.name} and {role.name}.") show_access_denied_without_role(user_key) list_buckets_from_assumed_role(user_key, role.arn, "AssumeRoleDemoSession") except Exception: print("Something went wrong!") finally: if user is not None and role is not None: teardown(user, role) print("Thanks for watching!") if __name__ == "__main__": usage_demo()


下列程式碼範例示範如何使用 AttachRolePolicy

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

使用 Boto3 Policy 物件將政策連接至角色。

def attach_to_role(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Policy(policy_arn).attach_role(RoleName=role_name) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise

使用 Boto3 Role 物件將政策連接至角色。

def attach_policy(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise
  如需API詳細資訊,請參閱 AttachRolePolicy 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 AttachUserPolicy

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def attach_policy(user_name, policy_arn): """ Attaches a policy to a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to user %s.", policy_arn, user_name) except ClientError: logger.exception("Couldn't attach policy %s to user %s.", policy_arn, user_name) raise
  如需API詳細資訊,請參閱 AttachUserPolicy 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 CreateAccessKey

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair
  如需API詳細資訊,請參閱 CreateAccessKey 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 CreateAccountAlias

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def create_alias(alias): """ Creates an alias for the current account. The alias can be used in place of the account ID in the sign-in URL. An account can have only one alias. When a new alias is created, it replaces any existing alias. :param alias: The alias to assign to the account. """ try: iam.create_account_alias(AccountAlias=alias) logger.info("Created an alias '%s' for your account.", alias) except ClientError: logger.exception("Couldn't create alias '%s' for your account.", alias) raise
  如需API詳細資訊,請參閱 CreateAccountAlias 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 CreateInstanceProfile

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。


class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def create_instance_profile( self, policy_file: str, policy_name: str, role_name: str, profile_name: str, aws_managed_policies: Tuple[str, ...] = (), ) -> str: """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = self.create_policy(policy_file, policy_name) self.create_role(role_name, assume_role_doc) self.attach_policy(role_name, policy_arn, aws_managed_policies) try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) log.error(f"Full error:\n\t{err}") return profile_arn
  如需API詳細資訊,請參閱 CreateInstanceProfile 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 CreatePolicy

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy
  如需API詳細資訊,請參閱 CreatePolicy 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 CreatePolicyVersion

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def create_policy_version(policy_arn, actions, resource_arn, set_as_default): """ Creates a policy version. Policies can have up to five versions. The default version is the one that is used for all resources that reference the policy. :param policy_arn: The ARN of the policy. :param actions: The actions to allow in the policy version. :param resource_arn: The ARN of the resource this policy version applies to. :param set_as_default: When True, this policy version is set as the default version for the policy. Otherwise, the default is not changed. :return: The newly created policy version. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.Policy(policy_arn) policy_version = policy.create_version( PolicyDocument=json.dumps(policy_doc), SetAsDefault=set_as_default ) logger.info( "Created policy version %s for policy %s.", policy_version.version_id, policy_version.arn, ) except ClientError: logger.exception("Couldn't create a policy version for %s.", policy_arn) raise else: return policy_version
  如需API詳細資訊,請參閱 CreatePolicyVersion 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 CreateRole

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def create_role(role_name, allowed_services): """ Creates a role that lets a list of specified services assume the role. :param role_name: The name of the role. :param allowed_services: The services that can assume the role. :return: The newly created role. """ trust_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": service}, "Action": "sts:AssumeRole", } for service in allowed_services ], } try: role = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy) ) logger.info("Created role %s.", role.name) except ClientError: logger.exception("Couldn't create role %s.", role_name) raise else: return role
  如需API詳細資訊,請參閱 CreateRole 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 CreateServiceLinkedRole

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def create_service_linked_role(service_name, description): """ Creates a service-linked role. :param service_name: The name of the service that owns the role. :param description: A description to give the role. :return: The newly created role. """ try: response = iam.meta.client.create_service_linked_role( AWSServiceName=service_name, Description=description ) role = iam.Role(response["Role"]["RoleName"]) logger.info("Created service-linked role %s.", role.name) except ClientError: logger.exception("Couldn't create service-linked role for %s.", service_name) raise else: return role

下列程式碼範例示範如何使用 CreateUser

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def create_user(user_name): """ Creates a user. By default, a user has no permissions or access keys. :param user_name: The name of the user. :return: The newly created user. """ try: user = iam.create_user(UserName=user_name) logger.info("Created user %s.", user.name) except ClientError: logger.exception("Couldn't create user %s.", user_name) raise else: return user
  如需API詳細資訊,請參閱 CreateUser 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 DeleteAccessKey

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise
  如需API詳細資訊,請參閱 DeleteAccessKey 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 DeleteAccountAlias

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def delete_alias(alias): """ Removes the alias from the current account. :param alias: The alias to remove. """ try: iam.meta.client.delete_account_alias(AccountAlias=alias) logger.info("Removed alias '%s' from your account.", alias) except ClientError: logger.exception("Couldn't remove alias '%s' from your account.", alias) raise
  如需API詳細資訊,請參閱 DeleteAccountAlias 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 DeleteInstanceProfile

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。


class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def delete_instance_profile(self, profile_name: str, role_name: str) -> None: """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: log.error( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name )
  如需API詳細資訊,請參閱 DeleteInstanceProfile 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 DeletePolicy

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise
  如需API詳細資訊,請參閱 DeletePolicy 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 DeleteRole

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def delete_role(role_name): """ Deletes a role. :param role_name: The name of the role to delete. """ try: iam.Role(role_name).delete() logger.info("Deleted role %s.", role_name) except ClientError: logger.exception("Couldn't delete role %s.", role_name) raise
  如需API詳細資訊,請參閱 DeleteRole 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 DeleteUser

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def delete_user(user_name): """ Deletes a user. Before a user can be deleted, all associated resources, such as access keys and policies, must be deleted or detached. :param user_name: The name of the user. """ try: iam.User(user_name).delete() logger.info("Deleted user %s.", user_name) except ClientError: logger.exception("Couldn't delete user %s.", user_name) raise
  如需API詳細資訊,請參閱 DeleteUser 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 DetachRolePolicy

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

使用 Boto3 Policy 物件將政策與角色分離。

def detach_from_role(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Policy(policy_arn).detach_role(RoleName=role_name) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise

使用 Boto3 Role 物件將政策與角色分離。

def detach_policy(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise
  如需API詳細資訊,請參閱 DetachRolePolicy 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 DetachUserPolicy

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def detach_policy(user_name, policy_arn): """ Detaches a policy from a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from user %s.", policy_arn, user_name) except ClientError: logger.exception( "Couldn't detach policy %s from user %s.", policy_arn, user_name ) raise
  如需API詳細資訊,請參閱 DetachUserPolicy 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 GenerateCredentialReport

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def generate_credential_report(): """ Starts generation of a credentials report about the current account. After calling this function to generate the report, call get_credential_report to get the latest report. A new report can be generated a minimum of four hours after the last one was generated. """ try: response = iam.meta.client.generate_credential_report() logger.info( "Generating credentials report for your account. " "Current state is %s.", response["State"], ) except ClientError: logger.exception("Couldn't generate a credentials report for your account.") raise else: return response

下列程式碼範例示範如何使用 GetAccessKeyLastUsed

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def get_last_use(key_id): """ Gets information about when and how a key was last used. :param key_id: The ID of the key to look up. :return: Information about the key's last use. """ try: response = iam.meta.client.get_access_key_last_used(AccessKeyId=key_id) last_used_date = response["AccessKeyLastUsed"].get("LastUsedDate", None) last_service = response["AccessKeyLastUsed"].get("ServiceName", None) logger.info( "Key %s was last used by %s on %s to access %s.", key_id, response["UserName"], last_used_date, last_service, ) except ClientError: logger.exception("Couldn't get last use of key %s.", key_id) raise else: return response
  如需API詳細資訊,請參閱 GetAccessKeyLastUsed 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 GetAccountAuthorizationDetails

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def get_authorization_details(response_filter): """ Gets an authorization detail report for the current account. :param response_filter: A list of resource types to include in the report, such as users or roles. When not specified, all resources are included. :return: The authorization detail report. """ try: account_details = iam.meta.client.get_account_authorization_details( Filter=response_filter ) logger.debug(account_details) except ClientError: logger.exception("Couldn't get details for your account.") raise else: return account_details

下列程式碼範例示範如何使用 GetAccountPasswordPolicy

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def print_password_policy(): """ Prints the password policy for the account. """ try: pw_policy = iam.AccountPasswordPolicy() print("Current account password policy:") print( f"\tallow_users_to_change_password: {pw_policy.allow_users_to_change_password}" ) print(f"\texpire_passwords: {pw_policy.expire_passwords}") print(f"\thard_expiry: {pw_policy.hard_expiry}") print(f"\tmax_password_age: {pw_policy.max_password_age}") print(f"\tminimum_password_length: {pw_policy.minimum_password_length}") print(f"\tpassword_reuse_prevention: {pw_policy.password_reuse_prevention}") print( f"\trequire_lowercase_characters: {pw_policy.require_lowercase_characters}" ) print(f"\trequire_numbers: {pw_policy.require_numbers}") print(f"\trequire_symbols: {pw_policy.require_symbols}") print( f"\trequire_uppercase_characters: {pw_policy.require_uppercase_characters}" ) printed = True except ClientError as error: if error.response["Error"]["Code"] == "NoSuchEntity": print("The account does not have a password policy set.") else: logger.exception("Couldn't get account password policy.") raise else: return printed

下列程式碼範例示範如何使用 GetAccountSummary

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def get_summary(): """ Gets a summary of account usage. :return: The summary of account usage. """ try: summary = iam.AccountSummary() logger.debug(summary.summary_map) except ClientError: logger.exception("Couldn't get a summary for your account.") raise else: return summary.summary_map
  如需API詳細資訊,請參閱 GetAccountSummary 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 GetCredentialReport

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def get_credential_report(): """ Gets the most recently generated credentials report about the current account. :return: The credentials report. """ try: response = iam.meta.client.get_credential_report() logger.debug(response["Content"]) except ClientError: logger.exception("Couldn't get credentials report.") raise else: return response["Content"]
  如需API詳細資訊,請參閱 GetCredentialReport 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 GetPolicy

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement
  如需API詳細資訊,請參閱 GetPolicy 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 GetPolicyVersion

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement
  如需API詳細資訊,請參閱 GetPolicyVersion 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 GetRole

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def get_role(role_name): """ Gets a role by name. :param role_name: The name of the role to retrieve. :return: The specified role. """ try: role = iam.Role(role_name) role.load() # calls GetRole to load attributes logger.info("Got role with arn %s.", role.arn) except ClientError: logger.exception("Couldn't get role named %s.", role_name) raise else: return role
  如需API詳細資訊,請參閱 GetRole 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 ListAccessKeys

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_keys(user_name): """ Lists the keys owned by the specified user. :param user_name: The name of the user. :return: The list of keys owned by the user. """ try: keys = list(iam.User(user_name).access_keys.all()) logger.info("Got %s access keys for %s.", len(keys), user_name) except ClientError: logger.exception("Couldn't get access keys for %s.", user_name) raise else: return keys
  如需API詳細資訊,請參閱 ListAccessKeys 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 ListAccountAliases

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_aliases(): """ Gets the list of aliases for the current account. An account has at most one alias. :return: The list of aliases for the account. """ try: response = iam.meta.client.list_account_aliases() aliases = response["AccountAliases"] if len(aliases) > 0: logger.info("Got aliases for your account: %s.", ",".join(aliases)) else: logger.info("Got no aliases for your account.") except ClientError: logger.exception("Couldn't list aliases for your account.") raise else: return response["AccountAliases"]
  如需API詳細資訊,請參閱 ListAccountAliases 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 ListAttachedRolePolicies

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_attached_policies(role_name): """ Lists policies attached to a role. :param role_name: The name of the role to query. """ try: role = iam.Role(role_name) for policy in role.attached_policies.all(): logger.info("Got policy %s.", policy.arn) except ClientError: logger.exception("Couldn't list attached policies for %s.", role_name) raise

下列程式碼範例示範如何使用 ListGroups

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_groups(count): """ Lists the specified number of groups for the account. :param count: The number of groups to list. """ try: for group in iam.groups.limit(count): logger.info("Group: %s", group.name) except ClientError: logger.exception("Couldn't list groups for the account.") raise
  如需API詳細資訊,請參閱 ListGroups 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 ListPolicies

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_policies(scope): """ Lists the policies in the current account. :param scope: Limits the kinds of policies that are returned. For example, 'Local' specifies that only locally managed policies are returned. :return: The list of policies. """ try: policies = list(iam.policies.filter(Scope=scope)) logger.info("Got %s policies in scope '%s'.", len(policies), scope) except ClientError: logger.exception("Couldn't get policies for scope '%s'.", scope) raise else: return policies
  如需API詳細資訊,請參閱 ListPolicies 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 ListRolePolicies

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_policies(role_name): """ Lists inline policies for a role. :param role_name: The name of the role to query. """ try: role = iam.Role(role_name) for policy in role.policies.all(): logger.info("Got inline policy %s.", policy.name) except ClientError: logger.exception("Couldn't list inline policies for %s.", role_name) raise
  如需API詳細資訊,請參閱 ListRolePolicies 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 ListRoles

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_roles(count): """ Lists the specified number of roles for the account. :param count: The number of roles to list. """ try: roles = list(iam.roles.limit(count=count)) for role in roles: logger.info("Role: %s", role.name) except ClientError: logger.exception("Couldn't list roles for the account.") raise else: return roles
  如需API詳細資訊,請參閱 ListRoles 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 ListSAMLProviders

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_saml_providers(count): """ Lists the SAML providers for the account. :param count: The maximum number of providers to list. """ try: found = 0 for provider in iam.saml_providers.limit(count): logger.info("Got SAML provider %s.", provider.arn) found += 1 if found == 0: logger.info("Your account has no SAML providers.") except ClientError: logger.exception("Couldn't list SAML providers.") raise
  如需API詳細資訊,請參閱 中的 ListSAMLProviders AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 ListUsers

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def list_users(): """ Lists the users in the current account. :return: The list of users. """ try: users = list(iam.users.all()) logger.info("Got %s users.", len(users)) except ClientError: logger.exception("Couldn't get users.") raise else: return users
  如需API詳細資訊,請參閱 ListUsers 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 UpdateAccessKey

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def update_key(user_name, key_id, activate): """ Updates the status of a key. :param user_name: The user that owns the key. :param key_id: The ID of the key to update. :param activate: When True, the key is activated. Otherwise, the key is deactivated. """ try: key = iam.User(user_name).AccessKey(key_id) if activate: key.activate() else: key.deactivate() logger.info("%s key %s.", "Activated" if activate else "Deactivated", key_id) except ClientError: logger.exception( "Couldn't %s key %s.", "Activate" if activate else "Deactivate", key_id ) raise
  如需API詳細資訊,請參閱 UpdateAccessKey 中的 AWS SDK for Python (Boto3) API參考

下列程式碼範例示範如何使用 UpdateUser

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def update_user(user_name, new_user_name): """ Updates a user's name. :param user_name: The current name of the user to update. :param new_user_name: The new name to assign to the user. :return: The updated user. """ try: user = iam.User(user_name) user.update(NewUserName=new_user_name) logger.info("Renamed %s to %s.", user_name, new_user_name) except ClientError: logger.exception("Couldn't update name for user %s.", user_name) raise return user
  如需API詳細資訊,請參閱 UpdateUser 中的 AWS SDK for Python (Boto3) API參考


下列程式碼範例會示範如何建立負載平衡的 Web 服務,以傳回書籍、影片和歌曲建議。此範例顯示服務如何回應失故障,以及如何在發生故障時重組服務以提高復原能力。

  • 使用 Amazon EC2 Auto Scaling 群組,根據啟動範本建立 Amazon Elastic Compute Cloud (Amazon EC2) 執行個體,並保留指定範圍內的執行個體數量。

  • 使用 Elastic Load Balancing 處理和分發HTTP請求。

  • 監控 Auto Scaling 群組中執行個體的運作狀態,並且只將請求轉送給運作良好的執行個體。

  • 在每個EC2執行個體上執行 Python Web 伺服器來處理HTTP請求。Web 伺服器會回應建議和運作狀態檢查。

  • 使用 Amazon DynamoDB 資料表模擬建議服務。

  • 透過更新 AWS Systems Manager 參數來控制 Web 伺服器對請求和運作狀態檢查的回應。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。


class Runner: """ Manages the deployment, demonstration, and destruction of resources for the resilient service. """ def __init__( self, resource_path: str, recommendation: RecommendationService, autoscaler: AutoScalingWrapper, loadbalancer: ElasticLoadBalancerWrapper, param_helper: ParameterHelper, ): """ Initializes the Runner class with the necessary parameters. :param resource_path: The path to resource files used by this example, such as IAM policies and instance scripts. :param recommendation: An instance of the RecommendationService class. :param autoscaler: An instance of the AutoScaler class. :param loadbalancer: An instance of the LoadBalancer class. :param param_helper: An instance of the ParameterHelper class. """ self.resource_path = resource_path self.recommendation = recommendation self.autoscaler = autoscaler self.loadbalancer = loadbalancer self.param_helper = param_helper self.protocol = "HTTP" self.port = 80 self.ssh_port = 22 prefix = "doc-example-resilience" self.target_group_name = f"{prefix}-tg" self.load_balancer_name = f"{prefix}-lb" def deploy(self) -> None: """ Deploys the resources required for the resilient service, including the DynamoDB table, EC2 instances, Auto Scaling group, and load balancer. """ recommendations_path = f"{self.resource_path}/recommendations.json" startup_script = f"{self.resource_path}/server_startup_script.sh" instance_policy = f"{self.resource_path}/instance_policy.json" logging.info("Starting deployment of resources for the resilient service.") logging.info( "Creating and populating DynamoDB table '%s'.", self.recommendation.table_name, ) self.recommendation.create() self.recommendation.populate(recommendations_path) logging.info( "Creating an EC2 launch template with the startup script '%s'.", startup_script, ) self.autoscaler.create_template(startup_script, instance_policy) logging.info( "Creating an EC2 Auto Scaling group across multiple Availability Zones." ) zones = self.autoscaler.create_autoscaling_group(3) logging.info("Creating variables that control the flow of the demo.") self.param_helper.reset() logging.info("Creating Elastic Load Balancing target group and load balancer.") vpc = self.autoscaler.get_default_vpc() subnets = self.autoscaler.get_subnets(vpc["VpcId"], zones) target_group = self.loadbalancer.create_target_group( self.target_group_name, self.protocol, self.port, vpc["VpcId"] ) self.loadbalancer.create_load_balancer( self.load_balancer_name, [subnet["SubnetId"] for subnet in subnets] ) self.loadbalancer.create_listener(self.load_balancer_name, target_group) self.autoscaler.attach_load_balancer_target_group(target_group) logging.info("Verifying access to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) current_ip_address = requests.get("http://checkip.amazonaws.com").text.strip() if not lb_success: logging.warning( "Couldn't connect to the load balancer. Verifying that the port is open..." ) sec_group, port_is_open = self.autoscaler.verify_inbound_port( vpc, self.port, current_ip_address ) sec_group, ssh_port_is_open = self.autoscaler.verify_inbound_port( vpc, self.ssh_port, current_ip_address ) if not port_is_open: logging.warning( "The default security group for your VPC must allow access from this computer." ) if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound traffic on port {self.port} from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.port, current_ip_address ) if not ssh_port_is_open: if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound SSH traffic on port {self.ssh_port} for debugging from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.ssh_port, current_ip_address ) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) if lb_success: logging.info( "Load balancer is ready. Access it at: http://%s", current_ip_address ) else: logging.error( "Couldn't get a successful response from the load balancer endpoint. Please verify your VPC and security group settings." ) def demo_choices(self) -> None: """ Presents choices for interacting with the deployed service, such as sending requests to the load balancer or checking the health of the targets. """ actions = [ "Send a GET request to the load balancer endpoint.", "Check the health of load balancer targets.", "Go to the next part of the demo.", ] choice = 0 while choice != 2: logging.info("Choose an action to interact with the service.") choice = q.choose("Which action would you like to take? ", actions) if choice == 0: logging.info("Sending a GET request to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) logging.info("GET http://%s", endpoint) response = requests.get(f"http://{endpoint}") logging.info("Response: %s", response.status_code) if response.headers.get("content-type") == "application/json": pp(response.json()) elif choice == 1: logging.info("Checking the health of load balancer targets.") health = self.loadbalancer.check_target_health(self.target_group_name) for target in health: state = target["TargetHealth"]["State"] logging.info( "Target %s on port %d is %s", target["Target"]["Id"], target["Target"]["Port"], state, ) if state != "healthy": logging.warning( "%s: %s", target["TargetHealth"]["Reason"], target["TargetHealth"]["Description"], ) logging.info( "Note that it can take a minute or two for the health check to update." ) elif choice == 2: logging.info("Proceeding to the next part of the demo.") def demo(self) -> None: """ Runs the demonstration, showing how the service responds to different failure scenarios and how a resilient architecture can keep the service running. """ ssm_only_policy = f"{self.resource_path}/ssm_only_policy.json" logging.info("Resetting parameters to starting values for the demo.") self.param_helper.reset() logging.info( "Starting demonstration of the service's resilience under various failure conditions." ) self.demo_choices() logging.info( "Simulating failure by changing the Systems Manager parameter to a non-existent table." ) self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info("Sending GET requests will now return failure codes.") self.demo_choices() logging.info("Switching to static response mode to mitigate failure.") self.param_helper.put(self.param_helper.failure_response, "static") logging.info("Sending GET requests will now return static responses.") self.demo_choices() logging.info("Restoring normal operation of the recommendation service.") self.param_helper.put(self.param_helper.table, self.recommendation.table_name) logging.info( "Introducing a failure by assigning bad credentials to one of the instances." ) self.autoscaler.create_instance_profile( ssm_only_policy, self.autoscaler.bad_creds_policy_name, self.autoscaler.bad_creds_role_name, self.autoscaler.bad_creds_profile_name, ["AmazonSSMManagedInstanceCore"], ) instances = self.autoscaler.get_instances() bad_instance_id = instances[0] instance_profile = self.autoscaler.get_instance_profile(bad_instance_id) logging.info( "Replacing instance profile with bad credentials for instance %s.", bad_instance_id, ) self.autoscaler.replace_instance_profile( bad_instance_id, self.autoscaler.bad_creds_profile_name, instance_profile["AssociationId"], ) logging.info( "Sending GET requests may return either a valid recommendation or a static response." ) self.demo_choices() logging.info("Implementing deep health checks to detect unhealthy instances.") self.param_helper.put(self.param_helper.health_check, "deep") logging.info("Checking the health of the load balancer targets.") self.demo_choices() logging.info( "Terminating the unhealthy instance to let the auto scaler replace it." ) self.autoscaler.terminate_instance(bad_instance_id) logging.info("The service remains resilient during instance replacement.") self.demo_choices() logging.info("Simulating a complete failure of the recommendation service.") self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info( "All instances will report as unhealthy, but the service will still return static responses." ) self.demo_choices() self.param_helper.reset() def destroy(self, automation=False) -> None: """ Destroys all resources created for the demo, including the load balancer, Auto Scaling group, EC2 instances, and DynamoDB table. """ logging.info( "This concludes the demo. Preparing to clean up all AWS resources created during the demo." ) if automation: cleanup = True else: cleanup = q.ask( "Do you want to clean up all demo resources? (y/n) ", q.is_yesno ) if cleanup: logging.info("Deleting load balancer and related resources.") self.loadbalancer.delete_load_balancer(self.load_balancer_name) self.loadbalancer.delete_target_group(self.target_group_name) self.autoscaler.delete_autoscaling_group(self.autoscaler.group_name) self.autoscaler.delete_key_pair() self.autoscaler.delete_template() self.autoscaler.delete_instance_profile( self.autoscaler.bad_creds_profile_name, self.autoscaler.bad_creds_role_name, ) logging.info("Deleting DynamoDB table and other resources.") self.recommendation.destroy() else: logging.warning( "Resources have not been deleted. Ensure you clean them up manually to avoid unexpected charges." ) def main() -> None: """ Main function to parse arguments and run the appropriate actions for the demo. """ parser = argparse.ArgumentParser() parser.add_argument( "--action", required=True, choices=["all", "deploy", "demo", "destroy"], help="The action to take for the demo. When 'all' is specified, resources are\n" "deployed, the demo is run, and resources are destroyed.", ) parser.add_argument( "--resource_path", default="../../../workflows/resilient_service/resources", help="The path to resource files used by this example, such as IAM policies and\n" "instance scripts.", ) args = parser.parse_args() logging.info("Starting the Resilient Service demo.") prefix = "doc-example-resilience" # Service Clients ddb_client = boto3.client("dynamodb") elb_client = boto3.client("elbv2") autoscaling_client = boto3.client("autoscaling") ec2_client = boto3.client("ec2") ssm_client = boto3.client("ssm") iam_client = boto3.client("iam") # Wrapper instantiations recommendation = RecommendationService( "doc-example-recommendation-service", ddb_client ) autoscaling_wrapper = AutoScalingWrapper( prefix, "t3.micro", "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2", autoscaling_client, ec2_client, ssm_client, iam_client, ) elb_wrapper = ElasticLoadBalancerWrapper(elb_client) param_helper = ParameterHelper(recommendation.table_name, ssm_client) # Demo invocation runner = Runner( args.resource_path, recommendation, autoscaling_wrapper, elb_wrapper, param_helper, ) actions = [args.action] if args.action != "all" else ["deploy", "demo", "destroy"] for action in actions: if action == "deploy": runner.deploy() elif action == "demo": runner.demo() elif action == "destroy": runner.destroy() logging.info("Demo completed successfully.") if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") main()

建立包裝 Auto Scaling 和 Amazon EC2動作的類別。

class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def create_policy(self, policy_file: str, policy_name: str) -> str: """ Creates a new IAM policy or retrieves the ARN of an existing policy. :param policy_file: The path to a JSON file that contains the policy definition. :param policy_name: The name to give the created policy. :return: The ARN of the created or existing policy. """ with open(policy_file) as file: policy_doc = file.read() try: response = self.iam_client.create_policy( PolicyName=policy_name, PolicyDocument=policy_doc ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' created successfully. ARN: {policy_arn}") return policy_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the policy already exists, get its ARN response = self.iam_client.get_policy( PolicyArn=f"arn:aws:iam::{self.account_id}:policy/{policy_name}" ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' already exists. ARN: {policy_arn}") return policy_arn log.error(f"Full error:\n\t{err}") def create_role(self, role_name: str, assume_role_doc: dict) -> str: """ Creates a new IAM role or retrieves the ARN of an existing role. :param role_name: The name to give the created role. :param assume_role_doc: The assume role policy document that specifies which entities can assume the role. :return: The ARN of the created or existing role. """ try: response = self.iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_doc) ) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' created successfully. ARN: {role_arn}") return role_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the role already exists, get its ARN response = self.iam_client.get_role(RoleName=role_name) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' already exists. ARN: {role_arn}") return role_arn log.error(f"Full error:\n\t{err}") def attach_policy( self, role_name: str, policy_arn: str, aws_managed_policies: Tuple[str, ...] = (), ) -> None: """ Attaches an IAM policy to a role and optionally attaches additional AWS-managed policies. :param role_name: The name of the role to attach the policy to. :param policy_arn: The ARN of the policy to attach. :param aws_managed_policies: A tuple of AWS-managed policy names to attach to the role. """ try: self.iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) for aws_policy in aws_managed_policies: self.iam_client.attach_role_policy( RoleName=role_name, PolicyArn=f"arn:aws:iam::aws:policy/{aws_policy}", ) log.info(f"Attached policy {policy_arn} to role {role_name}.") except ClientError as err: log.error(f"Failed to attach policy {policy_arn} to role {role_name}.") log.error(f"Full error:\n\t{err}") def create_instance_profile( self, policy_file: str, policy_name: str, role_name: str, profile_name: str, aws_managed_policies: Tuple[str, ...] = (), ) -> str: """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = self.create_policy(policy_file, policy_name) self.create_role(role_name, assume_role_doc) self.attach_policy(role_name, policy_arn, aws_managed_policies) try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) log.error(f"Full error:\n\t{err}") return profile_arn def get_instance_profile(self, instance_id: str) -> Dict[str, Any]: """ Gets data about the profile associated with an instance. :param instance_id: The ID of the instance to look up. :return: The profile data. """ try: response = self.ec2_client.describe_iam_instance_profile_associations( Filters=[{"Name": "instance-id", "Values": [instance_id]}] ) if not response["IamInstanceProfileAssociations"]: log.info(f"No instance profile found for instance {instance_id}.") profile_data = response["IamInstanceProfileAssociations"][0] log.info(f"Retrieved instance profile for instance {instance_id}.") return profile_data except ClientError as err: log.error( f"Failed to retrieve instance profile for instance {instance_id}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidInstanceID.NotFound": log.error(f"The instance ID '{instance_id}' does not exist.") log.error(f"Full error:\n\t{err}") def replace_instance_profile( self, instance_id: str, new_instance_profile_name: str, profile_association_id: str, ) -> None: """ Replaces the profile associated with a running instance. After the profile is replaced, the instance is rebooted to ensure that it uses the new profile. When the instance is ready, Systems Manager is used to restart the Python web server. :param instance_id: The ID of the instance to restart. :param new_instance_profile_name: The name of the new profile to associate with the specified instance. :param profile_association_id: The ID of the existing profile association for the instance. """ try: self.ec2_client.replace_iam_instance_profile_association( IamInstanceProfile={"Name": new_instance_profile_name}, AssociationId=profile_association_id, ) log.info( "Replaced instance profile for association %s with profile %s.", profile_association_id, new_instance_profile_name, ) time.sleep(5) self.ec2_client.reboot_instances(InstanceIds=[instance_id]) log.info("Rebooting instance %s.", instance_id) waiter = self.ec2_client.get_waiter("instance_running") log.info("Waiting for instance %s to be running.", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info("Instance %s is now running.", instance_id) self.ssm_client.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", Parameters={"commands": ["cd / && sudo python3 server.py 80"]}, ) log.info(f"Restarted the Python web server on instance '{instance_id}'.") except ClientError as err: log.error("Failed to replace instance profile.") error_code = err.response["Error"]["Code"] if error_code == "InvalidAssociationID.NotFound": log.error( f"Association ID '{profile_association_id}' does not exist." "Please check the association ID and try again." ) if error_code == "InvalidInstanceId": log.error( f"The specified instance ID '{instance_id}' does not exist or is not available for SSM. " f"Please verify the instance ID and try again." ) log.error(f"Full error:\n\t{err}") def delete_instance_profile(self, profile_name: str, role_name: str) -> None: """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: log.error( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name ) def create_key_pair(self, key_pair_name: str) -> None: """ Creates a new key pair. :param key_pair_name: The name of the key pair to create. """ try: response = self.ec2_client.create_key_pair(KeyName=key_pair_name) with open(f"{key_pair_name}.pem", "w") as file: file.write(response["KeyMaterial"]) chmod(f"{key_pair_name}.pem", 0o600) log.info("Created key pair %s.", key_pair_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to create key pair {key_pair_name}.") if error_code == "InvalidKeyPair.Duplicate": log.error(f"A key pair with the name '{key_pair_name}' already exists.") log.error(f"Full error:\n\t{err}") def delete_key_pair(self) -> None: """ Deletes a key pair. """ try: self.ec2_client.delete_key_pair(KeyName=self.key_pair_name) remove(f"{self.key_pair_name}.pem") log.info("Deleted key pair %s.", self.key_pair_name) except ClientError as err: log.error(f"Couldn't delete key pair '{self.key_pair_name}'.") log.error(f"Full error:\n\t{err}") except FileNotFoundError as err: log.info("Key pair %s doesn't exist, nothing to do.", self.key_pair_name) log.error(f"Full error:\n\t{err}") def create_template( self, server_startup_script_file: str, instance_policy_file: str ) -> Dict[str, Any]: """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. The launch template specifies a Bash script in its user data field that runs after the instance is started. This script installs Python packages and starts a Python web server on the instance. :param server_startup_script_file: The path to a Bash script file that is run when an instance starts. :param instance_policy_file: The path to a file that defines a permissions policy to create and attach to the instance profile. :return: Information about the newly created template. """ template = {} try: # Create key pair and instance profile self.create_key_pair(self.key_pair_name) self.create_instance_profile( instance_policy_file, self.instance_policy_name, self.instance_role_name, self.instance_profile_name, ) # Read the startup script with open(server_startup_script_file) as file: start_server_script = file.read() # Get the latest AMI ID ami_latest = self.ssm_client.get_parameter(Name=self.ami_param) ami_id = ami_latest["Parameter"]["Value"] # Create the launch template lt_response = self.ec2_client.create_launch_template( LaunchTemplateName=self.launch_template_name, LaunchTemplateData={ "InstanceType": self.inst_type, "ImageId": ami_id, "IamInstanceProfile": {"Name": self.instance_profile_name}, "UserData": base64.b64encode( start_server_script.encode(encoding="utf-8") ).decode(encoding="utf-8"), "KeyName": self.key_pair_name, }, ) template = lt_response["LaunchTemplate"] log.info( f"Created launch template {self.launch_template_name} for AMI {ami_id} on {self.inst_type}." ) except ClientError as err: log.error(f"Failed to create launch template {self.launch_template_name}.") error_code = err.response["Error"]["Code"] if error_code == "InvalidLaunchTemplateName.AlreadyExistsException": log.info( f"Launch template {self.launch_template_name} already exists, nothing to do." ) log.error(f"Full error:\n\t{err}") return template def delete_template(self): """ Deletes a launch template. """ try: self.ec2_client.delete_launch_template( LaunchTemplateName=self.launch_template_name ) self.delete_instance_profile( self.instance_profile_name, self.instance_role_name ) log.info("Launch template %s deleted.", self.launch_template_name) except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.NotFoundException" ): log.info( "Launch template %s does not exist, nothing to do.", self.launch_template_name, ) log.error(f"Full error:\n\t{err}") def get_availability_zones(self) -> List[str]: """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone["ZoneName"] for zone in response["AvailabilityZones"]] log.info(f"Retrieved {len(zones)} availability zones: {zones}.") except ClientError as err: log.error("Failed to retrieve availability zones.") log.error(f"Full error:\n\t{err}") else: return zones def create_autoscaling_group(self, group_size: int) -> List[str]: """ Creates an EC2 Auto Scaling group with the specified size. :param group_size: The number of instances to set for the minimum and maximum in the group. :return: The list of Availability Zones specified for the group. """ try: zones = self.get_availability_zones() self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=self.group_name, AvailabilityZones=zones, LaunchTemplate={ "LaunchTemplateName": self.launch_template_name, "Version": "$Default", }, MinSize=group_size, MaxSize=group_size, ) log.info( f"Created EC2 Auto Scaling group {self.group_name} with availability zones {zones}." ) except ClientError as err: error_code = err.response["Error"]["Code"] if error_code == "AlreadyExists": log.info( f"EC2 Auto Scaling group {self.group_name} already exists, nothing to do." ) else: log.error(f"Failed to create EC2 Auto Scaling group {self.group_name}.") log.error(f"Full error:\n\t{err}") else: return zones def get_instances(self) -> List[str]: """ Gets data about the instances in the EC2 Auto Scaling group. :return: A list of instance IDs in the Auto Scaling group. """ try: as_response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name] ) instance_ids = [ i["InstanceId"] for i in as_response["AutoScalingGroups"][0]["Instances"] ] log.info( f"Retrieved {len(instance_ids)} instances for Auto Scaling group {self.group_name}." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to retrieve instances for Auto Scaling group {self.group_name}." ) if error_code == "ResourceNotFound": log.error(f"The Auto Scaling group '{self.group_name}' does not exist.") log.error(f"Full error:\n\t{err}") else: return instance_ids def terminate_instance(self, instance_id: str, decrementsetting=False) -> None: """ Terminates an instance in an EC2 Auto Scaling group. After an instance is terminated, it can no longer be accessed. :param instance_id: The ID of the instance to terminate. :param decrementsetting: If True, do not replace terminated instances. """ try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=decrementsetting, ) log.info("Terminated instance %s.", instance_id) # Adding a waiter to ensure the instance is terminated waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for instance %s to be terminated...", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info( f"Instance '{instance_id}' has been terminated and will be replaced." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to terminate instance '{instance_id}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to terminate the instance again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) log.error(f"Full error:\n\t{err}") def attach_load_balancer_target_group( self, lb_target_group: Dict[str, Any] ) -> None: """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forwards requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group["TargetGroupArn"]], ) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group["TargetGroupName"], self.group_name, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to attach load balancer target group '{lb_target_group['TargetGroupName']}'." ) if error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) elif error_code == "ServiceLinkedRoleFailure": log.error( "The operation failed because the service-linked role is not ready or does not exist. " "Check that the service-linked role exists and is correctly configured." ) log.error(f"Full error:\n\t{err}") def delete_autoscaling_group(self, group_name: str) -> None: """ Terminates all instances in the group, then deletes the EC2 Auto Scaling group. :param group_name: The name of the group to delete. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[group_name] ) groups = response.get("AutoScalingGroups", []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=group_name, MinSize=0 ) instance_ids = [inst["InstanceId"] for inst in groups[0]["Instances"]] for inst_id in instance_ids: self.terminate_instance(inst_id) # Wait for all instances to be terminated if instance_ids: waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for all instances to be terminated...") waiter.wait(InstanceIds=instance_ids) log.info("All instances have been terminated.") else: log.info(f"No groups found named '{group_name}'! Nothing to do.") except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete Auto Scaling group '{group_name}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to delete the group again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the group." ) log.error(f"Full error:\n\t{err}") def get_default_vpc(self) -> Dict[str, Any]: """ Gets the default VPC for the account. :return: Data about the default VPC. """ try: response = self.ec2_client.describe_vpcs( Filters=[{"Name": "is-default", "Values": ["true"]}] ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error("Failed to retrieve the default VPC.") if error_code == "UnauthorizedOperation": log.error( "You do not have the necessary permissions to describe VPCs. " "Ensure that your AWS IAM user or role has the correct permissions." ) elif error_code == "InvalidParameterValue": log.error( "One or more parameters are invalid. Check the request parameters." ) log.error(f"Full error:\n\t{err}") else: if "Vpcs" in response and response["Vpcs"]: log.info(f"Retrieved default VPC: {response['Vpcs'][0]['VpcId']}") return response["Vpcs"][0] else: pass def verify_inbound_port( self, vpc: Dict[str, Any], port: int, ip_address: str ) -> Tuple[Dict[str, Any], bool]: """ Verify the default security group of the specified VPC allows ingress from this computer. This can be done by allowing ingress from this computer's IP address. In some situations, such as connecting from a corporate network, you must instead specify a prefix list ID. You can also temporarily open the port to any IP address while running this example. If you do, be sure to remove public access when you're done. :param vpc: The VPC used by this example. :param port: The port to verify. :param ip_address: This computer's IP address. :return: The default security group of the specified VPC, and a value that indicates whether the specified port is open. """ try: response = self.ec2_client.describe_security_groups( Filters=[ {"Name": "group-name", "Values": ["default"]}, {"Name": "vpc-id", "Values": [vpc["VpcId"]]}, ] ) sec_group = response["SecurityGroups"][0] port_is_open = False log.info(f"Found default security group {sec_group['GroupId']}.") for ip_perm in sec_group["IpPermissions"]: if ip_perm.get("FromPort", 0) == port: log.info(f"Found inbound rule: {ip_perm}") for ip_range in ip_perm["IpRanges"]: cidr = ip_range.get("CidrIp", "") if cidr.startswith(ip_address) or cidr == "": port_is_open = True if ip_perm["PrefixListIds"]: port_is_open = True if not port_is_open: log.info( f"The inbound rule does not appear to be open to either this computer's IP " f"address of {ip_address}, to all IP addresses (, or to a prefix list ID." ) else: break except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to verify inbound rule for port {port} for VPC {vpc['VpcId']}." ) if error_code == "InvalidVpcID.NotFound": log.error( f"The specified VPC ID '{vpc['VpcId']}' does not exist. Please check the VPC ID." ) log.error(f"Full error:\n\t{err}") else: return sec_group, port_is_open def open_inbound_port(self, sec_group_id: str, port: int, ip_address: str) -> None: """ Add an ingress rule to the specified security group that allows access on the specified port from the specified IP address. :param sec_group_id: The ID of the security group to modify. :param port: The port to open. :param ip_address: The IP address that is granted access. """ try: self.ec2_client.authorize_security_group_ingress( GroupId=sec_group_id, CidrIp=f"{ip_address}/32", FromPort=port, ToPort=port, IpProtocol="tcp", ) log.info( "Authorized ingress to %s on port %s from %s.", sec_group_id, port, ip_address, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to authorize ingress to security group '{sec_group_id}' on port {port} from {ip_address}." ) if error_code == "InvalidGroupId.Malformed": log.error( "The security group ID is malformed. " "Please verify that the security group ID is correct." ) elif error_code == "InvalidPermission.Duplicate": log.error( "The specified rule already exists in the security group. " "Check the existing rules for this security group." ) log.error(f"Full error:\n\t{err}") def get_subnets(self, vpc_id: str, zones: List[str] = None) -> List[Dict[str, Any]]: """ Gets the default subnets in a VPC for a specified list of Availability Zones. :param vpc_id: The ID of the VPC to look up. :param zones: The list of Availability Zones to look up. :return: The list of subnets found. """ # Ensure that 'zones' is a list, even if None is passed if zones is None: zones = [] try: paginator = self.ec2_client.get_paginator("describe_subnets") page_iterator = paginator.paginate( Filters=[ {"Name": "vpc-id", "Values": [vpc_id]}, {"Name": "availability-zone", "Values": zones}, {"Name": "default-for-az", "Values": ["true"]}, ] ) subnets = [] for page in page_iterator: subnets.extend(page["Subnets"]) log.info("Found %s subnets for the specified zones.", len(subnets)) return subnets except ClientError as err: log.error( f"Failed to retrieve subnets for VPC '{vpc_id}' in zones {zones}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidVpcID.NotFound": log.error( "The specified VPC ID does not exist. " "Please check the VPC ID and try again." ) # Add more error-specific handling as needed log.error(f"Full error:\n\t{err}")

建立包裝 Elastic Load Balancing 動作的類別。

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def create_target_group( self, target_group_name: str, protocol: str, port: int, vpc_id: str ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing target group. The target group specifies how the load balancer forwards requests to instances in the group and how instance health is checked. To speed up this demo, the health check is configured with shortened times and lower thresholds. In production, you might want to decrease the sensitivity of your health checks to avoid unwanted failures. :param target_group_name: The name of the target group to create. :param protocol: The protocol to use to forward requests, such as 'HTTP'. :param port: The port to use to forward requests, such as 80. :param vpc_id: The ID of the VPC in which the load balancer exists. :return: Data about the newly created target group. """ try: response = self.elb_client.create_target_group( Name=target_group_name, Protocol=protocol, Port=port, HealthCheckPath="/healthcheck", HealthCheckIntervalSeconds=10, HealthCheckTimeoutSeconds=5, HealthyThresholdCount=2, UnhealthyThresholdCount=2, VpcId=vpc_id, ) target_group = response["TargetGroups"][0] log.info(f"Created load balancing target group '{target_group_name}'.") return target_group except ClientError as err: log.error( f"Couldn't create load balancing target group '{target_group_name}'." ) error_code = err.response["Error"]["Code"] if error_code == "DuplicateTargetGroupName": log.error( f"Target group name {target_group_name} already exists. " "Check if the target group already exists." "Consider using a different name or deleting the existing target group if appropriate." ) elif error_code == "TooManyTargetGroups": log.error( "Too many target groups exist in the account. " "Consider deleting unused target groups to create space for new ones." ) log.error(f"Full error:\n\t{err}") def delete_target_group(self, target_group_name) -> None: """ Deletes the target group. """ try: # Describe the target group to get its ARN response = self.elb_client.describe_target_groups(Names=[target_group_name]) tg_arn = response["TargetGroups"][0]["TargetGroupArn"] # Delete the target group self.elb_client.delete_target_group(TargetGroupArn=tg_arn) log.info("Deleted load balancing target group %s.", target_group_name) # Use a custom waiter to wait until the target group is no longer available self.wait_for_target_group_deletion(self.elb_client, tg_arn) log.info("Target group %s successfully deleted.", target_group_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete target group '{target_group_name}'.") if error_code == "TargetGroupNotFound": log.error( "Load balancer target group either already deleted or never existed. " "Verify the name and check that the resource exists in the AWS Console." ) elif error_code == "ResourceInUseException": log.error( "Target group still in use by another resource. " "Ensure that the target group is no longer associated with any load balancers or resources.", ) log.error(f"Full error:\n\t{err}") def wait_for_target_group_deletion( self, elb_client, target_group_arn, max_attempts=10, delay=30 ): for attempt in range(max_attempts): try: elb_client.describe_target_groups(TargetGroupArns=[target_group_arn]) print( f"Attempt {attempt + 1}: Target group {target_group_arn} still exists." ) except ClientError as e: if e.response["Error"]["Code"] == "TargetGroupNotFound": print( f"Target group {target_group_arn} has been successfully deleted." ) return else: raise time.sleep(delay) raise TimeoutError( f"Target group {target_group_arn} was not deleted after {max_attempts * delay} seconds." ) def create_load_balancer( self, load_balancer_name: str, subnet_ids: List[str], ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing load balancer that uses the specified subnets and forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create. :param subnet_ids: A list of subnets to associate with the load balancer. :return: Data about the newly created load balancer. """ try: response = self.elb_client.create_load_balancer( Name=load_balancer_name, Subnets=subnet_ids ) load_balancer = response["LoadBalancers"][0] log.info(f"Created load balancer '{load_balancer_name}'.") waiter = self.elb_client.get_waiter("load_balancer_available") log.info( f"Waiting for load balancer '{load_balancer_name}' to be available..." ) waiter.wait(Names=[load_balancer_name]) log.info(f"Load balancer '{load_balancer_name}' is now available!") except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to create load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "DuplicateLoadBalancerNameException": log.error( f"A load balancer with the name '{load_balancer_name}' already exists. " "Load balancer names must be unique within the AWS region. " "Please choose a different name and try again." ) if error_code == "TooManyLoadBalancersException": log.error( "The maximum number of load balancers has been reached in this account and region. " "You can delete unused load balancers or request an increase in the service quota from AWS Support." ) log.error(f"Full error:\n\t{err}") else: return load_balancer def create_listener( self, load_balancer_name: str, target_group: Dict[str, Any], ) -> Dict[str, Any]: """ Creates a listener for the specified load balancer that forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create a listener for. :param target_group: An existing target group that is added as a listener to the load balancer. :return: Data about the newly created listener. """ try: # Retrieve the load balancer ARN load_balancer_response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) load_balancer_arn = load_balancer_response["LoadBalancers"][0][ "LoadBalancerArn" ] # Create the listener response = self.elb_client.create_listener( LoadBalancerArn=load_balancer_arn, Protocol=target_group["Protocol"], Port=target_group["Port"], DefaultActions=[ { "Type": "forward", "TargetGroupArn": target_group["TargetGroupArn"], } ], ) log.info( f"Created listener to forward traffic from load balancer '{load_balancer_name}' to target group '{target_group['TargetGroupName']}'." ) return response["Listeners"][0] except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to add a listener on '{load_balancer_name}' for target group '{target_group['TargetGroupName']}'." ) if error_code == "ListenerNotFoundException": log.error( f"The listener could not be found for the load balancer '{load_balancer_name}'. " "Please check the load balancer name and target group configuration." ) if error_code == "InvalidConfigurationRequestException": log.error( f"The configuration provided for the listener on load balancer '{load_balancer_name}' is invalid. " "Please review the provided protocol, port, and target group settings." ) log.error(f"Full error:\n\t{err}") def delete_load_balancer(self, load_balancer_name) -> None: """ Deletes a load balancer. :param load_balancer_name: The name of the load balancer to delete. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) lb_arn = response["LoadBalancers"][0]["LoadBalancerArn"] self.elb_client.delete_load_balancer(LoadBalancerArn=lb_arn) log.info("Deleted load balancer %s.", load_balancer_name) waiter = self.elb_client.get_waiter("load_balancers_deleted") log.info("Waiting for load balancer to be deleted...") waiter.wait(Names=[load_balancer_name]) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Couldn't delete load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "LoadBalancerNotFoundException": log.error( f"The load balancer '{load_balancer_name}' does not exist. " "Please check the name and try again." ) log.error(f"Full error:\n\t{err}") def get_endpoint(self, load_balancer_name) -> str: """ Gets the HTTP endpoint of the load balancer. :return: The endpoint. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) return response["LoadBalancers"][0]["DNSName"] except ClientError as err: log.error( f"Couldn't get the endpoint for load balancer {load_balancer_name}" ) error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Verify load balancer name and ensure it exists in the AWS console." ) log.error(f"Full error:\n\t{err}") @staticmethod def verify_load_balancer_endpoint(endpoint) -> bool: """ Verify this computer can successfully send a GET request to the load balancer endpoint. :param endpoint: The endpoint to verify. :return: True if the GET request is successful, False otherwise. """ retries = 3 verified = False while not verified and retries > 0: try: lb_response = requests.get(f"http://{endpoint}") log.info( "Got response %s from load balancer endpoint.", lb_response.status_code, ) if lb_response.status_code == 200: verified = True else: retries = 0 except requests.exceptions.ConnectionError: log.info( "Got connection error from load balancer endpoint, retrying..." ) retries -= 1 time.sleep(10) return verified def check_target_health(self, target_group_name: str) -> List[Dict[str, Any]]: """ Checks the health of the instances in the target group. :return: The health status of the target group. """ try: tg_response = self.elb_client.describe_target_groups( Names=[target_group_name] ) health_response = self.elb_client.describe_target_health( TargetGroupArn=tg_response["TargetGroups"][0]["TargetGroupArn"] ) except ClientError as err: log.error(f"Couldn't check health of {target_group_name} target(s).") error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Load balancer associated with the target group was not found. " "Ensure the load balancer exists, is in the correct AWS region, and " "that you have the necessary permissions to access it.", ) elif error_code == "TargetGroupNotFoundException": log.error( "Target group was not found. " "Verify the target group name, check that it exists in the correct region, " "and ensure it has not been deleted or created in a different account.", ) log.error(f"Full error:\n\t{err}") else: return health_response["TargetHealthDescriptions"]

建立使用 DynamoDB 模擬建議服務的類別。

class RecommendationService: """ Encapsulates a DynamoDB table to use as a service that recommends books, movies, and songs. """ def __init__(self, table_name: str, dynamodb_client: boto3.client): """ Initializes the RecommendationService class with the necessary parameters. :param table_name: The name of the DynamoDB recommendations table. :param dynamodb_client: A Boto3 DynamoDB client. """ self.table_name = table_name self.dynamodb_client = dynamodb_client def create(self) -> Dict[str, Any]: """ Creates a DynamoDB table to use as a recommendation service. The table has a hash key named 'MediaType' that defines the type of media recommended, such as Book or Movie, and a range key named 'ItemId' that, combined with the MediaType, forms a unique identifier for the recommended item. :return: Data about the newly created table. :raises RecommendationServiceError: If the table creation fails. """ try: response = self.dynamodb_client.create_table( TableName=self.table_name, AttributeDefinitions=[ {"AttributeName": "MediaType", "AttributeType": "S"}, {"AttributeName": "ItemId", "AttributeType": "N"}, ], KeySchema=[ {"AttributeName": "MediaType", "KeyType": "HASH"}, {"AttributeName": "ItemId", "KeyType": "RANGE"}, ], ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5}, ) log.info("Creating table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_exists") waiter.wait(TableName=self.table_name) log.info("Table %s created.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceInUseException": log.info("Table %s exists, nothing to be done.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when creating table: {err}." ) else: return response def populate(self, data_file: str) -> None: """ Populates the recommendations table from a JSON file. :param data_file: The path to the data file. :raises RecommendationServiceError: If the table population fails. """ try: with open(data_file) as data: items = json.load(data) batch = [{"PutRequest": {"Item": item}} for item in items] self.dynamodb_client.batch_write_item(RequestItems={self.table_name: batch}) log.info( "Populated table %s with items from %s.", self.table_name, data_file ) except ClientError as err: raise RecommendationServiceError( self.table_name, f"Couldn't populate table from {data_file}: {err}" ) def destroy(self) -> None: """ Deletes the recommendations table. :raises RecommendationServiceError: If the table deletion fails. """ try: self.dynamodb_client.delete_table(TableName=self.table_name) log.info("Deleting table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_not_exists") waiter.wait(TableName=self.table_name) log.info("Table %s deleted.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": log.info("Table %s does not exist, nothing to do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when deleting table: {err}." )

建立包裝 Systems Manager 動作的類別。

class ParameterHelper: """ Encapsulates Systems Manager parameters. This example uses these parameters to drive the demonstration of resilient architecture, such as failure of a dependency or how the service responds to a health check. """ table: str = "doc-example-resilient-architecture-table" failure_response: str = "doc-example-resilient-architecture-failure-response" health_check: str = "doc-example-resilient-architecture-health-check" def __init__(self, table_name: str, ssm_client: boto3.client): """ Initializes the ParameterHelper class with the necessary parameters. :param table_name: The name of the DynamoDB table that is used as a recommendation service. :param ssm_client: A Boto3 Systems Manager client. """ self.ssm_client = ssm_client self.table_name = table_name def reset(self) -> None: """ Resets the Systems Manager parameters to starting values for the demo. These are the name of the DynamoDB recommendation table, no response when a dependency fails, and shallow health checks. """ self.put(self.table, self.table_name) self.put(self.failure_response, "none") self.put(self.health_check, "shallow") def put(self, name: str, value: str) -> None: """ Sets the value of a named Systems Manager parameter. :param name: The name of the parameter. :param value: The new value of the parameter. :raises ParameterHelperError: If the parameter value cannot be set. """ try: self.ssm_client.put_parameter( Name=name, Value=value, Overwrite=True, Type="String" ) log.info("Setting parameter %s to '%s'.", name, value) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to set parameter {name}.") if error_code == "ParameterLimitExceeded": log.error( "The parameter limit has been exceeded. " "Consider deleting unused parameters or request a limit increase." ) elif error_code == "ParameterAlreadyExists": log.error( "The parameter already exists and overwrite is set to False. " "Use Overwrite=True to update the parameter." ) log.error(f"Full error:\n\t{err}")



為了避免安全風險,在開發專用軟體或使用真實資料時,請勿使用IAM使用者進行身分驗證。相反地,搭配使用聯合功能和身分提供者,例如 AWS IAM Identity Center

  • 建立兩個IAM使用者。

  • 對一位使用者連接政策,並將物件放在 Amazon S3 儲存貯體中。

  • 為第二位使用者連接政策,以便從儲存貯體取得物件。

  • 依據使用者憑證取得儲存貯體的不同許可。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。


import logging import time import boto3 from botocore.exceptions import ClientError import access_key_wrapper import policy_wrapper logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_user(user_name): """ Creates a user. By default, a user has no permissions or access keys. :param user_name: The name of the user. :return: The newly created user. """ try: user = iam.create_user(UserName=user_name) logger.info("Created user %s.", user.name) except ClientError: logger.exception("Couldn't create user %s.", user_name) raise else: return user def update_user(user_name, new_user_name): """ Updates a user's name. :param user_name: The current name of the user to update. :param new_user_name: The new name to assign to the user. :return: The updated user. """ try: user = iam.User(user_name) user.update(NewUserName=new_user_name) logger.info("Renamed %s to %s.", user_name, new_user_name) except ClientError: logger.exception("Couldn't update name for user %s.", user_name) raise return user def list_users(): """ Lists the users in the current account. :return: The list of users. """ try: users = list(iam.users.all()) logger.info("Got %s users.", len(users)) except ClientError: logger.exception("Couldn't get users.") raise else: return users def delete_user(user_name): """ Deletes a user. Before a user can be deleted, all associated resources, such as access keys and policies, must be deleted or detached. :param user_name: The name of the user. """ try: iam.User(user_name).delete() logger.info("Deleted user %s.", user_name) except ClientError: logger.exception("Couldn't delete user %s.", user_name) raise def attach_policy(user_name, policy_arn): """ Attaches a policy to a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to user %s.", policy_arn, user_name) except ClientError: logger.exception("Couldn't attach policy %s to user %s.", policy_arn, user_name) raise def detach_policy(user_name, policy_arn): """ Detaches a policy from a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from user %s.", policy_arn, user_name) except ClientError: logger.exception( "Couldn't detach policy %s from user %s.", policy_arn, user_name ) raise


import json import logging import operator import pprint import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise


import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise

使用包裝函數建立具有不同政策的使用者,並使用其憑證存取 Amazon S3 儲存貯體。

def usage_demo(): """ Shows how to manage users, keys, and policies. This demonstration creates two users: one user who can put and get objects in an Amazon S3 bucket, and another user who can only get objects from the bucket. The demo then shows how the users can perform only the actions they are permitted to perform. """ logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management user demo.") print("-" * 88) print( "Users can have policies and roles attached to grant them specific " "permissions." ) s3 = boto3.resource("s3") bucket = s3.create_bucket( Bucket=f"demo-iam-bucket-{time.time_ns()}", CreateBucketConfiguration={ "LocationConstraint": s3.meta.client.meta.region_name }, ) print(f"Created an Amazon S3 bucket named {bucket.name}.") user_read_writer = create_user("demo-iam-read-writer") user_reader = create_user("demo-iam-reader") print(f"Created two IAM users: {user_read_writer.name} and {user_reader.name}") update_user(user_read_writer.name, "demo-iam-creator") update_user(user_reader.name, "demo-iam-getter") users = list_users() user_read_writer = next( user for user in users if user.user_id == user_read_writer.user_id ) user_reader = next(user for user in users if user.user_id == user_reader.user_id) print( f"Changed the names of the users to {user_read_writer.name} " f"and {user_reader.name}." ) read_write_policy = policy_wrapper.create_policy( "demo-iam-read-write-policy", "Grants rights to create and get an object in the demo bucket.", ["s3:PutObject", "s3:GetObject"], f"arn:aws:s3:::{bucket.name}/*", ) print( f"Created policy {read_write_policy.policy_name} with ARN: {read_write_policy.arn}" ) print(read_write_policy.description) read_policy = policy_wrapper.create_policy( "demo-iam-read-policy", "Grants rights to get an object from the demo bucket.", "s3:GetObject", f"arn:aws:s3:::{bucket.name}/*", ) print(f"Created policy {read_policy.policy_name} with ARN: {read_policy.arn}") print(read_policy.description) attach_policy(user_read_writer.name, read_write_policy.arn) print(f"Attached {read_write_policy.policy_name} to {user_read_writer.name}.") attach_policy(user_reader.name, read_policy.arn) print(f"Attached {read_policy.policy_name} to {user_reader.name}.") user_read_writer_key = access_key_wrapper.create_key(user_read_writer.name) print(f"Created access key pair for {user_read_writer.name}.") user_reader_key = access_key_wrapper.create_key(user_reader.name) print(f"Created access key pair for {user_reader.name}.") s3_read_writer_resource = boto3.resource( "s3", aws_access_key_id=user_read_writer_key.id, aws_secret_access_key=user_read_writer_key.secret, ) demo_object_key = f"object-{time.time_ns()}" demo_object = None while demo_object is None: try: demo_object = s3_read_writer_resource.Bucket(bucket.name).put_object( Key=demo_object_key, Body=b"AWS IAM demo object content!" ) except ClientError as error: if error.response["Error"]["Code"] == "InvalidAccessKeyId": print("Access key not yet available. Waiting...") time.sleep(1) else: raise print( f"Put {demo_object_key} into {bucket.name} using " f"{user_read_writer.name}'s credentials." ) read_writer_object = s3_read_writer_resource.Bucket(bucket.name).Object( demo_object_key ) read_writer_content = read_writer_object.get()["Body"].read() print(f"Got object {read_writer_object.key} using read-writer user's credentials.") print(f"Object content: {read_writer_content}") s3_reader_resource = boto3.resource( "s3", aws_access_key_id=user_reader_key.id, aws_secret_access_key=user_reader_key.secret, ) demo_content = None while demo_content is None: try: demo_object = s3_reader_resource.Bucket(bucket.name).Object(demo_object_key) demo_content = demo_object.get()["Body"].read() print(f"Got object {demo_object.key} using reader user's credentials.") print(f"Object content: {demo_content}") except ClientError as error: if error.response["Error"]["Code"] == "InvalidAccessKeyId": print("Access key not yet available. Waiting...") time.sleep(1) else: raise try: demo_object.delete() except ClientError as error: if error.response["Error"]["Code"] == "AccessDenied": print("-" * 88) print( "Tried to delete the object using the reader user's credentials. " "Got expected AccessDenied error because the reader is not " "allowed to delete objects." ) print("-" * 88) access_key_wrapper.delete_key(user_reader.name, user_reader_key.id) detach_policy(user_reader.name, read_policy.arn) policy_wrapper.delete_policy(read_policy.arn) delete_user(user_reader.name) print(f"Deleted keys, detached and deleted policy, and deleted {user_reader.name}.") access_key_wrapper.delete_key(user_read_writer.name, user_read_writer_key.id) detach_policy(user_read_writer.name, read_write_policy.arn) policy_wrapper.delete_policy(read_write_policy.arn) delete_user(user_read_writer.name) print( f"Deleted keys, detached and deleted policy, and deleted {user_read_writer.name}." ) bucket.objects.delete() bucket.delete() print(f"Emptied and deleted {bucket.name}.") print("Thanks for watching!")



為了避免安全風險,在開發專用軟體或使用真實資料時,請勿使用IAM使用者進行身分驗證。相反地,搭配使用聯合功能和身分提供者,例如 AWS IAM Identity Center

  • 建立並列出存取金鑰。

  • 找出上次使用存取金鑰的時間和方式。

  • 更新和刪除存取金鑰。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。


import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def list_keys(user_name): """ Lists the keys owned by the specified user. :param user_name: The name of the user. :return: The list of keys owned by the user. """ try: keys = list(iam.User(user_name).access_keys.all()) logger.info("Got %s access keys for %s.", len(keys), user_name) except ClientError: logger.exception("Couldn't get access keys for %s.", user_name) raise else: return keys def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair def get_last_use(key_id): """ Gets information about when and how a key was last used. :param key_id: The ID of the key to look up. :return: Information about the key's last use. """ try: response = iam.meta.client.get_access_key_last_used(AccessKeyId=key_id) last_used_date = response["AccessKeyLastUsed"].get("LastUsedDate", None) last_service = response["AccessKeyLastUsed"].get("ServiceName", None) logger.info( "Key %s was last used by %s on %s to access %s.", key_id, response["UserName"], last_used_date, last_service, ) except ClientError: logger.exception("Couldn't get last use of key %s.", key_id) raise else: return response def update_key(user_name, key_id, activate): """ Updates the status of a key. :param user_name: The user that owns the key. :param key_id: The ID of the key to update. :param activate: When True, the key is activated. Otherwise, the key is deactivated. """ try: key = iam.User(user_name).AccessKey(key_id) if activate: key.activate() else: key.deactivate() logger.info("%s key %s.", "Activated" if activate else "Deactivated", key_id) except ClientError: logger.exception( "Couldn't %s key %s.", "Activate" if activate else "Deactivate", key_id ) raise def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise


def usage_demo(): """Shows how to create and manage access keys.""" def print_keys(): """Gets and prints the current keys for a user.""" current_keys = list_keys(current_user_name) print("The current user's keys are now:") print(*[f"{key.id}: {key.status}" for key in current_keys], sep="\n") logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management access key demo.") print("-" * 88) current_user_name = iam.CurrentUser().user_name print( f"This demo creates an access key for the current user " f"({current_user_name}), manipulates the key in a few ways, and then " f"deletes it." ) all_keys = list_keys(current_user_name) if len(all_keys) == 2: print( "The current user already has the maximum of 2 access keys. To run " "this demo, either delete one of the access keys or use a user " "that has only 1 access key." ) else: new_key = create_key(current_user_name) print(f"Created a new key with id {new_key.id} and secret {new_key.secret}.") print_keys() existing_key = next(key for key in all_keys if key != new_key) last_use = get_last_use(existing_key.id)["AccessKeyLastUsed"] print( f"Key {all_keys[0].id} was last used to access {last_use['ServiceName']} " f"on {last_use['LastUsedDate']}" ) update_key(current_user_name, new_key.id, False) print(f"Key {new_key.id} is now deactivated.") print_keys() delete_key(current_user_name, new_key.id) print_keys() print("Thanks for watching!")


  • 建立並列出政策。

  • 建立並取得政策版本。

  • 將政策復原至先前的版本。

  • 刪除政策。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。


import json import logging import operator import pprint import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy def list_policies(scope): """ Lists the policies in the current account. :param scope: Limits the kinds of policies that are returned. For example, 'Local' specifies that only locally managed policies are returned. :return: The list of policies. """ try: policies = list(iam.policies.filter(Scope=scope)) logger.info("Got %s policies in scope '%s'.", len(policies), scope) except ClientError: logger.exception("Couldn't get policies for scope '%s'.", scope) raise else: return policies def create_policy_version(policy_arn, actions, resource_arn, set_as_default): """ Creates a policy version. Policies can have up to five versions. The default version is the one that is used for all resources that reference the policy. :param policy_arn: The ARN of the policy. :param actions: The actions to allow in the policy version. :param resource_arn: The ARN of the resource this policy version applies to. :param set_as_default: When True, this policy version is set as the default version for the policy. Otherwise, the default is not changed. :return: The newly created policy version. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.Policy(policy_arn) policy_version = policy.create_version( PolicyDocument=json.dumps(policy_doc), SetAsDefault=set_as_default ) logger.info( "Created policy version %s for policy %s.", policy_version.version_id, policy_version.arn, ) except ClientError: logger.exception("Couldn't create a policy version for %s.", policy_arn) raise else: return policy_version def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement def rollback_policy_version(policy_arn): """ Rolls back to the previous default policy, if it exists. 1. Gets the list of policy versions in order by date. 2. Finds the default. 3. Makes the previous policy the default. 4. Deletes the old default version. :param policy_arn: The ARN of the policy to roll back. :return: The default version of the policy after the rollback. """ try: policy_versions = sorted( iam.Policy(policy_arn).versions.all(), key=operator.attrgetter("create_date"), ) logger.info("Got %s versions for %s.", len(policy_versions), policy_arn) except ClientError: logger.exception("Couldn't get versions for %s.", policy_arn) raise default_version = None rollback_version = None try: while default_version is None: ver = policy_versions.pop() if ver.is_default_version: default_version = ver rollback_version = policy_versions.pop() rollback_version.set_as_default() logger.info("Set %s as the default version.", rollback_version.version_id) default_version.delete() logger.info("Deleted original default version %s.", default_version.version_id) except IndexError: if default_version is None: logger.warning("No default version found for %s.", policy_arn) elif rollback_version is None: logger.warning( "Default version %s found for %s, but no previous version exists, so " "nothing to roll back to.", default_version.version_id, policy_arn, ) except ClientError: logger.exception("Couldn't roll back version for %s.", policy_arn) raise else: return rollback_version def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise


def usage_demo(): """Shows how to use the policy functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management policy demo.") print("-" * 88) print( "Policies let you define sets of permissions that can be attached to " "other IAM resources, like users and roles." ) bucket_arn = f"arn:aws:s3:::made-up-bucket-name" policy = create_policy( "demo-iam-policy", "Policy for IAM demonstration.", ["s3:ListObjects"], bucket_arn, ) print(f"Created policy {policy.policy_name}.") policies = list_policies("Local") print(f"Your account has {len(policies)} managed policies:") print(*[pol.policy_name for pol in policies], sep=", ") time.sleep(1) policy_version = create_policy_version( policy.arn, ["s3:PutObject"], bucket_arn, True ) print( f"Added policy version {policy_version.version_id} to policy " f"{policy.policy_name}." ) default_statement = get_default_policy_statement(policy.arn) print(f"The default policy statement for {policy.policy_name} is:") pprint.pprint(default_statement) rollback_version = rollback_policy_version(policy.arn) print( f"Rolled back to version {rollback_version.version_id} for " f"{policy.policy_name}." ) default_statement = get_default_policy_statement(policy.arn) print(f"The default policy statement for {policy.policy_name} is now:") pprint.pprint(default_statement) delete_policy(policy.arn) print(f"Deleted policy {policy.policy_name}.") print("Thanks for watching!")


  • 建立 IAM 角色。

  • 連接和分離角色的政策。

  • 刪除角色。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。


import json import logging import pprint import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_role(role_name, allowed_services): """ Creates a role that lets a list of specified services assume the role. :param role_name: The name of the role. :param allowed_services: The services that can assume the role. :return: The newly created role. """ trust_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": service}, "Action": "sts:AssumeRole", } for service in allowed_services ], } try: role = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy) ) logger.info("Created role %s.", role.name) except ClientError: logger.exception("Couldn't create role %s.", role_name) raise else: return role def attach_policy(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise def detach_policy(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise def delete_role(role_name): """ Deletes a role. :param role_name: The name of the role to delete. """ try: iam.Role(role_name).delete() logger.info("Deleted role %s.", role_name) except ClientError: logger.exception("Couldn't delete role %s.", role_name) raise


def usage_demo(): """Shows how to use the role functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management role demo.") print("-" * 88) print( "Roles let you define sets of permissions and can be assumed by " "other entities, like users and services." ) print("The first 10 roles currently in your account are:") roles = list_roles(10) print(f"The inline policies for role {roles[0].name} are:") list_policies(roles[0].name) role = create_role( "demo-iam-role", ["lambda.amazonaws.com", "batchoperations.s3.amazonaws.com"] ) print(f"Created role {role.name}, with trust policy:") pprint.pprint(role.assume_role_policy_document) policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess" attach_policy(role.name, policy_arn) print(f"Attached policy {policy_arn} to {role.name}.") print(f"Policies attached to role {role.name} are:") list_attached_policies(role.name) detach_policy(role.name, policy_arn) print(f"Detached policy {policy_arn} from {role.name}.") delete_role(role.name) print(f"Deleted {role.name}.") print("Thanks for watching!")


  • 取得並更新帳戶別名。

  • 產生使用者及憑證的報告。

  • 取得帳戶用量摘要。

  • 取得帳戶中所有使用者、群組、角色和政策的詳細資訊,包含這些項目彼此之間的關係。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。


import logging import pprint import sys import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def list_aliases(): """ Gets the list of aliases for the current account. An account has at most one alias. :return: The list of aliases for the account. """ try: response = iam.meta.client.list_account_aliases() aliases = response["AccountAliases"] if len(aliases) > 0: logger.info("Got aliases for your account: %s.", ",".join(aliases)) else: logger.info("Got no aliases for your account.") except ClientError: logger.exception("Couldn't list aliases for your account.") raise else: return response["AccountAliases"] def create_alias(alias): """ Creates an alias for the current account. The alias can be used in place of the account ID in the sign-in URL. An account can have only one alias. When a new alias is created, it replaces any existing alias. :param alias: The alias to assign to the account. """ try: iam.create_account_alias(AccountAlias=alias) logger.info("Created an alias '%s' for your account.", alias) except ClientError: logger.exception("Couldn't create alias '%s' for your account.", alias) raise def delete_alias(alias): """ Removes the alias from the current account. :param alias: The alias to remove. """ try: iam.meta.client.delete_account_alias(AccountAlias=alias) logger.info("Removed alias '%s' from your account.", alias) except ClientError: logger.exception("Couldn't remove alias '%s' from your account.", alias) raise def generate_credential_report(): """ Starts generation of a credentials report about the current account. After calling this function to generate the report, call get_credential_report to get the latest report. A new report can be generated a minimum of four hours after the last one was generated. """ try: response = iam.meta.client.generate_credential_report() logger.info( "Generating credentials report for your account. " "Current state is %s.", response["State"], ) except ClientError: logger.exception("Couldn't generate a credentials report for your account.") raise else: return response def get_credential_report(): """ Gets the most recently generated credentials report about the current account. :return: The credentials report. """ try: response = iam.meta.client.get_credential_report() logger.debug(response["Content"]) except ClientError: logger.exception("Couldn't get credentials report.") raise else: return response["Content"] def get_summary(): """ Gets a summary of account usage. :return: The summary of account usage. """ try: summary = iam.AccountSummary() logger.debug(summary.summary_map) except ClientError: logger.exception("Couldn't get a summary for your account.") raise else: return summary.summary_map def get_authorization_details(response_filter): """ Gets an authorization detail report for the current account. :param response_filter: A list of resource types to include in the report, such as users or roles. When not specified, all resources are included. :return: The authorization detail report. """ try: account_details = iam.meta.client.get_account_authorization_details( Filter=response_filter ) logger.debug(account_details) except ClientError: logger.exception("Couldn't get details for your account.") raise else: return account_details


def usage_demo(): """Shows how to use the account functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management account demo.") print("-" * 88) print( "Setting an account alias lets you use the alias in your sign-in URL " "instead of your account number." ) old_aliases = list_aliases() if len(old_aliases) > 0: print(f"Your account currently uses '{old_aliases[0]}' as its alias.") else: print("Your account currently has no alias.") for index in range(1, 3): new_alias = f"alias-{index}-{time.time_ns()}" print(f"Setting your account alias to {new_alias}") create_alias(new_alias) current_aliases = list_aliases() print(f"Your account alias is now {current_aliases}.") delete_alias(current_aliases[0]) print(f"Your account now has no alias.") if len(old_aliases) > 0: print(f"Restoring your original alias back to {old_aliases[0]}...") create_alias(old_aliases[0]) print("-" * 88) print("You can get various reports about your account.") print("Let's generate a credentials report...") report_state = None while report_state != "COMPLETE": cred_report_response = generate_credential_report() old_report_state = report_state report_state = cred_report_response["State"] if report_state != old_report_state: print(report_state, sep="") else: print(".", sep="") sys.stdout.flush() time.sleep(1) print() cred_report = get_credential_report() col_count = 3 print(f"Got credentials report. Showing only the first {col_count} columns.") cred_lines = [ line.split(",")[:col_count] for line in cred_report.decode("utf-8").split("\n") ] col_width = max([len(item) for line in cred_lines for item in line]) + 2 for line in cred_report.decode("utf-8").split("\n"): print( "".join(element.ljust(col_width) for element in line.split(",")[:col_count]) ) print("-" * 88) print("Let's get an account summary.") summary = get_summary() print("Here's your summary:") pprint.pprint(summary) print("-" * 88) print("Let's get authorization details!") details = get_authorization_details([]) see_details = input("These are pretty long, do you want to see them (y/n)? ") if see_details.lower() == "y": pprint.pprint(details) print("-" * 88) pw_policy_created = None see_pw_policy = input("Want to see the password policy for the account (y/n)? ") if see_pw_policy.lower() == "y": while True: if print_password_policy(): break else: answer = input( "Do you want to create a default password policy (y/n)? " ) if answer.lower() == "y": pw_policy_created = iam.create_account_password_policy() else: break if pw_policy_created is not None: answer = input("Do you want to delete the password policy (y/n)? ") if answer.lower() == "y": pw_policy_created.delete() print("Password policy deleted.") print("The SAML providers for your account are:") list_saml_providers(10) print("-" * 88) print("Thanks for watching.")


  • 依日期順序取得政策版本清單。

  • 查找預設政策版本。

  • 將先前的政策版本設為預設值。

  • 刪除舊的預設版本。

SDK for Python (Boto3)

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

def rollback_policy_version(policy_arn): """ Rolls back to the previous default policy, if it exists. 1. Gets the list of policy versions in order by date. 2. Finds the default. 3. Makes the previous policy the default. 4. Deletes the old default version. :param policy_arn: The ARN of the policy to roll back. :return: The default version of the policy after the rollback. """ try: policy_versions = sorted( iam.Policy(policy_arn).versions.all(), key=operator.attrgetter("create_date"), ) logger.info("Got %s versions for %s.", len(policy_versions), policy_arn) except ClientError: logger.exception("Couldn't get versions for %s.", policy_arn) raise default_version = None rollback_version = None try: while default_version is None: ver = policy_versions.pop() if ver.is_default_version: default_version = ver rollback_version = policy_versions.pop() rollback_version.set_as_default() logger.info("Set %s as the default version.", rollback_version.version_id) default_version.delete() logger.info("Deleted original default version %s.", default_version.version_id) except IndexError: if default_version is None: logger.warning("No default version found for %s.", policy_arn) elif rollback_version is None: logger.warning( "Default version %s found for %s, but no previous version exists, so " "nothing to roll back to.", default_version.version_id, policy_arn, ) except ClientError: logger.exception("Couldn't roll back version for %s.", policy_arn) raise else: return rollback_version