本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Step Functions 呼叫 Amazon EMR
Step Functions 可以直接從Amazon States Language(ASL)控制某些 AWS 服務。如需了解詳細資訊,請參閱 使用其他 服務 和 將參數傳遞至服務 API。
最佳化的 Amazon EMR 整合與 Amazon EMR AWS 開發套件整合有何不同
最佳化的 Amazon EMR 服務整合具有一組自訂的 API,用於包含基礎 Amazon EMR API,如下所述。因此,它與 Amazon EMR AWS 開發套件服務整合有很大不同。此外,也支援執行任務 (.sync)整合模式。
若要 AWS Step Functions 與 Amazon EMR 整合,您可以使用提供的 Amazon EMR 服務整合 API。服務整合 API 與對應的 Amazon EMR API 類似,傳遞的欄位和傳回的回應有些許差異。
如果停止執行,Step Functions 不會自動終止 Amazon EMR 叢集。如果您的狀態機器在 Amazon EMR 叢集終止之前停止,您的叢集可能會無限期地繼續執行,並可能產生額外費用。為避免這種情況,請確保您建立的任何 Amazon EMR 叢集都已正確終止。如需詳細資訊,請參閱:
-
Amazon EMR 使用者指南中的控制叢集終止。
-
「服務整合模式執行任務 (.sync)」區段。
注意
截至目前emr-5.28.0
,您可以在建立叢集StepConcurrencyLevel
時指定參數,以允許在單一叢集上 parallel 執行多個步驟。您可以使用 Step Functions Map
和Parallel
狀態來提交與叢集 parallel 的工作。
Amazon EMR 服務整合的可用性取決於 Amazon EMR API 的可用性。如需特殊區域的限制,請參閱 Amazon EMR 文件。
注意
為了與 Amazon EMR 整合,Step Functions 在此之後的前 10 分鐘和 300 秒內具有 60 秒的硬式編碼任務輪詢頻率。
下表說明每個服務整合 API 與其對應的 Amazon EMR API 之間的差異。
Amazon EMR 服務整合 API 和對應的 Amazon EMR API | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Amazon EMR 服務整合 API | 對應 EMR API | 差異 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
createCluster 建立並開始執行叢集 (任務流程)。 Amazon EMR 會直接連結至稱為服務連結角色的唯一 IAM 角色類型。為了讓 |
runJobFlow | createCluster 使用與下列項目相同的要求語法:runJobFlow
Amazon EMR 使用這個:
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
createCluster.sync 建立並開始執行叢集 (任務流程)。 |
runJobFlow | 與 createCluster 一樣,但會等待叢集到達 WAITING 狀態。 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
setClusterTermination保護 鎖定叢集 (任務流程),使叢集中的 EC2 執行個體無法藉由使用者介入、API 呼叫或任務流程錯誤而終止。 |
setTerminationProtection | 請求會使用: Amazon EMR 使用這個:
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
terminateCluster 關閉叢集 (任務流程)。 |
terminateJobFlows | 請求會使用: Amazon EMR 使用這個:
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
terminateCluster.sync 關閉叢集 (任務流程)。 |
terminateJobFlows | 與 terminateCluster 相同,但會等待叢集終止。 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
addStep 新增步驟至執行中叢集。 或者,您也可以在使用此 API 時指定 |
請求使用密鑰"ClusterId" 。Amazon EMR 使用"JobFlowId" 。請求會使用單一步驟。 Amazon EMR 使用這個: 回應為: Amazon EMR 返回這個:
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
addStep.sync 新增步驟至執行中叢集。 或者,您也可以在使用此 API 時指定 |
與 addStep 相同,但會等待步驟完成。 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cancelStep 取消執行中叢集中的擱置步驟。 |
cancelSteps | 請求會使用: Amazon EMR 使用這個: 回應為: Amazon EMR 使用這個:
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
modifyInstanceFleetByName
針對具有所指定 |
modifyInstanceFleet | 請求與 modifyInstanceFleet 相同,除了:
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
modifyInstanceGroupByName
修改執行個體群組的節點數量和組態設定。 |
modifyInstanceGroups | 請求為: Amazon EMR 使用一個列表:
在 已新增新欄位 |
以下包含建立叢集的 Task
狀態。
"Create_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "MyWorkflowCluster",
"VisibleToAllUsers": true,
"ReleaseLabel": "emr-5.28.0",
"Applications": [
{
"Name": "Hive"
}
],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/",
"Instances": {
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"Name": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge"
}
]
},
{
"InstanceFleetType": "CORE",
"Name": "CORE",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge"
}
]
}
]
}
},
"End": true
}
以下包含啟用終止保護的 Task
狀態。
"Enable_Termination_Protection": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"TerminationProtected": true
},
"End": true
}
以下包含提交步驟至叢集的 Task
狀態。
"Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"ExecutionRoleArn": "arn:aws:iam::123456789012
:role/myEMR-execution-role
",
"Step": {
"Name": "The first step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script",
"--run-hive-script",
"--args",
"-f",
"s3://<region>
.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
"-d",
"INPUT=s3://<region>
.elasticmapreduce.samples",
"-d",
"OUTPUT=s3://<mybucket>
/MyHiveQueryResults/"
]
}
}
},
"End": true
}
以下包含取消步驟的 Task
狀態。
"Cancel_Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:cancelStep",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"StepId.$": "$.AddStepsResult.StepId"
},
"End": true
}
以下包含終止叢集的 Task
狀態。
"Terminate_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
以下包含一種可對執行個體群組的叢集進行擴增或縮減的 Task
狀態。
"ModifyInstanceGroupByName": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName",
"Parameters": {
"ClusterId": "j-1234567890123",
"InstanceGroupName": "MyCoreGroup",
"InstanceGroup": {
"InstanceCount": 8
}
},
"End": true
}
以下包含一種可對執行個體機群的叢集進行擴增或縮減的 Task
狀態。
"ModifyInstanceFleetByName": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName",
"Parameters": {
"ClusterId": "j-1234567890123",
"InstanceFleetName": "MyCoreFleet",
"InstanceFleet": {
"TargetOnDemandCapacity": 8,
"TargetSpotCapacity": 0
}
},
"End": true
}
如需Step Functions與其他 AWS 服務搭配使用時如何設定IAM權限的相關資訊,請參閱整合式服務的 IAM 政策。