Hay más AWS SDK ejemplos disponibles en el GitHub repositorio de AWS Doc SDK Examples
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Ejemplos de Auto Scaling que se utilizan SDK para Python (Boto3)
Los siguientes ejemplos de código muestran cómo realizar acciones e implementar escenarios comunes mediante el AWS SDK for Python (Boto3) uso de Auto Scaling.
Los conceptos básicos son ejemplos de código que muestran cómo realizar las operaciones esenciales dentro de un servicio.
Las acciones son extractos de código de programas más grandes y deben ejecutarse en contexto. Mientras las acciones muestran cómo llamar a las funciones de servicio individuales, es posible ver las acciones en contexto en los escenarios relacionados.
Los escenarios son ejemplos de código que muestran cómo llevar a cabo una tarea específica a través de llamadas a varias funciones dentro del servicio o combinado con otros Servicios de AWS.
Cada ejemplo incluye un enlace al código fuente completo, donde puede encontrar instrucciones sobre cómo configurar y ejecutar el código en su contexto.
Introducción
En los siguientes ejemplos de código se muestra cómo empezar a utilizar el escalado automático.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import boto3 def hello_autoscaling(autoscaling_client): """ Use the AWS SDK for Python (Boto3) to create an Amazon EC2 Auto Scaling client and list some of the Auto Scaling groups in your account. This example uses the default settings specified in your shared credentials and config files. :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client object. """ print( "Hello, Amazon EC2 Auto Scaling! Let's list up to ten of you Auto Scaling groups:" ) response = autoscaling_client.describe_auto_scaling_groups() groups = response.get("AutoScalingGroups", []) if groups: for group in groups: print(f"\t{group['AutoScalingGroupName']}: {group['AvailabilityZones']}") else: print("There are no Auto Scaling groups in your account.") if __name__ == "__main__": hello_autoscaling(boto3.client("autoscaling"))
-
Para API obtener más información, consulte DescribeAutoScalingGroupsla AWS SDKreferencia de Python (Boto3). API
-
Conceptos básicos
En el siguiente ejemplo de código, se muestra cómo:
Cree un grupo de Amazon EC2 Auto Scaling con una plantilla de lanzamiento y zonas de disponibilidad, y obtenga información sobre las instancias en ejecución.
Habilita la recopilación de CloudWatch métricas de Amazon.
Actualizar la capacidad deseada del grupo y esperar a que una instancia se inicie
Terminar una instancia del grupo.
Mostrar las actividades de escalado que se producen como respuesta a las solicitudes de los usuarios y a los cambios de capacidad
Obtén estadísticas para CloudWatch las métricas y, a continuación, limpia los recursos.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Ejecutar un escenario interactivo en un símbolo del sistema.
def run_scenario(as_wrapper: AutoScalingWrapper, svc_helper: ServiceHelper) -> None: """ Runs the scenario demonstrating the management of Auto Scaling groups and instances. :param as_wrapper: An instance of the AutoScalingWrapper that manages Auto Scaling groups. :param svc_helper: An instance of the ServiceHelper that interacts with AWS services. :return: None """ logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") logger.info("Starting the Amazon EC2 Auto Scaling demo.") print("-" * 88) print( "Welcome to the Amazon EC2 Auto Scaling demo for managing groups and instances." ) print("-" * 88) print( "This example requires a launch template that specifies how to create " "EC2 instances. You can use an existing template or create a new one." ) template_name = q.ask( "Enter the name of an existing launch template or press Enter to create a new one: " ) template = None if template_name: template = svc_helper.get_template(template_name) if template is None: inst_type = "t1.micro" ami_id = "ami-0ca285d4c2cda3300" print("Let's create a launch template with the following specifications:") print(f"\tInstanceType: {inst_type}") print(f"\tAMI ID: {ami_id}") template_name = q.ask("Enter a name for the template: ", q.non_empty) template = svc_helper.create_template(template_name, inst_type, ami_id) print("-" * 88) print("Let's create an Auto Scaling group.") group_name = q.ask("Enter a name for the group: ", q.non_empty) zones = svc_helper.get_availability_zones() print("EC2 instances can be created in the following Availability Zones:") for index, zone in enumerate(zones): print(f"\t{index+1}. {zone}") print(f"\t{len(zones)+1}. All zones") zone_sel = q.ask( "Which zone do you want to use? ", q.is_int, q.in_range(1, len(zones) + 1) ) group_zones = [zones[zone_sel - 1]] if zone_sel <= len(zones) else zones print(f"Creating group {group_name}...") as_wrapper.create_autoscaling_group(group_name, group_zones, template_name, 1, 1) wait(10) group = as_wrapper.describe_group(group_name) logger.info("Created Auto Scaling group %s.", group_name) print("Created group:") pp(group) print("Waiting for instance to start...") wait_for_group(group_name, as_wrapper) print("-" * 88) use_metrics = q.ask( "Do you want to collect metrics about Amazon EC2 Auto Scaling during this demo (y/n)? ", q.is_yesno, ) if use_metrics: as_wrapper.enable_metrics( group_name, [ "GroupMinSize", "GroupMaxSize", "GroupDesiredCapacity", "GroupInServiceInstances", "GroupTotalInstances", ], ) logger.info("Enabled metrics for Auto Scaling group %s.", group_name) print(f"Metrics enabled for {group_name}.") print("-" * 88) print(f"Let's update the maximum number of instances in {group_name} from 1 to 3.") q.ask("Press Enter when you're ready.") as_wrapper.update_group(group_name, MaxSize=3) group = as_wrapper.describe_group(group_name) logger.info("Updated maximum size for group %s to 3.", group_name) print("The group still has one running instance, but can have up to three:") print_simplified_group(group) print("-" * 88) print(f"Let's update the desired capacity of {group_name} from 1 to 2.") q.ask("Press Enter when you're ready.") as_wrapper.set_desired_capacity(group_name, 2) wait(10) group = as_wrapper.describe_group(group_name) logger.info("Set desired capacity for group %s to 2.", group_name) print("Here's the current state of the group:") print_simplified_group(group) print("-" * 88) print("Waiting for the new instance to start...") instance_ids = wait_for_group(group_name, as_wrapper) print("-" * 88) print(f"Let's terminate one of the instances in {group_name}.") print("Because the desired capacity is 2, another instance will start.") print("The currently running instances are:") for index, inst_id in enumerate(instance_ids): print(f"\t{index+1}. {inst_id}") inst_sel = q.ask( "Which instance do you want to stop? ", q.is_int, q.in_range(1, len(instance_ids) + 1), ) print(f"Stopping {instance_ids[inst_sel-1]}...") as_wrapper.terminate_instance(instance_ids[inst_sel - 1], False) wait(10) group = as_wrapper.describe_group(group_name) logger.info( "Terminated instance %s in group %s.", instance_ids[inst_sel - 1], group_name ) print(f"Here's the state of {group_name}:") print_simplified_group(group) print("Waiting for the scaling activities to complete...") wait_for_group(group_name, as_wrapper) print("-" * 88) print(f"Let's get a report of scaling activities for {group_name}.") q.ask("Press Enter when you're ready.") activities = as_wrapper.describe_scaling_activities(group_name) logger.info( "Retrieved %d scaling activities for group %s.", len(activities), group_name ) print( f"Found {len(activities)} activities.\n" f"Activities are ordered with the most recent one first:" ) for act in activities: pp(act) print("-" * 88) if use_metrics: print("Let's look at CloudWatch metrics.") metric_namespace = "AWS/AutoScaling" metric_dimensions = [{"Name": "AutoScalingGroupName", "Value": group_name}] print(f"The following metrics are enabled for {group_name}:") done = False while not done: metrics = svc_helper.get_metrics(metric_namespace, metric_dimensions) for index, metric in enumerate(metrics): print(f"\t{index+1}. {metric.name}") print(f"\t{len(metrics)+1}. None") metric_sel = q.ask( "Which metric do you want to see? ", q.is_int, q.in_range(1, len(metrics) + 1), ) if metric_sel < len(metrics) + 1: span = 5 metric = metrics[metric_sel - 1] print(f"Over the last {span} minutes, {metric.name} recorded:") # CloudWatch metric times are in the UTC+0 time zone. now = datetime.now(timezone.utc) metric_data = svc_helper.get_metric_statistics( metric_dimensions, metric, now - timedelta(minutes=span), now ) pp(metric_data) if not q.ask("Do you want to see another metric (y/n)? ", q.is_yesno): done = True else: done = True print(f"Let's clean up.") q.ask("Press Enter when you're ready.") if use_metrics: print(f"Stopping metrics collection for {group_name}.") as_wrapper.disable_metrics(group_name) logger.info("Disabled metrics collection for group %s.", group_name) print( "You must terminate all instances in the group before you can delete the group." ) print("Set minimum size to 0.") as_wrapper.update_group(group_name, MinSize=0) group = as_wrapper.describe_group(group_name) instance_ids = [inst["InstanceId"] for inst in group["Instances"]] for inst_id in instance_ids: print(f"Stopping {inst_id}.") as_wrapper.terminate_instance(inst_id, True) logger.info("Terminated instance %s in group %s.", inst_id, group_name) print("Waiting for instances to stop...") wait_for_instances(instance_ids, as_wrapper) print(f"Deleting {group_name}.") as_wrapper.delete_autoscaling_group(group_name) logger.info("Deleted Auto Scaling group %s.", group_name) print("-" * 88) if template is not None: if q.ask( f"Do you want to delete launch template {template_name} used in this demo (y/n)? " ): svc_helper.delete_template(template_name) logger.info("Deleted launch template %s.", template_name) print("Template deleted.") print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": try: wrapper = AutoScalingWrapper(boto3.client("autoscaling")) helper = ServiceHelper(boto3.client("ec2"), boto3.resource("cloudwatch")) run_scenario(wrapper, helper) except Exception: logger.exception("Something went wrong with the demo!")
Definir las funciones a las que llama el escenario para administrar plantillas de lanzamiento y métricas. Estas funciones agrupan Amazon EC2 y CloudWatch sus acciones.
class ServiceHelper: """Encapsulates Amazon EC2 and CloudWatch actions for the example.""" def __init__(self, ec2_client, cloudwatch_resource): """ :param ec2_client: A Boto3 Amazon EC2 client. :param cloudwatch_resource: A Boto3 CloudWatch resource. """ self.ec2_client = ec2_client self.cloudwatch_resource = cloudwatch_resource def get_template(self, template_name: str) -> dict: """ Gets a launch template. Launch templates specify configuration for instances that are launched by Amazon EC2 Auto Scaling. :param template_name: The name of the template to look up. :return: The template, if it exists. :raises ClientError: If there is an error retrieving the launch template. """ try: response = self.ec2_client.describe_launch_templates( LaunchTemplateNames=[template_name] ) template = response["LaunchTemplates"][0] logger.info("Launch template %s retrieved successfully.", template_name) return template except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.NotFoundException" ): logger.warning("Launch template %s does not exist.", template_name) else: logger.error( "Couldn't verify launch template %s. Error: %s: %s", template_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def create_template(self, template_name: str, inst_type: str, ami_id: str) -> dict: """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. :param template_name: The name to give to the template. :param inst_type: The type of the instance, such as t1.micro. :param ami_id: The ID of the Amazon Machine Image (AMI) to use when creating an instance. :return: Information about the newly created template. :raises ClientError: If there is an error creating the launch template. """ try: response = self.ec2_client.create_launch_template( LaunchTemplateName=template_name, LaunchTemplateData={"InstanceType": inst_type, "ImageId": ami_id}, ) template = response["LaunchTemplate"] logger.info( "Created launch template %s with instance type %s and AMI ID %s.", template_name, inst_type, ami_id, ) return template except ClientError as err: logger.error( "Couldn't create launch template %s. Error: %s: %s", template_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_template(self, template_name: str) -> None: """ Deletes a launch template. :param template_name: The name of the template to delete. :raises ClientError: If there is an error deleting the launch template. """ try: self.ec2_client.delete_launch_template(LaunchTemplateName=template_name) logger.info("Deleted launch template %s.", template_name) except ClientError as err: logger.error( "Couldn't delete launch template %s. Error: %s: %s", template_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_availability_zones(self) -> list: """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. :raises ClientError: If there is an error retrieving availability zones. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone["ZoneName"] for zone in response["AvailabilityZones"]] logger.info("Retrieved availability zones: %s.", ", ".join(zones)) return zones except ClientError as err: logger.error( "Couldn't get availability zones. Error: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_metrics(self, namespace: str, dimensions: list) -> list: """ Gets a list of CloudWatch metrics filtered by namespace and dimensions. :param namespace: The namespace of the metrics to look up. :param dimensions: The dimensions of the metrics to look up. :return: The list of metrics. :raises ClientError: If there is an error retrieving CloudWatch metrics. """ try: metrics = list( self.cloudwatch_resource.metrics.filter( Namespace=namespace, Dimensions=dimensions ) ) logger.info( "Retrieved metrics for namespace %s with dimensions %s.", namespace, dimensions, ) return metrics except ClientError as err: logger.error( "Couldn't get metrics for %s, %s. Error: %s: %s", namespace, dimensions, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise @staticmethod def get_metric_statistics( dimensions: list, metric, start: datetime, end: datetime ) -> list: """ Gets statistics for a CloudWatch metric within a specified time span. :param dimensions: The dimensions of the metric. :param metric: The metric to look up. :param start: The start of the time span for retrieved metrics. :param end: The end of the time span for retrieved metrics. :return: The list of data points found for the specified metric. :raises ClientError: If there is an error retrieving metric statistics. """ try: response = metric.get_statistics( Dimensions=dimensions, StartTime=start, EndTime=end, Period=60, Statistics=["Sum"], ) data = response["Datapoints"] logger.info("Retrieved statistics for metric %s.", metric.name) return data except ClientError as err: logger.error( "Couldn't get statistics for metric %s. Error: %s: %s", metric.name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def print_simplified_group(group: dict) -> None: """ Prints a subset of data for an Auto Scaling group. :param group: The Auto Scaling group data to print. :return: None """ print(group["AutoScalingGroupName"]) print(f"\tLaunch template: {group['LaunchTemplate']['LaunchTemplateName']}") print( f"\tMin: {group['MinSize']}, Max: {group['MaxSize']}, Desired: {group['DesiredCapacity']}" ) if group["Instances"]: print(f"\tInstances:") for inst in group["Instances"]: print(f"\t\t{inst['InstanceId']}: {inst['LifecycleState']}") def wait_for_group(group_name: str, as_wrapper: AutoScalingWrapper) -> list: """ Waits for instances to start or stop in an Auto Scaling group. Prints the data for each instance after scaling activities are complete. :param group_name: The name of the Auto Scaling group. :param as_wrapper: The AutoScalingWrapper that manages Auto Scaling groups. :return: A list of instance IDs in the group. """ group = as_wrapper.describe_group(group_name) instance_ids = [i["InstanceId"] for i in group["Instances"]] return wait_for_instances(instance_ids, as_wrapper) def wait_for_instances(instance_ids: list, as_wrapper: AutoScalingWrapper) -> list: """ Waits for instances to start or stop in an Auto Scaling group. Prints the data for each instance after scaling activities are complete. :param instance_ids: A list of instance IDs to wait for. :param as_wrapper: The AutoScalingWrapper that manages Auto Scaling groups. :return: A list of instance IDs that were waited on. """ ready = False instances = [] while not ready: instances = as_wrapper.describe_instances(instance_ids) if instance_ids else [] if all([x["LifecycleState"] in ["Terminated", "InService"] for x in instances]): ready = True else: wait(10) if instances: print( f"Here are the details of the instance{'s' if len(instances) > 1 else ''}:" ) for instance in instances: pp(instance) return instance_ids
-
Para API obtener más información, consulte los siguientes temas en la sección AWS SDKde referencia sobre Python (Boto3). API
-
Acciones
En el siguiente ejemplo de código se muestra cómo usar AttachLoadBalancerTargetGroups
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def attach_load_balancer_target_group( self, lb_target_group: Dict[str, Any] ) -> None: """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forwards requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group["TargetGroupArn"]], ) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group["TargetGroupName"], self.group_name, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to attach load balancer target group '{lb_target_group['TargetGroupName']}'." ) if error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) elif error_code == "ServiceLinkedRoleFailure": log.error( "The operation failed because the service-linked role is not ready or does not exist. " "Check that the service-linked role exists and is correctly configured." ) log.error(f"Full error:\n\t{err}")
-
Para API obtener más información, consulte AttachLoadBalancerTargetGroupsla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar CreateAutoScalingGroup
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def create_group( self, group_name: str, group_zones: List[str], launch_template_name: str, min_size: int, max_size: int, ) -> None: """ Creates an Auto Scaling group. :param group_name: The name to give to the group. :param group_zones: The Availability Zones in which instances can be created. :param launch_template_name: The name of an existing Amazon EC2 launch template. The launch template specifies the configuration of instances that are created by auto scaling activities. :param min_size: The minimum number of active instances in the group. :param max_size: The maximum number of active instances in the group. :return: None :raises ClientError: If there is an error creating the Auto Scaling group. """ try: self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=group_name, AvailabilityZones=group_zones, LaunchTemplate={ "LaunchTemplateName": launch_template_name, "Version": "$Default", }, MinSize=min_size, MaxSize=max_size, ) # Wait for the group to exist. waiter = self.autoscaling_client.get_waiter("group_exists") waiter.wait(AutoScalingGroupNames=[group_name]) logger.info(f"Successfully created Auto Scaling group {group_name}.") except ClientError as err: error_code = err.response["Error"]["Code"] logger.error(f"Failed to create Auto Scaling group {group_name}.") if error_code == "AlreadyExistsFault": logger.error( f"An Auto Scaling group with the name '{group_name}' already exists. " "Please use a different name or update the existing group.", ) elif error_code == "LimitExceededFault": logger.error( "The request failed because you have reached the limit " "on the number of Auto Scaling groups or launch configurations. " "Consider deleting unused resources or request a limit increase. " "\nSee Auto Scaling Service Quota documentation here:" "\n\thttps://docs.aws.amazon.com/autoscaling/ec2/userguide/ec2-auto-scaling-quotas.html" ) logger.error(f"Full error:\n\t{err}") raise
-
Para API obtener más información, consulte CreateAutoScalingGroupla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DeleteAutoScalingGroup
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Actualice el tamaño mínimo de un grupo de escalado automático a cero, finalice todas las instancias del grupo y elimine el grupo.
class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def delete_autoscaling_group(self, group_name: str) -> None: """ Terminates all instances in the group, then deletes the EC2 Auto Scaling group. :param group_name: The name of the group to delete. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[group_name] ) groups = response.get("AutoScalingGroups", []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=group_name, MinSize=0 ) instance_ids = [inst["InstanceId"] for inst in groups[0]["Instances"]] for inst_id in instance_ids: self.terminate_instance(inst_id) # Wait for all instances to be terminated if instance_ids: waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for all instances to be terminated...") waiter.wait(InstanceIds=instance_ids) log.info("All instances have been terminated.") else: log.info(f"No groups found named '{group_name}'! Nothing to do.") except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete Auto Scaling group '{group_name}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to delete the group again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the group." ) log.error(f"Full error:\n\t{err}")
-
Para API obtener más información, consulte DeleteAutoScalingGroupla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DescribeAutoScalingGroups
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def describe_group(self, group_name: str) -> Optional[Dict[str, Any]]: """ Gets information about an Auto Scaling group. :param group_name: The name of the group to look up. :return: A dictionary with information about the group if found, otherwise None. :raises ClientError: If there is an error describing the Auto Scaling group. """ try: paginator = self.autoscaling_client.get_paginator( "describe_auto_scaling_groups" ) response_iterator = paginator.paginate(AutoScalingGroupNames=[group_name]) groups = [] for response in response_iterator: groups.extend(response.get("AutoScalingGroups", [])) logger.info( f"Successfully retrieved information for Auto Scaling group {group_name}." ) except ClientError as err: error_code = err.response["Error"]["Code"] logger.error(f"Failed to describe Auto Scaling group {group_name}.") if error_code == "ResourceContentionFault": logger.error( "There is a conflict with another operation that is modifying the " f"Auto Scaling group '{group_name}' Please try again later." ) logger.error(f"Full error:\n\t{err}") raise else: return groups[0] if len(groups) > 0 else None
-
Para API obtener más información, consulte DescribeAutoScalingGroupsla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DescribeAutoScalingInstances
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def describe_instances(self, instance_ids: List[str]) -> List[Dict[str, Any]]: """ Gets information about instances. :param instance_ids: A list of instance IDs to look up. :return: A list of dictionaries with information about each instance, or an empty list if none are found. :raises ClientError: If there is an error describing the instances. """ try: paginator = self.autoscaling_client.get_paginator( "describe_auto_scaling_instances" ) response_iterator = paginator.paginate(InstanceIds=instance_ids) instances = [] for response in response_iterator: instances.extend(response.get("AutoScalingInstances", [])) logger.info(f"Successfully described instances: {instance_ids}") except ClientError as err: error_code = err.response["Error"]["Code"] logger.error( f"Couldn't describe instances {instance_ids}. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) raise else: return instances
-
Para API obtener más información, consulte DescribeAutoScalingInstancesla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DescribeScalingActivities
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def describe_scaling_activities(self, group_name: str) -> List[Dict[str, Any]]: """ Gets information about scaling activities for the group. Scaling activities are things like instances stopping or starting in response to user requests or capacity changes. :param group_name: The name of the group to look up. :return: A list of dictionaries representing the scaling activities for the group, ordered with the most recent activity first. :raises ClientError: If there is an error describing the scaling activities. """ try: paginator = self.autoscaling_client.get_paginator( "describe_scaling_activities" ) response_iterator = paginator.paginate(AutoScalingGroupName=group_name) activities = [] for response in response_iterator: activities.extend(response.get("Activities", [])) logger.info( f"Successfully described scaling activities for group '{group_name}'." ) except ClientError as err: error_code = err.response["Error"]["Code"] logger.error( f"Couldn't describe scaling activities for group '{group_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "ResourceContentionFault": logger.error( f"There is a conflict with another operation that is modifying the Auto Scaling group '{group_name}'. " "Please try again later." ) raise else: return activities
-
Para API obtener más información, consulte DescribeScalingActivitiesla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DisableMetricsCollection
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def disable_metrics(self, group_name: str) -> Dict[str, Any]: """ Stops CloudWatch metric collection for the Auto Scaling group. :param group_name: The name of the group. :return: A dictionary with the response from disabling the metrics collection. :raises ClientError: If there is an error disabling metrics collection. """ try: response = self.autoscaling_client.disable_metrics_collection( AutoScalingGroupName=group_name ) logger.info( f"Successfully disabled metrics collection for group '{group_name}'." ) return response except ClientError as err: error_code = err.response["Error"]["Code"] logger.error( f"Couldn't disable metrics for group '{group_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "ResourceContentionFault": logger.error( f"There is a conflict with another operation that is modifying the Auto Scaling group '{group_name}'. " "Please try again later." ) raise
-
Para API obtener más información, consulte DisableMetricsCollectionla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar EnableMetricsCollection
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def enable_metrics(self, group_name: str, metrics: List[str]) -> Dict[str, Any]: """ Enables CloudWatch metric collection for Amazon EC2 Auto Scaling activities. :param group_name: The name of the group to enable. :param metrics: A list of metrics to collect. :return: A dictionary with the response from enabling the metrics collection. :raises ClientError: If there is an error enabling metrics collection. """ try: response = self.autoscaling_client.enable_metrics_collection( AutoScalingGroupName=group_name, Metrics=metrics, Granularity="1Minute" ) logger.info( f"Successfully enabled metrics for Auto Scaling group '{group_name}'." ) except ClientError as err: error_code = err.response["Error"]["Code"] logger.error( f"Couldn't enable metrics on '{group_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "ResourceContentionFault": logger.error( f"There is a conflict with another operation that is modifying the Auto Scaling group '{group_name}'. " "Please try again later." ) elif error_code == "InvalidParameterCombination": logger.error( f"The combination of parameters provided for enabling metrics on '{group_name}' is not valid. " "Please check the parameters and try again." ) raise else: return response
-
Para API obtener más información, consulte EnableMetricsCollectionla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar SetDesiredCapacity
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def set_desired_capacity(self, group_name: str, capacity: int) -> None: """ Sets the desired capacity of the group. Amazon EC2 Auto Scaling tries to keep the number of running instances equal to the desired capacity. :param group_name: The name of the group to update. :param capacity: The desired number of running instances. :return: None :raises ClientError: If there is an error setting the desired capacity. """ try: self.autoscaling_client.set_desired_capacity( AutoScalingGroupName=group_name, DesiredCapacity=capacity, HonorCooldown=False, ) logger.info( f"Successfully set desired capacity of {capacity} for Auto Scaling group '{group_name}'." ) except ClientError as err: error_code = err.response["Error"]["Code"] logger.error( f"Failed to set desired capacity for Auto Scaling group '{group_name}'." ) if error_code == "ScalingActivityInProgress": logger.error( f"A scaling activity is currently in progress for the Auto Scaling group '{group_name}'. " "Please wait for the activity to complete before attempting to set the desired capacity." ) logger.error(f"Full error:\n\t{err}") raise
-
Para API obtener más información, consulte SetDesiredCapacityla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar TerminateInstanceInAutoScalingGroup
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def terminate_instance( self, instance_id: str, decrease_capacity: bool ) -> Dict[str, Any]: """ Stops an instance. :param instance_id: The ID of the instance to stop. :param decrease_capacity: Specifies whether to decrease the desired capacity of the group. When passing True for this parameter, you can stop an instance without having a replacement instance start when the desired capacity threshold is crossed. :return: A dictionary containing details of the scaling activity that occurs in response to this action. :raises ClientError: If there is an error terminating the instance. """ try: response = self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=decrease_capacity ) logger.info(f"Successfully terminated instance {instance_id}.") return response["Activity"] except ClientError as err: error_code = err.response["Error"]["Code"] logger.error(f"Failed to terminate instance {instance_id}.") if error_code == "ScalingActivityInProgress": logger.error( "A scaling activity is currently in progress for the Auto Scaling group " f"associated with instance '{instance_id}'. " "Please wait for the activity to complete before attempting to terminate the instance." ) elif error_code == "ResourceInUse": logger.error( f"The instance '{instance_id}' or an associated resource is currently in use " "and cannot be terminated. " "Ensure the instance is not involved in any ongoing processes and try again." ) logger.error(f"Full error:\n\t{err}") raise
-
Para API obtener más información, consulte TerminateInstanceInAutoScalingGroupla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar UpdateAutoScalingGroup
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class AutoScalingWrapper: """Encapsulates Amazon EC2 Auto Scaling actions.""" def __init__(self, autoscaling_client): """ :param autoscaling_client: A Boto3 Amazon EC2 Auto Scaling client. """ self.autoscaling_client = autoscaling_client def update_group(self, group_name: str, **kwargs: Any) -> None: """ Updates an Auto Scaling group. :param group_name: The name of the group to update. :param kwargs: Keyword arguments to pass through to the service. :return: None :raises ClientError: If there is an error updating the Auto Scaling group. """ try: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=group_name, **kwargs ) logger.info(f"Successfully updated Auto Scaling group {group_name}.") except ClientError as err: error_code = err.response["Error"]["Code"] logger.error(f"Failed to update Auto Scaling group {group_name}.") if error_code == "ResourceInUse": logger.error( "The Auto Scaling group '%s' is currently in use and cannot be modified. Please try again later.", group_name, ) elif error_code == "ScalingActivityInProgress": logger.error( f"A scaling activity is currently in progress for the Auto Scaling group '{group_name}'." "Please wait for the activity to complete before attempting to update the group." ) logger.error(f"Full error:\n\t{err}") raise
-
Para API obtener más información, consulte UpdateAutoScalingGroupla AWS SDKreferencia de Python (Boto3). API
-
Escenarios
El siguiente ejemplo de código muestra cómo crear un servicio web con equilibrio de carga que muestre recomendaciones de libros, películas y canciones. El ejemplo muestra cómo responde el servicio a los errores y cómo reestructurarlo para aumentar la resiliencia cuando se produzcan errores.
Utilice un grupo de Amazon EC2 Auto Scaling para crear instancias de Amazon Elastic Compute Cloud (AmazonEC2) basadas en una plantilla de lanzamiento y para mantener el número de instancias en un rango específico.
Gestione y distribuya HTTP las solicitudes con Elastic Load Balancing.
Supervise el estado de las instancias de un grupo de escalado automático y reenvíe las solicitudes solo a las instancias en buen estado.
Ejecuta un servidor web Python en cada EC2 instancia para gestionar HTTP las solicitudes. El servidor web responde con recomendaciones y comprobaciones de estado.
Simule un servicio de recomendaciones con una tabla de Amazon DynamoDB.
Controle la respuesta del servidor web a las solicitudes y las comprobaciones de estado actualizando AWS Systems Manager los parámetros.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Ejecute el escenario interactivo en un símbolo del sistema.
class Runner: """ Manages the deployment, demonstration, and destruction of resources for the resilient service. """ def __init__( self, resource_path: str, recommendation: RecommendationService, autoscaler: AutoScalingWrapper, loadbalancer: ElasticLoadBalancerWrapper, param_helper: ParameterHelper, ): """ Initializes the Runner class with the necessary parameters. :param resource_path: The path to resource files used by this example, such as IAM policies and instance scripts. :param recommendation: An instance of the RecommendationService class. :param autoscaler: An instance of the AutoScaler class. :param loadbalancer: An instance of the LoadBalancer class. :param param_helper: An instance of the ParameterHelper class. """ self.resource_path = resource_path self.recommendation = recommendation self.autoscaler = autoscaler self.loadbalancer = loadbalancer self.param_helper = param_helper self.protocol = "HTTP" self.port = 80 self.ssh_port = 22 prefix = "doc-example-resilience" self.target_group_name = f"{prefix}-tg" self.load_balancer_name = f"{prefix}-lb" def deploy(self) -> None: """ Deploys the resources required for the resilient service, including the DynamoDB table, EC2 instances, Auto Scaling group, and load balancer. """ recommendations_path = f"{self.resource_path}/recommendations.json" startup_script = f"{self.resource_path}/server_startup_script.sh" instance_policy = f"{self.resource_path}/instance_policy.json" logging.info("Starting deployment of resources for the resilient service.") logging.info( "Creating and populating DynamoDB table '%s'.", self.recommendation.table_name, ) self.recommendation.create() self.recommendation.populate(recommendations_path) logging.info( "Creating an EC2 launch template with the startup script '%s'.", startup_script, ) self.autoscaler.create_template(startup_script, instance_policy) logging.info( "Creating an EC2 Auto Scaling group across multiple Availability Zones." ) zones = self.autoscaler.create_autoscaling_group(3) logging.info("Creating variables that control the flow of the demo.") self.param_helper.reset() logging.info("Creating Elastic Load Balancing target group and load balancer.") vpc = self.autoscaler.get_default_vpc() subnets = self.autoscaler.get_subnets(vpc["VpcId"], zones) target_group = self.loadbalancer.create_target_group( self.target_group_name, self.protocol, self.port, vpc["VpcId"] ) self.loadbalancer.create_load_balancer( self.load_balancer_name, [subnet["SubnetId"] for subnet in subnets] ) self.loadbalancer.create_listener(self.load_balancer_name, target_group) self.autoscaler.attach_load_balancer_target_group(target_group) logging.info("Verifying access to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) current_ip_address = requests.get("http://checkip.amazonaws.com").text.strip() if not lb_success: logging.warning( "Couldn't connect to the load balancer. Verifying that the port is open..." ) sec_group, port_is_open = self.autoscaler.verify_inbound_port( vpc, self.port, current_ip_address ) sec_group, ssh_port_is_open = self.autoscaler.verify_inbound_port( vpc, self.ssh_port, current_ip_address ) if not port_is_open: logging.warning( "The default security group for your VPC must allow access from this computer." ) if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound traffic on port {self.port} from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.port, current_ip_address ) if not ssh_port_is_open: if q.ask( f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound SSH traffic on port {self.ssh_port} for debugging from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno, ): self.autoscaler.open_inbound_port( sec_group["GroupId"], self.ssh_port, current_ip_address ) lb_success = self.loadbalancer.verify_load_balancer_endpoint(endpoint) if lb_success: logging.info( "Load balancer is ready. Access it at: http://%s", current_ip_address ) else: logging.error( "Couldn't get a successful response from the load balancer endpoint. Please verify your VPC and security group settings." ) def demo_choices(self) -> None: """ Presents choices for interacting with the deployed service, such as sending requests to the load balancer or checking the health of the targets. """ actions = [ "Send a GET request to the load balancer endpoint.", "Check the health of load balancer targets.", "Go to the next part of the demo.", ] choice = 0 while choice != 2: logging.info("Choose an action to interact with the service.") choice = q.choose("Which action would you like to take? ", actions) if choice == 0: logging.info("Sending a GET request to the load balancer endpoint.") endpoint = self.loadbalancer.get_endpoint(self.load_balancer_name) logging.info("GET http://%s", endpoint) response = requests.get(f"http://{endpoint}") logging.info("Response: %s", response.status_code) if response.headers.get("content-type") == "application/json": pp(response.json()) elif choice == 1: logging.info("Checking the health of load balancer targets.") health = self.loadbalancer.check_target_health(self.target_group_name) for target in health: state = target["TargetHealth"]["State"] logging.info( "Target %s on port %d is %s", target["Target"]["Id"], target["Target"]["Port"], state, ) if state != "healthy": logging.warning( "%s: %s", target["TargetHealth"]["Reason"], target["TargetHealth"]["Description"], ) logging.info( "Note that it can take a minute or two for the health check to update." ) elif choice == 2: logging.info("Proceeding to the next part of the demo.") def demo(self) -> None: """ Runs the demonstration, showing how the service responds to different failure scenarios and how a resilient architecture can keep the service running. """ ssm_only_policy = f"{self.resource_path}/ssm_only_policy.json" logging.info("Resetting parameters to starting values for the demo.") self.param_helper.reset() logging.info( "Starting demonstration of the service's resilience under various failure conditions." ) self.demo_choices() logging.info( "Simulating failure by changing the Systems Manager parameter to a non-existent table." ) self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info("Sending GET requests will now return failure codes.") self.demo_choices() logging.info("Switching to static response mode to mitigate failure.") self.param_helper.put(self.param_helper.failure_response, "static") logging.info("Sending GET requests will now return static responses.") self.demo_choices() logging.info("Restoring normal operation of the recommendation service.") self.param_helper.put(self.param_helper.table, self.recommendation.table_name) logging.info( "Introducing a failure by assigning bad credentials to one of the instances." ) self.autoscaler.create_instance_profile( ssm_only_policy, self.autoscaler.bad_creds_policy_name, self.autoscaler.bad_creds_role_name, self.autoscaler.bad_creds_profile_name, ["AmazonSSMManagedInstanceCore"], ) instances = self.autoscaler.get_instances() bad_instance_id = instances[0] instance_profile = self.autoscaler.get_instance_profile(bad_instance_id) logging.info( "Replacing instance profile with bad credentials for instance %s.", bad_instance_id, ) self.autoscaler.replace_instance_profile( bad_instance_id, self.autoscaler.bad_creds_profile_name, instance_profile["AssociationId"], ) logging.info( "Sending GET requests may return either a valid recommendation or a static response." ) self.demo_choices() logging.info("Implementing deep health checks to detect unhealthy instances.") self.param_helper.put(self.param_helper.health_check, "deep") logging.info("Checking the health of the load balancer targets.") self.demo_choices() logging.info( "Terminating the unhealthy instance to let the auto scaler replace it." ) self.autoscaler.terminate_instance(bad_instance_id) logging.info("The service remains resilient during instance replacement.") self.demo_choices() logging.info("Simulating a complete failure of the recommendation service.") self.param_helper.put(self.param_helper.table, "this-is-not-a-table") logging.info( "All instances will report as unhealthy, but the service will still return static responses." ) self.demo_choices() self.param_helper.reset() def destroy(self, automation=False) -> None: """ Destroys all resources created for the demo, including the load balancer, Auto Scaling group, EC2 instances, and DynamoDB table. """ logging.info( "This concludes the demo. Preparing to clean up all AWS resources created during the demo." ) if automation: cleanup = True else: cleanup = q.ask( "Do you want to clean up all demo resources? (y/n) ", q.is_yesno ) if cleanup: logging.info("Deleting load balancer and related resources.") self.loadbalancer.delete_load_balancer(self.load_balancer_name) self.loadbalancer.delete_target_group(self.target_group_name) self.autoscaler.delete_autoscaling_group(self.autoscaler.group_name) self.autoscaler.delete_key_pair() self.autoscaler.delete_template() self.autoscaler.delete_instance_profile( self.autoscaler.bad_creds_profile_name, self.autoscaler.bad_creds_role_name, ) logging.info("Deleting DynamoDB table and other resources.") self.recommendation.destroy() else: logging.warning( "Resources have not been deleted. Ensure you clean them up manually to avoid unexpected charges." ) def main() -> None: """ Main function to parse arguments and run the appropriate actions for the demo. """ parser = argparse.ArgumentParser() parser.add_argument( "--action", required=True, choices=["all", "deploy", "demo", "destroy"], help="The action to take for the demo. When 'all' is specified, resources are\n" "deployed, the demo is run, and resources are destroyed.", ) parser.add_argument( "--resource_path", default="../../../workflows/resilient_service/resources", help="The path to resource files used by this example, such as IAM policies and\n" "instance scripts.", ) args = parser.parse_args() logging.info("Starting the Resilient Service demo.") prefix = "doc-example-resilience" # Service Clients ddb_client = boto3.client("dynamodb") elb_client = boto3.client("elbv2") autoscaling_client = boto3.client("autoscaling") ec2_client = boto3.client("ec2") ssm_client = boto3.client("ssm") iam_client = boto3.client("iam") # Wrapper instantiations recommendation = RecommendationService( "doc-example-recommendation-service", ddb_client ) autoscaling_wrapper = AutoScalingWrapper( prefix, "t3.micro", "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2", autoscaling_client, ec2_client, ssm_client, iam_client, ) elb_wrapper = ElasticLoadBalancerWrapper(elb_client) param_helper = ParameterHelper(recommendation.table_name, ssm_client) # Demo invocation runner = Runner( args.resource_path, recommendation, autoscaling_wrapper, elb_wrapper, param_helper, ) actions = [args.action] if args.action != "all" else ["deploy", "demo", "destroy"] for action in actions: if action == "deploy": runner.deploy() elif action == "demo": runner.demo() elif action == "destroy": runner.destroy() logging.info("Demo completed successfully.") if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") main()
Crea una clase que agrupa las EC2 acciones de Auto Scaling y Amazon.
class AutoScalingWrapper: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix: str, inst_type: str, ami_param: str, autoscaling_client: boto3.client, ec2_client: boto3.client, ssm_client: boto3.client, iam_client: boto3.client, ): """ Initializes the AutoScaler class with the necessary parameters. :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client sts_client = boto3.client("sts") self.account_id = sts_client.get_caller_identity()["Account"] self.key_pair_name = f"{resource_prefix}-key-pair" self.launch_template_name = f"{resource_prefix}-template-" self.group_name = f"{resource_prefix}-group" # Happy path self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" # Failure mode self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" def create_policy(self, policy_file: str, policy_name: str) -> str: """ Creates a new IAM policy or retrieves the ARN of an existing policy. :param policy_file: The path to a JSON file that contains the policy definition. :param policy_name: The name to give the created policy. :return: The ARN of the created or existing policy. """ with open(policy_file) as file: policy_doc = file.read() try: response = self.iam_client.create_policy( PolicyName=policy_name, PolicyDocument=policy_doc ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' created successfully. ARN: {policy_arn}") return policy_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the policy already exists, get its ARN response = self.iam_client.get_policy( PolicyArn=f"arn:aws:iam::{self.account_id}:policy/{policy_name}" ) policy_arn = response["Policy"]["Arn"] log.info(f"Policy '{policy_name}' already exists. ARN: {policy_arn}") return policy_arn log.error(f"Full error:\n\t{err}") def create_role(self, role_name: str, assume_role_doc: dict) -> str: """ Creates a new IAM role or retrieves the ARN of an existing role. :param role_name: The name to give the created role. :param assume_role_doc: The assume role policy document that specifies which entities can assume the role. :return: The ARN of the created or existing role. """ try: response = self.iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_doc) ) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' created successfully. ARN: {role_arn}") return role_arn except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": # If the role already exists, get its ARN response = self.iam_client.get_role(RoleName=role_name) role_arn = response["Role"]["Arn"] log.info(f"Role '{role_name}' already exists. ARN: {role_arn}") return role_arn log.error(f"Full error:\n\t{err}") def attach_policy( self, role_name: str, policy_arn: str, aws_managed_policies: Tuple[str, ...] = (), ) -> None: """ Attaches an IAM policy to a role and optionally attaches additional AWS-managed policies. :param role_name: The name of the role to attach the policy to. :param policy_arn: The ARN of the policy to attach. :param aws_managed_policies: A tuple of AWS-managed policy names to attach to the role. """ try: self.iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) for aws_policy in aws_managed_policies: self.iam_client.attach_role_policy( RoleName=role_name, PolicyArn=f"arn:aws:iam::aws:policy/{aws_policy}", ) log.info(f"Attached policy {policy_arn} to role {role_name}.") except ClientError as err: log.error(f"Failed to attach policy {policy_arn} to role {role_name}.") log.error(f"Full error:\n\t{err}") def create_instance_profile( self, policy_file: str, policy_name: str, role_name: str, profile_name: str, aws_managed_policies: Tuple[str, ...] = (), ) -> str: """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = self.create_policy(policy_file, policy_name) self.create_role(role_name, assume_role_doc) self.attach_policy(role_name, policy_arn, aws_managed_policies) try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name ) waiter = self.iam_client.get_waiter("instance_profile_exists") waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response["InstanceProfile"]["Arn"] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response["Error"]["Code"] == "EntityAlreadyExists": prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name ) profile_arn = prof_response["InstanceProfile"]["Arn"] log.info( "Instance profile %s already exists, nothing to do.", profile_name ) log.error(f"Full error:\n\t{err}") return profile_arn def get_instance_profile(self, instance_id: str) -> Dict[str, Any]: """ Gets data about the profile associated with an instance. :param instance_id: The ID of the instance to look up. :return: The profile data. """ try: response = self.ec2_client.describe_iam_instance_profile_associations( Filters=[{"Name": "instance-id", "Values": [instance_id]}] ) if not response["IamInstanceProfileAssociations"]: log.info(f"No instance profile found for instance {instance_id}.") profile_data = response["IamInstanceProfileAssociations"][0] log.info(f"Retrieved instance profile for instance {instance_id}.") return profile_data except ClientError as err: log.error( f"Failed to retrieve instance profile for instance {instance_id}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidInstanceID.NotFound": log.error(f"The instance ID '{instance_id}' does not exist.") log.error(f"Full error:\n\t{err}") def replace_instance_profile( self, instance_id: str, new_instance_profile_name: str, profile_association_id: str, ) -> None: """ Replaces the profile associated with a running instance. After the profile is replaced, the instance is rebooted to ensure that it uses the new profile. When the instance is ready, Systems Manager is used to restart the Python web server. :param instance_id: The ID of the instance to restart. :param new_instance_profile_name: The name of the new profile to associate with the specified instance. :param profile_association_id: The ID of the existing profile association for the instance. """ try: self.ec2_client.replace_iam_instance_profile_association( IamInstanceProfile={"Name": new_instance_profile_name}, AssociationId=profile_association_id, ) log.info( "Replaced instance profile for association %s with profile %s.", profile_association_id, new_instance_profile_name, ) time.sleep(5) self.ec2_client.reboot_instances(InstanceIds=[instance_id]) log.info("Rebooting instance %s.", instance_id) waiter = self.ec2_client.get_waiter("instance_running") log.info("Waiting for instance %s to be running.", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info("Instance %s is now running.", instance_id) self.ssm_client.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", Parameters={"commands": ["cd / && sudo python3 server.py 80"]}, ) log.info(f"Restarted the Python web server on instance '{instance_id}'.") except ClientError as err: log.error("Failed to replace instance profile.") error_code = err.response["Error"]["Code"] if error_code == "InvalidAssociationID.NotFound": log.error( f"Association ID '{profile_association_id}' does not exist." "Please check the association ID and try again." ) if error_code == "InvalidInstanceId": log.error( f"The specified instance ID '{instance_id}' does not exist or is not available for SSM. " f"Please verify the instance ID and try again." ) log.error(f"Full error:\n\t{err}") def delete_instance_profile(self, profile_name: str, role_name: str) -> None: """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name ) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name ) for pol in attached_policies["AttachedPolicies"]: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol["PolicyArn"] ) if not pol["PolicyArn"].startswith("arn:aws:iam::aws"): self.iam_client.delete_policy(PolicyArn=pol["PolicyArn"]) log.info("Detached and deleted policy %s.", pol["PolicyName"]) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: log.error( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}" ) if err.response["Error"]["Code"] == "NoSuchEntity": log.info( "Instance profile %s doesn't exist, nothing to do.", profile_name ) def create_key_pair(self, key_pair_name: str) -> None: """ Creates a new key pair. :param key_pair_name: The name of the key pair to create. """ try: response = self.ec2_client.create_key_pair(KeyName=key_pair_name) with open(f"{key_pair_name}.pem", "w") as file: file.write(response["KeyMaterial"]) chmod(f"{key_pair_name}.pem", 0o600) log.info("Created key pair %s.", key_pair_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to create key pair {key_pair_name}.") if error_code == "InvalidKeyPair.Duplicate": log.error(f"A key pair with the name '{key_pair_name}' already exists.") log.error(f"Full error:\n\t{err}") def delete_key_pair(self) -> None: """ Deletes a key pair. """ try: self.ec2_client.delete_key_pair(KeyName=self.key_pair_name) remove(f"{self.key_pair_name}.pem") log.info("Deleted key pair %s.", self.key_pair_name) except ClientError as err: log.error(f"Couldn't delete key pair '{self.key_pair_name}'.") log.error(f"Full error:\n\t{err}") except FileNotFoundError as err: log.info("Key pair %s doesn't exist, nothing to do.", self.key_pair_name) log.error(f"Full error:\n\t{err}") def create_template( self, server_startup_script_file: str, instance_policy_file: str ) -> Dict[str, Any]: """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. The launch template specifies a Bash script in its user data field that runs after the instance is started. This script installs Python packages and starts a Python web server on the instance. :param server_startup_script_file: The path to a Bash script file that is run when an instance starts. :param instance_policy_file: The path to a file that defines a permissions policy to create and attach to the instance profile. :return: Information about the newly created template. """ template = {} try: # Create key pair and instance profile self.create_key_pair(self.key_pair_name) self.create_instance_profile( instance_policy_file, self.instance_policy_name, self.instance_role_name, self.instance_profile_name, ) # Read the startup script with open(server_startup_script_file) as file: start_server_script = file.read() # Get the latest AMI ID ami_latest = self.ssm_client.get_parameter(Name=self.ami_param) ami_id = ami_latest["Parameter"]["Value"] # Create the launch template lt_response = self.ec2_client.create_launch_template( LaunchTemplateName=self.launch_template_name, LaunchTemplateData={ "InstanceType": self.inst_type, "ImageId": ami_id, "IamInstanceProfile": {"Name": self.instance_profile_name}, "UserData": base64.b64encode( start_server_script.encode(encoding="utf-8") ).decode(encoding="utf-8"), "KeyName": self.key_pair_name, }, ) template = lt_response["LaunchTemplate"] log.info( f"Created launch template {self.launch_template_name} for AMI {ami_id} on {self.inst_type}." ) except ClientError as err: log.error(f"Failed to create launch template {self.launch_template_name}.") error_code = err.response["Error"]["Code"] if error_code == "InvalidLaunchTemplateName.AlreadyExistsException": log.info( f"Launch template {self.launch_template_name} already exists, nothing to do." ) log.error(f"Full error:\n\t{err}") return template def delete_template(self): """ Deletes a launch template. """ try: self.ec2_client.delete_launch_template( LaunchTemplateName=self.launch_template_name ) self.delete_instance_profile( self.instance_profile_name, self.instance_role_name ) log.info("Launch template %s deleted.", self.launch_template_name) except ClientError as err: if ( err.response["Error"]["Code"] == "InvalidLaunchTemplateName.NotFoundException" ): log.info( "Launch template %s does not exist, nothing to do.", self.launch_template_name, ) log.error(f"Full error:\n\t{err}") def get_availability_zones(self) -> List[str]: """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone["ZoneName"] for zone in response["AvailabilityZones"]] log.info(f"Retrieved {len(zones)} availability zones: {zones}.") except ClientError as err: log.error("Failed to retrieve availability zones.") log.error(f"Full error:\n\t{err}") else: return zones def create_autoscaling_group(self, group_size: int) -> List[str]: """ Creates an EC2 Auto Scaling group with the specified size. :param group_size: The number of instances to set for the minimum and maximum in the group. :return: The list of Availability Zones specified for the group. """ try: zones = self.get_availability_zones() self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=self.group_name, AvailabilityZones=zones, LaunchTemplate={ "LaunchTemplateName": self.launch_template_name, "Version": "$Default", }, MinSize=group_size, MaxSize=group_size, ) log.info( f"Created EC2 Auto Scaling group {self.group_name} with availability zones {zones}." ) except ClientError as err: error_code = err.response["Error"]["Code"] if error_code == "AlreadyExists": log.info( f"EC2 Auto Scaling group {self.group_name} already exists, nothing to do." ) else: log.error(f"Failed to create EC2 Auto Scaling group {self.group_name}.") log.error(f"Full error:\n\t{err}") else: return zones def get_instances(self) -> List[str]: """ Gets data about the instances in the EC2 Auto Scaling group. :return: A list of instance IDs in the Auto Scaling group. """ try: as_response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name] ) instance_ids = [ i["InstanceId"] for i in as_response["AutoScalingGroups"][0]["Instances"] ] log.info( f"Retrieved {len(instance_ids)} instances for Auto Scaling group {self.group_name}." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to retrieve instances for Auto Scaling group {self.group_name}." ) if error_code == "ResourceNotFound": log.error(f"The Auto Scaling group '{self.group_name}' does not exist.") log.error(f"Full error:\n\t{err}") else: return instance_ids def terminate_instance(self, instance_id: str, decrementsetting=False) -> None: """ Terminates an instance in an EC2 Auto Scaling group. After an instance is terminated, it can no longer be accessed. :param instance_id: The ID of the instance to terminate. :param decrementsetting: If True, do not replace terminated instances. """ try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=decrementsetting, ) log.info("Terminated instance %s.", instance_id) # Adding a waiter to ensure the instance is terminated waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for instance %s to be terminated...", instance_id) waiter.wait(InstanceIds=[instance_id]) log.info( f"Instance '{instance_id}' has been terminated and will be replaced." ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to terminate instance '{instance_id}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to terminate the instance again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) log.error(f"Full error:\n\t{err}") def attach_load_balancer_target_group( self, lb_target_group: Dict[str, Any] ) -> None: """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forwards requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group["TargetGroupArn"]], ) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group["TargetGroupName"], self.group_name, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to attach load balancer target group '{lb_target_group['TargetGroupName']}'." ) if error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the resource." ) elif error_code == "ServiceLinkedRoleFailure": log.error( "The operation failed because the service-linked role is not ready or does not exist. " "Check that the service-linked role exists and is correctly configured." ) log.error(f"Full error:\n\t{err}") def delete_autoscaling_group(self, group_name: str) -> None: """ Terminates all instances in the group, then deletes the EC2 Auto Scaling group. :param group_name: The name of the group to delete. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[group_name] ) groups = response.get("AutoScalingGroups", []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=group_name, MinSize=0 ) instance_ids = [inst["InstanceId"] for inst in groups[0]["Instances"]] for inst_id in instance_ids: self.terminate_instance(inst_id) # Wait for all instances to be terminated if instance_ids: waiter = self.ec2_client.get_waiter("instance_terminated") log.info("Waiting for all instances to be terminated...") waiter.wait(InstanceIds=instance_ids) log.info("All instances have been terminated.") else: log.info(f"No groups found named '{group_name}'! Nothing to do.") except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete Auto Scaling group '{group_name}'.") if error_code == "ScalingActivityInProgressFault": log.error( "Scaling activity is currently in progress. " "Wait for the scaling activity to complete before attempting to delete the group again." ) elif error_code == "ResourceContentionFault": log.error( "The request failed due to a resource contention issue. " "Ensure that no conflicting operations are being performed on the group." ) log.error(f"Full error:\n\t{err}") def get_default_vpc(self) -> Dict[str, Any]: """ Gets the default VPC for the account. :return: Data about the default VPC. """ try: response = self.ec2_client.describe_vpcs( Filters=[{"Name": "is-default", "Values": ["true"]}] ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error("Failed to retrieve the default VPC.") if error_code == "UnauthorizedOperation": log.error( "You do not have the necessary permissions to describe VPCs. " "Ensure that your AWS IAM user or role has the correct permissions." ) elif error_code == "InvalidParameterValue": log.error( "One or more parameters are invalid. Check the request parameters." ) log.error(f"Full error:\n\t{err}") else: if "Vpcs" in response and response["Vpcs"]: log.info(f"Retrieved default VPC: {response['Vpcs'][0]['VpcId']}") return response["Vpcs"][0] else: pass def verify_inbound_port( self, vpc: Dict[str, Any], port: int, ip_address: str ) -> Tuple[Dict[str, Any], bool]: """ Verify the default security group of the specified VPC allows ingress from this computer. This can be done by allowing ingress from this computer's IP address. In some situations, such as connecting from a corporate network, you must instead specify a prefix list ID. You can also temporarily open the port to any IP address while running this example. If you do, be sure to remove public access when you're done. :param vpc: The VPC used by this example. :param port: The port to verify. :param ip_address: This computer's IP address. :return: The default security group of the specified VPC, and a value that indicates whether the specified port is open. """ try: response = self.ec2_client.describe_security_groups( Filters=[ {"Name": "group-name", "Values": ["default"]}, {"Name": "vpc-id", "Values": [vpc["VpcId"]]}, ] ) sec_group = response["SecurityGroups"][0] port_is_open = False log.info(f"Found default security group {sec_group['GroupId']}.") for ip_perm in sec_group["IpPermissions"]: if ip_perm.get("FromPort", 0) == port: log.info(f"Found inbound rule: {ip_perm}") for ip_range in ip_perm["IpRanges"]: cidr = ip_range.get("CidrIp", "") if cidr.startswith(ip_address) or cidr == "0.0.0.0/0": port_is_open = True if ip_perm["PrefixListIds"]: port_is_open = True if not port_is_open: log.info( f"The inbound rule does not appear to be open to either this computer's IP " f"address of {ip_address}, to all IP addresses (0.0.0.0/0), or to a prefix list ID." ) else: break except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to verify inbound rule for port {port} for VPC {vpc['VpcId']}." ) if error_code == "InvalidVpcID.NotFound": log.error( f"The specified VPC ID '{vpc['VpcId']}' does not exist. Please check the VPC ID." ) log.error(f"Full error:\n\t{err}") else: return sec_group, port_is_open def open_inbound_port(self, sec_group_id: str, port: int, ip_address: str) -> None: """ Add an ingress rule to the specified security group that allows access on the specified port from the specified IP address. :param sec_group_id: The ID of the security group to modify. :param port: The port to open. :param ip_address: The IP address that is granted access. """ try: self.ec2_client.authorize_security_group_ingress( GroupId=sec_group_id, CidrIp=f"{ip_address}/32", FromPort=port, ToPort=port, IpProtocol="tcp", ) log.info( "Authorized ingress to %s on port %s from %s.", sec_group_id, port, ip_address, ) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to authorize ingress to security group '{sec_group_id}' on port {port} from {ip_address}." ) if error_code == "InvalidGroupId.Malformed": log.error( "The security group ID is malformed. " "Please verify that the security group ID is correct." ) elif error_code == "InvalidPermission.Duplicate": log.error( "The specified rule already exists in the security group. " "Check the existing rules for this security group." ) log.error(f"Full error:\n\t{err}") def get_subnets(self, vpc_id: str, zones: List[str] = None) -> List[Dict[str, Any]]: """ Gets the default subnets in a VPC for a specified list of Availability Zones. :param vpc_id: The ID of the VPC to look up. :param zones: The list of Availability Zones to look up. :return: The list of subnets found. """ # Ensure that 'zones' is a list, even if None is passed if zones is None: zones = [] try: paginator = self.ec2_client.get_paginator("describe_subnets") page_iterator = paginator.paginate( Filters=[ {"Name": "vpc-id", "Values": [vpc_id]}, {"Name": "availability-zone", "Values": zones}, {"Name": "default-for-az", "Values": ["true"]}, ] ) subnets = [] for page in page_iterator: subnets.extend(page["Subnets"]) log.info("Found %s subnets for the specified zones.", len(subnets)) return subnets except ClientError as err: log.error( f"Failed to retrieve subnets for VPC '{vpc_id}' in zones {zones}." ) error_code = err.response["Error"]["Code"] if error_code == "InvalidVpcID.NotFound": log.error( "The specified VPC ID does not exist. " "Please check the VPC ID and try again." ) # Add more error-specific handling as needed log.error(f"Full error:\n\t{err}")
Cree una clase que resuma las acciones de Elastic Load Balancing.
class ElasticLoadBalancerWrapper: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__(self, elb_client: boto3.client): """ Initializes the LoadBalancer class with the necessary parameters. """ self.elb_client = elb_client def create_target_group( self, target_group_name: str, protocol: str, port: int, vpc_id: str ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing target group. The target group specifies how the load balancer forwards requests to instances in the group and how instance health is checked. To speed up this demo, the health check is configured with shortened times and lower thresholds. In production, you might want to decrease the sensitivity of your health checks to avoid unwanted failures. :param target_group_name: The name of the target group to create. :param protocol: The protocol to use to forward requests, such as 'HTTP'. :param port: The port to use to forward requests, such as 80. :param vpc_id: The ID of the VPC in which the load balancer exists. :return: Data about the newly created target group. """ try: response = self.elb_client.create_target_group( Name=target_group_name, Protocol=protocol, Port=port, HealthCheckPath="/healthcheck", HealthCheckIntervalSeconds=10, HealthCheckTimeoutSeconds=5, HealthyThresholdCount=2, UnhealthyThresholdCount=2, VpcId=vpc_id, ) target_group = response["TargetGroups"][0] log.info(f"Created load balancing target group '{target_group_name}'.") return target_group except ClientError as err: log.error( f"Couldn't create load balancing target group '{target_group_name}'." ) error_code = err.response["Error"]["Code"] if error_code == "DuplicateTargetGroupName": log.error( f"Target group name {target_group_name} already exists. " "Check if the target group already exists." "Consider using a different name or deleting the existing target group if appropriate." ) elif error_code == "TooManyTargetGroups": log.error( "Too many target groups exist in the account. " "Consider deleting unused target groups to create space for new ones." ) log.error(f"Full error:\n\t{err}") def delete_target_group(self, target_group_name) -> None: """ Deletes the target group. """ try: # Describe the target group to get its ARN response = self.elb_client.describe_target_groups(Names=[target_group_name]) tg_arn = response["TargetGroups"][0]["TargetGroupArn"] # Delete the target group self.elb_client.delete_target_group(TargetGroupArn=tg_arn) log.info("Deleted load balancing target group %s.", target_group_name) # Use a custom waiter to wait until the target group is no longer available self.wait_for_target_group_deletion(self.elb_client, tg_arn) log.info("Target group %s successfully deleted.", target_group_name) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to delete target group '{target_group_name}'.") if error_code == "TargetGroupNotFound": log.error( "Load balancer target group either already deleted or never existed. " "Verify the name and check that the resource exists in the AWS Console." ) elif error_code == "ResourceInUseException": log.error( "Target group still in use by another resource. " "Ensure that the target group is no longer associated with any load balancers or resources.", ) log.error(f"Full error:\n\t{err}") def wait_for_target_group_deletion( self, elb_client, target_group_arn, max_attempts=10, delay=30 ): for attempt in range(max_attempts): try: elb_client.describe_target_groups(TargetGroupArns=[target_group_arn]) print( f"Attempt {attempt + 1}: Target group {target_group_arn} still exists." ) except ClientError as e: if e.response["Error"]["Code"] == "TargetGroupNotFound": print( f"Target group {target_group_arn} has been successfully deleted." ) return else: raise time.sleep(delay) raise TimeoutError( f"Target group {target_group_arn} was not deleted after {max_attempts * delay} seconds." ) def create_load_balancer( self, load_balancer_name: str, subnet_ids: List[str], ) -> Dict[str, Any]: """ Creates an Elastic Load Balancing load balancer that uses the specified subnets and forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create. :param subnet_ids: A list of subnets to associate with the load balancer. :return: Data about the newly created load balancer. """ try: response = self.elb_client.create_load_balancer( Name=load_balancer_name, Subnets=subnet_ids ) load_balancer = response["LoadBalancers"][0] log.info(f"Created load balancer '{load_balancer_name}'.") waiter = self.elb_client.get_waiter("load_balancer_available") log.info( f"Waiting for load balancer '{load_balancer_name}' to be available..." ) waiter.wait(Names=[load_balancer_name]) log.info(f"Load balancer '{load_balancer_name}' is now available!") except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to create load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "DuplicateLoadBalancerNameException": log.error( f"A load balancer with the name '{load_balancer_name}' already exists. " "Load balancer names must be unique within the AWS region. " "Please choose a different name and try again." ) if error_code == "TooManyLoadBalancersException": log.error( "The maximum number of load balancers has been reached in this account and region. " "You can delete unused load balancers or request an increase in the service quota from AWS Support." ) log.error(f"Full error:\n\t{err}") else: return load_balancer def create_listener( self, load_balancer_name: str, target_group: Dict[str, Any], ) -> Dict[str, Any]: """ Creates a listener for the specified load balancer that forwards requests to the specified target group. :param load_balancer_name: The name of the load balancer to create a listener for. :param target_group: An existing target group that is added as a listener to the load balancer. :return: Data about the newly created listener. """ try: # Retrieve the load balancer ARN load_balancer_response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) load_balancer_arn = load_balancer_response["LoadBalancers"][0][ "LoadBalancerArn" ] # Create the listener response = self.elb_client.create_listener( LoadBalancerArn=load_balancer_arn, Protocol=target_group["Protocol"], Port=target_group["Port"], DefaultActions=[ { "Type": "forward", "TargetGroupArn": target_group["TargetGroupArn"], } ], ) log.info( f"Created listener to forward traffic from load balancer '{load_balancer_name}' to target group '{target_group['TargetGroupName']}'." ) return response["Listeners"][0] except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Failed to add a listener on '{load_balancer_name}' for target group '{target_group['TargetGroupName']}'." ) if error_code == "ListenerNotFoundException": log.error( f"The listener could not be found for the load balancer '{load_balancer_name}'. " "Please check the load balancer name and target group configuration." ) if error_code == "InvalidConfigurationRequestException": log.error( f"The configuration provided for the listener on load balancer '{load_balancer_name}' is invalid. " "Please review the provided protocol, port, and target group settings." ) log.error(f"Full error:\n\t{err}") def delete_load_balancer(self, load_balancer_name) -> None: """ Deletes a load balancer. :param load_balancer_name: The name of the load balancer to delete. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) lb_arn = response["LoadBalancers"][0]["LoadBalancerArn"] self.elb_client.delete_load_balancer(LoadBalancerArn=lb_arn) log.info("Deleted load balancer %s.", load_balancer_name) waiter = self.elb_client.get_waiter("load_balancers_deleted") log.info("Waiting for load balancer to be deleted...") waiter.wait(Names=[load_balancer_name]) except ClientError as err: error_code = err.response["Error"]["Code"] log.error( f"Couldn't delete load balancer '{load_balancer_name}'. Error code: {error_code}, Message: {err.response['Error']['Message']}" ) if error_code == "LoadBalancerNotFoundException": log.error( f"The load balancer '{load_balancer_name}' does not exist. " "Please check the name and try again." ) log.error(f"Full error:\n\t{err}") def get_endpoint(self, load_balancer_name) -> str: """ Gets the HTTP endpoint of the load balancer. :return: The endpoint. """ try: response = self.elb_client.describe_load_balancers( Names=[load_balancer_name] ) return response["LoadBalancers"][0]["DNSName"] except ClientError as err: log.error( f"Couldn't get the endpoint for load balancer {load_balancer_name}" ) error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Verify load balancer name and ensure it exists in the AWS console." ) log.error(f"Full error:\n\t{err}") @staticmethod def verify_load_balancer_endpoint(endpoint) -> bool: """ Verify this computer can successfully send a GET request to the load balancer endpoint. :param endpoint: The endpoint to verify. :return: True if the GET request is successful, False otherwise. """ retries = 3 verified = False while not verified and retries > 0: try: lb_response = requests.get(f"http://{endpoint}") log.info( "Got response %s from load balancer endpoint.", lb_response.status_code, ) if lb_response.status_code == 200: verified = True else: retries = 0 except requests.exceptions.ConnectionError: log.info( "Got connection error from load balancer endpoint, retrying..." ) retries -= 1 time.sleep(10) return verified def check_target_health(self, target_group_name: str) -> List[Dict[str, Any]]: """ Checks the health of the instances in the target group. :return: The health status of the target group. """ try: tg_response = self.elb_client.describe_target_groups( Names=[target_group_name] ) health_response = self.elb_client.describe_target_health( TargetGroupArn=tg_response["TargetGroups"][0]["TargetGroupArn"] ) except ClientError as err: log.error(f"Couldn't check health of {target_group_name} target(s).") error_code = err.response["Error"]["Code"] if error_code == "LoadBalancerNotFoundException": log.error( "Load balancer associated with the target group was not found. " "Ensure the load balancer exists, is in the correct AWS region, and " "that you have the necessary permissions to access it.", ) elif error_code == "TargetGroupNotFoundException": log.error( "Target group was not found. " "Verify the target group name, check that it exists in the correct region, " "and ensure it has not been deleted or created in a different account.", ) log.error(f"Full error:\n\t{err}") else: return health_response["TargetHealthDescriptions"]
Cree una clase que utilice DynamoDB para simular un servicio de recomendaciones.
class RecommendationService: """ Encapsulates a DynamoDB table to use as a service that recommends books, movies, and songs. """ def __init__(self, table_name: str, dynamodb_client: boto3.client): """ Initializes the RecommendationService class with the necessary parameters. :param table_name: The name of the DynamoDB recommendations table. :param dynamodb_client: A Boto3 DynamoDB client. """ self.table_name = table_name self.dynamodb_client = dynamodb_client def create(self) -> Dict[str, Any]: """ Creates a DynamoDB table to use as a recommendation service. The table has a hash key named 'MediaType' that defines the type of media recommended, such as Book or Movie, and a range key named 'ItemId' that, combined with the MediaType, forms a unique identifier for the recommended item. :return: Data about the newly created table. :raises RecommendationServiceError: If the table creation fails. """ try: response = self.dynamodb_client.create_table( TableName=self.table_name, AttributeDefinitions=[ {"AttributeName": "MediaType", "AttributeType": "S"}, {"AttributeName": "ItemId", "AttributeType": "N"}, ], KeySchema=[ {"AttributeName": "MediaType", "KeyType": "HASH"}, {"AttributeName": "ItemId", "KeyType": "RANGE"}, ], ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5}, ) log.info("Creating table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_exists") waiter.wait(TableName=self.table_name) log.info("Table %s created.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceInUseException": log.info("Table %s exists, nothing to be done.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when creating table: {err}." ) else: return response def populate(self, data_file: str) -> None: """ Populates the recommendations table from a JSON file. :param data_file: The path to the data file. :raises RecommendationServiceError: If the table population fails. """ try: with open(data_file) as data: items = json.load(data) batch = [{"PutRequest": {"Item": item}} for item in items] self.dynamodb_client.batch_write_item(RequestItems={self.table_name: batch}) log.info( "Populated table %s with items from %s.", self.table_name, data_file ) except ClientError as err: raise RecommendationServiceError( self.table_name, f"Couldn't populate table from {data_file}: {err}" ) def destroy(self) -> None: """ Deletes the recommendations table. :raises RecommendationServiceError: If the table deletion fails. """ try: self.dynamodb_client.delete_table(TableName=self.table_name) log.info("Deleting table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter("table_not_exists") waiter.wait(TableName=self.table_name) log.info("Table %s deleted.", self.table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": log.info("Table %s does not exist, nothing to do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when deleting table: {err}." )
Cree una clase que agrupe las acciones de Systems Manager.
class ParameterHelper: """ Encapsulates Systems Manager parameters. This example uses these parameters to drive the demonstration of resilient architecture, such as failure of a dependency or how the service responds to a health check. """ table: str = "doc-example-resilient-architecture-table" failure_response: str = "doc-example-resilient-architecture-failure-response" health_check: str = "doc-example-resilient-architecture-health-check" def __init__(self, table_name: str, ssm_client: boto3.client): """ Initializes the ParameterHelper class with the necessary parameters. :param table_name: The name of the DynamoDB table that is used as a recommendation service. :param ssm_client: A Boto3 Systems Manager client. """ self.ssm_client = ssm_client self.table_name = table_name def reset(self) -> None: """ Resets the Systems Manager parameters to starting values for the demo. These are the name of the DynamoDB recommendation table, no response when a dependency fails, and shallow health checks. """ self.put(self.table, self.table_name) self.put(self.failure_response, "none") self.put(self.health_check, "shallow") def put(self, name: str, value: str) -> None: """ Sets the value of a named Systems Manager parameter. :param name: The name of the parameter. :param value: The new value of the parameter. :raises ParameterHelperError: If the parameter value cannot be set. """ try: self.ssm_client.put_parameter( Name=name, Value=value, Overwrite=True, Type="String" ) log.info("Setting parameter %s to '%s'.", name, value) except ClientError as err: error_code = err.response["Error"]["Code"] log.error(f"Failed to set parameter {name}.") if error_code == "ParameterLimitExceeded": log.error( "The parameter limit has been exceeded. " "Consider deleting unused parameters or request a limit increase." ) elif error_code == "ParameterAlreadyExists": log.error( "The parameter already exists and overwrite is set to False. " "Use Overwrite=True to update the parameter." ) log.error(f"Full error:\n\t{err}")
-
Para API obtener más información, consulte los siguientes temas en la sección AWS SDKde referencia sobre Python (Boto3). API
-