Elastic Load Balancing - Contoh Versi 2 menggunakan untuk SDK Python (Boto3) - AWS SDKContoh Kode

Ada lebih banyak AWS SDK contoh yang tersedia di GitHub repo SDKContoh AWS Dokumen.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Elastic Load Balancing - Contoh Versi 2 menggunakan untuk SDK Python (Boto3)

Contoh kode berikut menunjukkan cara melakukan tindakan dan mengimplementasikan skenario umum dengan menggunakan AWS SDK for Python (Boto3) with Elastic Load Balancing - Versi 2.

Tindakan adalah kutipan kode dari program yang lebih besar dan harus dijalankan dalam konteks. Sementara tindakan menunjukkan cara memanggil fungsi layanan individual, Anda dapat melihat tindakan dalam konteks dalam skenario terkait.

Skenario adalah contoh kode yang menunjukkan kepada Anda bagaimana menyelesaikan tugas tertentu dengan memanggil beberapa fungsi dalam layanan atau dikombinasikan dengan yang lain Layanan AWS.

Setiap contoh menyertakan tautan ke kode sumber lengkap, di mana Anda dapat menemukan instruksi tentang cara mengatur dan menjalankan kode dalam konteks.

Memulai

Contoh kode berikut menunjukkan cara memulai menggunakan Elastic Load Balancing.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

import boto3 def hello_elbv2(elbv2_client): """ Use the AWS SDK for Python (Boto3) to create an Elastic Load Balancing V2 client and list up to ten of the load balancers for your account. This example uses the default settings specified in your shared credentials and config files. :param elbv2_client: A Boto3 Elastic Load Balancing V2 client object. """ print("Hello, Elastic Load Balancing! Let's list some of your load balancers:") load_balancers = elbv2_client.describe_load_balancers(PageSize=10).get( "LoadBalancers", [] ) if load_balancers: for lb in load_balancers: print(f"\t{lb['LoadBalancerName']}: {lb['DNSName']}") else: print("Your account doesn't have any load balancers.") if __name__ == "__main__": hello_elbv2(boto3.client("elbv2"))

Tindakan

Contoh kode berikut menunjukkan cara menggunakanCreateListener.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def create_listener( self, load_balancer_name: str, target_group: Dict[str, Any], ) -> Dict[str, Any]: """ Creates a listener for the specified load balancer that forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create a listener for. :param target_group: An existing target group that is added as a listener to the load balancer. :return: Data about the newly created listener. """ try: # Retrieve the load balancer ARN load_balancer_response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) load_balancer_arn = load_balancer_response["LoadBalancers"][0][ "LoadBalancerArn" ] # Create the listener response = self.elb_client.create_listener( LoadBalancerArn=load_balancer_arn, Protocol=target_group["Protocol"], Port=target_group["Port"], DefaultActions=[ { "Type": "forward", "TargetGroupArn": target_group["TargetGroupArn"], } ], ) log.info( f"Created listener to forward traffic from load balancer '{load_balancer_name}' to target group '{target_group['TargetGroupName']}'." ) return response["Listeners"][0] except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to add a listener on '{load_balancer_name}' for target group '{target_group['TargetGroupName']}'." ) if error_code == "ListenerNotFoundException": log.error( f"The listener could not be found for the load balancer '{load_balancer_name}'. " "Please check the load balancer name and target group configuration." ) if error_code == "InvalidConfigurationRequestException": log.error( f"The configuration provided for the listener on load balancer '{load_balancer_name}' is invalid. " "Please review the provided protocol, port, and target group settings." ) log.error(f"Full error:\n\t{err}")

