翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Step Functions を使用して Amazon EMRクラスターを作成および管理します。
の統合方法を学ぶ AWS Step Functions が提供する Amazon EMRサービス統合 EMRを使用して Amazon と 。 APIsサービス統合APIsは対応する Amazon EMR と似ていますがAPIs、渡されるフィールドと返されるレスポンスにはいくつかの違いがあります。
との統合について学ぶには AWS Step Functions の サービスについては、 サービスとの統合「」および「」を参照してくださいStep Functions APIのサービスへのパラメータの受け渡し。
最適化された Amazon EMR統合の主な機能
Optimized Amazon EMRサービス統合には、以下でAPIs説明する基盤となる Amazon EMR をラップAPIsするカスタマイズされた のセットがあります。このため、Amazon とは大きく異なります。 EMR AWS SDK サービス統合。
-
ジョブの実行 (.sync) 統合パターンがサポートされています。
実行が停止しても、Step Functions は Amazon EMRクラスターを自動的に終了しません。Amazon EMRクラスターが終了する前にステートマシンが停止した場合、クラスターは無期限に実行され続け、追加料金が発生する可能性があります。これを回避するには、作成した Amazon EMRクラスターが正しく終了していることを確認します。詳細については、以下を参照してください。
-
サービス統合パターン ジョブの実行 (.sync) セクション。
注記
emr-5.28.0
の時点で、クラスターの作成時に StepConcurrencyLevel
パラメータを指定して、単一のクラスターで複数のステップを並行して実行することを許可します。Step Functions Map
および Parallel
状態を使用して、並行して作業をクラスターに送信できます。
Amazon EMRサービス統合の可用性は、Amazon EMR の可用性によって異なりますAPIs。特別なリージョンの制限については、Amazon EMR のドキュメントを参照してください。
注記
Amazon との統合の場合EMR、Step Functions には、最初の 10 分間とその後 300 秒間、ハードコードされた 60 秒のジョブポーリング頻度があります。
サポートされている Amazon EMR APIs
次の表に、各 Amazon EMRサービス統合APIと対応する Amazon EMR の違いを示しますAPIs。
Amazon EMR Service の統合 API | 対応する EMR API | 差異 |
---|---|---|
createCluster
新しいクラスター (ジョブフロー) を作成して実行を開始します。 Amazon EMR は、サービスにリンクされたIAMロールと呼ばれる一意のタイプのロールに直接リンクされます。 |
runJobFlow | createCluster は、以下を除いてrunJobFlow、 と同じリクエスト構文を使用します。
Amazon はこれEMRを使用します。
|
createCluster同期 新しいクラスター (ジョブフロー) を作成して実行を開始します。 |
runJobFlow | createCluster と同じですが、クラスターが WAITING 状態になるまで待機します。 |
setClusterTermination保護 クラスター (ジョブフロー) をロックして、ユーザーの介入、 API 呼び出し、またはジョブフローエラーによってクラスター内のEC2インスタンスを終了できないようにします。 |
setTerminationProtection | リクエストは以下を使用します。 Amazon はこれEMRを使用します。
|
terminateCluster
クラスター (ジョブフロー) をシャットダウンします。 |
terminateJobFlows | リクエストは以下を使用します。 Amazon はこれEMRを使用します。
|
terminateCluster同期 クラスター (ジョブフロー) をシャットダウンします。 |
terminateJobFlows | terminateCluster と同じですが、クラスターが終了するまで待機します。 |
addStep
実行中のクラスターに新しいステップを追加します。 オプションで、この の使用中に |
リクエストはキー "ClusterId" を使用します。Amazon EMRは を使用します"JobFlowId" 。リクエストは 1 つのステップを使用します。 Amazon はこれEMRを使用します。 レスポンスは次のとおりです。 Amazon は以下EMRを返します。
|
|
addStep同期 実行中のクラスターに新しいステップを追加します。 オプションで、この の使用中に |
addStep と同じですが、ステップが完了するまで待機します。 |
|
cancelStep
実行中のクラスターで保留中のステップを取り消します。 |
cancelSteps | リクエストは以下を使用します。 Amazon はこれEMRを使用します。 レスポンスは次のとおりです。 Amazon はこれEMRを使用します。
|
modifyInstanceFleetByName
指定された |
modifyInstanceFleet | リクエストは modifyInstanceFleet の場合と同じですが、以下が異なります。
|
modifyInstanceGroupByName
インスタンスグループのノード数と構成設定を変更します。 |
modifyInstanceGroups | リクエストは次のとおりです。 Amazon はリストEMRを使用します。
新しいフィールド |
ワークフローの例
以下にはクラスターを作成する Task
状態が含まれています。
"Create_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "MyWorkflowCluster",
"VisibleToAllUsers": true,
"ReleaseLabel": "emr-5.28.0",
"Applications": [
{
"Name": "Hive"
}
],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/",
"Instances": {
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"Name": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge"
}
]
},
{
"InstanceFleetType": "CORE",
"Name": "CORE",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge"
}
]
}
]
}
},
"End": true
}
以下には終了保護を有効にする Task
状態が含まれています。
"Enable_Termination_Protection": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"TerminationProtected": true
},
"End": true
}
以下にはクラスターにステップを送信する Task
状態が含まれています。
"Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"ExecutionRoleArn": "arn:aws:iam::123456789012
:role/myEMR-execution-role
",
"Step": {
"Name": "The first step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script",
"--run-hive-script",
"--args",
"-f",
"s3://<region>
.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
"-d",
"INPUT=s3://<region>
.elasticmapreduce.samples",
"-d",
"OUTPUT=s3://<amzn-s3-demo-bucket>
/MyHiveQueryResults/"
]
}
}
},
"End": true
}
以下には、ステップをキャンセルする Task
状態が含まれます。
"Cancel_Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:cancelStep",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"StepId.$": "$.AddStepsResult.StepId"
},
"End": true
}
以下には、クラスターを終了する Task
状態を示します。
"Terminate_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
以下には、インスタンスグループに合わせてクラスターをスケールアップまたはスケールダウンする Task
状態が含まれています。
"ModifyInstanceGroupByName": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName",
"Parameters": {
"ClusterId": "j-1234567890123",
"InstanceGroupName": "MyCoreGroup",
"InstanceGroup": {
"InstanceCount": 8
}
},
"End": true
}
以下には、インスタンスフリートに合わせてクラスターをスケールアップまたはスケールダウンする Task
状態が含まれています。
"ModifyInstanceFleetByName": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName",
"Parameters": {
"ClusterId": "j-1234567890123",
"InstanceFleetName": "MyCoreFleet",
"InstanceFleet": {
"TargetOnDemandCapacity": 8,
"TargetSpotCapacity": 0
}
},
"End": true
}
IAM Amazon を呼び出すための ポリシー EMR
次のサンプルテンプレートは、 AWS Step Functions は、ステートマシン定義のリソースに基づいてIAMポリシーを生成します。詳細については、「Step Functions が統合サービスのIAMポリシーを生成する方法」および「Step Functions でサービス統合パターンを検出する」を参照してください。
addStep
静的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:CancelSteps"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]"
]
}
]
}
動的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:CancelSteps"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
cancelStep
静的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:CancelSteps",
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
動的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:CancelSteps",
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
createCluster
静的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:RunJobFlow",
"elasticmapreduce:DescribeCluster",
"elasticmapreduce:TerminateJobFlows"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::{{account}}
:role/[[roleName]]
"
]
}
]
}
setClusterTerminationProtection
静的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:SetTerminationProtection",
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
動的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:SetTerminationProtection",
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
modifyInstanceFleetByName
静的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceFleet",
"elasticmapreduce:ListInstanceFleets"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
動的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceFleet",
"elasticmapreduce:ListInstanceFleets"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
modifyInstanceGroupByName
静的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceGroups",
"elasticmapreduce:ListInstanceGroups"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
動的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceGroups",
"elasticmapreduce:ListInstanceGroups"
],
"Resource": "*"
}
]
}
terminateCluster
静的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:TerminateJobFlows",
"elasticmapreduce:DescribeCluster"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
動的リソース
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:TerminateJobFlows",
"elasticmapreduce:DescribeCluster"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}