AWS Doc SDK Examples
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
IAM SDK for Python (Boto3) を使用する例
次のコード例は、 AWS SDK for Python (Boto3) で を使用してアクションを実行し、一般的なシナリオを実装する方法を示していますIAM。
「基本」は、重要なオペレーションをサービス内で実行する方法を示すコード例です。
アクションはより大きなプログラムからのコードの抜粋であり、コンテキスト内で実行する必要があります。アクションは個々のサービス機能を呼び出す方法を示していますが、コンテキスト内のアクションは、関連するシナリオで確認できます。
「シナリオ」は、1 つのサービス内から、または他の AWS のサービスと組み合わせて複数の関数を呼び出し、特定のタスクを実行する方法を示すコード例です。
各例には、完全なソースコードへのリンクが含まれています。ここでは、コンテキストでコードを設定および実行する方法の手順を確認できます。
開始方法
次のコード例は、 の使用を開始する方法を示していますIAM。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 import boto3 def main(): """ Lists the managed policies in your AWS account using the AWS SDK for Python (Boto3). """ iam = boto3.client("iam") try: # Get a paginator for the list_policies operation paginator = iam.get_paginator("list_policies") # Iterate through the pages of results for page in paginator.paginate(Scope="All", OnlyAttached=False): for policy in page["Policies"]: print(f"Policy name: {policy['PolicyName']}") print(f" Policy ARN: {policy['Arn']}") except boto3.exceptions.BotoCoreError as e: print(f"Encountered an error while listing policies: {e}") if __name__ == "__main__": main()
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のListPolicies「」の「」を参照してください。
-
基本
次のコードサンプルは、ユーザーを作成してロールを割り当てる方法を示しています。
警告
セキュリティリスクを回避するため、専用ソフトウェアの開発時や実際のデータの使用時に、認証にIAMユーザーを使用しないでください。代わりに、AWS IAM Identity Centerなどの ID プロバイダーとのフェデレーションを使用してください。
権限のないユーザーを作成します。
指定したアカウントに Amazon S3 バケットへのアクセス権限を付与するロールを作成します。
ユーザーにロールを引き受けさせるポリシーを追加します。
ロールを引き受け、一時的な認証情報を使用して S3 バケットを一覧表示しリソースをクリーンアップします。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 Amazon S3 バケットを一覧表示するアクセス許可を付与するIAMユーザーとロールを作成します。ユーザーには、ロールの引き受けのみ権限があります。ロールを引き受けた後、一時的な認証情報を使用してアカウントのバケットを一覧表示します。
import json import sys import time from uuid import uuid4 import boto3 from botocore.exceptions import ClientError def progress_bar(seconds): """Shows a simple progress bar in the command window.""" for _ in range(seconds): time.sleep(1) print(".", end="") sys.stdout.flush() print() def setup(iam_resource): """ Creates a new user with no permissions. Creates an access key pair for the user. Creates a role with a policy that lets the user assume the role. Creates a policy that allows listing Amazon S3 buckets. Attaches the policy to the role. Creates an inline policy for the user that lets the user assume the role. :param iam_resource: A Boto3 AWS Identity and Access Management (IAM) resource that has permissions to create users, roles, and policies in the account. :return: The newly created user, user key, and role. """ try: user = iam_resource.create_user(UserName=f"demo-user-{uuid4()}") print(f"Created user {user.name}.") except ClientError as error: print( f"Couldn't create a user for the demo. Here's why: " f"{error.response['Error']['Message']}" ) raise try: user_key = user.create_access_key_pair() print(f"Created access key pair for user.") except ClientError as error: print( f"Couldn't create access keys for user {user.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise print(f"Wait for user to be ready.", end="") progress_bar(10) try: role = iam_resource.create_role( RoleName=f"demo-role-{uuid4()}", AssumeRolePolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"AWS": user.arn}, "Action": "sts:AssumeRole", } ], } ), ) print(f"Created role {role.name}.") except ClientError as error: print( f"Couldn't create a role for the demo. Here's why: " f"{error.response['Error']['Message']}" ) raise try: policy = iam_resource.create_policy( PolicyName=f"demo-policy-{uuid4()}", PolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "s3:ListAllMyBuckets", "Resource": "arn:aws:s3:::*", } ], } ), ) role.attach_policy(PolicyArn=policy.arn) print(f"Created policy {policy.policy_name} and attached it to the role.") except ClientError as error: print( f"Couldn't create a policy and attach it to role {role.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise try: user.create_policy( PolicyName=f"demo-user-policy-{uuid4()}", PolicyDocument=json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": role.arn, } ], } ), ) print( f"Created an inline policy for {user.name} that lets the user assume " f"the role." ) except ClientError as error: print( f"Couldn't create an inline policy for user {user.name}. Here's why: " f"{error.response['Error']['Message']}" ) raise print("Give AWS time to propagate these new resources and connections.", end="") progress_bar(10) return user, user_key, role def show_access_denied_without_role(user_key): """ Shows that listing buckets without first assuming the role is not allowed. :param user_key: The key of the user created during setup. This user does not have permission to list buckets in the account. """ print(f"Try to list buckets without first assuming the role.") s3_denied_resource = boto3.resource( "s3", aws_access_key_id=user_key.id, aws_secret_access_key=user_key.secret ) try: for bucket in s3_denied_resource.buckets.all(): print(bucket.name) raise RuntimeError("Expected to get AccessDenied error when listing buckets!") except ClientError as error: if error.response["Error"]["Code"] == "AccessDenied": print("Attempt to list buckets with no permissions: AccessDenied.") else: raise def list_buckets_from_assumed_role(user_key, assume_role_arn, session_name): """ Assumes a role that grants permission to list the Amazon S3 buckets in the account. Uses the temporary credentials from the role to list the buckets that are owned by the assumed role's account. :param user_key: The access key of a user that has permission to assume the role. :param assume_role_arn: The Amazon Resource Name (ARN) of the role that grants access to list the other account's buckets. :param session_name: The name of the STS session. """ sts_client = boto3.client( "sts", aws_access_key_id=user_key.id, aws_secret_access_key=user_key.secret ) try: response = sts_client.assume_role( RoleArn=assume_role_arn, RoleSessionName=session_name ) temp_credentials = response["Credentials"] print(f"Assumed role {assume_role_arn} and got temporary credentials.") except ClientError as error: print( f"Couldn't assume role {assume_role_arn}. Here's why: " f"{error.response['Error']['Message']}" ) raise # Create an S3 resource that can access the account with the temporary credentials. s3_resource = boto3.resource( "s3", aws_access_key_id=temp_credentials["AccessKeyId"], aws_secret_access_key=temp_credentials["SecretAccessKey"], aws_session_token=temp_credentials["SessionToken"], ) print(f"Listing buckets for the assumed role's account:") try: for bucket in s3_resource.buckets.all(): print(bucket.name) except ClientError as error: print( f"Couldn't list buckets for the account. Here's why: " f"{error.response['Error']['Message']}" ) raise def teardown(user, role): """ Removes all resources created during setup. :param user: The demo user. :param role: The demo role. """ try: for attached in role.attached_policies.all(): policy_name = attached.policy_name role.detach_policy(PolicyArn=attached.arn) attached.delete() print(f"Detached and deleted {policy_name}.") role.delete() print(f"Deleted {role.name}.") except ClientError as error: print( "Couldn't detach policy, delete policy, or delete role. Here's why: " f"{error.response['Error']['Message']}" ) raise try: for user_pol in user.policies.all(): user_pol.delete() print("Deleted inline user policy.") for key in user.access_keys.all(): key.delete() print("Deleted user's access key.") user.delete() print(f"Deleted {user.name}.") except ClientError as error: print( "Couldn't delete user policy or delete user. Here's why: " f"{error.response['Error']['Message']}" ) def usage_demo(): """Drives the demonstration.""" print("-" * 88) print(f"Welcome to the IAM create user and assume role demo.") print("-" * 88) iam_resource = boto3.resource("iam") user = None role = None try: user, user_key, role = setup(iam_resource) print(f"Created {user.name} and {role.name}.") show_access_denied_without_role(user_key) list_buckets_from_assumed_role(user_key, role.arn, "AssumeRoleDemoSession") except Exception: print("Something went wrong!") finally: if user is not None and role is not None: teardown(user, role) print("Thanks for watching!") if __name__ == "__main__": usage_demo()
-
API 詳細については、 AWS SDK for Python (Boto3) APIリファレンスの の以下のトピックを参照してください。
-
アクション
次の例は、AttachRolePolicy
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 Boto3 Policy オブジェクトを使用して、ロールにポリシーをアタッチします。
def attach_to_role(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Policy(policy_arn).attach_role(RoleName=role_name) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise
Boto3 Role オブジェクトを使用して、ロールにポリシーをアタッチします。
def attach_policy(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のAttachRolePolicy「」の「」を参照してください。
-
次のコード例は、AttachUserPolicy
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def attach_policy(user_name, policy_arn): """ Attaches a policy to a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to user %s.", policy_arn, user_name) except ClientError: logger.exception("Couldn't attach policy %s to user %s.", policy_arn, user_name) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のAttachUserPolicy「」の「」を参照してください。
-
次の例は、CreateAccessKey
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のCreateAccessKey「」の「」を参照してください。
-
次の例は、CreateAccountAlias
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def create_alias(alias): """ Creates an alias for the current account. The alias can be used in place of the account ID in the sign-in URL. An account can have only one alias. When a new alias is created, it replaces any existing alias. :param alias: The alias to assign to the account. """ try: iam.create_account_alias(AccountAlias=alias) logger.info("Created an alias '%s' for your account.", alias) except ClientError: logger.exception("Couldn't create alias '%s' for your account.", alias) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のCreateAccountAlias「」の「」を参照してください。
-
次の例は、CreateInstanceProfile
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 この例では、ポリシー、ロール、インスタンスプロファイルを作成し、それらをすべてリンクします。
class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def create_instance_profile( self, policy_file: str, policy_name: str, role_name: str, profile_name: str, aws_managed_policies: Tuple[str, ...] = (), ) -> str: """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = self.create_policy(policy_file, policy_name) self.create_role(role_name, assume_role_doc) self.attach_policy(role_name, policy_arn, aws_managed_policies) try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) log.error(f"Full error:\n\t{err}") return profile_arn
-
API 詳細については、「 CreateInstanceProfilePython (Boto3) リファレンス」の「」を参照してください。 AWS SDK API
-
次のコード例は、CreatePolicy
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy
-
API 詳細については、「 CreatePolicyPython (Boto3) リファレンス」の「」を参照してください。 AWS SDK API
-
次の例は、CreatePolicyVersion
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def create_policy_version(policy_arn, actions, resource_arn, set_as_default): """ Creates a policy version. Policies can have up to five versions. The default version is the one that is used for all resources that reference the policy. :param policy_arn: The ARN of the policy. :param actions: The actions to allow in the policy version. :param resource_arn: The ARN of the resource this policy version applies to. :param set_as_default: When True, this policy version is set as the default version for the policy. Otherwise, the default is not changed. :return: The newly created policy version. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.Policy(policy_arn) policy_version = policy.create_version( PolicyDocument=json.dumps(policy_doc), SetAsDefault=set_as_default ) logger.info( "Created policy version %s for policy %s.", policy_version.version_id, policy_version.arn, ) except ClientError: logger.exception("Couldn't create a policy version for %s.", policy_arn) raise else: return policy_version
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のCreatePolicyVersion「」の「」を参照してください。
-
次の例は、CreateRole
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def create_role(role_name, allowed_services): """ Creates a role that lets a list of specified services assume the role. :param role_name: The name of the role. :param allowed_services: The services that can assume the role. :return: The newly created role. """ trust_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": service}, "Action": "sts:AssumeRole", } for service in allowed_services ], } try: role = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy) ) logger.info("Created role %s.", role.name) except ClientError: logger.exception("Couldn't create role %s.", role_name) raise else: return role
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のCreateRole「」の「」を参照してください。
-
次のコード例は、CreateServiceLinkedRole
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def create_service_linked_role(service_name, description): """ Creates a service-linked role. :param service_name: The name of the service that owns the role. :param description: A description to give the role. :return: The newly created role. """ try: response = iam.meta.client.create_service_linked_role( AWSServiceName=service_name, Description=description ) role = iam.Role(response["Role"]["RoleName"]) logger.info("Created service-linked role %s.", role.name) except ClientError: logger.exception("Couldn't create service-linked role for %s.", service_name) raise else: return role
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のCreateServiceLinkedRole「」の「」を参照してください。
-
次のコード例は、CreateUser
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def create_user(user_name): """ Creates a user. By default, a user has no permissions or access keys. :param user_name: The name of the user. :return: The newly created user. """ try: user = iam.create_user(UserName=user_name) logger.info("Created user %s.", user.name) except ClientError: logger.exception("Couldn't create user %s.", user_name) raise else: return user
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のCreateUser「」の「」を参照してください。
-
次のコード例は、DeleteAccessKey
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のDeleteAccessKey「」の「」を参照してください。
-
次のコード例は、DeleteAccountAlias
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def delete_alias(alias): """ Removes the alias from the current account. :param alias: The alias to remove. """ try: iam.meta.client.delete_account_alias(AccountAlias=alias) logger.info("Removed alias '%s' from your account.", alias) except ClientError: logger.exception("Couldn't remove alias '%s' from your account.", alias) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のDeleteAccountAlias「」の「」を参照してください。
-
次のコード例は、DeleteInstanceProfile
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 この例では、インスタンスプロファイルからロールを削除し、ロールにアタッチされているすべてのポリシーをデタッチし、すべてのリソースを削除します。
class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def delete_instance_profile(self, profile_name: str, role_name: str) -> None: """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: log.error( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name )
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のDeleteInstanceProfile「」の「」を参照してください。
-
次のコード例は、DeletePolicy
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のDeletePolicy「」の「」を参照してください。
-
次の例は、DeleteRole
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def delete_role(role_name): """ Deletes a role. :param role_name: The name of the role to delete. """ try: iam.Role(role_name).delete() logger.info("Deleted role %s.", role_name) except ClientError: logger.exception("Couldn't delete role %s.", role_name) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のDeleteRole「」の「」を参照してください。
-
次のコード例は、DeleteUser
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def delete_user(user_name): """ Deletes a user. Before a user can be deleted, all associated resources, such as access keys and policies, must be deleted or detached. :param user_name: The name of the user. """ try: iam.User(user_name).delete() logger.info("Deleted user %s.", user_name) except ClientError: logger.exception("Couldn't delete user %s.", user_name) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のDeleteUser「」の「」を参照してください。
-
次の例は、DetachRolePolicy
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 Boto3 Policy オブジェクトを使用して、ロールからポリシーをデタッチします。
def detach_from_role(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Policy(policy_arn).detach_role(RoleName=role_name) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise
Boto3 Role オブジェクトを使用して、ロールからポリシーをデタッチします。
def detach_policy(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のDetachRolePolicy「」の「」を参照してください。
-
次のコード例は、DetachUserPolicy
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def detach_policy(user_name, policy_arn): """ Detaches a policy from a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from user %s.", policy_arn, user_name) except ClientError: logger.exception( "Couldn't detach policy %s from user %s.", policy_arn, user_name ) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のDetachUserPolicy「」の「」を参照してください。
-
次の例は、GenerateCredentialReport
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def generate_credential_report(): """ Starts generation of a credentials report about the current account. After calling this function to generate the report, call get_credential_report to get the latest report. A new report can be generated a minimum of four hours after the last one was generated. """ try: response = iam.meta.client.generate_credential_report() logger.info( "Generating credentials report for your account. " "Current state is %s.", response["State"], ) except ClientError: logger.exception("Couldn't generate a credentials report for your account.") raise else: return response
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGenerateCredentialReport「」の「」を参照してください。
-
次の例は、GetAccessKeyLastUsed
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def get_last_use(key_id): """ Gets information about when and how a key was last used. :param key_id: The ID of the key to look up. :return: Information about the key's last use. """ try: response = iam.meta.client.get_access_key_last_used(AccessKeyId=key_id) last_used_date = response["AccessKeyLastUsed"].get("LastUsedDate", None) last_service = response["AccessKeyLastUsed"].get("ServiceName", None) logger.info( "Key %s was last used by %s on %s to access %s.", key_id, response["UserName"], last_used_date, last_service, ) except ClientError: logger.exception("Couldn't get last use of key %s.", key_id) raise else: return response
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGetAccessKeyLastUsed「」の「」を参照してください。
-
次のコード例は、GetAccountAuthorizationDetails
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def get_authorization_details(response_filter): """ Gets an authorization detail report for the current account. :param response_filter: A list of resource types to include in the report, such as users or roles. When not specified, all resources are included. :return: The authorization detail report. """ try: account_details = iam.meta.client.get_account_authorization_details( Filter=response_filter ) logger.debug(account_details) except ClientError: logger.exception("Couldn't get details for your account.") raise else: return account_details
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGetAccountAuthorizationDetails「」の「」を参照してください。
-
次の例は、GetAccountPasswordPolicy
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def print_password_policy(): """ Prints the password policy for the account. """ try: pw_policy = iam.AccountPasswordPolicy() print("Current account password policy:") print( f"\tallow_users_to_change_password: {pw_policy.allow_users_to_change_password}" ) print(f"\texpire_passwords: {pw_policy.expire_passwords}") print(f"\thard_expiry: {pw_policy.hard_expiry}") print(f"\tmax_password_age: {pw_policy.max_password_age}") print(f"\tminimum_password_length: {pw_policy.minimum_password_length}") print(f"\tpassword_reuse_prevention: {pw_policy.password_reuse_prevention}") print( f"\trequire_lowercase_characters: {pw_policy.require_lowercase_characters}" ) print(f"\trequire_numbers: {pw_policy.require_numbers}") print(f"\trequire_symbols: {pw_policy.require_symbols}") print( f"\trequire_uppercase_characters: {pw_policy.require_uppercase_characters}" ) printed = True except ClientError as error: if error.response["Error"]["Code"] == "NoSuchEntity": print("The account does not have a password policy set.") else: logger.exception("Couldn't get account password policy.") raise else: return printed
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGetAccountPasswordPolicy「」の「」を参照してください。
-
次の例は、GetAccountSummary
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def get_summary(): """ Gets a summary of account usage. :return: The summary of account usage. """ try: summary = iam.AccountSummary() logger.debug(summary.summary_map) except ClientError: logger.exception("Couldn't get a summary for your account.") raise else: return summary.summary_map
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGetAccountSummary「」の「」を参照してください。
-
次の例は、GetCredentialReport
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def get_credential_report(): """ Gets the most recently generated credentials report about the current account. :return: The credentials report. """ try: response = iam.meta.client.get_credential_report() logger.debug(response["Content"]) except ClientError: logger.exception("Couldn't get credentials report.") raise else: return response["Content"]
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGetCredentialReport「」の「」を参照してください。
-
次の例は、GetPolicy
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGetPolicy「」の「」を参照してください。
-
次の例は、GetPolicyVersion
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGetPolicyVersion「」の「」を参照してください。
-
次のコード例は、GetRole
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def get_role(role_name): """ Gets a role by name. :param role_name: The name of the role to retrieve. :return: The specified role. """ try: role = iam.Role(role_name) role.load() # calls GetRole to load attributes logger.info("Got role with arn %s.", role.arn) except ClientError: logger.exception("Couldn't get role named %s.", role_name) raise else: return role
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のGetRole「」の「」を参照してください。
-
次のコード例は、ListAccessKeys
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_keys(user_name): """ Lists the keys owned by the specified user. :param user_name: The name of the user. :return: The list of keys owned by the user. """ try: keys = list(iam.User(user_name).access_keys.all()) logger.info("Got %s access keys for %s.", len(keys), user_name) except ClientError: logger.exception("Couldn't get access keys for %s.", user_name) raise else: return keys
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のListAccessKeys「」の「」を参照してください。
-
次のコード例は、ListAccountAliases
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_aliases(): """ Gets the list of aliases for the current account. An account has at most one alias. :return: The list of aliases for the account. """ try: response = iam.meta.client.list_account_aliases() aliases = response["AccountAliases"] if len(aliases) > 0: logger.info("Got aliases for your account: %s.", ",".join(aliases)) else: logger.info("Got no aliases for your account.") except ClientError: logger.exception("Couldn't list aliases for your account.") raise else: return response["AccountAliases"]
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のListAccountAliases「」の「」を参照してください。
-
次のコード例は、ListAttachedRolePolicies
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_attached_policies(role_name): """ Lists policies attached to a role. :param role_name: The name of the role to query. """ try: role = iam.Role(role_name) for policy in role.attached_policies.all(): logger.info("Got policy %s.", policy.arn) except ClientError: logger.exception("Couldn't list attached policies for %s.", role_name) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のListAttachedRolePolicies「」の「」を参照してください。
-
次の例は、ListGroups
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_groups(count): """ Lists the specified number of groups for the account. :param count: The number of groups to list. """ try: for group in iam.groups.limit(count): logger.info("Group: %s", group.name) except ClientError: logger.exception("Couldn't list groups for the account.") raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のListGroups「」の「」を参照してください。
-
次のコード例は、ListPolicies
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_policies(scope): """ Lists the policies in the current account. :param scope: Limits the kinds of policies that are returned. For example, 'Local' specifies that only locally managed policies are returned. :return: The list of policies. """ try: policies = list(iam.policies.filter(Scope=scope)) logger.info("Got %s policies in scope '%s'.", len(policies), scope) except ClientError: logger.exception("Couldn't get policies for scope '%s'.", scope) raise else: return policies
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のListPolicies「」の「」を参照してください。
-
次の例は、ListRolePolicies
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_policies(role_name): """ Lists inline policies for a role. :param role_name: The name of the role to query. """ try: role = iam.Role(role_name) for policy in role.policies.all(): logger.info("Got inline policy %s.", policy.name) except ClientError: logger.exception("Couldn't list inline policies for %s.", role_name) raise
-
API 詳細については、「 ListRolePoliciesPython (Boto3) リファレンス」の「」を参照してください。 AWS SDK API
-
次のコード例は、ListRoles
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_roles(count): """ Lists the specified number of roles for the account. :param count: The number of roles to list. """ try: roles = list(iam.roles.limit(count=count)) for role in roles: logger.info("Role: %s", role.name) except ClientError: logger.exception("Couldn't list roles for the account.") raise else: return roles
-
API 詳細については、「 ListRolesPython (Boto3) リファレンス」の「」を参照してください。 AWS SDK API
-
次の例は、ListSAMLProviders
を使用する方法を説明しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_saml_providers(count): """ Lists the SAML providers for the account. :param count: The maximum number of providers to list. """ try: found = 0 for provider in iam.saml_providers.limit(count): logger.info("Got SAML provider %s.", provider.arn) found += 1 if found == 0: logger.info("Your account has no SAML providers.") except ClientError: logger.exception("Couldn't list SAML providers.") raise
-
API 詳細については、AWS SDK「 for Python (Boto3) APIリファレンス」の「ListSAMLProviders」を参照してください。
-
次のコード例は、ListUsers
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def list_users(): """ Lists the users in the current account. :return: The list of users. """ try: users = list(iam.users.all()) logger.info("Got %s users.", len(users)) except ClientError: logger.exception("Couldn't get users.") raise else: return users
-
API 詳細については、「 ListUsersPython (Boto3) リファレンス」の「」を参照してください。 AWS SDK API
-
次のコード例は、UpdateAccessKey
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def update_key(user_name, key_id, activate): """ Updates the status of a key. :param user_name: The user that owns the key. :param key_id: The ID of the key to update. :param activate: When True, the key is activated. Otherwise, the key is deactivated. """ try: key = iam.User(user_name).AccessKey(key_id) if activate: key.activate() else: key.deactivate() logger.info("%s key %s.", "Activated" if activate else "Deactivated", key_id) except ClientError: logger.exception( "Couldn't %s key %s.", "Activate" if activate else "Deactivate", key_id ) raise
-
API 詳細については、AWS SDKPython (Boto3) APIリファレンス のUpdateAccessKey「」の「」を参照してください。
-
次のコード例は、UpdateUser
を使用する方法を示しています。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def update_user(user_name, new_user_name): """ Updates a user's name. :param user_name: The current name of the user to update. :param new_user_name: The new name to assign to the user. :return: The updated user. """ try: user = iam.User(user_name) user.update(NewUserName=new_user_name) logger.info("Renamed %s to %s.", user_name, new_user_name) except ClientError: logger.exception("Couldn't update name for user %s.", user_name) raise return user
-
API 詳細については、「 UpdateUserPython (Boto3) リファレンス」の「」を参照してください。 AWS SDK API
-
シナリオ
次のコード例は、本、映画、曲のレコメンデーションを返す負荷分散型ウェブサービスの作成方法を示しています。この例は、障害に対するサービスの対応方法と、障害発生時の耐障害性を高めるためにサービスを再構築する方法を示しています。
Amazon EC2 Auto Scaling グループを使用して、起動テンプレートに基づいて Amazon Elastic Compute Cloud (Amazon EC2) インスタンスを作成し、インスタンスの数を指定された範囲内に保持します。
Elastic Load Balancing を使用してHTTPリクエストを処理し、配信します。 Elastic Load Balancing
Auto Scaling グループ内のインスタンスの状態を監視し、正常なインスタンスにのみリクエストを転送します。
各EC2インスタンスで Python ウェブサーバーを実行して、HTTPリクエストを処理します。ウェブサーバーはレコメンデーションとヘルスチェックを返します。
Amazon DynamoDB テーブルを使用してレコメンデーションサービスをシミュレートできます。
AWS Systems Manager パラメータを更新して、リクエストとヘルスチェックに対するウェブサーバーのレスポンスを制御します。
- SDK Python 用 (Boto3)
-
注記
については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 コマンドプロンプトからインタラクティブのシナリオを実行します。
class Runner: """ Manages the deployment, demonstration, and destruction of resources for the resilient service. """ def __init__( self, resource_path: str, recommendation: RecommendationService, autoscaler: AutoScalingWrapper, loadbalancer: ElasticLoadBalancerWrapper, param_helper: ParameterHelper, ): """ Initializes the Runner class with the necessary parameters. :param resource_path: The path to resource files used by this example, such as IAM policies and instance scripts. :param recommendation: An instance of the RecommendationService class. :param autoscaler: An instance of the AutoScaler class. :param loadbalancer: An instance of the LoadBalancer class. :param param_helper: An instance of the ParameterHelper class. """ self.resource_path = resource_path self.recommendation = recommendation self.autoscaler = autoscaler self.loadbalancer = loadbalancer self.param_helper = param_helper self.protocol = "HTTP" self.port = 80 self.ssh_port = 22 prefix = "doc-example-resilience" self.target_group_name = f"{prefix}-tg" self.load_balancer_name = f"{prefix}-lb" def deploy(self) -> None: """ Deploys the resources required for the resilient service, including the DynamoDB table, EC2 instances, Auto Scaling group, and load balancer. """ recommendations_path = f"{self.resource_path}/recommendations.json" startup_script = f"{self.resource_path}/server_startup_script.sh" instance_policy = f"{self.resource_path}/instance_policy.json" logging.info("Starting deployment of resources for the resilient service.") logging.info( "Creating and populating DynamoDB table '%s'.", self.recommendation.table_name, ) self.recommendation.create() self.recommendation.populate(recommendations_path) logging.info( "Creating an EC2 launch template with the startup script '%s'.", startup_script, ) self.autoscaler.create_template(startup_script, instance_policy) logging.info( "Creating an EC2 Auto Scaling group across multiple Availability Zones." ) zones = self.autoscaler.create_autoscaling_group(3) logging.info("Creating variables that control the flow of the demo.") self.param_helper.reset() logging.info("Creating Elastic Load Balancing target group and load balancer.") vpc = self.autoscaler.get_default_vpc() subnets = self.autoscaler.get_subnets(vpc["VpcId"], zones) target_group = self.loadbalancer.create_target_group( self.target_group_name, self.protocol, self.port, vpc["VpcId"] ) self.loadbalancer.create_load_balancer( self.load_balancer_name, [subnet["SubnetId"] for subnet in subnets] ) self.loadbalancer.create_listener(self.load_balancer_name, target_group) self.autoscaler.attach_load_balancer_target_group(target_group) logging.info("Verifying access to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) current_ip_address = requests.get("http://checkip.amazonaws.com").text.strip() if not lb_success: logging.warning( "Couldn't connect to the load balancer. Verifying that the port is open..." ) sec_group, port_is_open = self.autoscaler.verify_inbound_port( vpc, self.port, current_ip_address ) sec_group, ssh_port_is_open = self.autoscaler.verify_inbound_port( vpc, self.ssh_port, current_ip_address ) if not port_is_open: logging.warning( "The default security group for your VPC must allow access from this computer." ) if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound traffic on port {self.port} from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.port, current_ip_address ) if not ssh_port_is_open: if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound SSH traffic on port {self.ssh_port} for debugging from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.ssh_port, current_ip_address ) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) if lb_success: logging.info( "Load balancer is ready. Access it at: http://%s", current_ip_address ) else: logging.error( "Couldn't get a successful response from the load balancer endpoint. Please verify your VPC and security group settings." ) def demo_choices(self) -> None: """ Presents choices for interacting with the deployed service, such as sending requests to the load balancer or checking the health of the targets. """ actions = [ "Send a GET request to the load balancer endpoint.", "Check the health of load balancer targets.", "Go to the next part of the demo.", ] choice = 0 while choice != 2: logging.info("Choose an action to interact with the service.") choice = q.choose("Which action would you like to take? ", actions) if choice == 0: logging.info("Sending a GET request to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) logging.info("GET http://%s", endpoint) response = requests.get(f"http://{endpoint}") logging.info("Response: %s", response.status_code) if response.headers.get("content-type") == "application/json": pp(response.json()) elif choice == 1: logging.info("Checking the health of load balancer targets.") health = self.loadbalancer.check_target_health(self.target_group_name) for target in health: state = target["TargetHealth"]["State"] logging.info( "Target %s on port %d is %s", target["Target"]["Id"], target["Target"]["Port"], state, ) if state != "healthy": logging.warning( "%s: %s", target["TargetHealth"]["Reason"], target["TargetHealth"]["Description"], ) logging.info( "Note that it can take a minute or two for the health check to update." ) elif choice == 2: logging.info("Proceeding to the next part of the demo.") def demo(self) -> None: """ Runs the demonstration, showing how the service responds to different failure scenarios and how a resilient architecture can keep the service running. """ ssm_only_policy = f"{self.resource_path}/ssm_only_policy.json" logging.info("Resetting parameters to starting values for the demo.") self.param_helper.reset() logging.info( "Starting demonstration of the service's resilience under various failure conditions." ) self.demo_choices() logging.info( "Simulating failure by changing the Systems Manager parameter to a non-existent table." ) self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info("Sending GET requests will now return failure codes.") self.demo_choices() logging.info("Switching to static response mode to mitigate failure.") self.param_helper.put(self.param_helper.failure_response, "static") logging.info("Sending GET requests will now return static responses.") self.demo_choices() logging.info("Restoring normal operation of the recommendation service.") self.param_helper.put(self.param_helper.table, self.recommendation.table_name) logging.info( "Introducing a failure by assigning bad credentials to one of the instances." ) self.autoscaler.create_instance_profile( ssm_only_policy, self.autoscaler.bad_creds_policy_name, self.autoscaler.bad_creds_role_name, self.autoscaler.bad_creds_profile_name, ["AmazonSSMManagedInstanceCore"], ) instances = self.autoscaler.get_instances() bad_instance_id = instances[0] instance_profile = self.autoscaler.get_instance_profile(bad_instance_id) logging.info( "Replacing instance profile with bad credentials for instance %s.", bad_instance_id, ) self.autoscaler.replace_instance_profile( bad_instance_id, self.autoscaler.bad_creds_profile_name, instance_profile["AssociationId"], ) logging.info( "Sending GET requests may return either a valid recommendation or a static response." ) self.demo_choices() logging.info("Implementing deep health checks to detect unhealthy instances.") self.param_helper.put(self.param_helper.health_check, "deep") logging.info("Checking the health of the load balancer targets.") self.demo_choices() logging.info( "Terminating the unhealthy instance to let the auto scaler replace it." ) self.autoscaler.terminate_instance(bad_instance_id) logging.info("The service remains resilient during instance replacement.") self.demo_choices() logging.info("Simulating a complete failure of the recommendation service.") self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info( "All instances will report as unhealthy, but the service will still return static responses." ) self.demo_choices() self.param_helper.reset() def destroy(self, automation=False) -> None: """ Destroys all resources created for the demo, including the load balancer, Auto Scaling group, EC2 instances, and DynamoDB table. """ logging.info( "This concludes the demo. Preparing to clean up all AWS resources created during the demo." ) if automation: cleanup = True else: cleanup = q.ask( "Do you want to clean up all demo resources? (y/n) ", q.is_yesno ) if cleanup: logging.info("Deleting load balancer and related resources.") self.loadbalancer.delete_load_balancer(self.load_balancer_name) self.loadbalancer.delete_target_group(self.target_group_name) self.autoscaler.delete_autoscaling_group(self.autoscaler.group_name) self.autoscaler.delete_key_pair() self.autoscaler.delete_template() self.autoscaler.delete_instance_profile( self.autoscaler.bad_creds_profile_name, self.autoscaler.bad_creds_role_name, ) logging.info("Deleting DynamoDB table and other resources.") self.recommendation.destroy() else: logging.warning( "Resources have not been deleted. Ensure you clean them up manually to avoid unexpected charges." ) def main() -> None: """ Main function to parse arguments and run the appropriate actions for the demo. """ parser = argparse.ArgumentParser() parser.add_argument( "--action", required=True, choices=["all", "deploy", "demo", "destroy"], help="The action to take for the demo. When 'all' is specified, resources are\n" "deployed, the demo is run, and resources are destroyed.", ) parser.add_argument( "--resource_path", default="../../../workflows/resilient_service/resources", help="The path to resource files used by this example, such as IAM policies and\n" "instance scripts.", ) args = parser.parse_args() logging.info("Starting the Resilient Service demo.") prefix = "doc-example-resilience" # Service Clients ddb_client = boto3.client("dynamodb") elb_client = boto3.client("elbv2") autoscaling_client = boto3.client("autoscaling") ec2_client = boto3.client("ec2") ssm_client = boto3.client("ssm") iam_client = boto3.client("iam") # Wrapper instantiations recommendation = RecommendationService( "doc-example-recommendation-service", ddb_client ) autoscaling_wrapper = AutoScalingWrapper( prefix, "t3.micro", "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2", autoscaling_client, ec2_client, ssm_client, iam_client, ) elb_wrapper = ElasticLoadBalancerWrapper(elb_client) param_helper = ParameterHelper(recommendation.table_name, ssm_client) # Demo invocation runner = Runner( args.resource_path, recommendation, autoscaling_wrapper, elb_wrapper, param_helper, ) actions = [args.action] if args.action != "all" else ["deploy", "demo", "destroy"] for action in actions: if action == "deploy": runner.deploy() elif action == "demo": runner.demo() elif action == "destroy": runner.destroy() logging.info("Demo completed successfully.") if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") main()
Auto Scaling アクションと Amazon EC2アクションをラップするクラスを作成します。
class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def create_policy(self, policy_file: str, policy_name: str) -> str: """ Creates a new IAM policy or retrieves the ARN of an existing policy. :param policy_file: The path to a JSON file that contains the policy definition. :param policy_name: The name to give the created policy. :return: The ARN of the created or existing policy. """ with open(policy_file) as file: policy_doc = file.read() try: response = self.iam_client.create_policy( PolicyName=policy_name, PolicyDocument=policy_doc ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' created successfully. ARN: {policy_arn}") return policy_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the policy already exists, get its ARN response = self.iam_client.get_policy( PolicyArn=f"arn:aws:iam::{self.account_id}:policy/{policy_name}" ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' already exists. ARN: {policy_arn}") return policy_arn log.error(f"Full error:\n\t{err}") def create_role(self, role_name: str, assume_role_doc: dict) -> str: """ Creates a new IAM role or retrieves the ARN of an existing role. :param role_name: The name to give the created role. :param assume_role_doc: The assume role policy document that specifies which entities can assume the role. :return: The ARN of the created or existing role. """ try: response = self.iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_doc) ) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' created successfully. ARN: {role_arn}") return role_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the role already exists, get its ARN response = self.iam_client.get_role(RoleName=role_name) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' already exists. ARN: {role_arn}") return role_arn log.error(f"Full error:\n\t{err}") def attach_policy( self, role_name: str, policy_arn: str, aws_managed_policies: Tuple[str, ...] = (), ) -> None: """ Attaches an IAM policy to a role and optionally attaches additional AWS-managed policies. :param role_name: The name of the role to attach the policy to. :param policy_arn: The ARN of the policy to attach. :param aws_managed_policies: A tuple of AWS-managed policy names to attach to the role. """ try: self.iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) for aws_policy in aws_managed_policies: self.iam_client.attach_role_policy( RoleName=role_name, PolicyArn=f"arn:aws:iam::aws:policy/{aws_policy}", ) log.info(f"Attached policy {policy_arn} to role {role_name}.") except ClientError as err: log.error(f"Failed to attach policy {policy_arn} to role {role_name}.") log.error(f"Full error:\n\t{err}") def create_instance_profile( self, policy_file: str, policy_name: str, role_name: str, profile_name: str, aws_managed_policies: Tuple[str, ...] = (), ) -> str: """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = self.create_policy(policy_file, policy_name) self.create_role(role_name, assume_role_doc) self.attach_policy(role_name, policy_arn, aws_managed_policies) try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) log.error(f"Full error:\n\t{err}") return profile_arn def get_instance_profile(self, instance_id: str) -> Dict[str, Any]: """ Gets data about the profile associated with an instance. :param instance_id: The ID of the instance to look up. :return: The profile data. """ try: response = self.ec2_client.describe_iam_instance_profile_associations( Filters=[{"Name": "instance-id", "Values": [instance_id]}] ) if not response["IamInstanceProfileAssociations"]: log.info(f"No instance profile found for instance {instance_id}.") profile_data = response["IamInstanceProfileAssociations"][0] log.info(f"Retrieved instance profile for instance {instance_id}.") return profile_data except ClientError as err: log.error( f"Failed to retrieve instance profile for instance {instance_id}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidInstanceID.NotFound": log.error(f"The instance ID '{instance_id}' does not exist.") log.error(f"Full error:\n\t{err}") def replace_instance_profile( self, instance_id: str, new_instance_profile_name: str, profile_association_id: str, ) -> None: """ Replaces the profile associated with a running instance. After the profile is replaced, the instance is rebooted to ensure that it uses the new profile. When the instance is ready, Systems Manager is used to restart the Python web server. :param instance_id: The ID of the instance to restart. :param new_instance_profile_name: The name of the new profile to associate with the specified instance. :param profile_association_id: The ID of the existing profile association for the instance. """ try: self.ec2_client.replace_iam_instance_profile_association( IamInstanceProfile={"Name": new_instance_profile_name}, AssociationId=profile_association_id, ) log.info( "Replaced instance profile for association %s with profile %s.", profile_association_id, new_instance_profile_name, ) time.sleep(5) self.ec2_client.reboot_instances(InstanceIds=[instance_id]) log.info("Rebooting instance %s.", instance_id) waiter = self.ec2_client.get_waiter("instance_running") log.info("Waiting for instance %s to be running.", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info("Instance %s is now running.", instance_id) self.ssm_client.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", Parameters={"commands": ["cd / && sudo python3 server.py 80"]}, ) log.info(f"Restarted the Python web server on instance '{instance_id}'.") except ClientError as err: log.error("Failed to replace instance profile.") error_code = err.response["Error"]["Code"] if error_code == "InvalidAssociationID.NotFound": log.error( f"Association ID '{profile_association_id}' does not exist." "Please check the association ID and try again." ) if error_code == "InvalidInstanceId": log.error( f"The specified instance ID '{instance_id}' does not exist or is not available for SSM. " f"Please verify the instance ID and try again." ) log.error(f"Full error:\n\t{err}") def delete_instance_profile(self, profile_name: str, role_name: str) -> None: """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: log.error( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name ) def create_key_pair(self, key_pair_name: str) -> None: """ Creates a new key pair. :param key_pair_name: The name of the key pair to create. """ try: response = self.ec2_client.create_key_pair(KeyName=key_pair_name) with open(f"{key_pair_name}.pem", "w") as file: file.write(response["KeyMaterial"]) chmod(f"{key_pair_name}.pem", 0o600) log.info("Created key pair %s.", key_pair_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to create key pair {key_pair_name}.") if error_code == "InvalidKeyPair.Duplicate": log.error(f"A key pair with the name '{key_pair_name}' already exists.") log.error(f"Full error:\n\t{err}") def delete_key_pair(self) -> None: """ Deletes a key pair. """ try: self.ec2_client.delete_key_pair(KeyName=self.key_pair_name) remove(f"{self.key_pair_name}.pem") log.info("Deleted key pair %s.", self.key_pair_name) except ClientError as err: log.error(f"Couldn't delete key pair '{self.key_pair_name}'.") log.error(f"Full error:\n\t{err}") except FileNotFoundError as err: log.info("Key pair %s doesn't exist, nothing to do.", self.key_pair_name) log.error(f"Full error:\n\t{err}") def create_template( self, server_startup_script_file: str, instance_policy_file: str ) -> Dict[str, Any]: """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. The launch template specifies a Bash script in its user data field that runs after the instance is started. This script installs Python packages and starts a Python web server on the instance. :param server_startup_script_file: The path to a Bash script file that is run when an instance starts. :param instance_policy_file: The path to a file that defines a permissions policy to create and attach to the instance profile. :return: Information about the newly created template. """ template = {} try: # Create key pair and instance profile self.create_key_pair(self.key_pair_name) self.create_instance_profile( instance_policy_file, self.instance_policy_name, self.instance_role_name, self.instance_profile_name, ) # Read the startup script with open(server_startup_script_file) as file: start_server_script = file.read() # Get the latest AMI ID ami_latest = self.ssm_client.get_parameter(Name=self.ami_param) ami_id = ami_latest["Parameter"]["Value"] # Create the launch template lt_response = self.ec2_client.create_launch_template( LaunchTemplateName=self.launch_template_name, LaunchTemplateData={ "InstanceType": self.inst_type, "ImageId": ami_id, "IamInstanceProfile": {"Name": self.instance_profile_name}, "UserData": base64.b64encode( start_server_script.encode(encoding="utf-8") ).decode(encoding="utf-8"), "KeyName": self.key_pair_name, }, ) template = lt_response["LaunchTemplate"] log.info( f"Created launch template {self.launch_template_name} for AMI {ami_id} on {self.inst_type}." ) except ClientError as err: log.error(f"Failed to create launch template {self.launch_template_name}.") error_code = err.response["Error"]["Code"] if error_code == "InvalidLaunchTemplateName.AlreadyExistsException": log.info( f"Launch template {self.launch_template_name} already exists, nothing to do." ) log.error(f"Full error:\n\t{err}") return template def delete_template(self): """ Deletes a launch template. """ try: self.ec2_client.delete_launch_template( LaunchTemplateName=self.launch_template_name ) self.delete_instance_profile( self.instance_profile_name, self.instance_role_name ) log.info("Launch template %s deleted.", self.launch_template_name) except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.NotFoundException" ): log.info( "Launch template %s does not exist, nothing to do.", self.launch_template_name, ) log.error(f"Full error:\n\t{err}") def get_availability_zones(self) -> List[str]: """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone["ZoneName"] for zone in response["AvailabilityZones"]] log.info(f"Retrieved {len(zones)} availability zones: {zones}.") except ClientError as err: log.error("Failed to retrieve availability zones.") log.error(f"Full error:\n\t{err}") else: return zones def create_autoscaling_group(self, group_size: int) -> List[str]: """ Creates an EC2 Auto Scaling group with the specified size. :param group_size: The number of instances to set for the minimum and maximum in the group. :return: The list of Availability Zones specified for the group. """ try: zones = self.get_availability_zones() self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=self.group_name, AvailabilityZones=zones, LaunchTemplate={ "LaunchTemplateName": self.launch_template_name, "Version": "$Default", }, MinSize=group_size, MaxSize=group_size, ) log.info( f"Created EC2 Auto Scaling group {self.group_name} with availability zones {zones}." ) except ClientError as err: error_code = err.response["Error"]["Code"] if error_code == "AlreadyExists": log.info( f"EC2 Auto Scaling group {self.group_name} already exists, nothing to do." ) else: log.error(f"Failed to create EC2 Auto Scaling group {self.group_name}.") log.error(f"Full error:\n\t{err}") else: return zones def get_instances(self) -> List[str]: """ Gets data about the instances in the EC2 Auto Scaling group. :return: A list of instance IDs in the Auto Scaling group. """ try: as_response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name] ) instance_ids = [ i["InstanceId"] for i in as_response["AutoScalingGroups"][0]["Instances"] ] log.info( f"Retrieved {len(instance_ids)} instances for Auto Scaling group {self.group_name}." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to retrieve instances for Auto Scaling group {self.group_name}." ) if error_code == "ResourceNotFound": log.error(f"The Auto Scaling group '{self.group_name}' does not exist.") log.error(f"Full error:\n\t{err}") else: return instance_ids def terminate_instance(self, instance_id: str, decrementsetting=False) -> None: """ Terminates an instance in an EC2 Auto Scaling group. After an instance is terminated, it can no longer be accessed. :param instance_id: The ID of the instance to terminate. :param decrementsetting: If True, do not replace terminated instances. """ try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=decrementsetting, ) log.info("Terminated instance %s.", instance_id) # Adding a waiter to ensure the instance is terminated waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for instance %s to be terminated...", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info( f"Instance '{instance_id}' has been terminated and will be replaced." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to terminate instance '{instance_id}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to terminate the instance again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) log.error(f"Full error:\n\t{err}") def attach_load_balancer_target_group( self, lb_target_group: Dict[str, Any] ) -> None: """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forwards requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group["TargetGroupArn"]], ) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group["TargetGroupName"], self.group_name, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to attach load balancer target group '{lb_target_group['TargetGroupName']}'." ) if error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) elif error_code == "ServiceLinkedRoleFailure": log.error( "The operation failed because the service-linked role is not ready or does not exist. " "Check that the service-linked role exists and is correctly configured." ) log.error(f"Full error:\n\t{err}") def delete_autoscaling_group(self, group_name: str) -> None: """ Terminates all instances in the group, then deletes the EC2 Auto Scaling group. :param group_name: The name of the group to delete. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[group_name] ) groups = response.get("AutoScalingGroups", []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=group_name, MinSize=0 ) instance_ids = [inst["InstanceId"] for inst in groups[0]["Instances"]] for inst_id in instance_ids: self.terminate_instance(inst_id) # Wait for all instances to be terminated if instance_ids: waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for all instances to be terminated...") waiter.wait(InstanceIds=instance_ids) log.info("All instances have been terminated.") else: log.info(f"No groups found named '{group_name}'! Nothing to do.") except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete Auto Scaling group '{group_name}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to delete the group again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the group." ) log.error(f"Full error:\n\t{err}") def get_default_vpc(self) -> Dict[str, Any]: """ Gets the default VPC for the account. :return: Data about the default VPC. """ try: response = self.ec2_client.describe_vpcs( Filters=[{"Name": "is-default", "Values": ["true"]}] ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error("Failed to retrieve the default VPC.") if error_code == "UnauthorizedOperation": log.error( "You do not have the necessary permissions to describe VPCs. " "Ensure that your AWS IAM user or role has the correct permissions." ) elif error_code == "InvalidParameterValue": log.error( "One or more parameters are invalid. Check the request parameters." ) log.error(f"Full error:\n\t{err}") else: if "Vpcs" in response and response["Vpcs"]: log.info(f"Retrieved default VPC: {response['Vpcs'][0]['VpcId']}") return response["Vpcs"][0] else: pass def verify_inbound_port( self, vpc: Dict[str, Any], port: int, ip_address: str ) -> Tuple[Dict[str, Any], bool]: """ Verify the default security group of the specified VPC allows ingress from this computer. This can be done by allowing ingress from this computer's IP address. In some situations, such as connecting from a corporate network, you must instead specify a prefix list ID. You can also temporarily open the port to any IP address while running this example. If you do, be sure to remove public access when you're done. :param vpc: The VPC used by this example. :param port: The port to verify. :param ip_address: This computer's IP address. :return: The default security group of the specified VPC, and a value that indicates whether the specified port is open. """ try: response = self.ec2_client.describe_security_groups( Filters=[ {"Name": "group-name", "Values": ["default"]}, {"Name": "vpc-id", "Values": [vpc["VpcId"]]}, ] ) sec_group = response["SecurityGroups"][0] port_is_open = False log.info(f"Found default security group {sec_group['GroupId']}.") for ip_perm in sec_group["IpPermissions"]: if ip_perm.get("FromPort", 0) == port: log.info(f"Found inbound rule: {ip_perm}") for ip_range in ip_perm["IpRanges"]: cidr = ip_range.get("CidrIp", "") if cidr.startswith(ip_address) or cidr == "0.0.0.0/0": port_is_open = True if ip_perm["PrefixListIds"]: port_is_open = True if not port_is_open: log.info( f"The inbound rule does not appear to be open to either this computer's IP " f"address of {ip_address}, to all IP addresses (0.0.0.0/0), or to a prefix list ID." ) else: break except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to verify inbound rule for port {port} for VPC {vpc['VpcId']}." ) if error_code == "InvalidVpcID.NotFound": log.error( f"The specified VPC ID '{vpc['VpcId']}' does not exist. Please check the VPC ID." ) log.error(f"Full error:\n\t{err}") else: return sec_group, port_is_open def open_inbound_port(self, sec_group_id: str, port: int, ip_address: str) -> None: """ Add an ingress rule to the specified security group that allows access on the specified port from the specified IP address. :param sec_group_id: The ID of the security group to modify. :param port: The port to open. :param ip_address: The IP address that is granted access. """ try: self.ec2_client.authorize_security_group_ingress( GroupId=sec_group_id, CidrIp=f"{ip_address}/32", FromPort=port, ToPort=port, IpProtocol="tcp", ) log.info( "Authorized ingress to %s on port %s from %s.", sec_group_id, port, ip_address, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to authorize ingress to security group '{sec_group_id}' on port {port} from {ip_address}." ) if error_code == "InvalidGroupId.Malformed": log.error( "The security group ID is malformed. " "Please verify that the security group ID is correct." ) elif error_code == "InvalidPermission.Duplicate": log.error( "The specified rule already exists in the security group. " "Check the existing rules for this security group." ) log.error(f"Full error:\n\t{err}") def get_subnets(self, vpc_id: str, zones: List[str] = None) -> List[Dict[str, Any]]: """ Gets the default subnets in a VPC for a specified list of Availability Zones. :param vpc_id: The ID of the VPC to look up. :param zones: The list of Availability Zones to look up. :return: The list of subnets found. """ # Ensure that 'zones' is a list, even if None is passed if zones is None: zones = [] try: paginator = self.ec2_client.get_paginator("describe_subnets") page_iterator = paginator.paginate( Filters=[ {"Name": "vpc-id", "Values": [vpc_id]}, {"Name": "availability-zone", "Values": zones}, {"Name": "default-for-az", "Values": ["true"]}, ] ) subnets = [] for page in page_iterator: subnets.extend(page["Subnets"]) log.info("Found %s subnets for the specified zones.", len(subnets)) return subnets except ClientError as err: log.error( f"Failed to retrieve subnets for VPC '{vpc_id}' in zones {zones}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidVpcID.NotFound": log.error( "The specified VPC ID does not exist. " "Please check the VPC ID and try again." ) # Add more error-specific handling as needed log.error(f"Full error:\n\t{err}")
Elastic Load Balancing のアクションをラップするクラスを作成します。
class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def create_target_group( self, target_group_name: str, protocol: str, port: int, vpc_id: str ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing target group. The target group specifies how the load balancer forwards requests to instances in the group and how instance health is checked. To speed up this demo, the health check is configured with shortened times and lower thresholds. In production, you might want to decrease the sensitivity of your health checks to avoid unwanted failures. :param target_group_name: The name of the target group to create. :param protocol: The protocol to use to forward requests, such as 'HTTP'. :param port: The port to use to forward requests, such as 80. :param vpc_id: The ID of the VPC in which the load balancer exists. :return: Data about the newly created target group. """ try: response = self.elb_client.create_target_group( Name=target_group_name, Protocol=protocol, Port=port, HealthCheckPath="/healthcheck", HealthCheckIntervalSeconds=10, HealthCheckTimeoutSeconds=5, HealthyThresholdCount=2, UnhealthyThresholdCount=2, VpcId=vpc_id, ) target_group = response["TargetGroups"][0] log.info(f"Created load balancing target group '{target_group_name}'.") return target_group except ClientError as err: log.error( f"Couldn't create load balancing target group '{target_group_name}'." ) error_code = err.response["Error"]["Code"] if error_code == "DuplicateTargetGroupName": log.error( f"Target group name {target_group_name} already exists. " "Check if the target group already exists." "Consider using a different name or deleting the existing target group if appropriate." ) elif error_code == "TooManyTargetGroups": log.error( "Too many target groups exist in the account. " "Consider deleting unused target groups to create space for new ones." ) log.error(f"Full error:\n\t{err}") def delete_target_group(self, target_group_name) -> None: """ Deletes the target group. """ try: # Describe the target group to get its ARN response = self.elb_client.describe_target_groups(Names=[target_group_name]) tg_arn = response["TargetGroups"][0]["TargetGroupArn"] # Delete the target group self.elb_client.delete_target_group(TargetGroupArn=tg_arn) log.info("Deleted load balancing target group %s.", target_group_name) # Use a custom waiter to wait until the target group is no longer available self.wait_for_target_group_deletion(self.elb_client, tg_arn) log.info("Target group %s successfully deleted.", target_group_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete target group '{target_group_name}'.") if error_code == "TargetGroupNotFound": log.error( "Load balancer target group either already deleted or never existed. " "Verify the name and check that the resource exists in the AWS Console." ) elif error_code == "ResourceInUseException": log.error( "Target group still in use by another resource. " "Ensure that the target group is no longer associated with any load balancers or resources.", ) log.error(f"Full error:\n\t{err}") def wait_for_target_group_deletion( self, elb_client, target_group_arn, max_attempts=10, delay=30 ): for attempt in range(max_attempts): try: elb_client.describe_target_groups(TargetGroupArns=[target_group_arn]) print( f"Attempt {attempt + 1}: Target group {target_group_arn} still exists." ) except ClientError as e: if e.response["Error"]["Code"] == "TargetGroupNotFound": print( f"Target group {target_group_arn} has been successfully deleted." ) return else: raise time.sleep(delay) raise TimeoutError( f"Target group {target_group_arn} was not deleted after {max_attempts * delay} seconds." ) def create_load_balancer( self, load_balancer_name: str, subnet_ids: List[str], ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing load balancer that uses the specified subnets and forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create. :param subnet_ids: A list of subnets to associate with the load balancer. :return: Data about the newly created load balancer. """ try: response = self.elb_client.create_load_balancer( Name=load_balancer_name, Subnets=subnet_ids ) load_balancer = response["LoadBalancers"][0] log.info(f"Created load balancer '{load_balancer_name}'.") waiter = self.elb_client.get_waiter("load_balancer_available") log.info( f"Waiting for load balancer '{load_balancer_name}' to be available..." ) waiter.wait(Names=[load_balancer_name]) log.info(f"Load balancer '{load_balancer_name}' is now available!") except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to create load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "DuplicateLoadBalancerNameException": log.error( f"A load balancer with the name '{load_balancer_name}' already exists. " "Load balancer names must be unique within the AWS region. " "Please choose a different name and try again." ) if error_code == "TooManyLoadBalancersException": log.error( "The maximum number of load balancers has been reached in this account and region. " "You can delete unused load balancers or request an increase in the service quota from AWS Support." ) log.error(f"Full error:\n\t{err}") else: return load_balancer def create_listener( self, load_balancer_name: str, target_group: Dict[str, Any], ) -> Dict[str, Any]: """ Creates a listener for the specified load balancer that forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create a listener for. :param target_group: An existing target group that is added as a listener to the load balancer. :return: Data about the newly created listener. """ try: # Retrieve the load balancer ARN load_balancer_response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) load_balancer_arn = load_balancer_response["LoadBalancers"][0][ "LoadBalancerArn" ] # Create the listener response = self.elb_client.create_listener( LoadBalancerArn=load_balancer_arn, Protocol=target_group["Protocol"], Port=target_group["Port"], DefaultActions=[ { "Type": "forward", "TargetGroupArn": target_group["TargetGroupArn"], } ], ) log.info( f"Created listener to forward traffic from load balancer '{load_balancer_name}' to target group '{target_group['TargetGroupName']}'." ) return response["Listeners"][0] except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to add a listener on '{load_balancer_name}' for target group '{target_group['TargetGroupName']}'." ) if error_code == "ListenerNotFoundException": log.error( f"The listener could not be found for the load balancer '{load_balancer_name}'. " "Please check the load balancer name and target group configuration." ) if error_code == "InvalidConfigurationRequestException": log.error( f"The configuration provided for the listener on load balancer '{load_balancer_name}' is invalid. " "Please review the provided protocol, port, and target group settings." ) log.error(f"Full error:\n\t{err}") def delete_load_balancer(self, load_balancer_name) -> None: """ Deletes a load balancer. :param load_balancer_name: The name of the load balancer to delete. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) lb_arn = response["LoadBalancers"][0]["LoadBalancerArn"] self.elb_client.delete_load_balancer(LoadBalancerArn=lb_arn) log.info("Deleted load balancer %s.", load_balancer_name) waiter = self.elb_client.get_waiter("load_balancers_deleted") log.info("Waiting for load balancer to be deleted...") waiter.wait(Names=[load_balancer_name]) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Couldn't delete load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "LoadBalancerNotFoundException": log.error( f"The load balancer '{load_balancer_name}' does not exist. " "Please check the name and try again." ) log.error(f"Full error:\n\t{err}") def get_endpoint(self, load_balancer_name) -> str: """ Gets the HTTP endpoint of the load balancer. :return: The endpoint. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) return response["LoadBalancers"][0]["DNSName"] except ClientError as err: log.error( f"Couldn't get the endpoint for load balancer {load_balancer_name}" ) error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Verify load balancer name and ensure it exists in the AWS console." ) log.error(f"Full error:\n\t{err}") @staticmethod def verify_load_balancer_endpoint(endpoint) -> bool: """ Verify this computer can successfully send a GET request to the load balancer endpoint. :param endpoint: The endpoint to verify. :return: True if the GET request is successful, False otherwise. """ retries = 3 verified = False while not verified and retries > 0: try: lb_response = requests.get(f"http://{endpoint}") log.info( "Got response %s from load balancer endpoint.", lb_response.status_code, ) if lb_response.status_code == 200: verified = True else: retries = 0 except requests.exceptions.ConnectionError: log.info( "Got connection error from load balancer endpoint, retrying..." ) retries -= 1 time.sleep(10) return verified def check_target_health(self, target_group_name: str) -> List[Dict[str, Any]]: """ Checks the health of the instances in the target group. :return: The health status of the target group. """ try: tg_response = self.elb_client.describe_target_groups( Names=[target_group_name] ) health_response = self.elb_client.describe_target_health( TargetGroupArn=tg_response["TargetGroups"][0]["TargetGroupArn"] ) except ClientError as err: log.error(f"Couldn't check health of {target_group_name} target(s).") error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Load balancer associated with the target group was not found. " "Ensure the load balancer exists, is in the correct AWS region, and " "that you have the necessary permissions to access it.", ) elif error_code == "TargetGroupNotFoundException": log.error( "Target group was not found. " "Verify the target group name, check that it exists in the correct region, " "and ensure it has not been deleted or created in a different account.", ) log.error(f"Full error:\n\t{err}") else: return health_response["TargetHealthDescriptions"]
DynamoDB を使用してレコメンデーションサービスをシミュレートするクラスを作成します。
class RecommendationService: """ Encapsulates a DynamoDB table to use as a service that recommends books, movies, and songs. """ def __init__(self, table_name: str, dynamodb_client: boto3.client): """ Initializes the RecommendationService class with the necessary parameters. :param table_name: The name of the DynamoDB recommendations table. :param dynamodb_client: A Boto3 DynamoDB client. """ self.table_name = table_name self.dynamodb_client = dynamodb_client def create(self) -> Dict[str, Any]: """ Creates a DynamoDB table to use as a recommendation service. The table has a hash key named 'MediaType' that defines the type of media recommended, such as Book or Movie, and a range key named 'ItemId' that, combined with the MediaType, forms a unique identifier for the recommended item. :return: Data about the newly created table. :raises RecommendationServiceError: If the table creation fails. """ try: response = self.dynamodb_client.create_table( TableName=self.table_name, AttributeDefinitions=[ {"AttributeName": "MediaType", "AttributeType": "S"}, {"AttributeName": "ItemId", "AttributeType": "N"}, ], KeySchema=[ {"AttributeName": "MediaType", "KeyType": "HASH"}, {"AttributeName": "ItemId", "KeyType": "RANGE"}, ], ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5}, ) log.info("Creating table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_exists") waiter.wait(TableName=self.table_name) log.info("Table %s created.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceInUseException": log.info("Table %s exists, nothing to be done.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when creating table: {err}." ) else: return response def populate(self, data_file: str) -> None: """ Populates the recommendations table from a JSON file. :param data_file: The path to the data file. :raises RecommendationServiceError: If the table population fails. """ try: with open(data_file) as data: items = json.load(data) batch = [{"PutRequest": {"Item": item}} for item in items] self.dynamodb_client.batch_write_item(RequestItems={self.table_name: batch}) log.info( "Populated table %s with items from %s.", self.table_name, data_file ) except ClientError as err: raise RecommendationServiceError( self.table_name, f"Couldn't populate table from {data_file}: {err}" ) def destroy(self) -> None: """ Deletes the recommendations table. :raises RecommendationServiceError: If the table deletion fails. """ try: self.dynamodb_client.delete_table(TableName=self.table_name) log.info("Deleting table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_not_exists") waiter.wait(TableName=self.table_name) log.info("Table %s deleted.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": log.info("Table %s does not exist, nothing to do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when deleting table: {err}." )
Systems Manager のアクションをラップするクラスを作成します。
class ParameterHelper: """ Encapsulates Systems Manager parameters. This example uses these parameters to drive the demonstration of resilient architecture, such as failure of a dependency or how the service responds to a health check. """ table: str = "doc-example-resilient-architecture-table" failure_response: str = "doc-example-resilient-architecture-failure-response" health_check: str = "doc-example-resilient-architecture-health-check" def __init__(self, table_name: str, ssm_client: boto3.client): """ Initializes the ParameterHelper class with the necessary parameters. :param table_name: The name of the DynamoDB table that is used as a recommendation service. :param ssm_client: A Boto3 Systems Manager client. """ self.ssm_client = ssm_client self.table_name = table_name def reset(self) -> None: """ Resets the Systems Manager parameters to starting values for the demo. These are the name of the DynamoDB recommendation table, no response when a dependency fails, and shallow health checks. """ self.put(self.table, self.table_name) self.put(self.failure_response, "none") self.put(self.health_check, "shallow") def put(self, name: str, value: str) -> None: """ Sets the value of a named Systems Manager parameter. :param name: The name of the parameter. :param value: The new value of the parameter. :raises ParameterHelperError: If the parameter value cannot be set. """ try: self.ssm_client.put_parameter( Name=name, Value=value, Overwrite=True, Type="String" ) log.info("Setting parameter %s to '%s'.", name, value) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to set parameter {name}.") if error_code == "ParameterLimitExceeded": log.error( "The parameter limit has been exceeded. " "Consider deleting unused parameters or request a limit increase." ) elif error_code == "ParameterAlreadyExists": log.error( "The parameter already exists and overwrite is set to False. " "Use Overwrite=True to update the parameter." ) log.error(f"Full error:\n\t{err}")
-
API 詳細については、 AWS SDK for Python (Boto3) APIリファレンスの の以下のトピックを参照してください。
-
次のコード例は、ユーザーを作成し、そのユーザーにポリシーをアタッチする方法を示します。
警告
セキュリティリスクを回避するには、専用ソフトウェアの開発時や実際のデータを操作する際に、認証にIAMユーザーを使用しないでください。代わりに、AWS IAM Identity Center などの ID プロバイダーとのフェデレーションを使用してください。
2 人のIAMユーザーを作成します。
Amazon S3 バケット内のオブジェクトを取得して格納するポリシーを 1 人のユーザーに割り当てます。
このバケットからオブジェクトを取得することを許可するポリシーを、セカンドユーザーにアタッチします。
ユーザーの認証情報に基づいて、バケットについてのさまざまなアクセス許可を取得します。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 IAM ユーザーアクションをラップする関数を作成します。
import logging import time import boto3 from botocore.exceptions import ClientError import access_key_wrapper import policy_wrapper logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_user(user_name): """ Creates a user. By default, a user has no permissions or access keys. :param user_name: The name of the user. :return: The newly created user. """ try: user = iam.create_user(UserName=user_name) logger.info("Created user %s.", user.name) except ClientError: logger.exception("Couldn't create user %s.", user_name) raise else: return user def update_user(user_name, new_user_name): """ Updates a user's name. :param user_name: The current name of the user to update. :param new_user_name: The new name to assign to the user. :return: The updated user. """ try: user = iam.User(user_name) user.update(NewUserName=new_user_name) logger.info("Renamed %s to %s.", user_name, new_user_name) except ClientError: logger.exception("Couldn't update name for user %s.", user_name) raise return user def list_users(): """ Lists the users in the current account. :return: The list of users. """ try: users = list(iam.users.all()) logger.info("Got %s users.", len(users)) except ClientError: logger.exception("Couldn't get users.") raise else: return users def delete_user(user_name): """ Deletes a user. Before a user can be deleted, all associated resources, such as access keys and policies, must be deleted or detached. :param user_name: The name of the user. """ try: iam.User(user_name).delete() logger.info("Deleted user %s.", user_name) except ClientError: logger.exception("Couldn't delete user %s.", user_name) raise def attach_policy(user_name, policy_arn): """ Attaches a policy to a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to user %s.", policy_arn, user_name) except ClientError: logger.exception("Couldn't attach policy %s to user %s.", policy_arn, user_name) raise def detach_policy(user_name, policy_arn): """ Detaches a policy from a user. :param user_name: The name of the user. :param policy_arn: The Amazon Resource Name (ARN) of the policy. """ try: iam.User(user_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from user %s.", policy_arn, user_name) except ClientError: logger.exception( "Couldn't detach policy %s from user %s.", policy_arn, user_name ) raise
IAM ポリシーアクションをラップする関数を作成します。
import json import logging import operator import pprint import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise
IAM アクセスキーアクションをラップする関数を作成します。
import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise
ラッパー関数を使用して、異なるポリシーを持つユーザーを作成し、その認証情報を使用して Amazon S3 バケットにアクセスします。
def usage_demo(): """ Shows how to manage users, keys, and policies. This demonstration creates two users: one user who can put and get objects in an Amazon S3 bucket, and another user who can only get objects from the bucket. The demo then shows how the users can perform only the actions they are permitted to perform. """ logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management user demo.") print("-" * 88) print( "Users can have policies and roles attached to grant them specific " "permissions." ) s3 = boto3.resource("s3") bucket = s3.create_bucket( Bucket=f"demo-iam-bucket-{time.time_ns()}", CreateBucketConfiguration={ "LocationConstraint": s3.meta.client.meta.region_name }, ) print(f"Created an Amazon S3 bucket named {bucket.name}.") user_read_writer = create_user("demo-iam-read-writer") user_reader = create_user("demo-iam-reader") print(f"Created two IAM users: {user_read_writer.name} and {user_reader.name}") update_user(user_read_writer.name, "demo-iam-creator") update_user(user_reader.name, "demo-iam-getter") users = list_users() user_read_writer = next( user for user in users if user.user_id == user_read_writer.user_id ) user_reader = next(user for user in users if user.user_id == user_reader.user_id) print( f"Changed the names of the users to {user_read_writer.name} " f"and {user_reader.name}." ) read_write_policy = policy_wrapper.create_policy( "demo-iam-read-write-policy", "Grants rights to create and get an object in the demo bucket.", ["s3:PutObject", "s3:GetObject"], f"arn:aws:s3:::{bucket.name}/*", ) print( f"Created policy {read_write_policy.policy_name} with ARN: {read_write_policy.arn}" ) print(read_write_policy.description) read_policy = policy_wrapper.create_policy( "demo-iam-read-policy", "Grants rights to get an object from the demo bucket.", "s3:GetObject", f"arn:aws:s3:::{bucket.name}/*", ) print(f"Created policy {read_policy.policy_name} with ARN: {read_policy.arn}") print(read_policy.description) attach_policy(user_read_writer.name, read_write_policy.arn) print(f"Attached {read_write_policy.policy_name} to {user_read_writer.name}.") attach_policy(user_reader.name, read_policy.arn) print(f"Attached {read_policy.policy_name} to {user_reader.name}.") user_read_writer_key = access_key_wrapper.create_key(user_read_writer.name) print(f"Created access key pair for {user_read_writer.name}.") user_reader_key = access_key_wrapper.create_key(user_reader.name) print(f"Created access key pair for {user_reader.name}.") s3_read_writer_resource = boto3.resource( "s3", aws_access_key_id=user_read_writer_key.id, aws_secret_access_key=user_read_writer_key.secret, ) demo_object_key = f"object-{time.time_ns()}" demo_object = None while demo_object is None: try: demo_object = s3_read_writer_resource.Bucket(bucket.name).put_object( Key=demo_object_key, Body=b"AWS IAM demo object content!" ) except ClientError as error: if error.response["Error"]["Code"] == "InvalidAccessKeyId": print("Access key not yet available. Waiting...") time.sleep(1) else: raise print( f"Put {demo_object_key} into {bucket.name} using " f"{user_read_writer.name}'s credentials." ) read_writer_object = s3_read_writer_resource.Bucket(bucket.name).Object( demo_object_key ) read_writer_content = read_writer_object.get()["Body"].read() print(f"Got object {read_writer_object.key} using read-writer user's credentials.") print(f"Object content: {read_writer_content}") s3_reader_resource = boto3.resource( "s3", aws_access_key_id=user_reader_key.id, aws_secret_access_key=user_reader_key.secret, ) demo_content = None while demo_content is None: try: demo_object = s3_reader_resource.Bucket(bucket.name).Object(demo_object_key) demo_content = demo_object.get()["Body"].read() print(f"Got object {demo_object.key} using reader user's credentials.") print(f"Object content: {demo_content}") except ClientError as error: if error.response["Error"]["Code"] == "InvalidAccessKeyId": print("Access key not yet available. Waiting...") time.sleep(1) else: raise try: demo_object.delete() except ClientError as error: if error.response["Error"]["Code"] == "AccessDenied": print("-" * 88) print( "Tried to delete the object using the reader user's credentials. " "Got expected AccessDenied error because the reader is not " "allowed to delete objects." ) print("-" * 88) access_key_wrapper.delete_key(user_reader.name, user_reader_key.id) detach_policy(user_reader.name, read_policy.arn) policy_wrapper.delete_policy(read_policy.arn) delete_user(user_reader.name) print(f"Deleted keys, detached and deleted policy, and deleted {user_reader.name}.") access_key_wrapper.delete_key(user_read_writer.name, user_read_writer_key.id) detach_policy(user_read_writer.name, read_write_policy.arn) policy_wrapper.delete_policy(read_write_policy.arn) delete_user(user_read_writer.name) print( f"Deleted keys, detached and deleted policy, and deleted {user_read_writer.name}." ) bucket.objects.delete() bucket.delete() print(f"Emptied and deleted {bucket.name}.") print("Thanks for watching!")
-
API 詳細については、 AWS SDK for Python (Boto3) APIリファレンスの の以下のトピックを参照してください。
-
次のコード例は、アクセスキーの管理方法を示しています。
警告
セキュリティリスクを回避するには、専用ソフトウェアの開発時や実際のデータを操作する際に、認証にIAMユーザーを使用しないでください。代わりに、AWS IAM Identity Center などの ID プロバイダーとのフェデレーションを使用してください。
アクセスキーを作成して一覧表示します。
アクセスキーが最後に使用された時刻とその方法を検索します。
アクセスキーの更新や削除を行います。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 IAM アクセスキーアクションをラップする関数を作成します。
import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def list_keys(user_name): """ Lists the keys owned by the specified user. :param user_name: The name of the user. :return: The list of keys owned by the user. """ try: keys = list(iam.User(user_name).access_keys.all()) logger.info("Got %s access keys for %s.", len(keys), user_name) except ClientError: logger.exception("Couldn't get access keys for %s.", user_name) raise else: return keys def create_key(user_name): """ Creates an access key for the specified user. Each user can have a maximum of two keys. :param user_name: The name of the user. :return: The created access key. """ try: key_pair = iam.User(user_name).create_access_key_pair() logger.info( "Created access key pair for %s. Key ID is %s.", key_pair.user_name, key_pair.id, ) except ClientError: logger.exception("Couldn't create access key pair for %s.", user_name) raise else: return key_pair def get_last_use(key_id): """ Gets information about when and how a key was last used. :param key_id: The ID of the key to look up. :return: Information about the key's last use. """ try: response = iam.meta.client.get_access_key_last_used(AccessKeyId=key_id) last_used_date = response["AccessKeyLastUsed"].get("LastUsedDate", None) last_service = response["AccessKeyLastUsed"].get("ServiceName", None) logger.info( "Key %s was last used by %s on %s to access %s.", key_id, response["UserName"], last_used_date, last_service, ) except ClientError: logger.exception("Couldn't get last use of key %s.", key_id) raise else: return response def update_key(user_name, key_id, activate): """ Updates the status of a key. :param user_name: The user that owns the key. :param key_id: The ID of the key to update. :param activate: When True, the key is activated. Otherwise, the key is deactivated. """ try: key = iam.User(user_name).AccessKey(key_id) if activate: key.activate() else: key.deactivate() logger.info("%s key %s.", "Activated" if activate else "Deactivated", key_id) except ClientError: logger.exception( "Couldn't %s key %s.", "Activate" if activate else "Deactivate", key_id ) raise def delete_key(user_name, key_id): """ Deletes a user's access key. :param user_name: The user that owns the key. :param key_id: The ID of the key to delete. """ try: key = iam.AccessKey(user_name, key_id) key.delete() logger.info("Deleted access key %s for %s.", key.id, key.user_name) except ClientError: logger.exception("Couldn't delete key %s for %s", key_id, user_name) raise
ラッパー関数を使用して、現在のユーザーのアクセスキーアクションを実行します。
def usage_demo(): """Shows how to create and manage access keys.""" def print_keys(): """Gets and prints the current keys for a user.""" current_keys = list_keys(current_user_name) print("The current user's keys are now:") print(*[f"{key.id}: {key.status}" for key in current_keys], sep="\n") logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management access key demo.") print("-" * 88) current_user_name = iam.CurrentUser().user_name print( f"This demo creates an access key for the current user " f"({current_user_name}), manipulates the key in a few ways, and then " f"deletes it." ) all_keys = list_keys(current_user_name) if len(all_keys) == 2: print( "The current user already has the maximum of 2 access keys. To run " "this demo, either delete one of the access keys or use a user " "that has only 1 access key." ) else: new_key = create_key(current_user_name) print(f"Created a new key with id {new_key.id} and secret {new_key.secret}.") print_keys() existing_key = next(key for key in all_keys if key != new_key) last_use = get_last_use(existing_key.id)["AccessKeyLastUsed"] print( f"Key {all_keys[0].id} was last used to access {last_use['ServiceName']} " f"on {last_use['LastUsedDate']}" ) update_key(current_user_name, new_key.id, False) print(f"Key {new_key.id} is now deactivated.") print_keys() delete_key(current_user_name, new_key.id) print_keys() print("Thanks for watching!")
-
API 詳細については、 AWS SDK for Python (Boto3) APIリファレンスの の以下のトピックを参照してください。
-
次のコードサンプルは、以下の操作方法を示しています。
ポリシーを作成して一覧表示します。
ポリシーバージョンを作成して取得します。
ポリシーを以前のバージョンにロールバックします。
ポリシーを削除します。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 IAM ポリシーアクションをラップする関数を作成します。
import json import logging import operator import pprint import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc), ) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy def list_policies(scope): """ Lists the policies in the current account. :param scope: Limits the kinds of policies that are returned. For example, 'Local' specifies that only locally managed policies are returned. :return: The list of policies. """ try: policies = list(iam.policies.filter(Scope=scope)) logger.info("Got %s policies in scope '%s'.", len(policies), scope) except ClientError: logger.exception("Couldn't get policies for scope '%s'.", scope) raise else: return policies def create_policy_version(policy_arn, actions, resource_arn, set_as_default): """ Creates a policy version. Policies can have up to five versions. The default version is the one that is used for all resources that reference the policy. :param policy_arn: The ARN of the policy. :param actions: The actions to allow in the policy version. :param resource_arn: The ARN of the resource this policy version applies to. :param set_as_default: When True, this policy version is set as the default version for the policy. Otherwise, the default is not changed. :return: The newly created policy version. """ policy_doc = { "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}], } try: policy = iam.Policy(policy_arn) policy_version = policy.create_version( PolicyDocument=json.dumps(policy_doc), SetAsDefault=set_as_default ) logger.info( "Created policy version %s for policy %s.", policy_version.version_id, policy_version.arn, ) except ClientError: logger.exception("Couldn't create a policy version for %s.", policy_arn) raise else: return policy_version def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) # To get an attribute of a policy, the SDK first calls get_policy. policy_doc = policy.default_version.document policy_statement = policy_doc.get("Statement", None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement def rollback_policy_version(policy_arn): """ Rolls back to the previous default policy, if it exists. 1. Gets the list of policy versions in order by date. 2. Finds the default. 3. Makes the previous policy the default. 4. Deletes the old default version. :param policy_arn: The ARN of the policy to roll back. :return: The default version of the policy after the rollback. """ try: policy_versions = sorted( iam.Policy(policy_arn).versions.all(), key=operator.attrgetter("create_date"), ) logger.info("Got %s versions for %s.", len(policy_versions), policy_arn) except ClientError: logger.exception("Couldn't get versions for %s.", policy_arn) raise default_version = None rollback_version = None try: while default_version is None: ver = policy_versions.pop() if ver.is_default_version: default_version = ver rollback_version = policy_versions.pop() rollback_version.set_as_default() logger.info("Set %s as the default version.", rollback_version.version_id) default_version.delete() logger.info("Deleted original default version %s.", default_version.version_id) except IndexError: if default_version is None: logger.warning("No default version found for %s.", policy_arn) elif rollback_version is None: logger.warning( "Default version %s found for %s, but no previous version exists, so " "nothing to roll back to.", default_version.version_id, policy_arn, ) except ClientError: logger.exception("Couldn't roll back version for %s.", policy_arn) raise else: return rollback_version def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise
ラッパー関数を使用して、ポリシーを作成し、バージョンを更新し、それらに関する情報を取得します。
def usage_demo(): """Shows how to use the policy functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management policy demo.") print("-" * 88) print( "Policies let you define sets of permissions that can be attached to " "other IAM resources, like users and roles." ) bucket_arn = f"arn:aws:s3:::made-up-bucket-name" policy = create_policy( "demo-iam-policy", "Policy for IAM demonstration.", ["s3:ListObjects"], bucket_arn, ) print(f"Created policy {policy.policy_name}.") policies = list_policies("Local") print(f"Your account has {len(policies)} managed policies:") print(*[pol.policy_name for pol in policies], sep=", ") time.sleep(1) policy_version = create_policy_version( policy.arn, ["s3:PutObject"], bucket_arn, True ) print( f"Added policy version {policy_version.version_id} to policy " f"{policy.policy_name}." ) default_statement = get_default_policy_statement(policy.arn) print(f"The default policy statement for {policy.policy_name} is:") pprint.pprint(default_statement) rollback_version = rollback_policy_version(policy.arn) print( f"Rolled back to version {rollback_version.version_id} for " f"{policy.policy_name}." ) default_statement = get_default_policy_statement(policy.arn) print(f"The default policy statement for {policy.policy_name} is now:") pprint.pprint(default_statement) delete_policy(policy.arn) print(f"Deleted policy {policy.policy_name}.") print("Thanks for watching!")
-
API 詳細については、 AWS SDK for Python (Boto3) APIリファレンスの の以下のトピックを参照してください。
-
次のコードサンプルは、以下の操作方法を示しています。
IAM ロールを作成します。
ロールにポリシーをアタッチおよびデタッチします
ロールを削除します。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 IAM ロールアクションをラップする関数を作成します。
import json import logging import pprint import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def create_role(role_name, allowed_services): """ Creates a role that lets a list of specified services assume the role. :param role_name: The name of the role. :param allowed_services: The services that can assume the role. :return: The newly created role. """ trust_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": service}, "Action": "sts:AssumeRole", } for service in allowed_services ], } try: role = iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy) ) logger.info("Created role %s.", role.name) except ClientError: logger.exception("Couldn't create role %s.", role_name) raise else: return role def attach_policy(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).attach_policy(PolicyArn=policy_arn) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise def detach_policy(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Role(role_name).detach_policy(PolicyArn=policy_arn) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name ) raise def delete_role(role_name): """ Deletes a role. :param role_name: The name of the role to delete. """ try: iam.Role(role_name).delete() logger.info("Deleted role %s.", role_name) except ClientError: logger.exception("Couldn't delete role %s.", role_name) raise
ラッパー関数を使用してロールを作成し、ポリシーをアタッチおよびデタッチします。
def usage_demo(): """Shows how to use the role functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management role demo.") print("-" * 88) print( "Roles let you define sets of permissions and can be assumed by " "other entities, like users and services." ) print("The first 10 roles currently in your account are:") roles = list_roles(10) print(f"The inline policies for role {roles[0].name} are:") list_policies(roles[0].name) role = create_role( "demo-iam-role", ["lambda.amazonaws.com", "batchoperations.s3.amazonaws.com"] ) print(f"Created role {role.name}, with trust policy:") pprint.pprint(role.assume_role_policy_document) policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess" attach_policy(role.name, policy_arn) print(f"Attached policy {policy_arn} to {role.name}.") print(f"Policies attached to role {role.name} are:") list_attached_policies(role.name) detach_policy(role.name, policy_arn) print(f"Detached policy {policy_arn} from {role.name}.") delete_role(role.name) print(f"Deleted {role.name}.") print("Thanks for watching!")
-
API 詳細については、 AWS SDK for Python (Boto3) APIリファレンスの の以下のトピックを参照してください。
-
次のコードサンプルは、以下の操作方法を示しています。
アカウントエイリアスを取得して更新します。
ユーザーと認証情報に関するレポートを生成します。
アカウントの使用状況の概要を取得します。
アカウント内のユーザー、グループ、ロール、ポリシーについて、相互の関連付けなどの詳細を取得します。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 IAM アカウントアクションをラップする関数を作成します。
import logging import pprint import sys import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource("iam") def list_aliases(): """ Gets the list of aliases for the current account. An account has at most one alias. :return: The list of aliases for the account. """ try: response = iam.meta.client.list_account_aliases() aliases = response["AccountAliases"] if len(aliases) > 0: logger.info("Got aliases for your account: %s.", ",".join(aliases)) else: logger.info("Got no aliases for your account.") except ClientError: logger.exception("Couldn't list aliases for your account.") raise else: return response["AccountAliases"] def create_alias(alias): """ Creates an alias for the current account. The alias can be used in place of the account ID in the sign-in URL. An account can have only one alias. When a new alias is created, it replaces any existing alias. :param alias: The alias to assign to the account. """ try: iam.create_account_alias(AccountAlias=alias) logger.info("Created an alias '%s' for your account.", alias) except ClientError: logger.exception("Couldn't create alias '%s' for your account.", alias) raise def delete_alias(alias): """ Removes the alias from the current account. :param alias: The alias to remove. """ try: iam.meta.client.delete_account_alias(AccountAlias=alias) logger.info("Removed alias '%s' from your account.", alias) except ClientError: logger.exception("Couldn't remove alias '%s' from your account.", alias) raise def generate_credential_report(): """ Starts generation of a credentials report about the current account. After calling this function to generate the report, call get_credential_report to get the latest report. A new report can be generated a minimum of four hours after the last one was generated. """ try: response = iam.meta.client.generate_credential_report() logger.info( "Generating credentials report for your account. " "Current state is %s.", response["State"], ) except ClientError: logger.exception("Couldn't generate a credentials report for your account.") raise else: return response def get_credential_report(): """ Gets the most recently generated credentials report about the current account. :return: The credentials report. """ try: response = iam.meta.client.get_credential_report() logger.debug(response["Content"]) except ClientError: logger.exception("Couldn't get credentials report.") raise else: return response["Content"] def get_summary(): """ Gets a summary of account usage. :return: The summary of account usage. """ try: summary = iam.AccountSummary() logger.debug(summary.summary_map) except ClientError: logger.exception("Couldn't get a summary for your account.") raise else: return summary.summary_map def get_authorization_details(response_filter): """ Gets an authorization detail report for the current account. :param response_filter: A list of resource types to include in the report, such as users or roles. When not specified, all resources are included. :return: The authorization detail report. """ try: account_details = iam.meta.client.get_account_authorization_details( Filter=response_filter ) logger.debug(account_details) except ClientError: logger.exception("Couldn't get details for your account.") raise else: return account_details
ラッパー関数を呼び出して、アカウントのエイリアスを変更し、アカウントに関するレポートを取得します。
def usage_demo(): """Shows how to use the account functions.""" logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Identity and Account Management account demo.") print("-" * 88) print( "Setting an account alias lets you use the alias in your sign-in URL " "instead of your account number." ) old_aliases = list_aliases() if len(old_aliases) > 0: print(f"Your account currently uses '{old_aliases[0]}' as its alias.") else: print("Your account currently has no alias.") for index in range(1, 3): new_alias = f"alias-{index}-{time.time_ns()}" print(f"Setting your account alias to {new_alias}") create_alias(new_alias) current_aliases = list_aliases() print(f"Your account alias is now {current_aliases}.") delete_alias(current_aliases[0]) print(f"Your account now has no alias.") if len(old_aliases) > 0: print(f"Restoring your original alias back to {old_aliases[0]}...") create_alias(old_aliases[0]) print("-" * 88) print("You can get various reports about your account.") print("Let's generate a credentials report...") report_state = None while report_state != "COMPLETE": cred_report_response = generate_credential_report() old_report_state = report_state report_state = cred_report_response["State"] if report_state != old_report_state: print(report_state, sep="") else: print(".", sep="") sys.stdout.flush() time.sleep(1) print() cred_report = get_credential_report() col_count = 3 print(f"Got credentials report. Showing only the first {col_count} columns.") cred_lines = [ line.split(",")[:col_count] for line in cred_report.decode("utf-8").split("\n") ] col_width = max([len(item) for line in cred_lines for item in line]) + 2 for line in cred_report.decode("utf-8").split("\n"): print( "".join(element.ljust(col_width) for element in line.split(",")[:col_count]) ) print("-" * 88) print("Let's get an account summary.") summary = get_summary() print("Here's your summary:") pprint.pprint(summary) print("-" * 88) print("Let's get authorization details!") details = get_authorization_details([]) see_details = input("These are pretty long, do you want to see them (y/n)? ") if see_details.lower() == "y": pprint.pprint(details) print("-" * 88) pw_policy_created = None see_pw_policy = input("Want to see the password policy for the account (y/n)? ") if see_pw_policy.lower() == "y": while True: if print_password_policy(): break else: answer = input( "Do you want to create a default password policy (y/n)? " ) if answer.lower() == "y": pw_policy_created = iam.create_account_password_policy() else: break if pw_policy_created is not None: answer = input("Do you want to delete the password policy (y/n)? ") if answer.lower() == "y": pw_policy_created.delete() print("Password policy deleted.") print("The SAML providers for your account are:") list_saml_providers(10) print("-" * 88) print("Thanks for watching.")
-
API 詳細については、 AWS SDK for Python (Boto3) APIリファレンス の の以下のトピックを参照してください。
-
次のコードサンプルは、以下の操作方法を示しています。
ポリシーのバージョンの日付順リストを取得します。
デフォルトのポリシーバージョンを検索します。
デフォルト値を以前のポリシーバージョンにします。
古いデフォルトバージョンを削除します。
- SDK Python 用 (Boto3)
-
注記
の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 def rollback_policy_version(policy_arn): """ Rolls back to the previous default policy, if it exists. 1. Gets the list of policy versions in order by date. 2. Finds the default. 3. Makes the previous policy the default. 4. Deletes the old default version. :param policy_arn: The ARN of the policy to roll back. :return: The default version of the policy after the rollback. """ try: policy_versions = sorted( iam.Policy(policy_arn).versions.all(), key=operator.attrgetter("create_date"), ) logger.info("Got %s versions for %s.", len(policy_versions), policy_arn) except ClientError: logger.exception("Couldn't get versions for %s.", policy_arn) raise default_version = None rollback_version = None try: while default_version is None: ver = policy_versions.pop() if ver.is_default_version: default_version = ver rollback_version = policy_versions.pop() rollback_version.set_as_default() logger.info("Set %s as the default version.", rollback_version.version_id) default_version.delete() logger.info("Deleted original default version %s.", default_version.version_id) except IndexError: if default_version is None: logger.warning("No default version found for %s.", policy_arn) elif rollback_version is None: logger.warning( "Default version %s found for %s, but no previous version exists, so " "nothing to roll back to.", default_version.version_id, policy_arn, ) except ClientError: logger.exception("Couldn't roll back version for %s.", policy_arn) raise else: return rollback_version
-
API 詳細については、 AWS SDK for Python (Boto3) APIリファレンスの の以下のトピックを参照してください。
-