Doc AWS SDK 예제 GitHub 리포지토리에서 더 많은 SDK 예제를 사용할 수 있습니다. AWS
기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
SDK for Python (Boto3)을 사용한 IAM 예제
다음 코드 예제에서는 IAM과 AWS SDK for Python (Boto3) 함께를 사용하여 작업을 수행하고 일반적인 시나리오를 구현하는 방법을 보여줍니다.
기본 사항은 서비스 내에서 필수 작업을 수행하는 방법을 보여주는 코드 예제입니다.
작업은 대규모 프로그램에서 발췌한 코드이며 컨텍스트에 맞춰 실행해야 합니다. 작업은 관련 시나리오의 컨텍스트에 따라 표시되며, 개별 서비스 함수를 직접적으로 호출하는 방법을 보여줍니다.
시나리오는 동일한 서비스 내에서 또는 다른 AWS 서비스와 결합된 상태에서 여러 함수를 호출하여 특정 태스크를 수행하는 방법을 보여주는 코드 예제입니다.
각 예시에는 전체 소스 코드에 대한 링크가 포함되어 있으며, 여기에서 컨텍스트에 맞춰 코드를 설정하고 실행하는 방법에 대한 지침을 찾을 수 있습니다.
시작
다음 코드 예제에서는 IAM을 사용하여 시작하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. import boto3 def main(): """ Lists the managed policies in your AWS account using the AWS SDK for Python (Boto3). """ iam = boto3.client("iam") try: # Get a paginator for the list_policies operation paginator = iam.get_paginator("list_policies") # Iterate through the pages of results for page in paginator.paginate(Scope="All", OnlyAttached=False): for policy in page["Policies"]: print(f"Policy name: {policy['PolicyName']}") print(f" Policy ARN: {policy['Arn']}") except boto3.exceptions.BotoCoreError as e: print(f"Encountered an error while listing policies: {e}") if __name__ == "__main__": main()
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListPolicies를 참조하십시오.
-
기본 사항
다음 코드 예제에서는 사용자를 생성하고 역할을 수임하는 방법을 보여줍니다.
주의
보안 위험을 방지하려면 목적별 소프트웨어를 개발하거나 실제 데이터로 작업할 때 IAM 사용자를 인증에 사용하지 마세요. 대신 AWS IAM Identity Center과 같은 보안 인증 공급자를 통한 페더레이션을 사용하십시오.
권한이 없는 사용자를 생성합니다.
계정에 대한 Amazon S3 버킷을 나열할 수 있는 권한을 부여하는 역할을 생성합니다.
사용자가 역할을 수임할 수 있도록 정책을 추가합니다.
역할을 수임하고 임시 보안 인증 정보를 사용하여 S3 버킷을 나열한 후 리소스를 정리합니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. Amazon S3 버킷을 나열할 수 있는 권한을 부여하는 역할과 IAM 사용자를 생성합니다. 사용자는 역할을 수임할 수 있는 권한만 있습니다. 역할을 수임한 후 임시 자격 증명을 사용하여 계정의 버킷을 나열합니다.
import json import sys import time from uuid import uuid4 import boto3 from botocore.exceptions import ClientError def progress_bar(seconds): """Shows a simple progress bar in the command window.""" for _ in range(seconds): time.sleep(1) print(".", end="") sys.stdout.flush() print() def setup(iam_resource): """ Creates a new user with no permissions. Creates an access key pair for the user. Creates a role with a policy that lets the user assume the role. Creates a policy that allows listing Amazon S3 buckets. Attaches the policy to the role. Creates an inline policy for the user that lets the user assume the role. :param iam_resource: A Boto3 AWS Identity and Access Management (IAM) resource that has permissions to create users, roles, and policies in the account. :return: The newly created user, user key, and role. """ try: user = iam_resource.create_user(UserName=f"demo-user-{uuid4()}") print(f"Created user {user.name}.") except ClientError as error: print( f"Couldn't create a user for the demo. Here's why: " f"{error.response['Error']['Message']}" ) raise try: user_key = user.create_access_key_pair() print(f"Created access key pair for user.") except ClientError as error: print( f"Couldn't create access keys for user {user.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise print(f"Wait for user to be ready.", end="") progress_bar(10) try: role = iam_resource.create_role( RoleName=f"demo-role-{uuid4()}", AssumeRolePolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"AWS": user.arn}, "Action": "sts:AssumeRole", } ], } ), ) print(f"Created role {role.name}.") except ClientError as error: print( f"Couldn't create a role for the demo. Here's why: " f"{error.response['Error']['Message']}" ) raise try: policy = iam_resource.create_policy( PolicyName=f"demo-policy-{uuid4()}", PolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "s3:ListAllMyBuckets", "Resource": "arn:aws:s3:::*", } ], } ), ) role.attach_policy(PolicyArn=policy.arn) print(f"Created policy {policy.policy_name} and attached it to the role.") except ClientError as error: print( f"Couldn't create a policy and attach it to role {role.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise try: user.create_policy( PolicyName=f"demo-user-policy-{uuid4()}", PolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": role.arn, } ], } ), ) print( f"Created an inline policy for {user.name} that lets the user assume " f"the role." ) except ClientError as error: print( f"Couldn't create an inline policy for user {user.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise print("Give AWS time to propagate these new resources and connections.", end="") progress_bar(10) return user, user_key, role def show_access_denied_without_role(user_key): """ Shows that listing buckets without first assuming the role is not allowed. :param user_key: The key of the user created during setup. This user does not have permission to list buckets in the account. """ print(f"Try to list buckets without first assuming the role.") s3_denied_resource = boto3.resource( "s3", aws_access_key_id=user_key.id, aws_secret_access_key=user_key.secret ) try: for bucket in s3_denied_resource.buckets.all(): print(bucket.name) raise RuntimeError("Expected to get AccessDenied error when listing buckets!") except ClientError as error: if error.response["Error"]["Code"] == "AccessDenied": print("Attempt to list buckets with no permissions: AccessDenied.") else: raise def list_buckets_from_assumed_role(user_key, assume_role_arn, session_name): """ Assumes a role that grants permission to list the Amazon S3 buckets in the account. Uses the temporary credentials from the role to list the buckets that are owned by the assumed role's account. :param user_key: The access key of a user that has permission to assume the role. :param assume_role_arn: The Amazon Resource Name (ARN) of the role that grants access to list the other account's buckets. :param session_name: The name of the STS session. """ sts_client = boto3.client( "sts", aws_access_key_id=user_key.id, aws_secret_access_key=user_key.secret ) try: response = sts_client.assume_role( RoleArn=assume_role_arn, RoleSessionName=session_name ) temp_credentials = response["Credentials"] print(f"Assumed role {assume_role_arn} and got temporary credentials.") except ClientError as error: print( f"Couldn't assume role {assume_role_arn}. Here's why: " f"{error.response['Error']['Message']}" ) raise # Create an S3 resource that can access the account with the temporary credentials. s3_resource = boto3.resource( "s3", aws_access_key_id=temp_credentials["AccessKeyId"], aws_secret_access_key=temp_credentials["SecretAccessKey"], aws_session_token=temp_credentials["SessionToken"], ) print(f"Listing buckets for the assumed role's account:") try: for bucket in s3_resource.buckets.all(): print(bucket.name) except ClientError as error: print( f"Couldn't list buckets for the account. Here's why: " f"{error.response['Error']['Message']}" ) raise def teardown(user, role): """ Removes all resources created during setup. :param user: The demo user. :param role: The demo role. """ try: for attached in role.attached_policies.all(): policy_name = attached.policy_name role.detach_policy(PolicyArn=attached.arn) attached.delete() print(f"Detached and deleted {policy_name}.") role.delete() print(f"Deleted {role.name}.") except ClientError as error: print( "Couldn't detach policy, delete policy, or delete role. Here's why: " f"{error.response['Error']['Message']}" ) raise try: for user_pol in user.policies.all(): user_pol.delete() print("Deleted inline user policy.") for key in user.access_keys.all(): key.delete() print("Deleted user's access key.") user.delete() print(f"Deleted {user.name}.") except ClientError as error: print( "Couldn't delete user policy or delete user. Here's why: " f"{error.response['Error']['Message']}" ) def usage_demo(): """Drives the demonstration.""" print("-" * 88) print(f"Welcome to the IAM create user and assume role demo.") print("-" * 88) iam_resource = boto3.resource("iam") user = None role = None try: user, user_key, role = setup(iam_resource) print(f"Created {user.name} and {role.name}.") show_access_denied_without_role(user_key) list_buckets_from_assumed_role(user_key, role.arn, "AssumeRoleDemoSession") except Exception: print("Something went wrong!") finally: if user is not None and role is not None: teardown(user, role) print("Thanks for watching!") if __name__ == "__main__": usage_demo()
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 다음 주제를 참조하십시오.
-
작업
다음 코드 예시에서는 AttachRolePolicy
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. Boto3 정책 객체를 사용하여 역할에 정책을 연결합니다.
def attach_to_role(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Policy(policy_arn).attach_role(RoleName=role_name) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise
Boto3 역할 객체를 사용하여 역할에 정책을 연결합니다.
def attach_policy(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 AttachRolePolicy를 참조하십시오.
-
다음 코드 예시에서는 AttachUserPolicy
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def attach_policy(user_name, policy_arn): """ Attaches a policy to a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to user %s.", policy_arn, user_name) except ClientError: logger.exception("Couldn't attach policy %s to user %s.", policy_arn, user_name) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 AttachUserPolicy를 참조하십시오.
-
다음 코드 예시에서는 CreateAccessKey
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 CreateAccessKey를 참조하십시오.
-
다음 코드 예시에서는 CreateAccountAlias
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def create_alias(alias): """ Creates an alias for the current account. The alias can be used in place of the account ID in the sign-in URL. An account can have only one alias. When a new alias is created, it replaces any existing alias. :param alias: The alias to assign to the account. """ try: iam.create_account_alias(AccountAlias=alias) logger.info("Created an alias '%s' for your account.", alias) except ClientError: logger.exception("Couldn't create alias '%s' for your account.", alias) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 CreateAccountAlias를 참조하세요.
-
다음 코드 예시에서는 CreateInstanceProfile
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. 이 예제에서는 정책, 역할, 인스턴스 프로파일을 생성하고 이를 모두 연결합니다.
class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def create_instance_profile( self, policy_file: str, policy_name: str, role_name: str, profile_name: str, aws_managed_policies: Tuple[str, ...] = (), ) -> str: """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = self.create_policy(policy_file, policy_name) self.create_role(role_name, assume_role_doc) self.attach_policy(role_name, policy_arn, aws_managed_policies) try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) log.error(f"Full error:\n\t{err}") return profile_arn
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 CreateInstanceProfile를 참조하십시오.
-
다음 코드 예시에서는 CreatePolicy
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 CreatePolicy를 참조하십시오.
-
다음 코드 예시에서는 CreatePolicyVersion
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def create_policy_version(policy_arn, actions, resource_arn, set_as_default): """ Creates a policy version. Policies can have up to five versions. The default version is the one that is used for all resources that reference the policy. :param policy_arn: The ARN of the policy. :param actions: The actions to allow in the policy version. :param resource_arn: The ARN of the resource this policy version applies to. :param set_as_default: When True, this policy version is set as the default version for the policy. Otherwise, the default is not changed. :return: The newly created policy version. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.Policy(policy_arn) policy_version = policy.create_version( PolicyDocument=json.dumps(policy_doc), SetAsDefault=set_as_default ) logger.info( "Created policy version %s for policy %s.", policy_version.version_id, policy_version.arn, ) except ClientError: logger.exception("Couldn't create a policy version for %s.", policy_arn) raise else: return policy_version
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 CreatePolicyVersion를 참조하십시오.
-
다음 코드 예시에서는 CreateRole
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def create_role(role_name, allowed_services): """ Creates a role that lets a list of specified services assume the role. :param role_name: The name of the role. :param allowed_services: The services that can assume the role. :return: The newly created role. """ trust_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": service}, "Action": "sts:AssumeRole", } for service in allowed_services ], } try: role = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy) ) logger.info("Created role %s.", role.name) except ClientError: logger.exception("Couldn't create role %s.", role_name) raise else: return role
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 CreateRole를 참조하세요.
-
다음 코드 예시에서는 CreateServiceLinkedRole
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def create_service_linked_role(service_name, description): """ Creates a service-linked role. :param service_name: The name of the service that owns the role. :param description: A description to give the role. :return: The newly created role. """ try: response = iam.meta.client.create_service_linked_role( AWSServiceName=service_name, Description=description ) role = iam.Role(response["Role"]["RoleName"]) logger.info("Created service-linked role %s.", role.name) except ClientError: logger.exception("Couldn't create service-linked role for %s.", service_name) raise else: return role
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 CreateServiceLinkedRole를 참조하십시오.
-
다음 코드 예시에서는 CreateUser
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def create_user(user_name): """ Creates a user. By default, a user has no permissions or access keys. :param user_name: The name of the user. :return: The newly created user. """ try: user = iam.create_user(UserName=user_name) logger.info("Created user %s.", user.name) except ClientError: logger.exception("Couldn't create user %s.", user_name) raise else: return user
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 CreateUser를 참조하십시오.
-
다음 코드 예시에서는 DeleteAccessKey
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 DeleteAccessKey를 참조하십시오.
-
다음 코드 예시에서는 DeleteAccountAlias
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def delete_alias(alias): """ Removes the alias from the current account. :param alias: The alias to remove. """ try: iam.meta.client.delete_account_alias(AccountAlias=alias) logger.info("Removed alias '%s' from your account.", alias) except ClientError: logger.exception("Couldn't remove alias '%s' from your account.", alias) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 DeleteAccountAlias를 참조하세요.
-
다음 코드 예시에서는 DeleteInstanceProfile
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. 이 예제에서는 인스턴스 프로파일에서 역할을 제거하고 역할에 연결된 모든 정책을 분리하며 모든 리소스를 삭제합니다.
class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def delete_instance_profile(self, profile_name: str, role_name: str) -> None: """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: log.error( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name )
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 DeleteInstanceProfile를 참조하십시오.
-
다음 코드 예시에서는 DeletePolicy
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 DeletePolicy를 참조하십시오.
-
다음 코드 예시에서는 DeleteRole
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def delete_role(role_name): """ Deletes a role. :param role_name: The name of the role to delete. """ try: iam.Role(role_name).delete() logger.info("Deleted role %s.", role_name) except ClientError: logger.exception("Couldn't delete role %s.", role_name) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 DeleteRole를 참조하십시오.
-
다음 코드 예시에서는 DeleteUser
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def delete_user(user_name): """ Deletes a user. Before a user can be deleted, all associated resources, such as access keys and policies, must be deleted or detached. :param user_name: The name of the user. """ try: iam.User(user_name).delete() logger.info("Deleted user %s.", user_name) except ClientError: logger.exception("Couldn't delete user %s.", user_name) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 DeleteUser를 참조하십시오.
-
다음 코드 예시에서는 DetachRolePolicy
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. Boto3 정책 객체를 사용하여 역할에서 정책을 분리합니다.
def detach_from_role(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Policy(policy_arn).detach_role(RoleName=role_name) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise
Boto3 역할 객체를 사용하여 역할에서 정책을 분리합니다.
def detach_policy(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 DetachRolePolicy를 참조하십시오.
-
다음 코드 예시에서는 DetachUserPolicy
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def detach_policy(user_name, policy_arn): """ Detaches a policy from a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from user %s.", policy_arn, user_name) except ClientError: logger.exception( "Couldn't detach policy %s from user %s.", policy_arn, user_name ) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 DetachUserPolicy를 참조하십시오.
-
다음 코드 예시에서는 GenerateCredentialReport
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def generate_credential_report(): """ Starts generation of a credentials report about the current account. After calling this function to generate the report, call get_credential_report to get the latest report. A new report can be generated a minimum of four hours after the last one was generated. """ try: response = iam.meta.client.generate_credential_report() logger.info( "Generating credentials report for your account. " "Current state is %s.", response["State"], ) except ClientError: logger.exception("Couldn't generate a credentials report for your account.") raise else: return response
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GenerateCredentialReport를 참조하십시오.
-
다음 코드 예시에서는 GetAccessKeyLastUsed
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def get_last_use(key_id): """ Gets information about when and how a key was last used. :param key_id: The ID of the key to look up. :return: Information about the key's last use. """ try: response = iam.meta.client.get_access_key_last_used(AccessKeyId=key_id) last_used_date = response["AccessKeyLastUsed"].get("LastUsedDate", None) last_service = response["AccessKeyLastUsed"].get("ServiceName", None) logger.info( "Key %s was last used by %s on %s to access %s.", key_id, response["UserName"], last_used_date, last_service, ) except ClientError: logger.exception("Couldn't get last use of key %s.", key_id) raise else: return response
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GetAccessKeyLastUsed를 참조하십시오.
-
다음 코드 예시에서는 GetAccountAuthorizationDetails
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def get_authorization_details(response_filter): """ Gets an authorization detail report for the current account. :param response_filter: A list of resource types to include in the report, such as users or roles. When not specified, all resources are included. :return: The authorization detail report. """ try: account_details = iam.meta.client.get_account_authorization_details( Filter=response_filter ) logger.debug(account_details) except ClientError: logger.exception("Couldn't get details for your account.") raise else: return account_details
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GetAccountAuthorizationDetails를 참조하십시오.
-
다음 코드 예시에서는 GetAccountPasswordPolicy
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def print_password_policy(): """ Prints the password policy for the account. """ try: pw_policy = iam.AccountPasswordPolicy() print("Current account password policy:") print( f"\tallow_users_to_change_password: {pw_policy.allow_users_to_change_password}" ) print(f"\texpire_passwords: {pw_policy.expire_passwords}") print(f"\thard_expiry: {pw_policy.hard_expiry}") print(f"\tmax_password_age: {pw_policy.max_password_age}") print(f"\tminimum_password_length: {pw_policy.minimum_password_length}") print(f"\tpassword_reuse_prevention: {pw_policy.password_reuse_prevention}") print( f"\trequire_lowercase_characters: {pw_policy.require_lowercase_characters}" ) print(f"\trequire_numbers: {pw_policy.require_numbers}") print(f"\trequire_symbols: {pw_policy.require_symbols}") print( f"\trequire_uppercase_characters: {pw_policy.require_uppercase_characters}" ) printed = True except ClientError as error: if error.response["Error"]["Code"] == "NoSuchEntity": print("The account does not have a password policy set.") else: logger.exception("Couldn't get account password policy.") raise else: return printed
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GetAccountPasswordPolicy를 참조하십시오.
-
다음 코드 예시에서는 GetAccountSummary
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def get_summary(): """ Gets a summary of account usage. :return: The summary of account usage. """ try: summary = iam.AccountSummary() logger.debug(summary.summary_map) except ClientError: logger.exception("Couldn't get a summary for your account.") raise else: return summary.summary_map
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GetAccountSummary를 참조하십시오.
-
다음 코드 예시에서는 GetCredentialReport
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def get_credential_report(): """ Gets the most recently generated credentials report about the current account. :return: The credentials report. """ try: response = iam.meta.client.get_credential_report() logger.debug(response["Content"]) except ClientError: logger.exception("Couldn't get credentials report.") raise else: return response["Content"]
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GetCredentialReport를 참조하십시오.
-
다음 코드 예시에서는 GetPolicy
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GetPolicy를 참조하십시오.
-
다음 코드 예시에서는 GetPolicyVersion
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GetPolicyVersion를 참조하십시오.
-
다음 코드 예시에서는 GetRole
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def get_role(role_name): """ Gets a role by name. :param role_name: The name of the role to retrieve. :return: The specified role. """ try: role = iam.Role(role_name) role.load() # calls GetRole to load attributes logger.info("Got role with arn %s.", role.arn) except ClientError: logger.exception("Couldn't get role named %s.", role_name) raise else: return role
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 GetRole를 참조하십시오.
-
다음 코드 예시에서는 ListAccessKeys
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_keys(user_name): """ Lists the keys owned by the specified user. :param user_name: The name of the user. :return: The list of keys owned by the user. """ try: keys = list(iam.User(user_name).access_keys.all()) logger.info("Got %s access keys for %s.", len(keys), user_name) except ClientError: logger.exception("Couldn't get access keys for %s.", user_name) raise else: return keys
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListAccessKeys를 참조하십시오.
-
다음 코드 예시에서는 ListAccountAliases
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_aliases(): """ Gets the list of aliases for the current account. An account has at most one alias. :return: The list of aliases for the account. """ try: response = iam.meta.client.list_account_aliases() aliases = response["AccountAliases"] if len(aliases) > 0: logger.info("Got aliases for your account: %s.", ",".join(aliases)) else: logger.info("Got no aliases for your account.") except ClientError: logger.exception("Couldn't list aliases for your account.") raise else: return response["AccountAliases"]
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListAccountAliases를 참조하세요.
-
다음 코드 예시에서는 ListAttachedRolePolicies
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_attached_policies(role_name): """ Lists policies attached to a role. :param role_name: The name of the role to query. """ try: role = iam.Role(role_name) for policy in role.attached_policies.all(): logger.info("Got policy %s.", policy.arn) except ClientError: logger.exception("Couldn't list attached policies for %s.", role_name) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListAttachedRolePolicies를 참조하십시오.
-
다음 코드 예시에서는 ListGroups
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_groups(count): """ Lists the specified number of groups for the account. :param count: The number of groups to list. """ try: for group in iam.groups.limit(count): logger.info("Group: %s", group.name) except ClientError: logger.exception("Couldn't list groups for the account.") raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListGroups를 참조하십시오.
-
다음 코드 예시에서는 ListPolicies
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_policies(scope): """ Lists the policies in the current account. :param scope: Limits the kinds of policies that are returned. For example, 'Local' specifies that only locally managed policies are returned. :return: The list of policies. """ try: policies = list(iam.policies.filter(Scope=scope)) logger.info("Got %s policies in scope '%s'.", len(policies), scope) except ClientError: logger.exception("Couldn't get policies for scope '%s'.", scope) raise else: return policies
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListPolicies를 참조하십시오.
-
다음 코드 예시에서는 ListRolePolicies
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_policies(role_name): """ Lists inline policies for a role. :param role_name: The name of the role to query. """ try: role = iam.Role(role_name) for policy in role.policies.all(): logger.info("Got inline policy %s.", policy.name) except ClientError: logger.exception("Couldn't list inline policies for %s.", role_name) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListRolePolicies를 참조하십시오.
-
다음 코드 예시에서는 ListRoles
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_roles(count): """ Lists the specified number of roles for the account. :param count: The number of roles to list. """ try: roles = list(iam.roles.limit(count=count)) for role in roles: logger.info("Role: %s", role.name) except ClientError: logger.exception("Couldn't list roles for the account.") raise else: return roles
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListRoles를 참조하십시오.
-
다음 코드 예시에서는 ListSAMLProviders
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_saml_providers(count): """ Lists the SAML providers for the account. :param count: The maximum number of providers to list. """ try: found = 0 for provider in iam.saml_providers.limit(count): logger.info("Got SAML provider %s.", provider.arn) found += 1 if found == 0: logger.info("Your account has no SAML providers.") except ClientError: logger.exception("Couldn't list SAML providers.") raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListSAMLProviders를 참조하십시오.
-
다음 코드 예시에서는 ListUsers
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def list_users(): """ Lists the users in the current account. :return: The list of users. """ try: users = list(iam.users.all()) logger.info("Got %s users.", len(users)) except ClientError: logger.exception("Couldn't get users.") raise else: return users
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 ListUsers를 참조하십시오.
-
다음 코드 예시에서는 UpdateAccessKey
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def update_key(user_name, key_id, activate): """ Updates the status of a key. :param user_name: The user that owns the key. :param key_id: The ID of the key to update. :param activate: When True, the key is activated. Otherwise, the key is deactivated. """ try: key = iam.User(user_name).AccessKey(key_id) if activate: key.activate() else: key.deactivate() logger.info("%s key %s.", "Activated" if activate else "Deactivated", key_id) except ClientError: logger.exception( "Couldn't %s key %s.", "Activate" if activate else "Deactivate", key_id ) raise
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 UpdateAccessKey를 참조하십시오.
-
다음 코드 예시에서는 UpdateUser
을 사용하는 방법을 보여 줍니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def update_user(user_name, new_user_name): """ Updates a user's name. :param user_name: The current name of the user to update. :param new_user_name: The new name to assign to the user. :return: The updated user. """ try: user = iam.User(user_name) user.update(NewUserName=new_user_name) logger.info("Renamed %s to %s.", user_name, new_user_name) except ClientError: logger.exception("Couldn't update name for user %s.", user_name) raise return user
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 UpdateUser를 참조하십시오.
-
시나리오
다음 코드 예제에서는 책, 영화, 노래 추천을 반환하는 로드 밸런싱 웹 서비스를 만드는 방법을 보여줍니다. 이 예제에서는 서비스가 장애에 대응하는 방법과 장애 발생 시 복원력을 높이기 위해 서비스를 재구성하는 방법을 보여줍니다.
Amazon EC2 Auto Scaling 그룹을 사용하여 시작 템플릿을 기반으로 Amazon Elastic Compute Cloud(Amazon EC2) 인스턴스를 생성하고 인스턴스 수를 지정된 범위 내로 유지합니다.
Elastic Load Balancing으로 HTTP 요청을 처리하고 배포합니다.
Auto Scaling 그룹의 인스턴스 상태를 모니터링하고 요청을 정상 인스턴스로만 전달합니다.
각 EC2 인스턴스에서 Python 웹 서버를 실행하여 HTTP 요청을 처리합니다. 웹 서버는 추천 및 상태 확인으로 응답합니다.
Amazon DynamoDB 테이블을 사용하여 추천 서비스를 시뮬레이션합니다.
AWS Systems Manager 파라미터를 업데이트하여 요청 및 상태 확인에 대한 웹 서버 응답을 제어합니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. 명령 프롬프트에서 대화형 시나리오를 실행합니다.
class Runner: """ Manages the deployment, demonstration, and destruction of resources for the resilient service. """ def __init__( self, resource_path: str, recommendation: RecommendationService, autoscaler: AutoScalingWrapper, loadbalancer: ElasticLoadBalancerWrapper, param_helper: ParameterHelper, ): """ Initializes the Runner class with the necessary parameters. :param resource_path: The path to resource files used by this example, such as IAM policies and instance scripts. :param recommendation: An instance of the RecommendationService class. :param autoscaler: An instance of the AutoScaler class. :param loadbalancer: An instance of the LoadBalancer class. :param param_helper: An instance of the ParameterHelper class. """ self.resource_path = resource_path self.recommendation = recommendation self.autoscaler = autoscaler self.loadbalancer = loadbalancer self.param_helper = param_helper self.protocol = "HTTP" self.port = 80 self.ssh_port = 22 prefix = "doc-example-resilience" self.target_group_name = f"{prefix}-tg" self.load_balancer_name = f"{prefix}-lb" def deploy(self) -> None: """ Deploys the resources required for the resilient service, including the DynamoDB table, EC2 instances, Auto Scaling group, and load balancer. """ recommendations_path = f"{self.resource_path}/recommendations.json" startup_script = f"{self.resource_path}/server_startup_script.sh" instance_policy = f"{self.resource_path}/instance_policy.json" logging.info("Starting deployment of resources for the resilient service.") logging.info( "Creating and populating DynamoDB table '%s'.", self.recommendation.table_name, ) self.recommendation.create() self.recommendation.populate(recommendations_path) logging.info( "Creating an EC2 launch template with the startup script '%s'.", startup_script, ) self.autoscaler.create_template(startup_script, instance_policy) logging.info( "Creating an EC2 Auto Scaling group across multiple Availability Zones." ) zones = self.autoscaler.create_autoscaling_group(3) logging.info("Creating variables that control the flow of the demo.") self.param_helper.reset() logging.info("Creating Elastic Load Balancing target group and load balancer.") vpc = self.autoscaler.get_default_vpc() subnets = self.autoscaler.get_subnets(vpc["VpcId"], zones) target_group = self.loadbalancer.create_target_group( self.target_group_name, self.protocol, self.port, vpc["VpcId"] ) self.loadbalancer.create_load_balancer( self.load_balancer_name, [subnet["SubnetId"] for subnet in subnets] ) self.loadbalancer.create_listener(self.load_balancer_name, target_group) self.autoscaler.attach_load_balancer_target_group(target_group) logging.info("Verifying access to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) current_ip_address = requests.get("http://checkip.amazonaws.com").text.strip() if not lb_success: logging.warning( "Couldn't connect to the load balancer. Verifying that the port is open..." ) sec_group, port_is_open = self.autoscaler.verify_inbound_port( vpc, self.port, current_ip_address ) sec_group, ssh_port_is_open = self.autoscaler.verify_inbound_port( vpc, self.ssh_port, current_ip_address ) if not port_is_open: logging.warning( "The default security group for your VPC must allow access from this computer." ) if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound traffic on port {self.port} from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.port, current_ip_address ) if not ssh_port_is_open: if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound SSH traffic on port {self.ssh_port} for debugging from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.ssh_port, current_ip_address ) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) if lb_success: logging.info( "Load balancer is ready. Access it at: http://%s", current_ip_address ) else: logging.error( "Couldn't get a successful response from the load balancer endpoint. Please verify your VPC and security group settings." ) def demo_choices(self) -> None: """ Presents choices for interacting with the deployed service, such as sending requests to the load balancer or checking the health of the targets. """ actions = [ "Send a GET request to the load balancer endpoint.", "Check the health of load balancer targets.", "Go to the next part of the demo.", ] choice = 0 while choice != 2: logging.info("Choose an action to interact with the service.") choice = q.choose("Which action would you like to take? ", actions) if choice == 0: logging.info("Sending a GET request to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) logging.info("GET http://%s", endpoint) response = requests.get(f"http://{endpoint}") logging.info("Response: %s", response.status_code) if response.headers.get("content-type") == "application/json": pp(response.json()) elif choice == 1: logging.info("Checking the health of load balancer targets.") health = self.loadbalancer.check_target_health(self.target_group_name) for target in health: state = target["TargetHealth"]["State"] logging.info( "Target %s on port %d is %s", target["Target"]["Id"], target["Target"]["Port"], state, ) if state != "healthy": logging.warning( "%s: %s", target["TargetHealth"]["Reason"], target["TargetHealth"]["Description"], ) logging.info( "Note that it can take a minute or two for the health check to update." ) elif choice == 2: logging.info("Proceeding to the next part of the demo.") def demo(self) -> None: """ Runs the demonstration, showing how the service responds to different failure scenarios and how a resilient architecture can keep the service running. """ ssm_only_policy = f"{self.resource_path}/ssm_only_policy.json" logging.info("Resetting parameters to starting values for the demo.") self.param_helper.reset() logging.info( "Starting demonstration of the service's resilience under various failure conditions." ) self.demo_choices() logging.info( "Simulating failure by changing the Systems Manager parameter to a non-existent table." ) self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info("Sending GET requests will now return failure codes.") self.demo_choices() logging.info("Switching to static response mode to mitigate failure.") self.param_helper.put(self.param_helper.failure_response, "static") logging.info("Sending GET requests will now return static responses.") self.demo_choices() logging.info("Restoring normal operation of the recommendation service.") self.param_helper.put(self.param_helper.table, self.recommendation.table_name) logging.info( "Introducing a failure by assigning bad credentials to one of the instances." ) self.autoscaler.create_instance_profile( ssm_only_policy, self.autoscaler.bad_creds_policy_name, self.autoscaler.bad_creds_role_name, self.autoscaler.bad_creds_profile_name, ["AmazonSSMManagedInstanceCore"], ) instances = self.autoscaler.get_instances() bad_instance_id = instances[0] instance_profile = self.autoscaler.get_instance_profile(bad_instance_id) logging.info( "Replacing instance profile with bad credentials for instance %s.", bad_instance_id, ) self.autoscaler.replace_instance_profile( bad_instance_id, self.autoscaler.bad_creds_profile_name, instance_profile["AssociationId"], ) logging.info( "Sending GET requests may return either a valid recommendation or a static response." ) self.demo_choices() logging.info("Implementing deep health checks to detect unhealthy instances.") self.param_helper.put(self.param_helper.health_check, "deep") logging.info("Checking the health of the load balancer targets.") self.demo_choices() logging.info( "Terminating the unhealthy instance to let the auto scaler replace it." ) self.autoscaler.terminate_instance(bad_instance_id) logging.info("The service remains resilient during instance replacement.") self.demo_choices() logging.info("Simulating a complete failure of the recommendation service.") self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info( "All instances will report as unhealthy, but the service will still return static responses." ) self.demo_choices() self.param_helper.reset() def destroy(self, automation=False) -> None: """ Destroys all resources created for the demo, including the load balancer, Auto Scaling group, EC2 instances, and DynamoDB table. """ logging.info( "This concludes the demo. Preparing to clean up all AWS resources created during the demo." ) if automation: cleanup = True else: cleanup = q.ask( "Do you want to clean up all demo resources? (y/n) ", q.is_yesno ) if cleanup: logging.info("Deleting load balancer and related resources.") self.loadbalancer.delete_load_balancer(self.load_balancer_name) self.loadbalancer.delete_target_group(self.target_group_name) self.autoscaler.delete_autoscaling_group(self.autoscaler.group_name) self.autoscaler.delete_key_pair() self.autoscaler.delete_template() self.autoscaler.delete_instance_profile( self.autoscaler.bad_creds_profile_name, self.autoscaler.bad_creds_role_name, ) logging.info("Deleting DynamoDB table and other resources.") self.recommendation.destroy() else: logging.warning( "Resources have not been deleted. Ensure you clean them up manually to avoid unexpected charges." ) def main() -> None: """ Main function to parse arguments and run the appropriate actions for the demo. """ parser = argparse.ArgumentParser() parser.add_argument( "--action", required=True, choices=["all", "deploy", "demo", "destroy"], help="The action to take for the demo. When 'all' is specified, resources are\n" "deployed, the demo is run, and resources are destroyed.", ) parser.add_argument( "--resource_path", default="../../../scenarios/features/resilient_service/resources", help="The path to resource files used by this example, such as IAM policies and\n" "instance scripts.", ) args = parser.parse_args() logging.info("Starting the Resilient Service demo.") prefix = "doc-example-resilience" # Service Clients ddb_client = boto3.client("dynamodb") elb_client = boto3.client("elbv2") autoscaling_client = boto3.client("autoscaling") ec2_client = boto3.client("ec2") ssm_client = boto3.client("ssm") iam_client = boto3.client("iam") # Wrapper instantiations recommendation = RecommendationService( "doc-example-recommendation-service", ddb_client ) autoscaling_wrapper = AutoScalingWrapper( prefix, "t3.micro", "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2", autoscaling_client, ec2_client, ssm_client, iam_client, ) elb_wrapper = ElasticLoadBalancerWrapper(elb_client) param_helper = ParameterHelper(recommendation.table_name, ssm_client) # Demo invocation runner = Runner( args.resource_path, recommendation, autoscaling_wrapper, elb_wrapper, param_helper, ) actions = [args.action] if args.action != "all" else ["deploy", "demo", "destroy"] for action in actions: if action == "deploy": runner.deploy() elif action == "demo": runner.demo() elif action == "destroy": runner.destroy() logging.info("Demo completed successfully.") if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") main()
Auto Scaling과 Amazon EC2 작업을 래핑하는 클래스를 생성합니다.
class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def create_policy(self, policy_file: str, policy_name: str) -> str: """ Creates a new IAM policy or retrieves the ARN of an existing policy. :param policy_file: The path to a JSON file that contains the policy definition. :param policy_name: The name to give the created policy. :return: The ARN of the created or existing policy. """ with open(policy_file) as file: policy_doc = file.read() try: response = self.iam_client.create_policy( PolicyName=policy_name, PolicyDocument=policy_doc ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' created successfully. ARN: {policy_arn}") return policy_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the policy already exists, get its ARN response = self.iam_client.get_policy( PolicyArn=f"arn:aws:iam::{self.account_id}:policy/{policy_name}" ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' already exists. ARN: {policy_arn}") return policy_arn log.error(f"Full error:\n\t{err}") def create_role(self, role_name: str, assume_role_doc: dict) -> str: """ Creates a new IAM role or retrieves the ARN of an existing role. :param role_name: The name to give the created role. :param assume_role_doc: The assume role policy document that specifies which entities can assume the role. :return: The ARN of the created or existing role. """ try: response = self.iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_doc) ) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' created successfully. ARN: {role_arn}") return role_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the role already exists, get its ARN response = self.iam_client.get_role(RoleName=role_name) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' already exists. ARN: {role_arn}") return role_arn log.error(f"Full error:\n\t{err}") def attach_policy( self, role_name: str, policy_arn: str, aws_managed_policies: Tuple[str, ...] = (), ) -> None: """ Attaches an IAM policy to a role and optionally attaches additional AWS-managed policies. :param role_name: The name of the role to attach the policy to. :param policy_arn: The ARN of the policy to attach. :param aws_managed_policies: A tuple of AWS-managed policy names to attach to the role. """ try: self.iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) for aws_policy in aws_managed_policies: self.iam_client.attach_role_policy( RoleName=role_name, PolicyArn=f"arn:aws:iam::aws:policy/{aws_policy}", ) log.info(f"Attached policy {policy_arn} to role {role_name}.") except ClientError as err: log.error(f"Failed to attach policy {policy_arn} to role {role_name}.") log.error(f"Full error:\n\t{err}") def create_instance_profile( self, policy_file: str, policy_name: str, role_name: str, profile_name: str, aws_managed_policies: Tuple[str, ...] = (), ) -> str: """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = self.create_policy(policy_file, policy_name) self.create_role(role_name, assume_role_doc) self.attach_policy(role_name, policy_arn, aws_managed_policies) try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) log.error(f"Full error:\n\t{err}") return profile_arn def get_instance_profile(self, instance_id: str) -> Dict[str, Any]: """ Gets data about the profile associated with an instance. :param instance_id: The ID of the instance to look up. :return: The profile data. """ try: response = self.ec2_client.describe_iam_instance_profile_associations( Filters=[{"Name": "instance-id", "Values": [instance_id]}] ) if not response["IamInstanceProfileAssociations"]: log.info(f"No instance profile found for instance {instance_id}.") profile_data = response["IamInstanceProfileAssociations"][0] log.info(f"Retrieved instance profile for instance {instance_id}.") return profile_data except ClientError as err: log.error( f"Failed to retrieve instance profile for instance {instance_id}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidInstanceID.NotFound": log.error(f"The instance ID '{instance_id}' does not exist.") log.error(f"Full error:\n\t{err}") def replace_instance_profile( self, instance_id: str, new_instance_profile_name: str, profile_association_id: str, ) -> None: """ Replaces the profile associated with a running instance. After the profile is replaced, the instance is rebooted to ensure that it uses the new profile. When the instance is ready, Systems Manager is used to restart the Python web server. :param instance_id: The ID of the instance to restart. :param new_instance_profile_name: The name of the new profile to associate with the specified instance. :param profile_association_id: The ID of the existing profile association for the instance. """ try: self.ec2_client.replace_iam_instance_profile_association( IamInstanceProfile={"Name": new_instance_profile_name}, AssociationId=profile_association_id, ) log.info( "Replaced instance profile for association %s with profile %s.", profile_association_id, new_instance_profile_name, ) time.sleep(5) self.ec2_client.reboot_instances(InstanceIds=[instance_id]) log.info("Rebooting instance %s.", instance_id) waiter = self.ec2_client.get_waiter("instance_running") log.info("Waiting for instance %s to be running.", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info("Instance %s is now running.", instance_id) self.ssm_client.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", Parameters={"commands": ["cd / && sudo python3 server.py 80"]}, ) log.info(f"Restarted the Python web server on instance '{instance_id}'.") except ClientError as err: log.error("Failed to replace instance profile.") error_code = err.response["Error"]["Code"] if error_code == "InvalidAssociationID.NotFound": log.error( f"Association ID '{profile_association_id}' does not exist." "Please check the association ID and try again." ) if error_code == "InvalidInstanceId": log.error( f"The specified instance ID '{instance_id}' does not exist or is not available for SSM. " f"Please verify the instance ID and try again." ) log.error(f"Full error:\n\t{err}") def delete_instance_profile(self, profile_name: str, role_name: str) -> None: """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: log.error( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name ) def create_key_pair(self, key_pair_name: str) -> None: """ Creates a new key pair. :param key_pair_name: The name of the key pair to create. """ try: response = self.ec2_client.create_key_pair(KeyName=key_pair_name) with open(f"{key_pair_name}.pem", "w") as file: file.write(response["KeyMaterial"]) chmod(f"{key_pair_name}.pem", 0o600) log.info("Created key pair %s.", key_pair_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to create key pair {key_pair_name}.") if error_code == "InvalidKeyPair.Duplicate": log.error(f"A key pair with the name '{key_pair_name}' already exists.") log.error(f"Full error:\n\t{err}") def delete_key_pair(self) -> None: """ Deletes a key pair. """ try: self.ec2_client.delete_key_pair(KeyName=self.key_pair_name) remove(f"{self.key_pair_name}.pem") log.info("Deleted key pair %s.", self.key_pair_name) except ClientError as err: log.error(f"Couldn't delete key pair '{self.key_pair_name}'.") log.error(f"Full error:\n\t{err}") except FileNotFoundError as err: log.info("Key pair %s doesn't exist, nothing to do.", self.key_pair_name) log.error(f"Full error:\n\t{err}") def create_template( self, server_startup_script_file: str, instance_policy_file: str ) -> Dict[str, Any]: """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. The launch template specifies a Bash script in its user data field that runs after the instance is started. This script installs Python packages and starts a Python web server on the instance. :param server_startup_script_file: The path to a Bash script file that is run when an instance starts. :param instance_policy_file: The path to a file that defines a permissions policy to create and attach to the instance profile. :return: Information about the newly created template. """ template = {} try: # Create key pair and instance profile self.create_key_pair(self.key_pair_name) self.create_instance_profile( instance_policy_file, self.instance_policy_name, self.instance_role_name, self.instance_profile_name, ) # Read the startup script with open(server_startup_script_file) as file: start_server_script = file.read() # Get the latest AMI ID ami_latest = self.ssm_client.get_parameter(Name=self.ami_param) ami_id = ami_latest["Parameter"]["Value"] # Create the launch template lt_response = self.ec2_client.create_launch_template( LaunchTemplateName=self.launch_template_name, LaunchTemplateData={ "InstanceType": self.inst_type, "ImageId": ami_id, "IamInstanceProfile": {"Name": self.instance_profile_name}, "UserData": base64.b64encode( start_server_script.encode(encoding="utf-8") ).decode(encoding="utf-8"), "KeyName": self.key_pair_name, }, ) template = lt_response["LaunchTemplate"] log.info( f"Created launch template {self.launch_template_name} for AMI {ami_id} on {self.inst_type}." ) except ClientError as err: log.error(f"Failed to create launch template {self.launch_template_name}.") error_code = err.response["Error"]["Code"] if error_code == "InvalidLaunchTemplateName.AlreadyExistsException": log.info( f"Launch template {self.launch_template_name} already exists, nothing to do." ) log.error(f"Full error:\n\t{err}") return template def delete_template(self): """ Deletes a launch template. """ try: self.ec2_client.delete_launch_template( LaunchTemplateName=self.launch_template_name ) self.delete_instance_profile( self.instance_profile_name, self.instance_role_name ) log.info("Launch template %s deleted.", self.launch_template_name) except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.NotFoundException" ): log.info( "Launch template %s does not exist, nothing to do.", self.launch_template_name, ) log.error(f"Full error:\n\t{err}") def get_availability_zones(self) -> List[str]: """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone["ZoneName"] for zone in response["AvailabilityZones"]] log.info(f"Retrieved {len(zones)} availability zones: {zones}.") except ClientError as err: log.error("Failed to retrieve availability zones.") log.error(f"Full error:\n\t{err}") else: return zones def create_autoscaling_group(self, group_size: int) -> List[str]: """ Creates an EC2 Auto Scaling group with the specified size. :param group_size: The number of instances to set for the minimum and maximum in the group. :return: The list of Availability Zones specified for the group. """ try: zones = self.get_availability_zones() self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=self.group_name, AvailabilityZones=zones, LaunchTemplate={ "LaunchTemplateName": self.launch_template_name, "Version": "$Default", }, MinSize=group_size, MaxSize=group_size, ) log.info( f"Created EC2 Auto Scaling group {self.group_name} with availability zones {zones}." ) except ClientError as err: error_code = err.response["Error"]["Code"] if error_code == "AlreadyExists": log.info( f"EC2 Auto Scaling group {self.group_name} already exists, nothing to do." ) else: log.error(f"Failed to create EC2 Auto Scaling group {self.group_name}.") log.error(f"Full error:\n\t{err}") else: return zones def get_instances(self) -> List[str]: """ Gets data about the instances in the EC2 Auto Scaling group. :return: A list of instance IDs in the Auto Scaling group. """ try: as_response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name] ) instance_ids = [ i["InstanceId"] for i in as_response["AutoScalingGroups"][0]["Instances"] ] log.info( f"Retrieved {len(instance_ids)} instances for Auto Scaling group {self.group_name}." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to retrieve instances for Auto Scaling group {self.group_name}." ) if error_code == "ResourceNotFound": log.error(f"The Auto Scaling group '{self.group_name}' does not exist.") log.error(f"Full error:\n\t{err}") else: return instance_ids def terminate_instance(self, instance_id: str, decrementsetting=False) -> None: """ Terminates an instance in an EC2 Auto Scaling group. After an instance is terminated, it can no longer be accessed. :param instance_id: The ID of the instance to terminate. :param decrementsetting: If True, do not replace terminated instances. """ try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=decrementsetting, ) log.info("Terminated instance %s.", instance_id) # Adding a waiter to ensure the instance is terminated waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for instance %s to be terminated...", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info( f"Instance '{instance_id}' has been terminated and will be replaced." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to terminate instance '{instance_id}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to terminate the instance again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) log.error(f"Full error:\n\t{err}") def attach_load_balancer_target_group( self, lb_target_group: Dict[str, Any] ) -> None: """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forwards requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group["TargetGroupArn"]], ) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group["TargetGroupName"], self.group_name, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to attach load balancer target group '{lb_target_group['TargetGroupName']}'." ) if error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) elif error_code == "ServiceLinkedRoleFailure": log.error( "The operation failed because the service-linked role is not ready or does not exist. " "Check that the service-linked role exists and is correctly configured." ) log.error(f"Full error:\n\t{err}") def delete_autoscaling_group(self, group_name: str) -> None: """ Terminates all instances in the group, then deletes the EC2 Auto Scaling group. :param group_name: The name of the group to delete. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[group_name] ) groups = response.get("AutoScalingGroups", []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=group_name, MinSize=0 ) instance_ids = [inst["InstanceId"] for inst in groups[0]["Instances"]] for inst_id in instance_ids: self.terminate_instance(inst_id) # Wait for all instances to be terminated if instance_ids: waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for all instances to be terminated...") waiter.wait(InstanceIds=instance_ids) log.info("All instances have been terminated.") else: log.info(f"No groups found named '{group_name}'! Nothing to do.") except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete Auto Scaling group '{group_name}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to delete the group again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the group." ) log.error(f"Full error:\n\t{err}") def get_default_vpc(self) -> Dict[str, Any]: """ Gets the default VPC for the account. :return: Data about the default VPC. """ try: response = self.ec2_client.describe_vpcs( Filters=[{"Name": "is-default", "Values": ["true"]}] ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error("Failed to retrieve the default VPC.") if error_code == "UnauthorizedOperation": log.error( "You do not have the necessary permissions to describe VPCs. " "Ensure that your AWS IAM user or role has the correct permissions." ) elif error_code == "InvalidParameterValue": log.error( "One or more parameters are invalid. Check the request parameters." ) log.error(f"Full error:\n\t{err}") else: if "Vpcs" in response and response["Vpcs"]: log.info(f"Retrieved default VPC: {response['Vpcs'][0]['VpcId']}") return response["Vpcs"][0] else: pass def verify_inbound_port( self, vpc: Dict[str, Any], port: int, ip_address: str ) -> Tuple[Dict[str, Any], bool]: """ Verify the default security group of the specified VPC allows ingress from this computer. This can be done by allowing ingress from this computer's IP address. In some situations, such as connecting from a corporate network, you must instead specify a prefix list ID. You can also temporarily open the port to any IP address while running this example. If you do, be sure to remove public access when you're done. :param vpc: The VPC used by this example. :param port: The port to verify. :param ip_address: This computer's IP address. :return: The default security group of the specified VPC, and a value that indicates whether the specified port is open. """ try: response = self.ec2_client.describe_security_groups( Filters=[ {"Name": "group-name", "Values": ["default"]}, {"Name": "vpc-id", "Values": [vpc["VpcId"]]}, ] ) sec_group = response["SecurityGroups"][0] port_is_open = False log.info(f"Found default security group {sec_group['GroupId']}.") for ip_perm in sec_group["IpPermissions"]: if ip_perm.get("FromPort", 0) == port: log.info(f"Found inbound rule: {ip_perm}") for ip_range in ip_perm["IpRanges"]: cidr = ip_range.get("CidrIp", "") if cidr.startswith(ip_address) or cidr == "0.0.0.0/0": port_is_open = True if ip_perm["PrefixListIds"]: port_is_open = True if not port_is_open: log.info( f"The inbound rule does not appear to be open to either this computer's IP " f"address of {ip_address}, to all IP addresses (0.0.0.0/0), or to a prefix list ID." ) else: break except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to verify inbound rule for port {port} for VPC {vpc['VpcId']}." ) if error_code == "InvalidVpcID.NotFound": log.error( f"The specified VPC ID '{vpc['VpcId']}' does not exist. Please check the VPC ID." ) log.error(f"Full error:\n\t{err}") else: return sec_group, port_is_open def open_inbound_port(self, sec_group_id: str, port: int, ip_address: str) -> None: """ Add an ingress rule to the specified security group that allows access on the specified port from the specified IP address. :param sec_group_id: The ID of the security group to modify. :param port: The port to open. :param ip_address: The IP address that is granted access. """ try: self.ec2_client.authorize_security_group_ingress( GroupId=sec_group_id, CidrIp=f"{ip_address}/32", FromPort=port, ToPort=port, IpProtocol="tcp", ) log.info( "Authorized ingress to %s on port %s from %s.", sec_group_id, port, ip_address, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to authorize ingress to security group '{sec_group_id}' on port {port} from {ip_address}." ) if error_code == "InvalidGroupId.Malformed": log.error( "The security group ID is malformed. " "Please verify that the security group ID is correct." ) elif error_code == "InvalidPermission.Duplicate": log.error( "The specified rule already exists in the security group. " "Check the existing rules for this security group." ) log.error(f"Full error:\n\t{err}") def get_subnets(self, vpc_id: str, zones: List[str] = None) -> List[Dict[str, Any]]: """ Gets the default subnets in a VPC for a specified list of Availability Zones. :param vpc_id: The ID of the VPC to look up. :param zones: The list of Availability Zones to look up. :return: The list of subnets found. """ # Ensure that 'zones' is a list, even if None is passed if zones is None: zones = [] try: paginator = self.ec2_client.get_paginator("describe_subnets") page_iterator = paginator.paginate( Filters=[ {"Name": "vpc-id", "Values": [vpc_id]}, {"Name": "availability-zone", "Values": zones}, {"Name": "default-for-az", "Values": ["true"]}, ] ) subnets = [] for page in page_iterator: subnets.extend(page["Subnets"]) log.info("Found %s subnets for the specified zones.", len(subnets)) return subnets except ClientError as err: log.error( f"Failed to retrieve subnets for VPC '{vpc_id}' in zones {zones}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidVpcID.NotFound": log.error( "The specified VPC ID does not exist. " "Please check the VPC ID and try again." ) # Add more error-specific handling as needed log.error(f"Full error:\n\t{err}")
Elastic Load Balancing 작업을 래핑하는 클래스를 생성합니다.
class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def create_target_group( self, target_group_name: str, protocol: str, port: int, vpc_id: str ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing target group. The target group specifies how the load balancer forwards requests to instances in the group and how instance health is checked. To speed up this demo, the health check is configured with shortened times and lower thresholds. In production, you might want to decrease the sensitivity of your health checks to avoid unwanted failures. :param target_group_name: The name of the target group to create. :param protocol: The protocol to use to forward requests, such as 'HTTP'. :param port: The port to use to forward requests, such as 80. :param vpc_id: The ID of the VPC in which the load balancer exists. :return: Data about the newly created target group. """ try: response = self.elb_client.create_target_group( Name=target_group_name, Protocol=protocol, Port=port, HealthCheckPath="/healthcheck", HealthCheckIntervalSeconds=10, HealthCheckTimeoutSeconds=5, HealthyThresholdCount=2, UnhealthyThresholdCount=2, VpcId=vpc_id, ) target_group = response["TargetGroups"][0] log.info(f"Created load balancing target group '{target_group_name}'.") return target_group except ClientError as err: log.error( f"Couldn't create load balancing target group '{target_group_name}'." ) error_code = err.response["Error"]["Code"] if error_code == "DuplicateTargetGroupName": log.error( f"Target group name {target_group_name} already exists. " "Check if the target group already exists." "Consider using a different name or deleting the existing target group if appropriate." ) elif error_code == "TooManyTargetGroups": log.error( "Too many target groups exist in the account. " "Consider deleting unused target groups to create space for new ones." ) log.error(f"Full error:\n\t{err}") def delete_target_group(self, target_group_name) -> None: """ Deletes the target group. """ try: # Describe the target group to get its ARN response = self.elb_client.describe_target_groups(Names=[target_group_name]) tg_arn = response["TargetGroups"][0]["TargetGroupArn"] # Delete the target group self.elb_client.delete_target_group(TargetGroupArn=tg_arn) log.info("Deleted load balancing target group %s.", target_group_name) # Use a custom waiter to wait until the target group is no longer available self.wait_for_target_group_deletion(self.elb_client, tg_arn) log.info("Target group %s successfully deleted.", target_group_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete target group '{target_group_name}'.") if error_code == "TargetGroupNotFound": log.error( "Load balancer target group either already deleted or never existed. " "Verify the name and check that the resource exists in the AWS Console." ) elif error_code == "ResourceInUseException": log.error( "Target group still in use by another resource. " "Ensure that the target group is no longer associated with any load balancers or resources.", ) log.error(f"Full error:\n\t{err}") def wait_for_target_group_deletion( self, elb_client, target_group_arn, max_attempts=10, delay=30 ): for attempt in range(max_attempts): try: elb_client.describe_target_groups(TargetGroupArns=[target_group_arn]) print( f"Attempt {attempt + 1}: Target group {target_group_arn} still exists." ) except ClientError as e: if e.response["Error"]["Code"] == "TargetGroupNotFound": print( f"Target group {target_group_arn} has been successfully deleted." ) return else: raise time.sleep(delay) raise TimeoutError( f"Target group {target_group_arn} was not deleted after {max_attempts * delay} seconds." ) def create_load_balancer( self, load_balancer_name: str, subnet_ids: List[str], ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing load balancer that uses the specified subnets and forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create. :param subnet_ids: A list of subnets to associate with the load balancer. :return: Data about the newly created load balancer. """ try: response = self.elb_client.create_load_balancer( Name=load_balancer_name, Subnets=subnet_ids ) load_balancer = response["LoadBalancers"][0] log.info(f"Created load balancer '{load_balancer_name}'.") waiter = self.elb_client.get_waiter("load_balancer_available") log.info( f"Waiting for load balancer '{load_balancer_name}' to be available..." ) waiter.wait(Names=[load_balancer_name]) log.info(f"Load balancer '{load_balancer_name}' is now available!") except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to create load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "DuplicateLoadBalancerNameException": log.error( f"A load balancer with the name '{load_balancer_name}' already exists. " "Load balancer names must be unique within the AWS region. " "Please choose a different name and try again." ) if error_code == "TooManyLoadBalancersException": log.error( "The maximum number of load balancers has been reached in this account and region. " "You can delete unused load balancers or request an increase in the service quota from AWS Support." ) log.error(f"Full error:\n\t{err}") else: return load_balancer def create_listener( self, load_balancer_name: str, target_group: Dict[str, Any], ) -> Dict[str, Any]: """ Creates a listener for the specified load balancer that forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create a listener for. :param target_group: An existing target group that is added as a listener to the load balancer. :return: Data about the newly created listener. """ try: # Retrieve the load balancer ARN load_balancer_response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) load_balancer_arn = load_balancer_response["LoadBalancers"][0][ "LoadBalancerArn" ] # Create the listener response = self.elb_client.create_listener( LoadBalancerArn=load_balancer_arn, Protocol=target_group["Protocol"], Port=target_group["Port"], DefaultActions=[ { "Type": "forward", "TargetGroupArn": target_group["TargetGroupArn"], } ], ) log.info( f"Created listener to forward traffic from load balancer '{load_balancer_name}' to target group '{target_group['TargetGroupName']}'." ) return response["Listeners"][0] except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to add a listener on '{load_balancer_name}' for target group '{target_group['TargetGroupName']}'." ) if error_code == "ListenerNotFoundException": log.error( f"The listener could not be found for the load balancer '{load_balancer_name}'. " "Please check the load balancer name and target group configuration." ) if error_code == "InvalidConfigurationRequestException": log.error( f"The configuration provided for the listener on load balancer '{load_balancer_name}' is invalid. " "Please review the provided protocol, port, and target group settings." ) log.error(f"Full error:\n\t{err}") def delete_load_balancer(self, load_balancer_name) -> None: """ Deletes a load balancer. :param load_balancer_name: The name of the load balancer to delete. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) lb_arn = response["LoadBalancers"][0]["LoadBalancerArn"] self.elb_client.delete_load_balancer(LoadBalancerArn=lb_arn) log.info("Deleted load balancer %s.", load_balancer_name) waiter = self.elb_client.get_waiter("load_balancers_deleted") log.info("Waiting for load balancer to be deleted...") waiter.wait(Names=[load_balancer_name]) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Couldn't delete load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "LoadBalancerNotFoundException": log.error( f"The load balancer '{load_balancer_name}' does not exist. " "Please check the name and try again." ) log.error(f"Full error:\n\t{err}") def get_endpoint(self, load_balancer_name) -> str: """ Gets the HTTP endpoint of the load balancer. :return: The endpoint. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) return response["LoadBalancers"][0]["DNSName"] except ClientError as err: log.error( f"Couldn't get the endpoint for load balancer {load_balancer_name}" ) error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Verify load balancer name and ensure it exists in the AWS console." ) log.error(f"Full error:\n\t{err}") @staticmethod def verify_load_balancer_endpoint(endpoint) -> bool: """ Verify this computer can successfully send a GET request to the load balancer endpoint. :param endpoint: The endpoint to verify. :return: True if the GET request is successful, False otherwise. """ retries = 3 verified = False while not verified and retries > 0: try: lb_response = requests.get(f"http://{endpoint}") log.info( "Got response %s from load balancer endpoint.", lb_response.status_code, ) if lb_response.status_code == 200: verified = True else: retries = 0 except requests.exceptions.ConnectionError: log.info( "Got connection error from load balancer endpoint, retrying..." ) retries -= 1 time.sleep(10) return verified def check_target_health(self, target_group_name: str) -> List[Dict[str, Any]]: """ Checks the health of the instances in the target group. :return: The health status of the target group. """ try: tg_response = self.elb_client.describe_target_groups( Names=[target_group_name] ) health_response = self.elb_client.describe_target_health( TargetGroupArn=tg_response["TargetGroups"][0]["TargetGroupArn"] ) except ClientError as err: log.error(f"Couldn't check health of {target_group_name} target(s).") error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Load balancer associated with the target group was not found. " "Ensure the load balancer exists, is in the correct AWS region, and " "that you have the necessary permissions to access it.", ) elif error_code == "TargetGroupNotFoundException": log.error( "Target group was not found. " "Verify the target group name, check that it exists in the correct region, " "and ensure it has not been deleted or created in a different account.", ) log.error(f"Full error:\n\t{err}") else: return health_response["TargetHealthDescriptions"]
DynamoDB를 사용하여 추천 서비스를 시뮬레이션하는 클래스를 생성합니다.
class RecommendationService: """ Encapsulates a DynamoDB table to use as a service that recommends books, movies, and songs. """ def __init__(self, table_name: str, dynamodb_client: boto3.client): """ Initializes the RecommendationService class with the necessary parameters. :param table_name: The name of the DynamoDB recommendations table. :param dynamodb_client: A Boto3 DynamoDB client. """ self.table_name = table_name self.dynamodb_client = dynamodb_client def create(self) -> Dict[str, Any]: """ Creates a DynamoDB table to use as a recommendation service. The table has a hash key named 'MediaType' that defines the type of media recommended, such as Book or Movie, and a range key named 'ItemId' that, combined with the MediaType, forms a unique identifier for the recommended item. :return: Data about the newly created table. :raises RecommendationServiceError: If the table creation fails. """ try: response = self.dynamodb_client.create_table( TableName=self.table_name, AttributeDefinitions=[ {"AttributeName": "MediaType", "AttributeType": "S"}, {"AttributeName": "ItemId", "AttributeType": "N"}, ], KeySchema=[ {"AttributeName": "MediaType", "KeyType": "HASH"}, {"AttributeName": "ItemId", "KeyType": "RANGE"}, ], ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5}, ) log.info("Creating table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_exists") waiter.wait(TableName=self.table_name) log.info("Table %s created.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceInUseException": log.info("Table %s exists, nothing to be done.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when creating table: {err}." ) else: return response def populate(self, data_file: str) -> None: """ Populates the recommendations table from a JSON file. :param data_file: The path to the data file. :raises RecommendationServiceError: If the table population fails. """ try: with open(data_file) as data: items = json.load(data) batch = [{"PutRequest": {"Item": item}} for item in items] self.dynamodb_client.batch_write_item(RequestItems={self.table_name: batch}) log.info( "Populated table %s with items from %s.", self.table_name, data_file ) except ClientError as err: raise RecommendationServiceError( self.table_name, f"Couldn't populate table from {data_file}: {err}" ) def destroy(self) -> None: """ Deletes the recommendations table. :raises RecommendationServiceError: If the table deletion fails. """ try: self.dynamodb_client.delete_table(TableName=self.table_name) log.info("Deleting table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_not_exists") waiter.wait(TableName=self.table_name) log.info("Table %s deleted.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": log.info("Table %s does not exist, nothing to do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when deleting table: {err}." )
Systems Manager 작업을 래핑하는 클래스를 생성합니다.
class ParameterHelper: """ Encapsulates Systems Manager parameters. This example uses these parameters to drive the demonstration of resilient architecture, such as failure of a dependency or how the service responds to a health check. """ table: str = "doc-example-resilient-architecture-table" failure_response: str = "doc-example-resilient-architecture-failure-response" health_check: str = "doc-example-resilient-architecture-health-check" def __init__(self, table_name: str, ssm_client: boto3.client): """ Initializes the ParameterHelper class with the necessary parameters. :param table_name: The name of the DynamoDB table that is used as a recommendation service. :param ssm_client: A Boto3 Systems Manager client. """ self.ssm_client = ssm_client self.table_name = table_name def reset(self) -> None: """ Resets the Systems Manager parameters to starting values for the demo. These are the name of the DynamoDB recommendation table, no response when a dependency fails, and shallow health checks. """ self.put(self.table, self.table_name) self.put(self.failure_response, "none") self.put(self.health_check, "shallow") def put(self, name: str, value: str) -> None: """ Sets the value of a named Systems Manager parameter. :param name: The name of the parameter. :param value: The new value of the parameter. :raises ParameterHelperError: If the parameter value cannot be set. """ try: self.ssm_client.put_parameter( Name=name, Value=value, Overwrite=True, Type="String" ) log.info("Setting parameter %s to '%s'.", name, value) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to set parameter {name}.") if error_code == "ParameterLimitExceeded": log.error( "The parameter limit has been exceeded. " "Consider deleting unused parameters or request a limit increase." ) elif error_code == "ParameterAlreadyExists": log.error( "The parameter already exists and overwrite is set to False. " "Use Overwrite=True to update the parameter." ) log.error(f"Full error:\n\t{err}")
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 다음 주제를 참조하십시오.
-
다음 코드 예제에서는 사용자를 생성하고 정책을 사용자에게 연결하는 방법을 보여줍니다.
주의
보안 위험을 방지하려면 목적별 소프트웨어를 개발하거나 실제 데이터로 작업할 때 IAM 사용자를 인증에 사용하지 마세요. 대신 AWS IAM Identity Center과 같은 보안 인증 공급자를 통한 페더레이션을 사용하십시오.
두 IAM 사용자를 생성합니다.
Amazon S3 버킷의 객체를 가져와 넣기 위한 정책을 한 명의 사용자에게 연결합니다.
두 번째 사용자가 버킷에서 객체를 가져오도록 하기 위한 정책을 연결합니다.
사용자 보안 인증에 따라 버킷에 대해 서로 다른 권한이 부여됩니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. IAM 사용자 작업을 래핑하는 함수를 생성합니다.
import logging import time import boto3 from botocore.exceptions import ClientError import access_key_wrapper import policy_wrapper logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_user(user_name): """ Creates a user. By default, a user has no permissions or access keys. :param user_name: The name of the user. :return: The newly created user. """ try: user = iam.create_user(UserName=user_name) logger.info("Created user %s.", user.name) except ClientError: logger.exception("Couldn't create user %s.", user_name) raise else: return user def update_user(user_name, new_user_name): """ Updates a user's name. :param user_name: The current name of the user to update. :param new_user_name: The new name to assign to the user. :return: The updated user. """ try: user = iam.User(user_name) user.update(NewUserName=new_user_name) logger.info("Renamed %s to %s.", user_name, new_user_name) except ClientError: logger.exception("Couldn't update name for user %s.", user_name) raise return user def list_users(): """ Lists the users in the current account. :return: The list of users. """ try: users = list(iam.users.all()) logger.info("Got %s users.", len(users)) except ClientError: logger.exception("Couldn't get users.") raise else: return users def delete_user(user_name): """ Deletes a user. Before a user can be deleted, all associated resources, such as access keys and policies, must be deleted or detached. :param user_name: The name of the user. """ try: iam.User(user_name).delete() logger.info("Deleted user %s.", user_name) except ClientError: logger.exception("Couldn't delete user %s.", user_name) raise def attach_policy(user_name, policy_arn): """ Attaches a policy to a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to user %s.", policy_arn, user_name) except ClientError: logger.exception("Couldn't attach policy %s to user %s.", policy_arn, user_name) raise def detach_policy(user_name, policy_arn): """ Detaches a policy from a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from user %s.", policy_arn, user_name) except ClientError: logger.exception( "Couldn't detach policy %s from user %s.", policy_arn, user_name ) raise
IAM 정책 작업을 래핑하는 함수를 생성합니다.
import json import logging import operator import pprint import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise
IAM 액세스 키 작업을 래핑하는 함수를 생성합니다.
import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise
래퍼 함수를 사용하여 다른 정책을 가진 사용자를 생성하고 자격 증명을 사용하여 Amazon S3 버킷에 액세스합니다.
def usage_demo(): """ Shows how to manage users, keys, and policies. This demonstration creates two users: one user who can put and get objects in an Amazon S3 bucket, and another user who can only get objects from the bucket. The demo then shows how the users can perform only the actions they are permitted to perform. """ logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management user demo.") print("-" * 88) print( "Users can have policies and roles attached to grant them specific " "permissions." ) s3 = boto3.resource("s3") bucket = s3.create_bucket( Bucket=f"demo-iam-bucket-{time.time_ns()}", CreateBucketConfiguration={ "LocationConstraint": s3.meta.client.meta.region_name }, ) print(f"Created an Amazon S3 bucket named {bucket.name}.") user_read_writer = create_user("demo-iam-read-writer") user_reader = create_user("demo-iam-reader") print(f"Created two IAM users: {user_read_writer.name} and {user_reader.name}") update_user(user_read_writer.name, "demo-iam-creator") update_user(user_reader.name, "demo-iam-getter") users = list_users() user_read_writer = next( user for user in users if user.user_id == user_read_writer.user_id ) user_reader = next(user for user in users if user.user_id == user_reader.user_id) print( f"Changed the names of the users to {user_read_writer.name} " f"and {user_reader.name}." ) read_write_policy = policy_wrapper.create_policy( "demo-iam-read-write-policy", "Grants rights to create and get an object in the demo bucket.", ["s3:PutObject", "s3:GetObject"], f"arn:aws:s3:::{bucket.name}/*", ) print( f"Created policy {read_write_policy.policy_name} with ARN: {read_write_policy.arn}" ) print(read_write_policy.description) read_policy = policy_wrapper.create_policy( "demo-iam-read-policy", "Grants rights to get an object from the demo bucket.", "s3:GetObject", f"arn:aws:s3:::{bucket.name}/*", ) print(f"Created policy {read_policy.policy_name} with ARN: {read_policy.arn}") print(read_policy.description) attach_policy(user_read_writer.name, read_write_policy.arn) print(f"Attached {read_write_policy.policy_name} to {user_read_writer.name}.") attach_policy(user_reader.name, read_policy.arn) print(f"Attached {read_policy.policy_name} to {user_reader.name}.") user_read_writer_key = access_key_wrapper.create_key(user_read_writer.name) print(f"Created access key pair for {user_read_writer.name}.") user_reader_key = access_key_wrapper.create_key(user_reader.name) print(f"Created access key pair for {user_reader.name}.") s3_read_writer_resource = boto3.resource( "s3", aws_access_key_id=user_read_writer_key.id, aws_secret_access_key=user_read_writer_key.secret, ) demo_object_key = f"object-{time.time_ns()}" demo_object = None while demo_object is None: try: demo_object = s3_read_writer_resource.Bucket(bucket.name).put_object( Key=demo_object_key, Body=b"AWS IAM demo object content!" ) except ClientError as error: if error.response["Error"]["Code"] == "InvalidAccessKeyId": print("Access key not yet available. Waiting...") time.sleep(1) else: raise print( f"Put {demo_object_key} into {bucket.name} using " f"{user_read_writer.name}'s credentials." ) read_writer_object = s3_read_writer_resource.Bucket(bucket.name).Object( demo_object_key ) read_writer_content = read_writer_object.get()["Body"].read() print(f"Got object {read_writer_object.key} using read-writer user's credentials.") print(f"Object content: {read_writer_content}") s3_reader_resource = boto3.resource( "s3", aws_access_key_id=user_reader_key.id, aws_secret_access_key=user_reader_key.secret, ) demo_content = None while demo_content is None: try: demo_object = s3_reader_resource.Bucket(bucket.name).Object(demo_object_key) demo_content = demo_object.get()["Body"].read() print(f"Got object {demo_object.key} using reader user's credentials.") print(f"Object content: {demo_content}") except ClientError as error: if error.response["Error"]["Code"] == "InvalidAccessKeyId": print("Access key not yet available. Waiting...") time.sleep(1) else: raise try: demo_object.delete() except ClientError as error: if error.response["Error"]["Code"] == "AccessDenied": print("-" * 88) print( "Tried to delete the object using the reader user's credentials. " "Got expected AccessDenied error because the reader is not " "allowed to delete objects." ) print("-" * 88) access_key_wrapper.delete_key(user_reader.name, user_reader_key.id) detach_policy(user_reader.name, read_policy.arn) policy_wrapper.delete_policy(read_policy.arn) delete_user(user_reader.name) print(f"Deleted keys, detached and deleted policy, and deleted {user_reader.name}.") access_key_wrapper.delete_key(user_read_writer.name, user_read_writer_key.id) detach_policy(user_read_writer.name, read_write_policy.arn) policy_wrapper.delete_policy(read_write_policy.arn) delete_user(user_read_writer.name) print( f"Deleted keys, detached and deleted policy, and deleted {user_read_writer.name}." ) bucket.objects.delete() bucket.delete() print(f"Emptied and deleted {bucket.name}.") print("Thanks for watching!")
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 다음 주제를 참조하십시오.
-
다음 코드 예제에서는 액세스 키를 관리하는 방법을 보여줍니다.
주의
보안 위험을 방지하려면 목적별 소프트웨어를 개발하거나 실제 데이터로 작업할 때 IAM 사용자를 인증에 사용하지 마세요. 대신 AWS IAM Identity Center과 같은 보안 인증 공급자를 통한 페더레이션을 사용하십시오.
액세스 키를 생성하고 나열합니다.
액세스 키가 마지막으로 사용된 시기 및 방법을 조회합니다.
액세스 키를 업데이트 및 삭제합니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. IAM 액세스 키 작업을 래핑하는 함수를 생성합니다.
import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def list_keys(user_name): """ Lists the keys owned by the specified user. :param user_name: The name of the user. :return: The list of keys owned by the user. """ try: keys = list(iam.User(user_name).access_keys.all()) logger.info("Got %s access keys for %s.", len(keys), user_name) except ClientError: logger.exception("Couldn't get access keys for %s.", user_name) raise else: return keys def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair def get_last_use(key_id): """ Gets information about when and how a key was last used. :param key_id: The ID of the key to look up. :return: Information about the key's last use. """ try: response = iam.meta.client.get_access_key_last_used(AccessKeyId=key_id) last_used_date = response["AccessKeyLastUsed"].get("LastUsedDate", None) last_service = response["AccessKeyLastUsed"].get("ServiceName", None) logger.info( "Key %s was last used by %s on %s to access %s.", key_id, response["UserName"], last_used_date, last_service, ) except ClientError: logger.exception("Couldn't get last use of key %s.", key_id) raise else: return response def update_key(user_name, key_id, activate): """ Updates the status of a key. :param user_name: The user that owns the key. :param key_id: The ID of the key to update. :param activate: When True, the key is activated. Otherwise, the key is deactivated. """ try: key = iam.User(user_name).AccessKey(key_id) if activate: key.activate() else: key.deactivate() logger.info("%s key %s.", "Activated" if activate else "Deactivated", key_id) except ClientError: logger.exception( "Couldn't %s key %s.", "Activate" if activate else "Deactivate", key_id ) raise def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise
래퍼 함수를 사용하여 현재 사용자에 대한 액세스 키 작업을 수행합니다.
def usage_demo(): """Shows how to create and manage access keys.""" def print_keys(): """Gets and prints the current keys for a user.""" current_keys = list_keys(current_user_name) print("The current user's keys are now:") print(*[f"{key.id}: {key.status}" for key in current_keys], sep="\n") logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management access key demo.") print("-" * 88) current_user_name = iam.CurrentUser().user_name print( f"This demo creates an access key for the current user " f"({current_user_name}), manipulates the key in a few ways, and then " f"deletes it." ) all_keys = list_keys(current_user_name) if len(all_keys) == 2: print( "The current user already has the maximum of 2 access keys. To run " "this demo, either delete one of the access keys or use a user " "that has only 1 access key." ) else: new_key = create_key(current_user_name) print(f"Created a new key with id {new_key.id} and secret {new_key.secret}.") print_keys() existing_key = next(key for key in all_keys if key != new_key) last_use = get_last_use(existing_key.id)["AccessKeyLastUsed"] print( f"Key {all_keys[0].id} was last used to access {last_use['ServiceName']} " f"on {last_use['LastUsedDate']}" ) update_key(current_user_name, new_key.id, False) print(f"Key {new_key.id} is now deactivated.") print_keys() delete_key(current_user_name, new_key.id) print_keys() print("Thanks for watching!")
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 다음 주제를 참조하십시오.
-
다음 코드 예제에서는 다음과 같은 작업을 수행하는 방법을 보여줍니다.
정책을 생성하고 나열합니다.
정책 버전을 생성하고 가져옵니다.
정책을 이전 버전으로 롤백합니다.
정책을 삭제합니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. IAM 정책 작업을 래핑하는 함수를 생성합니다.
import json import logging import operator import pprint import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy def list_policies(scope): """ Lists the policies in the current account. :param scope: Limits the kinds of policies that are returned. For example, 'Local' specifies that only locally managed policies are returned. :return: The list of policies. """ try: policies = list(iam.policies.filter(Scope=scope)) logger.info("Got %s policies in scope '%s'.", len(policies), scope) except ClientError: logger.exception("Couldn't get policies for scope '%s'.", scope) raise else: return policies def create_policy_version(policy_arn, actions, resource_arn, set_as_default): """ Creates a policy version. Policies can have up to five versions. The default version is the one that is used for all resources that reference the policy. :param policy_arn: The ARN of the policy. :param actions: The actions to allow in the policy version. :param resource_arn: The ARN of the resource this policy version applies to. :param set_as_default: When True, this policy version is set as the default version for the policy. Otherwise, the default is not changed. :return: The newly created policy version. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.Policy(policy_arn) policy_version = policy.create_version( PolicyDocument=json.dumps(policy_doc), SetAsDefault=set_as_default ) logger.info( "Created policy version %s for policy %s.", policy_version.version_id, policy_version.arn, ) except ClientError: logger.exception("Couldn't create a policy version for %s.", policy_arn) raise else: return policy_version def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement def rollback_policy_version(policy_arn): """ Rolls back to the previous default policy, if it exists. 1. Gets the list of policy versions in order by date. 2. Finds the default. 3. Makes the previous policy the default. 4. Deletes the old default version. :param policy_arn: The ARN of the policy to roll back. :return: The default version of the policy after the rollback. """ try: policy_versions = sorted( iam.Policy(policy_arn).versions.all(), key=operator.attrgetter("create_date"), ) logger.info("Got %s versions for %s.", len(policy_versions), policy_arn) except ClientError: logger.exception("Couldn't get versions for %s.", policy_arn) raise default_version = None rollback_version = None try: while default_version is None: ver = policy_versions.pop() if ver.is_default_version: default_version = ver rollback_version = policy_versions.pop() rollback_version.set_as_default() logger.info("Set %s as the default version.", rollback_version.version_id) default_version.delete() logger.info("Deleted original default version %s.", default_version.version_id) except IndexError: if default_version is None: logger.warning("No default version found for %s.", policy_arn) elif rollback_version is None: logger.warning( "Default version %s found for %s, but no previous version exists, so " "nothing to roll back to.", default_version.version_id, policy_arn, ) except ClientError: logger.exception("Couldn't roll back version for %s.", policy_arn) raise else: return rollback_version def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise
래퍼 함수를 사용하여 정책을 생성하고, 버전을 업데이트하고, 정책에 대한 정보를 얻습니다.
def usage_demo(): """Shows how to use the policy functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management policy demo.") print("-" * 88) print( "Policies let you define sets of permissions that can be attached to " "other IAM resources, like users and roles." ) bucket_arn = f"arn:aws:s3:::made-up-bucket-name" policy = create_policy( "demo-iam-policy", "Policy for IAM demonstration.", ["s3:ListObjects"], bucket_arn, ) print(f"Created policy {policy.policy_name}.") policies = list_policies("Local") print(f"Your account has {len(policies)} managed policies:") print(*[pol.policy_name for pol in policies], sep=", ") time.sleep(1) policy_version = create_policy_version( policy.arn, ["s3:PutObject"], bucket_arn, True ) print( f"Added policy version {policy_version.version_id} to policy " f"{policy.policy_name}." ) default_statement = get_default_policy_statement(policy.arn) print(f"The default policy statement for {policy.policy_name} is:") pprint.pprint(default_statement) rollback_version = rollback_policy_version(policy.arn) print( f"Rolled back to version {rollback_version.version_id} for " f"{policy.policy_name}." ) default_statement = get_default_policy_statement(policy.arn) print(f"The default policy statement for {policy.policy_name} is now:") pprint.pprint(default_statement) delete_policy(policy.arn) print(f"Deleted policy {policy.policy_name}.") print("Thanks for watching!")
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 다음 주제를 참조하십시오.
-
다음 코드 예제에서는 다음과 같은 작업을 수행하는 방법을 보여줍니다.
IAM 역할을 생성합니다.
역할에 대한 정책을 연결 및 분리합니다.
역할을 삭제합니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. IAM 역할 작업을 래핑하는 함수를 생성합니다.
import json import logging import pprint import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_role(role_name, allowed_services): """ Creates a role that lets a list of specified services assume the role. :param role_name: The name of the role. :param allowed_services: The services that can assume the role. :return: The newly created role. """ trust_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": service}, "Action": "sts:AssumeRole", } for service in allowed_services ], } try: role = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy) ) logger.info("Created role %s.", role.name) except ClientError: logger.exception("Couldn't create role %s.", role_name) raise else: return role def attach_policy(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise def detach_policy(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise def delete_role(role_name): """ Deletes a role. :param role_name: The name of the role to delete. """ try: iam.Role(role_name).delete() logger.info("Deleted role %s.", role_name) except ClientError: logger.exception("Couldn't delete role %s.", role_name) raise
래퍼 함수를 사용하여 역할을 생성한 다음 정책을 연결하고 분리합니다.
def usage_demo(): """Shows how to use the role functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management role demo.") print("-" * 88) print( "Roles let you define sets of permissions and can be assumed by " "other entities, like users and services." ) print("The first 10 roles currently in your account are:") roles = list_roles(10) print(f"The inline policies for role {roles[0].name} are:") list_policies(roles[0].name) role = create_role( "demo-iam-role", ["lambda.amazonaws.com", "batchoperations.s3.amazonaws.com"] ) print(f"Created role {role.name}, with trust policy:") pprint.pprint(role.assume_role_policy_document) policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess" attach_policy(role.name, policy_arn) print(f"Attached policy {policy_arn} to {role.name}.") print(f"Policies attached to role {role.name} are:") list_attached_policies(role.name) detach_policy(role.name, policy_arn) print(f"Detached policy {policy_arn} from {role.name}.") delete_role(role.name) print(f"Deleted {role.name}.") print("Thanks for watching!")
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 다음 주제를 참조하십시오.
-
다음 코드 예제에서는 다음과 같은 작업을 수행하는 방법을 보여줍니다.
계정 별칭을 가져오고 업데이트합니다.
사용자 및 보안 인증에 대한 보고서를 생성합니다.
계정 사용량 요약을 가져옵니다.
서로의 관계를 포함하여 계정의 모든 사용자, 그룹, 역할 및 정책에 대한 세부 정보를 가져옵니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. IAM 계정 작업을 래핑하는 함수를 생성합니다.
import logging import pprint import sys import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def list_aliases(): """ Gets the list of aliases for the current account. An account has at most one alias. :return: The list of aliases for the account. """ try: response = iam.meta.client.list_account_aliases() aliases = response["AccountAliases"] if len(aliases) > 0: logger.info("Got aliases for your account: %s.", ",".join(aliases)) else: logger.info("Got no aliases for your account.") except ClientError: logger.exception("Couldn't list aliases for your account.") raise else: return response["AccountAliases"] def create_alias(alias): """ Creates an alias for the current account. The alias can be used in place of the account ID in the sign-in URL. An account can have only one alias. When a new alias is created, it replaces any existing alias. :param alias: The alias to assign to the account. """ try: iam.create_account_alias(AccountAlias=alias) logger.info("Created an alias '%s' for your account.", alias) except ClientError: logger.exception("Couldn't create alias '%s' for your account.", alias) raise def delete_alias(alias): """ Removes the alias from the current account. :param alias: The alias to remove. """ try: iam.meta.client.delete_account_alias(AccountAlias=alias) logger.info("Removed alias '%s' from your account.", alias) except ClientError: logger.exception("Couldn't remove alias '%s' from your account.", alias) raise def generate_credential_report(): """ Starts generation of a credentials report about the current account. After calling this function to generate the report, call get_credential_report to get the latest report. A new report can be generated a minimum of four hours after the last one was generated. """ try: response = iam.meta.client.generate_credential_report() logger.info( "Generating credentials report for your account. " "Current state is %s.", response["State"], ) except ClientError: logger.exception("Couldn't generate a credentials report for your account.") raise else: return response def get_credential_report(): """ Gets the most recently generated credentials report about the current account. :return: The credentials report. """ try: response = iam.meta.client.get_credential_report() logger.debug(response["Content"]) except ClientError: logger.exception("Couldn't get credentials report.") raise else: return response["Content"] def get_summary(): """ Gets a summary of account usage. :return: The summary of account usage. """ try: summary = iam.AccountSummary() logger.debug(summary.summary_map) except ClientError: logger.exception("Couldn't get a summary for your account.") raise else: return summary.summary_map def get_authorization_details(response_filter): """ Gets an authorization detail report for the current account. :param response_filter: A list of resource types to include in the report, such as users or roles. When not specified, all resources are included. :return: The authorization detail report. """ try: account_details = iam.meta.client.get_account_authorization_details( Filter=response_filter ) logger.debug(account_details) except ClientError: logger.exception("Couldn't get details for your account.") raise else: return account_details
래퍼 함수를 호출하여 계정 별칭을 변경하고 계정에 대한 보고서를 가져옵니다.
def usage_demo(): """Shows how to use the account functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management account demo.") print("-" * 88) print( "Setting an account alias lets you use the alias in your sign-in URL " "instead of your account number." ) old_aliases = list_aliases() if len(old_aliases) > 0: print(f"Your account currently uses '{old_aliases[0]}' as its alias.") else: print("Your account currently has no alias.") for index in range(1, 3): new_alias = f"alias-{index}-{time.time_ns()}" print(f"Setting your account alias to {new_alias}") create_alias(new_alias) current_aliases = list_aliases() print(f"Your account alias is now {current_aliases}.") delete_alias(current_aliases[0]) print(f"Your account now has no alias.") if len(old_aliases) > 0: print(f"Restoring your original alias back to {old_aliases[0]}...") create_alias(old_aliases[0]) print("-" * 88) print("You can get various reports about your account.") print("Let's generate a credentials report...") report_state = None while report_state != "COMPLETE": cred_report_response = generate_credential_report() old_report_state = report_state report_state = cred_report_response["State"] if report_state != old_report_state: print(report_state, sep="") else: print(".", sep="") sys.stdout.flush() time.sleep(1) print() cred_report = get_credential_report() col_count = 3 print(f"Got credentials report. Showing only the first {col_count} columns.") cred_lines = [ line.split(",")[:col_count] for line in cred_report.decode("utf-8").split("\n") ] col_width = max([len(item) for line in cred_lines for item in line]) + 2 for line in cred_report.decode("utf-8").split("\n"): print( "".join(element.ljust(col_width) for element in line.split(",")[:col_count]) ) print("-" * 88) print("Let's get an account summary.") summary = get_summary() print("Here's your summary:") pprint.pprint(summary) print("-" * 88) print("Let's get authorization details!") details = get_authorization_details([]) see_details = input("These are pretty long, do you want to see them (y/n)? ") if see_details.lower() == "y": pprint.pprint(details) print("-" * 88) pw_policy_created = None see_pw_policy = input("Want to see the password policy for the account (y/n)? ") if see_pw_policy.lower() == "y": while True: if print_password_policy(): break else: answer = input( "Do you want to create a default password policy (y/n)? " ) if answer.lower() == "y": pw_policy_created = iam.create_account_password_policy() else: break if pw_policy_created is not None: answer = input("Do you want to delete the password policy (y/n)? ") if answer.lower() == "y": pw_policy_created.delete() print("Password policy deleted.") print("The SAML providers for your account are:") list_saml_providers(10) print("-" * 88) print("Thanks for watching.")
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 다음 주제를 참조하십시오.
-
다음 코드 예제에서는 다음과 같은 작업을 수행하는 방법을 보여줍니다.
정책 버전 목록을 날짜별로 순서대로 가져옵니다.
기본 정책 버전을 찾습니다.
이전 정책 버전을 기본값으로 설정합니다.
이전 기본 버전을 삭제합니다.
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리
에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요. def rollback_policy_version(policy_arn): """ Rolls back to the previous default policy, if it exists. 1. Gets the list of policy versions in order by date. 2. Finds the default. 3. Makes the previous policy the default. 4. Deletes the old default version. :param policy_arn: The ARN of the policy to roll back. :return: The default version of the policy after the rollback. """ try: policy_versions = sorted( iam.Policy(policy_arn).versions.all(), key=operator.attrgetter("create_date"), ) logger.info("Got %s versions for %s.", len(policy_versions), policy_arn) except ClientError: logger.exception("Couldn't get versions for %s.", policy_arn) raise default_version = None rollback_version = None try: while default_version is None: ver = policy_versions.pop() if ver.is_default_version: default_version = ver rollback_version = policy_versions.pop() rollback_version.set_as_default() logger.info("Set %s as the default version.", rollback_version.version_id) default_version.delete() logger.info("Deleted original default version %s.", default_version.version_id) except IndexError: if default_version is None: logger.warning("No default version found for %s.", policy_arn) elif rollback_version is None: logger.warning( "Default version %s found for %s, but no previous version exists, so " "nothing to roll back to.", default_version.version_id, policy_arn, ) except ClientError: logger.exception("Couldn't roll back version for %s.", policy_arn) raise else: return rollback_version
-
API 세부 정보는 AWS SDK for Python (Boto3) API 참조의 다음 주제를 참조하십시오.
-