本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Step Functions 创建和管理亚马逊EMR集群
学习如何集成 AWS Step Functions EMR使用亚马逊提供的亚马逊EMR服务集成APIs。服务集成与相应APIs的 Amazon 类似 EMRAPIs,但传递的字段和返回的响应有所不同。
要了解如何与集成 AWS Step Functions 中的服务,参见集成 服务和。在 Step Functions API 中向服务传递参数
优化 Amazon EMR 集成的主要功能
优化的亚马逊EMR服务集成有一组自定义的APIs包裹底层的亚马逊 EMRAPIs,如下所述。因此,它与Amazon有很大不同 EMR AWS SDK服务集成。
-
支持运行作业 (.sync) 集成模式。
如果执行停止,Step Functions 不会自动终止亚马逊EMR集群。如果您的状态机在您的 Amazon EMR 集群终止之前停止,则您的集群可能会无限期地继续运行,并且可能会产生额外费用。为避免这种情况,请确保正确终止您创建的任何 Amazon EMR 集群。有关更多信息,请参阅:
-
在 Amazon EMR 用户指南中@@ 控制集群终止。
-
服务集成模式运行作业 (.sync) 部分。
注意
自 emr-5.28.0
起,您可以在创建集群时指定参数 StepConcurrencyLevel
,以允许在单个集群上并行运行多个步骤。您可以使用 Step Functions Map
和 Parallel
状态将工作并行提交到集群。
亚马逊EMR服务集成的可用性视亚马逊的可用性而定EMRAPIs。有关特殊地区的限制,请参阅 Amazon EMR 文档。
注意
为了与亚马逊集成EMR,Step Functions在之后的前10分300秒内具有硬编码的60秒作业轮询频率。
支持 Amazon EMR APIs
下表描述了每个亚马逊EMR服务集成API与相应亚马逊服务之间的区别EMRAPIs。
亚马逊EMR服务集成 API | 相应的 EMR API | 差异 |
---|---|---|
createCluster
创建并开始运行集群(作业流程)。 EMRAmazon 与一种称为服务相关IAM角色的独特角色直接关联。要使 |
runJobFlow | createCluster 使用与相同的请求语法 runJobFlow,但以下语法除外:
Amazon EMR 使用这个:
|
createCluster.sync 创建并开始运行集群(作业流程)。 |
runJobFlow | 与 createCluster 相同,但等待集群达到 WAITING 状态。 |
setClusterTermination保护 锁定集群(任务流),这样集群中的EC2实例就不会因为用户干预、API呼叫或任务流错误而终止。 |
setTerminationProtection | 请求使用: Amazon EMR 使用这个:
|
terminateCluster
关闭集群(作业流程)。 |
terminateJobFlows | 请求使用: Amazon EMR 使用这个:
|
terminateCluster.sync 关闭集群(作业流程)。 |
terminateJobFlows | 与 terminateCluster 相同,但等待集群终止。 |
addStep
向正在运行的集群添加新步骤。 或者,您也可以在使用此 |
请求使用密钥 "ClusterId" 。亚马逊EMR使用"JobFlowId" 。请求使用单一步骤。 Amazon EMR 使用这个: 响应如下: Amazon EMR 退回了这个:
|
|
addStep.sync 向正在运行的集群添加新步骤。 或者,您也可以在使用此 |
与 addStep 相同,但等待步骤完成。 |
|
cancelStep
取消正在运行的集群中的一个待处理步骤。 |
cancelSteps | 请求使用: Amazon EMR 使用这个: 响应如下: Amazon EMR 使用这个:
|
modifyInstanceFleetByName
使用指定的 |
modifyInstanceFleet | 请求与 modifyInstanceFleet 相同,但以下情况除外:
|
modifyInstanceGroupByName
修改实例组的节点数和配置设置。 |
modifyInstanceGroups | 请求如下: Amazon EMR 使用清单:
在 已添加一个新字段 |
工作流程示例
以下内容包含一个创建集群的 Task
状态。
"Create_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "MyWorkflowCluster",
"VisibleToAllUsers": true,
"ReleaseLabel": "emr-5.28.0",
"Applications": [
{
"Name": "Hive"
}
],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/",
"Instances": {
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"Name": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge"
}
]
},
{
"InstanceFleetType": "CORE",
"Name": "CORE",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge"
}
]
}
]
}
},
"End": true
}
以下内容包括启用终止保护的 Task
状态。
"Enable_Termination_Protection": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"TerminationProtected": true
},
"End": true
}
以下内容包括向集群提交步骤的 Task
状态。
"Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"ExecutionRoleArn": "arn:aws:iam::123456789012
:role/myEMR-execution-role
",
"Step": {
"Name": "The first step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script",
"--run-hive-script",
"--args",
"-f",
"s3://<region>
.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
"-d",
"INPUT=s3://<region>
.elasticmapreduce.samples",
"-d",
"OUTPUT=s3://<amzn-s3-demo-bucket>
/MyHiveQueryResults/"
]
}
}
},
"End": true
}
以下内容包括取消步骤的 Task
状态。
"Cancel_Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:cancelStep",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"StepId.$": "$.AddStepsResult.StepId"
},
"End": true
}
以下内容包括终止集群的 Task
状态。
"Terminate_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
以下内容包括为实例组向上或向下扩展集群的 Task
状态。
"ModifyInstanceGroupByName": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName",
"Parameters": {
"ClusterId": "j-1234567890123",
"InstanceGroupName": "MyCoreGroup",
"InstanceGroup": {
"InstanceCount": 8
}
},
"End": true
}
以下内容包括为实例队列向上或向下扩展集群的 Task
状态。
"ModifyInstanceFleetByName": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName",
"Parameters": {
"ClusterId": "j-1234567890123",
"InstanceFleetName": "MyCoreFleet",
"InstanceFleet": {
"TargetOnDemandCapacity": 8,
"TargetSpotCapacity": 0
}
},
"End": true
}
IAM致电 Amazon 的政策 EMR
以下示例模板演示了如何操作 AWS Step Functions 根据状态机定义中的资源生成IAM策略。有关更多信息,请参阅Step Functions 如何为集成服务生成IAM策略 和在 Step Functions 中探索服务集成模式。
addStep
静态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:CancelSteps"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]"
]
}
]
}
动态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:CancelSteps"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
cancelStep
静态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:CancelSteps",
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
动态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:CancelSteps",
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
createCluster
静态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:RunJobFlow",
"elasticmapreduce:DescribeCluster",
"elasticmapreduce:TerminateJobFlows"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::{{account}}
:role/[[roleName]]
"
]
}
]
}
setClusterTerminationProtection
静态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:SetTerminationProtection",
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
动态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:SetTerminationProtection",
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
modifyInstanceFleetByName
静态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceFleet",
"elasticmapreduce:ListInstanceFleets"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
动态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceFleet",
"elasticmapreduce:ListInstanceFleets"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
modifyInstanceGroupByName
静态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceGroups",
"elasticmapreduce:ListInstanceGroups"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
动态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceGroups",
"elasticmapreduce:ListInstanceGroups"
],
"Resource": "*"
}
]
}
terminateCluster
静态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:TerminateJobFlows",
"elasticmapreduce:DescribeCluster"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
动态资源
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:TerminateJobFlows",
"elasticmapreduce:DescribeCluster"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}