Contoh kode berikut menunjukkan cara menggunakanCreateLoadBalancer.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def create_load_balancer( self, load_balancer_name: str, subnet_ids: List[str], ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing load balancer that uses the specified subnets and forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create. :param subnet_ids: A list of subnets to associate with the load balancer. :return: Data about the newly created load balancer. """ try: response = self.elb_client.create_load_balancer( Name=load_balancer_name, Subnets=subnet_ids ) load_balancer = response["LoadBalancers"][0] log.info(f"Created load balancer '{load_balancer_name}'.") waiter = self.elb_client.get_waiter("load_balancer_available") log.info( f"Waiting for load balancer '{load_balancer_name}' to be available..." ) waiter.wait(Names=[load_balancer_name]) log.info(f"Load balancer '{load_balancer_name}' is now available!") except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to create load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "DuplicateLoadBalancerNameException": log.error( f"A load balancer with the name '{load_balancer_name}' already exists. " "Load balancer names must be unique within the AWS region. " "Please choose a different name and try again." ) if error_code == "TooManyLoadBalancersException": log.error( "The maximum number of load balancers has been reached in this account and region. " "You can delete unused load balancers or request an increase in the service quota from AWS Support." ) log.error(f"Full error:\n\t{err}") else: return load_balancer

Contoh kode berikut menunjukkan cara menggunakanCreateTargetGroup.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def create_target_group( self, target_group_name: str, protocol: str, port: int, vpc_id: str ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing target group. The target group specifies how the load balancer forwards requests to instances in the group and how instance health is checked. To speed up this demo, the health check is configured with shortened times and lower thresholds. In production, you might want to decrease the sensitivity of your health checks to avoid unwanted failures. :param target_group_name: The name of the target group to create. :param protocol: The protocol to use to forward requests, such as 'HTTP'. :param port: The port to use to forward requests, such as 80. :param vpc_id: The ID of the VPC in which the load balancer exists. :return: Data about the newly created target group. """ try: response = self.elb_client.create_target_group( Name=target_group_name, Protocol=protocol, Port=port, HealthCheckPath="/healthcheck", HealthCheckIntervalSeconds=10, HealthCheckTimeoutSeconds=5, HealthyThresholdCount=2, UnhealthyThresholdCount=2, VpcId=vpc_id, ) target_group = response["TargetGroups"][0] log.info(f"Created load balancing target group '{target_group_name}'.") return target_group except ClientError as err: log.error( f"Couldn't create load balancing target group '{target_group_name}'." ) error_code = err.response["Error"]["Code"] if error_code == "DuplicateTargetGroupName": log.error( f"Target group name {target_group_name} already exists. " "Check if the target group already exists." "Consider using a different name or deleting the existing target group if appropriate." ) elif error_code == "TooManyTargetGroups": log.error( "Too many target groups exist in the account. " "Consider deleting unused target groups to create space for new ones." ) log.error(f"Full error:\n\t{err}")

Contoh kode berikut menunjukkan cara menggunakanDeleteLoadBalancer.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def delete_load_balancer(self, load_balancer_name) -> None: """ Deletes a load balancer. :param load_balancer_name: The name of the load balancer to delete. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) lb_arn = response["LoadBalancers"][0]["LoadBalancerArn"] self.elb_client.delete_load_balancer(LoadBalancerArn=lb_arn) log.info("Deleted load balancer %s.", load_balancer_name) waiter = self.elb_client.get_waiter("load_balancers_deleted") log.info("Waiting for load balancer to be deleted...") waiter.wait(Names=[load_balancer_name]) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Couldn't delete load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "LoadBalancerNotFoundException": log.error( f"The load balancer '{load_balancer_name}' does not exist. " "Please check the name and try again." ) log.error(f"Full error:\n\t{err}")

Contoh kode berikut menunjukkan cara menggunakanDeleteTargetGroup.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def delete_target_group(self, target_group_name) -> None: """ Deletes the target group. """ try: # Describe the target group to get its ARN response = self.elb_client.describe_target_groups(Names=[target_group_name]) tg_arn = response["TargetGroups"][0]["TargetGroupArn"] # Delete the target group self.elb_client.delete_target_group(TargetGroupArn=tg_arn) log.info("Deleted load balancing target group %s.", target_group_name) # Use a custom waiter to wait until the target group is no longer available self.wait_for_target_group_deletion(self.elb_client, tg_arn) log.info("Target group %s successfully deleted.", target_group_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete target group '{target_group_name}'.") if error_code == "TargetGroupNotFound": log.error( "Load balancer target group either already deleted or never existed. " "Verify the name and check that the resource exists in the AWS Console." ) elif error_code == "ResourceInUseException": log.error( "Target group still in use by another resource. " "Ensure that the target group is no longer associated with any load balancers or resources.", ) log.error(f"Full error:\n\t{err}") def wait_for_target_group_deletion( self, elb_client, target_group_arn, max_attempts=10, delay=30 ): for attempt in range(max_attempts): try: elb_client.describe_target_groups(TargetGroupArns=[target_group_arn]) print( f"Attempt {attempt + 1}: Target group {target_group_arn} still exists." ) except ClientError as e: if e.response["Error"]["Code"] == "TargetGroupNotFound": print( f"Target group {target_group_arn} has been successfully deleted." ) return else: raise time.sleep(delay) raise TimeoutError( f"Target group {target_group_arn} was not deleted after {max_attempts * delay} seconds." )

Contoh kode berikut menunjukkan cara menggunakanDescribeLoadBalancers.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def get_endpoint(self, load_balancer_name) -> str: """ Gets the HTTP endpoint of the load balancer. :return: The endpoint. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) return response["LoadBalancers"][0]["DNSName"] except ClientError as err: log.error( f"Couldn't get the endpoint for load balancer {load_balancer_name}" ) error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Verify load balancer name and ensure it exists in the AWS console." ) log.error(f"Full error:\n\t{err}")

Contoh kode berikut menunjukkan cara menggunakanDescribeTargetHealth.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def check_target_health(self, target_group_name: str) -> List[Dict[str, Any]]: """ Checks the health of the instances in the target group. :return: The health status of the target group. """ try: tg_response = self.elb_client.describe_target_groups( Names=[target_group_name] ) health_response = self.elb_client.describe_target_health( TargetGroupArn=tg_response["TargetGroups"][0]["TargetGroupArn"] ) except ClientError as err: log.error(f"Couldn't check health of {target_group_name} target(s).") error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Load balancer associated with the target group was not found. " "Ensure the load balancer exists, is in the correct AWS region, and " "that you have the necessary permissions to access it.", ) elif error_code == "TargetGroupNotFoundException": log.error( "Target group was not found. " "Verify the target group name, check that it exists in the correct region, " "and ensure it has not been deleted or created in a different account.", ) log.error(f"Full error:\n\t{err}") else: return health_response["TargetHealthDescriptions"]

Skenario

Contoh kode berikut menunjukkan cara membuat layanan web load-balanced yang mengembalikan rekomendasi buku, film, dan lagu. Contoh ini menunjukkan cara layanan tersebut merespons kegagalan, serta cara merestrukturisasi layanan agar lebih tangguh ketika terjadi kegagalan.

  • Gunakan grup EC2 Auto Scaling Amazon untuk membuat instans Amazon Elastic Compute Cloud (AmazonEC2) berdasarkan template peluncuran dan untuk menyimpan jumlah instans dalam rentang yang ditentukan.

  • Menangani dan mendistribusikan HTTP permintaan dengan Elastic Load Balancing.

  • Memantau kondisi instans dalam grup Auto Scaling dan meneruskan permintaan hanya ke instans yang sehat.

  • Jalankan server web Python pada setiap EC2 instance untuk menangani HTTP permintaan. Server web merespons dengan memberikan rekomendasi dan melakukan pemeriksaan kondisi.

  • Menyimulasikan layanan yang direkomendasikan dengan tabel Amazon DynamoDB.

  • Kontrol respons server web terhadap permintaan dan pemeriksaan kesehatan dengan memperbarui AWS Systems Manager parameter.

SDKuntuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

Menjalankan skenario interaktif di prompt perintah.

