Amazon EMR 클러스터 인스턴스 플릿 크기 조정 제한 시간 이벤트에 대한 대응
개요
Amazon EMR 클러스터는 인스턴스 플릿 클러스터의 크기 조정 작업을 실행하는 동안 이벤트를 생성합니다. 프로비저닝 제한 시간 이벤트는 제한 시간이 만료된 후 Amazon EMR이 플릿에 대한 스팟 또는 온디맨드 용량 프로비저닝을 중지할 때 발생합니다. 제한 시간은 인스턴스 플릿의 크기 조정 사양의 일부로 사용자가 구성할 수 있습니다. 동일한 인스턴스 플릿의 크기가 연속적으로 조정되는 시나리오에서 Amazon EMR은 현재 크기 조정 작업의 제한 시간이 만료되면 Spot
provisioning timeout - continuing resize
또는 On-Demand provisioning
timeout - continuing resize
이벤트를 생성합니다. 그런 다음 플릿의 다음 크기 조정 작업을 위한 용량 프로비저닝을 시작합니다.
인스턴스 플릿 크기 조정 제한 시간 이벤트에 대한 대응
프로비저닝 제한 시간 이벤트에 대한 다음 방법 중 하나를 사용하여 대응하는 것이 좋습니다.
-
크기 조정 사양을 다시 확인하고 크기 조정 작업을 다시 시도합니다. 용량이 자주 변경되므로 Amazon EC2 용량이 확보되면 즉시 클러스터의 크기가 성공적으로 조정됩니다. 더 엄격한 SLA가 필요한 작업의 경우 제한 시간을 더 낮은 값으로 구성하는 것이 좋습니다.
-
또는 다음 중 하나를 수행할 수 있습니다.
-
인스턴스 및 가용 영역 유연성에 대한 모범 사례를 기반으로 다양한 인스턴스 유형으로 새 클러스터 시작 또는
-
온디맨드 용량으로 클러스터 시작
-
-
프로비저닝 제한 시간 - 크기 조정 계속 이벤트의 경우 크기 조정 작업이 처리될 때까지 더 기다릴 수 있습니다. Amazon EMR은 구성된 크기 조정 사양에 따라 플릿에 대해 트리거된 크기 조정 작업을 계속 순차적으로 처리합니다.
다음 섹션의 설명에 따라 이 이벤트에 대한 규칙이나 자동 대응을 설정할 수도 있습니다.
프로비저닝 제한 시간 이벤트에서 자동 복구
Spot
Provisioning timeout
이벤트 코드의 Amazon EMR 이벤트에 대한 대응으로 자동화를 구축할 수 있습니다. 예를 들어 다음 AWS Lambda 함수는 태스크 노드에 대해 스팟 인스턴스를 사용하는 인스턴스 플릿이 있는 EMR 클러스터를 종료한 다음, 원래 요청과 다른 인스턴스 유형을 포함하는 인스턴스 플릿으로 새 EMR 클러스터를 생성합니다. 이 예제에서 태스크 노드에 대해 생성된 Spot Provisioning timeout
이벤트는 Lambda 함수의 실행을 트리거합니다.
예 Spot Provisioning timeout
이벤트에 대응하는 예제 함수
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")