Amazon EMR-Cluster mit Step Functions erstellen und verwalten - AWS Step Functions

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Amazon EMR-Cluster mit Step Functions erstellen und verwalten

Erfahren Sie, wie Sie mithilfe der bereitgestellten Amazon EMR-Serviceintegration eine Integration AWS Step Functions mit Amazon EMR durchführen. APIs Die Serviceintegration APIs ähnelt der entsprechenden Amazon EMR APIs, mit einigen Unterschieden in den Feldern, die übergeben werden, und in den Antworten, die zurückgegeben werden.

Informationen zur Integration mit AWS Diensten in Step Functions finden Sie unter Integrieren von -Services undÜbergeben von Parametern an eine Service-API in Step Functions.

Hauptmerkmale der optimierten Amazon EMR-Integration
  • Die optimierte Amazon EMR-Serviceintegration umfasst ein benutzerdefiniertes Set APIs , das das zugrunde liegende Amazon EMR umschließt APIs, wie unten beschrieben. Aus diesem Grund unterscheidet es sich erheblich von der Amazon EMR AWS SDK-Serviceintegration.

  • Das Ausführen einer Aufgabe (.sync) Integrationsmuster wird unterstützt.

Step Functions beendet einen Amazon EMR-Cluster nicht automatisch, wenn die Ausführung gestoppt wird. Wenn Ihr State Machine stoppt, bevor Ihr Amazon EMR-Cluster beendet wurde, läuft Ihr Cluster möglicherweise auf unbestimmte Zeit weiter und es können zusätzliche Gebühren anfallen. Um dies zu vermeiden, stellen Sie sicher, dass jeder Amazon EMR-Cluster, den Sie erstellen, ordnungsgemäß beendet wird. Weitere Informationen finden Sie unter:

Anmerkung

Ab sofort emr-5.28.0 können Sie den Parameter StepConcurrencyLevel beim Erstellen eines Clusters angeben, damit mehrere Schritte parallel auf einem einzelnen Cluster ausgeführt werden können. Sie können die Step-Funktionen Map und Parallel -Status verwenden, um Arbeiten parallel an den Cluster einzureichen.

Die Verfügbarkeit der Amazon EMR-Serviceintegration hängt von der Verfügbarkeit von Amazon APIs EMR ab. Informationen zu Einschränkungen in speziellen Regionen finden Sie in der Amazon EMR-Dokumentation.

Anmerkung

Für die Integration mit Amazon EMR verfügt Step Functions über eine fest codierte Jobabfragefrequenz von 60 Sekunden für die ersten 10 Minuten und danach 300 Sekunden.

Optimiertes Amazon EMR APIs

In der folgenden Tabelle werden die Unterschiede zwischen den einzelnen Amazon EMR-Serviceintegrations-APIs und den entsprechenden Amazon APIs EMR-APIs beschrieben.

API für die Amazon EMR-Serviceintegration Entsprechende EMR-API Unterschiede
createCluster

Erstellt und startet einen Cluster (Auftragsverlauf).

Amazon EMR ist direkt mit einer speziellen Art von IAM-Rolle verknüpft, die als serviceverknüpfte Rolle bezeichnet wird. Damit createCluster und createCluster.sync funktionieren, müssen Sie die erforderlichen Berechtigungen zum Erstellen der serviceverknüpften Rolle AWSServiceRoleForEMRCleanup konfiguriert haben. Weitere Informationen dazu, einschließlich einer Erklärung, die Sie zu Ihrer IAM-Berechtigungsrichtlinie hinzufügen können, finden Sie unter Verwenden der serviceverknüpften Rolle für Amazon EMR.

runJobFlow createClusterverwendet dieselbe Anfragesyntax wie runJobFlow, mit Ausnahme der folgenden:
  • Das Feld Instances.KeepJobFlowAliveWhenNoSteps ist obligatorisch und muss den booleschen Wert TRUE aufweisen.

  • Das Feld Steps ist nicht zulässig.

  • Das Feld Instances.InstanceFleets[index].Name sollte angegeben werden und muss eindeutig sein, wenn die optionale Connector-API modifyInstanceFleetByName verwendet wird.

  • Das Feld Instances.InstanceGroups[index].Name sollte angegeben werden und muss eindeutig sein, wenn die optionale modifyInstanceGroupByName-API verwendet wird.

Die Antwort lautet:
{ "ClusterId": "string" }
Amazon EMR verwendet Folgendes:
{ "JobFlowId": "string" }
createCluster.sync

Erstellt und startet einen Cluster (Auftragsverlauf).

runJobFlow Entspricht createCluster, wartet aber darauf, dass der Cluster den Zustand WAITING erreicht.
setClusterTerminationSchutz