class Runner: """ Manages the deployment, demonstration, and destruction of resources for the resilient service. """ def __init__( self, resource_path: str, recommendation: RecommendationService, autoscaler: AutoScalingWrapper, loadbalancer: ElasticLoadBalancerWrapper, param_helper: ParameterHelper, ): """ Initializes the Runner class with the necessary parameters. :param resource_path: The path to resource files used by this example, such as IAM policies and instance scripts. :param recommendation: An instance of the RecommendationService class. :param autoscaler: An instance of the AutoScaler class. :param loadbalancer: An instance of the LoadBalancer class. :param param_helper: An instance of the ParameterHelper class. """ self.resource_path = resource_path self.recommendation = recommendation self.autoscaler = autoscaler self.loadbalancer = loadbalancer self.param_helper = param_helper self.protocol = "HTTP" self.port = 80 self.ssh_port = 22 prefix = "doc-example-resilience" self.target_group_name = f"{prefix}-tg" self.load_balancer_name = f"{prefix}-lb" def deploy(self) -> None: """ Deploys the resources required for the resilient service, including the DynamoDB table, EC2 instances, Auto Scaling group, and load balancer. """ recommendations_path = f"{self.resource_path}/recommendations.json" startup_script = f"{self.resource_path}/server_startup_script.sh" instance_policy = f"{self.resource_path}/instance_policy.json" logging.info("Starting deployment of resources for the resilient service.") logging.info( "Creating and populating DynamoDB table '%s'.", self.recommendation.table_name, ) self.recommendation.create() self.recommendation.populate(recommendations_path) logging.info( "Creating an EC2 launch template with the startup script '%s'.", startup_script, ) self.autoscaler.create_template(startup_script, instance_policy) logging.info( "Creating an EC2 Auto Scaling group across multiple Availability Zones." ) zones = self.autoscaler.create_autoscaling_group(3) logging.info("Creating variables that control the flow of the demo.") self.param_helper.reset() logging.info("Creating Elastic Load Balancing target group and load balancer.") vpc = self.autoscaler.get_default_vpc() subnets = self.autoscaler.get_subnets(vpc["VpcId"], zones) target_group = self.loadbalancer.create_target_group( self.target_group_name, self.protocol, self.port, vpc["VpcId"] ) self.loadbalancer.create_load_balancer( self.load_balancer_name, [subnet["SubnetId"] for subnet in subnets] ) self.loadbalancer.create_listener(self.load_balancer_name, target_group) self.autoscaler.attach_load_balancer_target_group(target_group) logging.info("Verifying access to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) current_ip_address = requests.get("http://checkip.amazonaws.com").text.strip() if not lb_success: logging.warning( "Couldn't connect to the load balancer. Verifying that the port is open..." ) sec_group, port_is_open = self.autoscaler.verify_inbound_port( vpc, self.port, current_ip_address ) sec_group, ssh_port_is_open = self.autoscaler.verify_inbound_port( vpc, self.ssh_port, current_ip_address ) if not port_is_open: logging.warning( "The default security group for your VPC must allow access from this computer." ) if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound traffic on port {self.port} from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.port, current_ip_address ) if not ssh_port_is_open: if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound SSH traffic on port {self.ssh_port} for debugging from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.ssh_port, current_ip_address ) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) if lb_success: logging.info( "Load balancer is ready. Access it at: http://%s", current_ip_address ) else: logging.error( "Couldn't get a successful response from the load balancer endpoint. Please verify your VPC and security group settings." ) def demo_choices(self) -> None: """ Presents choices for interacting with the deployed service, such as sending requests to the load balancer or checking the health of the targets. """ actions = [ "Send a GET request to the load balancer endpoint.", "Check the health of load balancer targets.", "Go to the next part of the demo.", ] choice = 0 while choice != 2: logging.info("Choose an action to interact with the service.") choice = q.choose("Which action would you like to take? ", actions) if choice == 0: logging.info("Sending a GET request to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) logging.info("GET http://%s", endpoint) response = requests.get(f"http://{endpoint}") logging.info("Response: %s", response.status_code) if response.headers.get("content-type") == "application/json": pp(response.json()) elif choice == 1: logging.info("Checking the health of load balancer targets.") health = self.loadbalancer.check_target_health(self.target_group_name) for target in health: state = target["TargetHealth"]["State"] logging.info( "Target %s on port %d is %s", target["Target"]["Id"], target["Target"]["Port"], state, ) if state != "healthy": logging.warning( "%s: %s", target["TargetHealth"]["Reason"], target["TargetHealth"]["Description"], ) logging.info( "Note that it can take a minute or two for the health check to update." ) elif choice == 2: logging.info("Proceeding to the next part of the demo.") def demo(self) -> None: """ Runs the demonstration, showing how the service responds to different failure scenarios and how a resilient architecture can keep the service running. """ ssm_only_policy = f"{self.resource_path}/ssm_only_policy.json" logging.info("Resetting parameters to starting values for the demo.") self.param_helper.reset() logging.info( "Starting demonstration of the service's resilience under various failure conditions." ) self.demo_choices() logging.info( "Simulating failure by changing the Systems Manager parameter to a non-existent table." ) self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info("Sending GET requests will now return failure codes.") self.demo_choices() logging.info("Switching to static response mode to mitigate failure.") self.param_helper.put(self.param_helper.failure_response, "static") logging.info("Sending GET requests will now return static responses.") self.demo_choices() logging.info("Restoring normal operation of the recommendation service.") self.param_helper.put(self.param_helper.table, self.recommendation.table_name) logging.info( "Introducing a failure by assigning bad credentials to one of the instances." ) self.autoscaler.create_instance_profile( ssm_only_policy, self.autoscaler.bad_creds_policy_name, self.autoscaler.bad_creds_role_name, self.autoscaler.bad_creds_profile_name, ["AmazonSSMManagedInstanceCore"], ) instances = self.autoscaler.get_instances() bad_instance_id = instances[0] instance_profile = self.autoscaler.get_instance_profile(bad_instance_id) logging.info( "Replacing instance profile with bad credentials for instance %s.", bad_instance_id, ) self.autoscaler.replace_instance_profile( bad_instance_id, self.autoscaler.bad_creds_profile_name, instance_profile["AssociationId"], ) logging.info( "Sending GET requests may return either a valid recommendation or a static response." ) self.demo_choices() logging.info("Implementing deep health checks to detect unhealthy instances.") self.param_helper.put(self.param_helper.health_check, "deep") logging.info("Checking the health of the load balancer targets.") self.demo_choices() logging.info( "Terminating the unhealthy instance to let the auto scaler replace it." ) self.autoscaler.terminate_instance(bad_instance_id) logging.info("The service remains resilient during instance replacement.") self.demo_choices() logging.info("Simulating a complete failure of the recommendation service.") self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info( "All instances will report as unhealthy, but the service will still return static responses." ) self.demo_choices() self.param_helper.reset() def destroy(self, automation=False) -> None: """ Destroys all resources created for the demo, including the load balancer, Auto Scaling group, EC2 instances, and DynamoDB table. """ logging.info( "This concludes the demo. Preparing to clean up all AWS resources created during the demo." ) if automation: cleanup = True else: cleanup = q.ask( "Do you want to clean up all demo resources? (y/n) ", q.is_yesno ) if cleanup: logging.info("Deleting load balancer and related resources.") self.loadbalancer.delete_load_balancer(self.load_balancer_name) self.loadbalancer.delete_target_group(self.target_group_name) self.autoscaler.delete_autoscaling_group(self.autoscaler.group_name) self.autoscaler.delete_key_pair() self.autoscaler.delete_template() self.autoscaler.delete_instance_profile( self.autoscaler.bad_creds_profile_name, self.autoscaler.bad_creds_role_name, ) logging.info("Deleting DynamoDB table and other resources.") self.recommendation.destroy() else: logging.warning( "Resources have not been deleted. Ensure you clean them up manually to avoid unexpected charges." ) def main() -> None: """ Main function to parse arguments and run the appropriate actions for the demo. """ parser = argparse.ArgumentParser() parser.add_argument( "--action", required=True, choices=["all", "deploy", "demo", "destroy"], help="The action to take for the demo. When 'all' is specified, resources are\n" "deployed, the demo is run, and resources are destroyed.", ) parser.add_argument( "--resource_path", default="../../../workflows/resilient_service/resources", help="The path to resource files used by this example, such as IAM policies and\n" "instance scripts.", ) args = parser.parse_args() logging.info("Starting the Resilient Service demo.") prefix = "doc-example-resilience" # Service Clients ddb_client = boto3.client("dynamodb") elb_client = boto3.client("elbv2") autoscaling_client = boto3.client("autoscaling") ec2_client = boto3.client("ec2") ssm_client = boto3.client("ssm") iam_client = boto3.client("iam") # Wrapper instantiations recommendation = RecommendationService( "doc-example-recommendation-service", ddb_client ) autoscaling_wrapper = AutoScalingWrapper( prefix, "t3.micro", "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2", autoscaling_client, ec2_client, ssm_client, iam_client, ) elb_wrapper = ElasticLoadBalancerWrapper(elb_client) param_helper = ParameterHelper(recommendation.table_name, ssm_client) # Demo invocation runner = Runner( args.resource_path, recommendation, autoscaling_wrapper, elb_wrapper, param_helper, ) actions = [args.action] if args.action != "all" else ["deploy", "demo", "destroy"] for action in actions: if action == "deploy": runner.deploy() elif action == "demo": runner.demo() elif action == "destroy": runner.destroy() logging.info("Demo completed successfully.") if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") main()

Buat kelas yang membungkus Auto Scaling dan tindakan AmazonEC2.

class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def create_policy(self, policy_file: str, policy_name: str) -> str: """ Creates a new IAM policy or retrieves the ARN of an existing policy. :param policy_file: The path to a JSON file that contains the policy definition. :param policy_name: The name to give the created policy. :return: The ARN of the created or existing policy. """ with open(policy_file) as file: policy_doc = file.read() try: response = self.iam_client.create_policy( PolicyName=policy_name, PolicyDocument=policy_doc ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' created successfully. ARN: {policy_arn}") return policy_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the policy already exists, get its ARN response = self.iam_client.get_policy( PolicyArn=f"arn:aws:iam::{self.account_id}:policy/{policy_name}" ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' already exists. ARN: {policy_arn}") return policy_arn log.error(f"Full error:\n\t{err}") def create_role(self, role_name: str, assume_role_doc: dict) -> str: """ Creates a new IAM role or retrieves the ARN of an existing role. :param role_name: The name to give the created role. :param assume_role_doc: The assume role policy document that specifies which entities can assume the role. :return: The ARN of the created or existing role. """ try: response = self.iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_doc) ) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' created successfully. ARN: {role_arn}") return role_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the role already exists, get its ARN response = self.iam_client.get_role(RoleName=role_name) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' already exists. ARN: {role_arn}") return role_arn log.error(f"Full error:\n\t{err}") def attach_policy( self, role_name: str, policy_arn: str, aws_managed_policies: Tuple[str, ...] = (), ) -> None: """ Attaches an IAM policy to a role and optionally attaches additional AWS-managed policies. :param role_name: The name of the role to attach the policy to. :param policy_arn: The ARN of the policy to attach. :param aws_managed_policies: A tuple of AWS-managed policy names to attach to the role. """ try: self.iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) for aws_policy in aws_managed_policies: self.iam_client.attach_role_policy( RoleName=role_name, PolicyArn=f"arn:aws:iam::aws:policy/{aws_policy}", ) log.info(f"Attached policy {policy_arn} to role {role_name}.") except ClientError as err: log.error(f"Failed to attach policy {policy_arn} to role {role_name}.") log.error(f"Full error:\n\t{err}") def create_instance_profile( self, policy_file: str, policy_name: str, role_name: str, profile_name: str, aws_managed_policies: Tuple[str, ...] = (), ) -> str: """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = self.create_policy(policy_file, policy_name) self.create_role(role_name, assume_role_doc) self.attach_policy(role_name, policy_arn, aws_managed_policies) try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) log.error(f"Full error:\n\t{err}") return profile_arn def get_instance_profile(self, instance_id: str) -> Dict[str, Any]: """ Gets data about the profile associated with an instance. :param instance_id: The ID of the instance to look up. :return: The profile data. """ try: response = self.ec2_client.describe_iam_instance_profile_associations( Filters=[{"Name": "instance-id", "Values": [instance_id]}] ) if not response["IamInstanceProfileAssociations"]: log.info(f"No instance profile found for instance {instance_id}.") profile_data = response["IamInstanceProfileAssociations"][0] log.info(f"Retrieved instance profile for instance {instance_id}.") return profile_data except ClientError as err: log.error( f"Failed to retrieve instance profile for instance {instance_id}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidInstanceID.NotFound": log.error(f"The instance ID '{instance_id}' does not exist.") log.error(f"Full error:\n\t{err}") def replace_instance_profile( self, instance_id: str, new_instance_profile_name: str, profile_association_id: str, ) -> None: """ Replaces the profile associated with a running instance. After the profile is replaced, the instance is rebooted to ensure that it uses the new profile. When the instance is ready, Systems Manager is used to restart the Python web server. :param instance_id: The ID of the instance to restart. :param new_instance_profile_name: The name of the new profile to associate with the specified instance. :param profile_association_id: The ID of the existing profile association for the instance. """ try: self.ec2_client.replace_iam_instance_profile_association( IamInstanceProfile={"Name": new_instance_profile_name}, AssociationId=profile_association_id, ) log.info( "Replaced instance profile for association %s with profile %s.", profile_association_id, new_instance_profile_name, ) time.sleep(5) self.ec2_client.reboot_instances(InstanceIds=[instance_id]) log.info("Rebooting instance %s.", instance_id) waiter = self.ec2_client.get_waiter("instance_running") log.info("Waiting for instance %s to be running.", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info("Instance %s is now running.", instance_id) self.ssm_client.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", Parameters={"commands": ["cd / && sudo python3 server.py 80"]}, ) log.info(f"Restarted the Python web server on instance '{instance_id}'.") except ClientError as err: log.error("Failed to replace instance profile.") error_code = err.response["Error"]["Code"] if error_code == "InvalidAssociationID.NotFound": log.error( f"Association ID '{profile_association_id}' does not exist." "Please check the association ID and try again." ) if error_code == "InvalidInstanceId": log.error( f"The specified instance ID '{instance_id}' does not exist or is not available for SSM. " f"Please verify the instance ID and try again." ) log.error(f"Full error:\n\t{err}") def delete_instance_profile(self, profile_name: str, role_name: str) -> None: """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: log.error( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name ) def create_key_pair(self, key_pair_name: str) -> None: """ Creates a new key pair. :param key_pair_name: The name of the key pair to create. """ try: response = self.ec2_client.create_key_pair(KeyName=key_pair_name) with open(f"{key_pair_name}.pem", "w") as file: file.write(response["KeyMaterial"]) chmod(f"{key_pair_name}.pem", 0o600) log.info("Created key pair %s.", key_pair_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to create key pair {key_pair_name}.") if error_code == "InvalidKeyPair.Duplicate": log.error(f"A key pair with the name '{key_pair_name}' already exists.") log.error(f"Full error:\n\t{err}") def delete_key_pair(self) -> None: """ Deletes a key pair. """ try: self.ec2_client.delete_key_pair(KeyName=self.key_pair_name) remove(f"{self.key_pair_name}.pem") log.info("Deleted key pair %s.", self.key_pair_name) except ClientError as err: log.error(f"Couldn't delete key pair '{self.key_pair_name}'.") log.error(f"Full error:\n\t{err}") except FileNotFoundError as err: log.info("Key pair %s doesn't exist, nothing to do.", self.key_pair_name) log.error(f"Full error:\n\t{err}") def create_template( self, server_startup_script_file: str, instance_policy_file: str ) -> Dict[str, Any]: """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. The launch template specifies a Bash script in its user data field that runs after the instance is started. This script installs Python packages and starts a Python web server on the instance. :param server_startup_script_file: The path to a Bash script file that is run when an instance starts. :param instance_policy_file: The path to a file that defines a permissions policy to create and attach to the instance profile. :return: Information about the newly created template. """ template = {} try: # Create key pair and instance profile self.create_key_pair(self.key_pair_name) self.create_instance_profile( instance_policy_file, self.instance_policy_name, self.instance_role_name, self.instance_profile_name, ) # Read the startup script with open(server_startup_script_file) as file: start_server_script = file.read() # Get the latest AMI ID ami_latest = self.ssm_client.get_parameter(Name=self.ami_param) ami_id = ami_latest["Parameter"]["Value"] # Create the launch template lt_response = self.ec2_client.create_launch_template( LaunchTemplateName=self.launch_template_name, LaunchTemplateData={ "InstanceType": self.inst_type, "ImageId": ami_id, "IamInstanceProfile": {"Name": self.instance_profile_name}, "UserData": base64.b64encode( start_server_script.encode(encoding="utf-8") ).decode(encoding="utf-8"), "KeyName": self.key_pair_name, }, ) template = lt_response["LaunchTemplate"] log.info( f"Created launch template {self.launch_template_name} for AMI {ami_id} on {self.inst_type}." ) except ClientError as err: log.error(f"Failed to create launch template {self.launch_template_name}.") error_code = err.response["Error"]["Code"] if error_code == "InvalidLaunchTemplateName.AlreadyExistsException": log.info( f"Launch template {self.launch_template_name} already exists, nothing to do." ) log.error(f"Full error:\n\t{err}") return template def delete_template(self): """ Deletes a launch template. """ try: self.ec2_client.delete_launch_template( LaunchTemplateName=self.launch_template_name ) self.delete_instance_profile( self.instance_profile_name, self.instance_role_name ) log.info("Launch template %s deleted.", self.launch_template_name) except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.NotFoundException" ): log.info( "Launch template %s does not exist, nothing to do.", self.launch_template_name, ) log.error(f"Full error:\n\t{err}") def get_availability_zones(self) -> List[str]: """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone["ZoneName"] for zone in response["AvailabilityZones"]] log.info(f"Retrieved {len(zones)} availability zones: {zones}.") except ClientError as err: log.error("Failed to retrieve availability zones.") log.error(f"Full error:\n\t{err}") else: return zones def create_autoscaling_group(self, group_size: int) -> List[str]: """ Creates an EC2 Auto Scaling group with the specified size. :param group_size: The number of instances to set for the minimum and maximum in the group. :return: The list of Availability Zones specified for the group. """ try: zones = self.get_availability_zones() self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=self.group_name, AvailabilityZones=zones, LaunchTemplate={ "LaunchTemplateName": self.launch_template_name, "Version": "$Default", }, MinSize=group_size, MaxSize=group_size, ) log.info( f"Created EC2 Auto Scaling group {self.group_name} with availability zones {zones}." ) except ClientError as err: error_code = err.response["Error"]["Code"] if error_code == "AlreadyExists": log.info( f"EC2 Auto Scaling group {self.group_name} already exists, nothing to do." ) else: log.error(f"Failed to create EC2 Auto Scaling group {self.group_name}.") log.error(f"Full error:\n\t{err}") else: return zones def get_instances(self) -> List[str]: """ Gets data about the instances in the EC2 Auto Scaling group. :return: A list of instance IDs in the Auto Scaling group. """ try: as_response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name] ) instance_ids = [ i["InstanceId"] for i in as_response["AutoScalingGroups"][0]["Instances"] ] log.info( f"Retrieved {len(instance_ids)} instances for Auto Scaling group {self.group_name}." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to retrieve instances for Auto Scaling group {self.group_name}." ) if error_code == "ResourceNotFound": log.error(f"The Auto Scaling group '{self.group_name}' does not exist.") log.error(f"Full error:\n\t{err}") else: return instance_ids def terminate_instance(self, instance_id: str, decrementsetting=False) -> None: """ Terminates an instance in an EC2 Auto Scaling group. After an instance is terminated, it can no longer be accessed. :param instance_id: The ID of the instance to terminate. :param decrementsetting: If True, do not replace terminated instances. """ try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=decrementsetting, ) log.info("Terminated instance %s.", instance_id) # Adding a waiter to ensure the instance is terminated waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for instance %s to be terminated...", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info( f"Instance '{instance_id}' has been terminated and will be replaced." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to terminate instance '{instance_id}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to terminate the instance again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) log.error(f"Full error:\n\t{err}") def attach_load_balancer_target_group( self, lb_target_group: Dict[str, Any] ) -> None: """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forwards requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group["TargetGroupArn"]], ) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group["TargetGroupName"], self.group_name, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to attach load balancer target group '{lb_target_group['TargetGroupName']}'." ) if error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) elif error_code == "ServiceLinkedRoleFailure": log.error( "The operation failed because the service-linked role is not ready or does not exist. " "Check that the service-linked role exists and is correctly configured." ) log.error(f"Full error:\n\t{err}") def delete_autoscaling_group(self, group_name: str) -> None: """ Terminates all instances in the group, then deletes the EC2 Auto Scaling group. :param group_name: The name of the group to delete. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[group_name] ) groups = response.get("AutoScalingGroups", []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=group_name, MinSize=0 ) instance_ids = [inst["InstanceId"] for inst in groups[0]["Instances"]] for inst_id in instance_ids: self.terminate_instance(inst_id) # Wait for all instances to be terminated if instance_ids: waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for all instances to be terminated...") waiter.wait(InstanceIds=instance_ids) log.info("All instances have been terminated.") else: log.info(f"No groups found named '{group_name}'! Nothing to do.") except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete Auto Scaling group '{group_name}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to delete the group again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the group." ) log.error(f"Full error:\n\t{err}") def get_default_vpc(self) -> Dict[str, Any]: """ Gets the default VPC for the account. :return: Data about the default VPC. """ try: response = self.ec2_client.describe_vpcs( Filters=[{"Name": "is-default", "Values": ["true"]}] ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error("Failed to retrieve the default VPC.") if error_code == "UnauthorizedOperation": log.error( "You do not have the necessary permissions to describe VPCs. " "Ensure that your AWS IAM user or role has the correct permissions." ) elif error_code == "InvalidParameterValue": log.error( "One or more parameters are invalid. Check the request parameters." ) log.error(f"Full error:\n\t{err}") else: if "Vpcs" in response and response["Vpcs"]: log.info(f"Retrieved default VPC: {response['Vpcs'][0]['VpcId']}") return response["Vpcs"][0] else: pass def verify_inbound_port( self, vpc: Dict[str, Any], port: int, ip_address: str ) -> Tuple[Dict[str, Any], bool]: """ Verify the default security group of the specified VPC allows ingress from this computer. This can be done by allowing ingress from this computer's IP address. In some situations, such as connecting from a corporate network, you must instead specify a prefix list ID. You can also temporarily open the port to any IP address while running this example. If you do, be sure to remove public access when you're done. :param vpc: The VPC used by this example. :param port: The port to verify. :param ip_address: This computer's IP address. :return: The default security group of the specified VPC, and a value that indicates whether the specified port is open. """ try: response = self.ec2_client.describe_security_groups( Filters=[ {"Name": "group-name", "Values": ["default"]}, {"Name": "vpc-id", "Values": [vpc["VpcId"]]}, ] ) sec_group = response["SecurityGroups"][0] port_is_open = False log.info(f"Found default security group {sec_group['GroupId']}.") for ip_perm in sec_group["IpPermissions"]: if ip_perm.get("FromPort", 0) == port: log.info(f"Found inbound rule: {ip_perm}") for ip_range in ip_perm["IpRanges"]: cidr = ip_range.get("CidrIp", "") if cidr.startswith(ip_address) or cidr == "0.0.0.0/0": port_is_open = True if ip_perm["PrefixListIds"]: port_is_open = True if not port_is_open: log.info( f"The inbound rule does not appear to be open to either this computer's IP " f"address of {ip_address}, to all IP addresses (0.0.0.0/0), or to a prefix list ID." ) else: break except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to verify inbound rule for port {port} for VPC {vpc['VpcId']}." ) if error_code == "InvalidVpcID.NotFound": log.error( f"The specified VPC ID '{vpc['VpcId']}' does not exist. Please check the VPC ID." ) log.error(f"Full error:\n\t{err}") else: return sec_group, port_is_open def open_inbound_port(self, sec_group_id: str, port: int, ip_address: str) -> None: """ Add an ingress rule to the specified security group that allows access on the specified port from the specified IP address. :param sec_group_id: The ID of the security group to modify. :param port: The port to open. :param ip_address: The IP address that is granted access. """ try: self.ec2_client.authorize_security_group_ingress( GroupId=sec_group_id, CidrIp=f"{ip_address}/32", FromPort=port, ToPort=port, IpProtocol="tcp", ) log.info( "Authorized ingress to %s on port %s from %s.", sec_group_id, port, ip_address, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to authorize ingress to security group '{sec_group_id}' on port {port} from {ip_address}." ) if error_code == "InvalidGroupId.Malformed": log.error( "The security group ID is malformed. " "Please verify that the security group ID is correct." ) elif error_code == "InvalidPermission.Duplicate": log.error( "The specified rule already exists in the security group. " "Check the existing rules for this security group." ) log.error(f"Full error:\n\t{err}") def get_subnets(self, vpc_id: str, zones: List[str] = None) -> List[Dict[str, Any]]: """ Gets the default subnets in a VPC for a specified list of Availability Zones. :param vpc_id: The ID of the VPC to look up. :param zones: The list of Availability Zones to look up. :return: The list of subnets found. """ # Ensure that 'zones' is a list, even if None is passed if zones is None: zones = [] try: paginator = self.ec2_client.get_paginator("describe_subnets") page_iterator = paginator.paginate( Filters=[ {"Name": "vpc-id", "Values": [vpc_id]}, {"Name": "availability-zone", "Values": zones}, {"Name": "default-for-az", "Values": ["true"]}, ] ) subnets = [] for page in page_iterator: subnets.extend(page["Subnets"]) log.info("Found %s subnets for the specified zones.", len(subnets)) return subnets except ClientError as err: log.error( f"Failed to retrieve subnets for VPC '{vpc_id}' in zones {zones}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidVpcID.NotFound": log.error( "The specified VPC ID does not exist. " "Please check the VPC ID and try again." ) # Add more error-specific handling as needed log.error(f"Full error:\n\t{err}")

