選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

使用 Step Functions 建立和管理 Amazon EMR 叢集

焦點模式
使用 Step Functions 建立和管理 Amazon EMR 叢集 - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

了解如何使用提供的 Amazon EMR 服務整合 APIs AWS Step Functions 與 Amazon EMR 整合。服務整合 APIs 類似於對應的 Amazon EMR APIs,其中傳遞的欄位和傳回的回應有一些差異。

若要了解如何在 Step Functions 中整合 AWS 服務,請參閱 整合 服務在 Step Functions 中將參數傳遞至服務 API

Optimized Amazon EMR 整合的主要功能
  • Optimized Amazon EMR 服務整合具有一組自訂 APIs,可包裝基礎 Amazon EMR APIs,如下所述。因此,它與 Amazon EMR AWS SDK 服務整合有很大的差異。

  • 支援執行任務 (.sync)整合模式。

如果停止執行,Step Functions 不會自動終止 Amazon EMR 叢集。如果您的狀態機器在 Amazon EMR 叢集終止之前停止,您的叢集可能會無限期地繼續執行,並且可能會產生額外費用。若要避免這種情況,請確定您建立的任何 Amazon EMR 叢集都已正確終止。如需詳細資訊,請參閱:

注意

從 開始emr-5.28.0,您可以在建立叢集StepConcurrencyLevel時指定 參數,以允許多個步驟在單一叢集上平行執行。您可以使用 Step Functions MapParallel 狀態,將工作平行提交至叢集。

Amazon EMR 服務整合的可用性取決於 Amazon EMR APIs的可用性。如需特殊區域的限制,請參閱 Amazon EMR 文件。

注意

為了與 Amazon EMR 整合,Step Functions 在前 10 分鐘和之後 300 秒有硬式編碼的 60 秒任務輪詢頻率。

最佳化 Amazon EMR APIs

下表說明每個 Amazon EMR 服務整合 API 與對應 Amazon EMR APIs 之間的差異。

Amazon EMR Service Integration API 對應 EMR API 差異
createCluster

建立並開始執行叢集 (任務流程)。

Amazon EMR 會直接連結到稱為服務連結角色的唯一 IAM 角色類型。為了讓 createClustercreateCluster.sync 運作,您必須設定必要的許可來建立服務連結角色 AWSServiceRoleForEMRCleanup。如需詳細資訊,包括您可以新增至 IAM 許可政策的陳述式,請參閱使用 Amazon EMR 的服務連結角色

RunJobFlow createCluster 使用與 runJobFlow 相同的請求語法,除了:
  • Instances.KeepJobFlowAliveWhenNoSteps 欄位是強制性的,且必須具有布林值 TRUE

  • 不允許 Steps 欄位。

  • 如果使用選用的 modifyInstanceFleetByName 連接器 API,則應提供 Instances.InstanceFleets[index].Name 欄位,且必須是唯一的。

  • 如果使用選用的 modifyInstanceGroupByName API,則應提供 Instances.InstanceGroups[index].Name 欄位,且必須是唯一的。

回應為:
{ "ClusterId": "string" }
Amazon EMR 使用此項目:
{ "JobFlowId": "string" }
createCluster.sync

建立並開始執行叢集 (任務流程)。

RunJobFlow createCluster 一樣,但會等待叢集到達 WAITING 狀態。
setClusterTerminationProtection

鎖定叢集 (任務流程),使叢集中的 EC2 執行個體無法藉由使用者介入、API 呼叫或任務流程錯誤而終止。

setTerminationProtection 請求會使用:
{ "ClusterId": "string" }
Amazon EMR 使用此項目:
{ "JobFlowIds": ["string"] }
terminateCluster

關閉叢集 (任務流程)。

terminateJobFlows 請求會使用:
{ "ClusterId": "string" }
Amazon EMR 使用此項目:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

關閉叢集 (任務流程)。

terminateJobFlows terminateCluster 相同,但會等待叢集終止。
addStep

新增步驟至執行中叢集。

您也可以選擇性地在使用此 API 時指定 ExecutionRoleArn 參數。

addJobFlowSteps

請求使用金鑰 "ClusterId"。Amazon EMR 使用 "JobFlowId"。請求會使用單一步驟。
{ "Step": <"StepConfig object"> }
Amazon EMR 使用此項目:
{ "Steps": [<StepConfig objects>] }
回應為:
{ "StepId": "string" }
Amazon EMR 傳回此項目:
{ "StepIds": [<strings>] }
addStep.sync

新增步驟至執行中叢集。

您也可以選擇性地在使用此 API 時指定 ExecutionRoleArn 參數。

addJobFlowSteps

addStep 相同,但會等待步驟完成。
cancelStep

取消執行中叢集中的擱置步驟。

cancelSteps 請求會使用:
{ "StepId": "string" }
Amazon EMR 使用此項目:
{ "StepIds": [<strings>] }
回應為:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR 使用此項目:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

針對具有所指定 InstanceFleetName 的執行個體機群,修改其目標隨需容量和目標 Spot 容量。

modifyInstanceFleet 請求與 modifyInstanceFleet 相同,除了:
  • 不允許 Instance.InstanceFleetId 欄位。

  • 在執行時間,InstanceFleetId 是由服務整合自動決定,方法為呼叫 ListInstanceFleets 並剖析結果。

modifyInstanceGroupByName

修改執行個體群組的節點數量和組態設定。

modifyInstanceGroups 請求為:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR 使用清單:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

InstanceGroupModifyConfig 物件中,不允許 InstanceGroupId 欄位。

已新增新欄位 InstanceGroupName。在執行時間,InstanceGroupId 是由服務整合自動決定,方法為呼叫 ListInstanceGroups 並剖析結果。

工作流程範例

以下包含建立叢集的 Task 狀態。

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

以下包含啟用終止保護的 Task 狀態。

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

以下包含提交步驟至叢集的 Task 狀態。

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

以下包含取消步驟的 Task 狀態。

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

以下包含終止叢集的 Task 狀態。

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

以下包含一種可對執行個體群組的叢集進行擴增或縮減的 Task 狀態。

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

以下包含一種可對執行個體機群的叢集進行擴增或縮減的 Task 狀態。

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

呼叫 Amazon EMR 的 IAM 政策

下列範例範本顯示 如何根據您狀態機器定義中的資源 AWS Step Functions 產生 IAM 政策。如需詳細資訊,請參閱 Step Functions 如何為整合服務產生 IAM 政策探索 Step Functions 中的服務整合模式

addStep

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::{{account}}:role/[[roleName]]" ] } ] }

setClusterTerminationProtection

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

在本頁面

下一個主題:

Amazon EMR on EKS

上一個主題:

Amazon EKS
隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。