Sperrt einen Cluster (Auftragsablauf), sodass die EC2 Instances im Cluster nicht durch Benutzereingriffe, einen API-Aufruf oder einen Job-Flow-Fehler beendet werden können.

setTerminationProtection Anforderung verwendet Folgendes:
{ "ClusterId": "string" }
Amazon EMR verwendet Folgendes:
{ "JobFlowIds": ["string"] }
terminateCluster

Beendet einen Cluster (Auftragsverlauf).

terminateJobFlows Anforderung verwendet Folgendes:
{ "ClusterId": "string" }
Amazon EMR verwendet Folgendes:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

Beendet einen Cluster (Auftragsverlauf).

terminateJobFlows Entspricht terminateCluster, wartet aber auf den Abschluss des Clusters.
addStep

Fügt einem ausgeführten Cluster einen neuen Schritt hinzu.

Optional können Sie den ExecutionRoleArn Parameter auch angeben, während Sie diese API verwenden.

addJobFlowSchritte

Die Anfrage verwendet den Schlüssel"ClusterId". Amazon EMR verwendet"JobFlowId". Anforderung verwendet einen einzelnen Schritt.
{ "Step": <"StepConfig object"> }
Amazon EMR verwendet Folgendes:
{ "Steps": [<StepConfig objects>] }
Die Antwort lautet:
{ "StepId": "string" }
Amazon EMR gibt Folgendes zurück:
{ "StepIds": [<strings>] }
addStep.sync

Fügt einem ausgeführten Cluster einen neuen Schritt hinzu.

Optional können Sie den ExecutionRoleArn Parameter auch angeben, während Sie diese API verwenden.

addJobFlowSchritte

Wie addStep, wartet aber auf den Abschluss des Schrittes.
cancelStep

Bricht einen ausstehenden Schritt in einem laufenden Cluster ab.

cancelSteps Anforderung verwendet Folgendes:
{ "StepId": "string" }
Amazon EMR verwendet Folgendes:
{ "StepIds": [<strings>] }
Die Antwort lautet:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR verwendet Folgendes:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

Ändert die Ziel-On-Demand- und Ziel-Spot-Kapazitäten für die Instance-Flotte mit dem angegebenen InstanceFleetName.

modifyInstanceFleet Die Anforderung entspricht der für modifyInstanceFleet, mit Ausnahme von Folgendem:
  • Das Feld Instance.InstanceFleetId ist nicht zulässig.

  • Zur Laufzeit wird die InstanceFleetId automatisch anhand der Serviceintegration bestimmt, indem ListInstanceFleets aufgerufen und das Ergebnis analysiert wird.

modifyInstanceGroupByName

Ändert die Anzahl der Knoten und Konfigurationseinstellungen einer Instance-Gruppe.

modifyInstanceGroups Anforderung lautet:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR verwendet eine Liste:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

Innerhalb des Objekts InstanceGroupModifyConfig ist das Feld InstanceGroupId nicht zulässig.

Das neue Feld InstanceGroupName wurde hinzugefügt. Zur Laufzeit wird die InstanceGroupId automatisch anhand der Serviceintegration bestimmt, indem ListInstanceGroups aufgerufen und das Ergebnis analysiert wird.

Beispiel für einen Arbeitsablauf

Im Folgenden finden Sie einen Task-Zustand, der einen Cluster erstellt.

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

Im Folgenden finden Sie einen Task-Zustand, der den Kündigungsschutz ermöglicht.

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

Im Folgenden finden Sie einen Task-Zustand, der einen Schritt an einen Cluster sendet.

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

Im Folgenden finden Sie einen Task-Zustand, der einen Schritt abbricht.

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

Im Folgenden finden Sie einen Task-Zustand, der einen Cluster beendet.

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

Im Folgenden finden Sie einen Task-Zustand, der einen Cluster für eine Instance-Gruppe nach oben oder unten skaliert.

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

Im Folgenden finden Sie einen Task-Zustand, der einen Cluster für eine Instance-Flotte nach oben oder unten skaliert.

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

IAM-Richtlinien für das Aufrufen von Amazon EMR

Die folgenden Beispielvorlagen zeigen, wie IAM-Richtlinien auf der Grundlage der Ressourcen in Ihrer State-Machine-Definition AWS Step Functions generiert werden. Weitere Informationen erhalten Sie unter So generiert Step Functions IAM-Richtlinien für integrierte Dienste und Entdecken Sie Serviceintegrationsmuster in Step Functions.

addStep

Statische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Dynamische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

Statische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Dynamische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

Statische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::{{account}}:role/[[roleName]]" ] } ] }

setClusterTerminationProtection

Statische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Dynamische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

Statische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Dynamische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

Statische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Dynamische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

Statische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Dynamische Ressourcen

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }