Responder a eventos de tempo limite de redimensionamento da frota de instâncias de cluster do Amazon EMR - Amazon EMR

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Responder a eventos de tempo limite de redimensionamento da frota de instâncias de cluster do Amazon EMR

Visão geral

Os clusters do Amazon EMR emitem eventos enquanto executam a operação de redimensionamento para clusters de frotas de instâncias. Os eventos de tempo limite de provisionamento são emitidos quando o Amazon EMR interrompe o provisionamento de capacidade spot ou sob demanda da frota após o tempo limite expirar. O usuário pode configurar a duração do tempo limite como parte das especificações de redimensionamento das frotas de instâncias. Em cenários de redimensionamento consecutivo para a mesma frota de instâncias, o Amazon EMR emite os eventos Spot provisioning timeout - continuing resize ou On-Demand provisioning timeout - continuing resize quando o tempo limite da operação de redimensionamento atual expira. Em seguida, começa a provisionar capacidade para a próxima operação de redimensionamento da frota.

Responder a eventos de tempo limite de redimensionamento da frota de instâncias

Recomendamos responder a um evento de tempo limite de aprovisionamento de uma das seguintes maneiras:

  • Revisite as especificações de redimensionamento e repita a operação de redimensionamento. Como a capacidade muda com frequência, seus clusters serão redimensionados com sucesso assim que a EC2 capacidade da Amazon estiver disponível. Recomendamos que os clientes configurem valores mais baixos para a duração do tempo limite dos trabalhos que exigem mais rigorSLAs.

  • Como alternativa, você pode:

  • Para o evento de tempo limite de provisionamento e redimensionamento contínuo, você também pode aguardar o processamento das operações de redimensionamento. O Amazon EMR continuará processando sequencialmente as operações de redimensionamento acionadas para a frota, atendendo às especificações de redimensionamento configuradas.

Também é possível configurar regras ou respostas automatizadas para este evento, conforme descrito na próxima seção.

Recuperação automatizada de um evento de tempo limite de provisionamento

É possível criar automação em resposta aos eventos do Amazon EMR com código de evento Spot Provisioning timeout. Por exemplo, a função do AWS Lambda a seguir desativa um cluster do EMR com uma frota de instâncias que usa instâncias spot para nós de tarefa e cria um novo cluster do EMR com uma frota de instâncias que contém tipos de instância mais diversificados do que a solicitação original. Neste exemplo, o evento Spot Provisioning timeout emitido para os nós de tarefa acionará a execução da função do Lambda.

exemplo Exemplo de função para responder ao evento Spot Provisioning timeout
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")