Membuat kelas yang menggabungkan tindakan Penyeimbangan Beban Elastis.

class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def create_target_group( self, target_group_name: str, protocol: str, port: int, vpc_id: str ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing target group. The target group specifies how the load balancer forwards requests to instances in the group and how instance health is checked. To speed up this demo, the health check is configured with shortened times and lower thresholds. In production, you might want to decrease the sensitivity of your health checks to avoid unwanted failures. :param target_group_name: The name of the target group to create. :param protocol: The protocol to use to forward requests, such as 'HTTP'. :param port: The port to use to forward requests, such as 80. :param vpc_id: The ID of the VPC in which the load balancer exists. :return: Data about the newly created target group. """ try: response = self.elb_client.create_target_group( Name=target_group_name, Protocol=protocol, Port=port, HealthCheckPath="/healthcheck", HealthCheckIntervalSeconds=10, HealthCheckTimeoutSeconds=5, HealthyThresholdCount=2, UnhealthyThresholdCount=2, VpcId=vpc_id, ) target_group = response["TargetGroups"][0] log.info(f"Created load balancing target group '{target_group_name}'.") return target_group except ClientError as err: log.error( f"Couldn't create load balancing target group '{target_group_name}'." ) error_code = err.response["Error"]["Code"] if error_code == "DuplicateTargetGroupName": log.error( f"Target group name {target_group_name} already exists. " "Check if the target group already exists." "Consider using a different name or deleting the existing target group if appropriate." ) elif error_code == "TooManyTargetGroups": log.error( "Too many target groups exist in the account. " "Consider deleting unused target groups to create space for new ones." ) log.error(f"Full error:\n\t{err}") def delete_target_group(self, target_group_name) -> None: """ Deletes the target group. """ try: # Describe the target group to get its ARN response = self.elb_client.describe_target_groups(Names=[target_group_name]) tg_arn = response["TargetGroups"][0]["TargetGroupArn"] # Delete the target group self.elb_client.delete_target_group(TargetGroupArn=tg_arn) log.info("Deleted load balancing target group %s.", target_group_name) # Use a custom waiter to wait until the target group is no longer available self.wait_for_target_group_deletion(self.elb_client, tg_arn) log.info("Target group %s successfully deleted.", target_group_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete target group '{target_group_name}'.") if error_code == "TargetGroupNotFound": log.error( "Load balancer target group either already deleted or never existed. " "Verify the name and check that the resource exists in the AWS Console." ) elif error_code == "ResourceInUseException": log.error( "Target group still in use by another resource. " "Ensure that the target group is no longer associated with any load balancers or resources.", ) log.error(f"Full error:\n\t{err}") def wait_for_target_group_deletion( self, elb_client, target_group_arn, max_attempts=10, delay=30 ): for attempt in range(max_attempts): try: elb_client.describe_target_groups(TargetGroupArns=[target_group_arn]) print( f"Attempt {attempt + 1}: Target group {target_group_arn} still exists." ) except ClientError as e: if e.response["Error"]["Code"] == "TargetGroupNotFound": print( f"Target group {target_group_arn} has been successfully deleted." ) return else: raise time.sleep(delay) raise TimeoutError( f"Target group {target_group_arn} was not deleted after {max_attempts * delay} seconds." ) def create_load_balancer( self, load_balancer_name: str, subnet_ids: List[str], ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing load balancer that uses the specified subnets and forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create. :param subnet_ids: A list of subnets to associate with the load balancer. :return: Data about the newly created load balancer. """ try: response = self.elb_client.create_load_balancer( Name=load_balancer_name, Subnets=subnet_ids ) load_balancer = response["LoadBalancers"][0] log.info(f"Created load balancer '{load_balancer_name}'.") waiter = self.elb_client.get_waiter("load_balancer_available") log.info( f"Waiting for load balancer '{load_balancer_name}' to be available..." ) waiter.wait(Names=[load_balancer_name]) log.info(f"Load balancer '{load_balancer_name}' is now available!") except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to create load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "DuplicateLoadBalancerNameException": log.error( f"A load balancer with the name '{load_balancer_name}' already exists. " "Load balancer names must be unique within the AWS region. " "Please choose a different name and try again." ) if error_code == "TooManyLoadBalancersException": log.error( "The maximum number of load balancers has been reached in this account and region. " "You can delete unused load balancers or request an increase in the service quota from AWS Support." ) log.error(f"Full error:\n\t{err}") else: return load_balancer def create_listener( self, load_balancer_name: str, target_group: Dict[str, Any], ) -> Dict[str, Any]: """ Creates a listener for the specified load balancer that forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create a listener for. :param target_group: An existing target group that is added as a listener to the load balancer. :return: Data about the newly created listener. """ try: # Retrieve the load balancer ARN load_balancer_response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) load_balancer_arn = load_balancer_response["LoadBalancers"][0][ "LoadBalancerArn" ] # Create the listener response = self.elb_client.create_listener( LoadBalancerArn=load_balancer_arn, Protocol=target_group["Protocol"], Port=target_group["Port"], DefaultActions=[ { "Type": "forward", "TargetGroupArn": target_group["TargetGroupArn"], } ], ) log.info( f"Created listener to forward traffic from load balancer '{load_balancer_name}' to target group '{target_group['TargetGroupName']}'." ) return response["Listeners"][0] except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to add a listener on '{load_balancer_name}' for target group '{target_group['TargetGroupName']}'." ) if error_code == "ListenerNotFoundException": log.error( f"The listener could not be found for the load balancer '{load_balancer_name}'. " "Please check the load balancer name and target group configuration." ) if error_code == "InvalidConfigurationRequestException": log.error( f"The configuration provided for the listener on load balancer '{load_balancer_name}' is invalid. " "Please review the provided protocol, port, and target group settings." ) log.error(f"Full error:\n\t{err}") def delete_load_balancer(self, load_balancer_name) -> None: """ Deletes a load balancer. :param load_balancer_name: The name of the load balancer to delete. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) lb_arn = response["LoadBalancers"][0]["LoadBalancerArn"] self.elb_client.delete_load_balancer(LoadBalancerArn=lb_arn) log.info("Deleted load balancer %s.", load_balancer_name) waiter = self.elb_client.get_waiter("load_balancers_deleted") log.info("Waiting for load balancer to be deleted...") waiter.wait(Names=[load_balancer_name]) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Couldn't delete load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "LoadBalancerNotFoundException": log.error( f"The load balancer '{load_balancer_name}' does not exist. " "Please check the name and try again." ) log.error(f"Full error:\n\t{err}") def get_endpoint(self, load_balancer_name) -> str: """ Gets the HTTP endpoint of the load balancer. :return: The endpoint. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) return response["LoadBalancers"][0]["DNSName"] except ClientError as err: log.error( f"Couldn't get the endpoint for load balancer {load_balancer_name}" ) error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Verify load balancer name and ensure it exists in the AWS console." ) log.error(f"Full error:\n\t{err}") @staticmethod def verify_load_balancer_endpoint(endpoint) -> bool: """ Verify this computer can successfully send a GET request to the load balancer endpoint. :param endpoint: The endpoint to verify. :return: True if the GET request is successful, False otherwise. """ retries = 3 verified = False while not verified and retries > 0: try: lb_response = requests.get(f"http://{endpoint}") log.info( "Got response %s from load balancer endpoint.", lb_response.status_code, ) if lb_response.status_code == 200: verified = True else: retries = 0 except requests.exceptions.ConnectionError: log.info( "Got connection error from load balancer endpoint, retrying..." ) retries -= 1 time.sleep(10) return verified def check_target_health(self, target_group_name: str) -> List[Dict[str, Any]]: """ Checks the health of the instances in the target group. :return: The health status of the target group. """ try: tg_response = self.elb_client.describe_target_groups( Names=[target_group_name] ) health_response = self.elb_client.describe_target_health( TargetGroupArn=tg_response["TargetGroups"][0]["TargetGroupArn"] ) except ClientError as err: log.error(f"Couldn't check health of {target_group_name} target(s).") error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Load balancer associated with the target group was not found. " "Ensure the load balancer exists, is in the correct AWS region, and " "that you have the necessary permissions to access it.", ) elif error_code == "TargetGroupNotFoundException": log.error( "Target group was not found. " "Verify the target group name, check that it exists in the correct region, " "and ensure it has not been deleted or created in a different account.", ) log.error(f"Full error:\n\t{err}") else: return health_response["TargetHealthDescriptions"]

Membuat kelas yang menggunakan DynamoDB untuk menyimulasikan layanan yang direkomendasikan.

class RecommendationService: """ Encapsulates a DynamoDB table to use as a service that recommends books, movies, and songs. """ def __init__(self, table_name: str, dynamodb_client: boto3.client): """ Initializes the RecommendationService class with the necessary parameters. :param table_name: The name of the DynamoDB recommendations table. :param dynamodb_client: A Boto3 DynamoDB client. """ self.table_name = table_name self.dynamodb_client = dynamodb_client def create(self) -> Dict[str, Any]: """ Creates a DynamoDB table to use as a recommendation service. The table has a hash key named 'MediaType' that defines the type of media recommended, such as Book or Movie, and a range key named 'ItemId' that, combined with the MediaType, forms a unique identifier for the recommended item. :return: Data about the newly created table. :raises RecommendationServiceError: If the table creation fails. """ try: response = self.dynamodb_client.create_table( TableName=self.table_name, AttributeDefinitions=[ {"AttributeName": "MediaType", "AttributeType": "S"}, {"AttributeName": "ItemId", "AttributeType": "N"}, ], KeySchema=[ {"AttributeName": "MediaType", "KeyType": "HASH"}, {"AttributeName": "ItemId", "KeyType": "RANGE"}, ], ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5}, ) log.info("Creating table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_exists") waiter.wait(TableName=self.table_name) log.info("Table %s created.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceInUseException": log.info("Table %s exists, nothing to be done.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when creating table: {err}." ) else: return response def populate(self, data_file: str) -> None: """ Populates the recommendations table from a JSON file. :param data_file: The path to the data file. :raises RecommendationServiceError: If the table population fails. """ try: with open(data_file) as data: items = json.load(data) batch = [{"PutRequest": {"Item": item}} for item in items] self.dynamodb_client.batch_write_item(RequestItems={self.table_name: batch}) log.info( "Populated table %s with items from %s.", self.table_name, data_file ) except ClientError as err: raise RecommendationServiceError( self.table_name, f"Couldn't populate table from {data_file}: {err}" ) def destroy(self) -> None: """ Deletes the recommendations table. :raises RecommendationServiceError: If the table deletion fails. """ try: self.dynamodb_client.delete_table(TableName=self.table_name) log.info("Deleting table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_not_exists") waiter.wait(TableName=self.table_name) log.info("Table %s deleted.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": log.info("Table %s does not exist, nothing to do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when deleting table: {err}." )

Membuat kelas yang mengabungkan tindakan Systems Manager.

class ParameterHelper: """ Encapsulates Systems Manager parameters. This example uses these parameters to drive the demonstration of resilient architecture, such as failure of a dependency or how the service responds to a health check. """ table: str = "doc-example-resilient-architecture-table" failure_response: str = "doc-example-resilient-architecture-failure-response" health_check: str = "doc-example-resilient-architecture-health-check" def __init__(self, table_name: str, ssm_client: boto3.client): """ Initializes the ParameterHelper class with the necessary parameters. :param table_name: The name of the DynamoDB table that is used as a recommendation service. :param ssm_client: A Boto3 Systems Manager client. """ self.ssm_client = ssm_client self.table_name = table_name def reset(self) -> None: """ Resets the Systems Manager parameters to starting values for the demo. These are the name of the DynamoDB recommendation table, no response when a dependency fails, and shallow health checks. """ self.put(self.table, self.table_name) self.put(self.failure_response, "none") self.put(self.health_check, "shallow") def put(self, name: str, value: str) -> None: """ Sets the value of a named Systems Manager parameter. :param name: The name of the parameter. :param value: The new value of the parameter. :raises ParameterHelperError: If the parameter value cannot be set. """ try: self.ssm_client.put_parameter( Name=name, Value=value, Overwrite=True, Type="String" ) log.info("Setting parameter %s to '%s'.", name, value) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to set parameter {name}.") if error_code == "ParameterLimitExceeded": log.error( "The parameter limit has been exceeded. " "Consider deleting unused parameters or request a limit increase." ) elif error_code == "ParameterAlreadyExists": log.error( "The parameter already exists and overwrite is set to False. " "Use Overwrite=True to update the parameter." ) log.error(f"Full error:\n\t{err}")