Amazon EMR クラスターのインスタンスフリートのサイズ変更タイムアウトイベントに対応する
概要
Amazon EMR クラスターは、インスタンスフリートクラスターのサイズ変更操作の実行中にイベントを発行します。プロビジョニングのタイムアウトイベントは、タイムアウト時間が過ぎた後に Amazon EMR がフリートのスポット容量またはオンデマンド容量のプロビジョニングを停止した場合に発生します。タイムアウト時間は、インスタンスフリートのサイズ変更仕様の一部としてユーザーが設定できます。同じインスタンスフリートに対して連続でサイズ変更するシナリオでは、Amazon EMR は現在のサイズ変更操作のタイムアウト時間が過ぎると、Spot
provisioning timeout - continuing resize
または On-Demand provisioning
timeout - continuing resize
イベントを発行します。その後、フリートの次のサイズ変更操作のために容量のプロビジョニングを開始します。
インスタンスフリートのサイズ変更のタイムアウトイベントに対応する
プロビジョニングのタイムアウトイベントには、次のいずれかの方法で対応することをお勧めします。
-
サイズ変更の仕様を再確認し、サイズ変更操作を再試行します。容量は頻繁に変化するため、Amazon EC2 の容量が利用可能になり次第、クラスターは正常にサイズ変更されます。厳格な SLA が求められるジョブでは、タイムアウト時間を低い値に設定することをお勧めします。
-
または、次のいずれかの方法を取ります。
-
インスタンスやアベイラビリティーゾーンの柔軟性に関するベストプラクティスに基づいて、さまざまなインスタンスタイプを使用して新しいクラスターを起動、または
-
オンデマンド容量でクラスターを起動
-
-
provisioning timeout - continuing resize イベントでは、サイズ変更操作が処理されるのをさらに待つことができます。Amazon EMR は設定されたサイズ変更仕様に従いながら、フリートに対してトリガーされたサイズ変更操作を順次処理し続けます。
次のセクションで説明するように、このイベントに対するルールや自動応答を設定することも可能です。
プロビジョニングのタイムアウトイベントからの自動回復
Spot
Provisioning timeout
イベントコードを含む Amazon EMR イベントに対応する自動化機能を構築できます。例えば、次の AWS Lambda 関数は、タスクノードにスポットインスタンスを使用するインスタンスフリートを持つ EMR クラスターを終了し、元のリクエストよりも多様なインスタンスタイプを含むインスタンスフリートで新しい EMR クラスターを作成します。この例では、タスクノードに対する Spot Provisioning timeout
イベントが発行されると、Lambda 関数の実行がトリガーされます。
例 Spot Provisioning timeout
イベントに対応する関数の例
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")