Responder a eventos de capacidad de instancias insuficiente en el clúster de Amazon EMR - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Responder a eventos de capacidad de instancias insuficiente en el clúster de Amazon EMR

Descripción general

Los clústeres de Amazon EMR devuelven el código del evento EC2 provisioning - Insufficient Instance Capacity cuando la zona de disponibilidad seleccionada no tiene suficiente capacidad para cumplir con la solicitud de inicio o cambio de tamaño del clúster. El evento se emite periódicamente tanto con los grupos de instancias como con las flotas de instancias si Amazon EMR encuentra repetidamente excepciones de capacidad insuficiente y no puede cumplir con su solicitud de aprovisionamiento para una operación de inicio o cambio de tamaño del clúster.

En esta página, se describe la mejor manera de responder a este tipo de evento cuando se produce en su clúster de EMR.

Respuesta recomendada a un evento de capacidad insuficiente

Le recomendamos que responda a un evento de capacidad insuficiente de una de las siguientes maneras:

  • Espere a que se recupere la capacidad. La capacidad cambia con frecuencia, por lo que una excepción de capacidad insuficiente puede recuperarse por sí sola. El tamaño de tus clústeres empezará o terminará en cuanto Amazon disponga de EC2 capacidad.

  • Como alternativa, puede terminar el clúster, modificar las configuraciones del tipo de instancia y crear un nuevo clúster con la solicitud de configuración del clúster actualizada. Para obtener más información, consulte Flexibilidad de zona de disponibilidad para un clúster de Amazon EMR.

También puede configurar reglas o respuestas automatizadas a un evento de capacidad insuficiente, como se describe en la siguiente sección.

Recuperación automática de un evento de capacidad insuficiente

Puede crear una automatización en respuesta a los eventos de Amazon EMR, como los que tienen el código de evento EC2 provisioning - Insufficient Instance Capacity. Por ejemplo, la siguiente AWS Lambda función termina un clúster de EMR con un grupo de instancias que usa instancias bajo demanda y, a continuación, crea un nuevo clúster de EMR con un grupo de instancias que contiene tipos de instancias diferentes a los de la solicitud original.

Las siguientes condiciones activan el proceso automatizado:

  • El evento de capacidad insuficiente se ha estado emitiendo en los nodos principales o de núcleo durante más de 20 minutos.

  • El clúster no está en estado LISTO o EN ESPERA. Para obtener más información acerca de los estados del clúster de EMR, consulte Descripción del ciclo de vida del clúster.

nota

Al crear un proceso automatizado para una excepción de capacidad insuficiente, debe tener en cuenta que el evento de capacidad insuficiente es recuperable. La capacidad suele cambiar y tus clústeres reanudarán el cambio de tamaño o comenzarán a funcionar tan pronto como la EC2 capacidad de Amazon esté disponible.

ejemplo función para responder a un evento de capacidad insuficiente
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE = "EMR Instance Group Provisioning" INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE = ( "EC2 provisioning - Insufficient Instance Capacity" ) ALLOWED_INSTANCE_TYPES_TO_USE = [ "m5.xlarge", "c5.xlarge", "m5.4xlarge", "m5.2xlarge", "t3.xlarge", ] CLUSTER_START_ACCEPTABLE_STATES = ["WAITING", "RUNNING"] CLUSTER_START_SLA = 20 CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Provisioning' with eventCode 'EC2 provisioning - Insufficient Instance Capacity' def is_insufficient_capacity_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceGroupType could be CORE, MASTER OR TASK instanceGroupType = event["detail"]["instanceGroupType"] clusterCreationTime = describeClusterResponse["Cluster"]["Status"]["Timeline"][ "CreationDateTime" ] clusterState = describeClusterResponse["Cluster"]["Status"]["State"] now = datetime.datetime.now() now = now.replace(tzinfo=timezone.utc) isClusterStartSlaBreached = clusterCreationTime < now - datetime.timedelta( minutes=CLUSTER_START_SLA ) # Check if instance group receiving Insufficient capacity exception is CORE or PRIMARY (MASTER), # and it's been more than 20 minutes since cluster was created but the cluster state and the cluster state is not updated to RUNNING or WAITING if ( (instanceGroupType == "CORE" or instanceGroupType == "MASTER") and isClusterStartSlaBreached and clusterState not in CLUSTER_START_ACCEPTABLE_STATES ): return True else: return False # Choose item from the list except the exempt value def choice_excluding(exempt): for i in ALLOWED_INSTANCE_TYPES_TO_USE: if i != exempt: return i # Create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceGroupType cloud be CORE, MASTER OR TASK instanceGroupType = event["detail"]["instanceGroupType"] # Following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # Select new instance types to include in the new createCluster request instanceTypeForMaster = ( instanceTypesFromOriginalRequestMaster if instanceGroupType != "MASTER" else choice_excluding(instanceTypesFromOriginalRequestMaster) ) instanceTypeForCore = ( instanceTypesFromOriginalRequestCore if instanceGroupType != "CORE" else choice_excluding(instanceTypesFromOriginalRequestCore) ) print("Starting to create cluster...") instances = { "InstanceGroups": [ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": instanceTypeForMaster, "Market": "ON_DEMAND", "Name": "Master", }, { "InstanceRole": "CORE", "InstanceCount": 1, "InstanceType": instanceTypeForCore, "Market": "ON_DEMAND", "Name": "Core", }, ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # Terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_insufficient_capacity_event(event): print( "Received insufficient capacity event for instanceGroup, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not insufficient capacity event, skipping")