Crea e gestisci cluster Amazon EMR con Step Functions - AWS Step Functions

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Crea e gestisci cluster Amazon EMR con Step Functions

Scopri come integrarti AWS Step Functions con Amazon EMR utilizzando l'integrazione del servizio Amazon EMR fornito. APIs L'integrazione APIs del servizio è simile a quella del corrispondente Amazon EMR APIs, con alcune differenze nei campi che vengono passati e nelle risposte restituite.

Per ulteriori informazioni sull'integrazione con AWS i servizi in Step Functions, vedere Integrazione dei servizi ePassaggio di parametri a un'API di servizio in Step Functions.

Caratteristiche principali dell'integrazione ottimizzata con Amazon EMR
  • L'integrazione ottimizzata del servizio Amazon EMR dispone di un set personalizzato APIs che racchiude l'Amazon EMR APIs sottostante, descritto di seguito. Per questo motivo, si differenzia in modo significativo dall'integrazione del servizio SDK Amazon AWS EMR.

  • Il modello di Esegui un processo (.sync) integrazione è supportato.

Step Functions non termina automaticamente un cluster Amazon EMR se l'esecuzione viene interrotta. Se la macchina a stati si arresta prima della chiusura del cluster Amazon EMR, il cluster potrebbe continuare a funzionare a tempo indeterminato e potrebbe comportare costi aggiuntivi. Per evitare ciò, assicurati che tutti i cluster Amazon EMR che crei siano terminati correttamente. Per ulteriori informazioni, consultare:

Nota

A partire da emr-5.28.0 ora, puoi specificare il parametro StepConcurrencyLevel durante la creazione di un cluster per consentire l'esecuzione di più passaggi in parallelo su un singolo cluster. È possibile utilizzare Step Functions Map e Parallel gli stati per inviare il lavoro in parallelo al cluster.

La disponibilità dell'integrazione del servizio Amazon EMR è soggetta alla disponibilità di Amazon EMR. APIs Consulta la documentazione di Amazon EMR per le limitazioni nelle regioni speciali.

Nota

Per l'integrazione con Amazon EMR, Step Functions utilizza una frequenza di polling dei job codificata di 60 secondi per i primi 10 minuti e per i 300 secondi successivi.

Amazon EMR ottimizzato APIs

La tabella seguente descrive le differenze tra ogni API di integrazione del servizio Amazon EMR e Amazon EMR corrispondente. APIs

API di integrazione dei servizi Amazon EMR API EMR corrispondente Differenze
createCluster

Crea e avvia l'esecuzione di un cluster (flusso di lavoro).

Amazon EMR è collegato direttamente a un tipo unico di ruolo IAM noto come ruolo collegato ai servizi. Perché createCluster e createCluster.sync funzionino, è necessario aver configurato le autorizzazioni necessarie per creare il ruolo collegato al servizio AWSServiceRoleForEMRCleanup. Per ulteriori informazioni su questo argomento, inclusa una dichiarazione che puoi aggiungere alla tua politica di autorizzazioni IAM, consulta Using the Service-Linked Role for Amazon EMR.

runJobFlow createClusterutilizza la stessa sintassi di richiesta di runJobFlow, ad eccezione di quanto segue:
  • Il campo Instances.KeepJobFlowAliveWhenNoSteps è obbligatorio e deve avere il valore booleano TRUE.

  • Il campo Steps non è consentito.

  • Deve essere fornito il campo Instances.InstanceFleets[index].Name, che deve essere univoco se viene utilizzata l'API del connettore modifyInstanceFleetByName opzionale.

  • Deve essere fornito il campo Instances.InstanceGroups[index].Name, che deve essere univoco se viene utilizzata l'API modifyInstanceGroupByName opzionale.

La risposta è:
{ "ClusterId": "string" }
Amazon EMR utilizza questo:
{ "JobFlowId": "string" }
createCluster.sync

Crea e avvia l'esecuzione di un cluster (flusso di lavoro).

runJobFlow Lo stesso di createCluster, ma attende che il cluster raggiunga lo stato WAITING.
setClusterTerminationProtezione

Blocca un cluster (flusso di lavoro) in modo che le EC2 istanze nel cluster non possano essere terminate dall'intervento dell'utente, da una chiamata API o da un errore del flusso di lavoro.

setTerminationProtection La richiesta utilizza questo:
{ "ClusterId": "string" }
Amazon EMR utilizza questo:
{ "JobFlowIds": ["string"] }
terminateCluster

Arresta un cluster (flusso di lavoro).

terminateJobFlows La richiesta utilizza questo:
{ "ClusterId": "string" }
Amazon EMR utilizza questo:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

Arresta un cluster (flusso di lavoro).

terminateJobFlows Lo stesso di terminateCluster, ma attende che il cluster venga terminato.
addStep

Aggiunge un nuovo passaggio a un cluster in esecuzione.

Facoltativamente, puoi anche specificare il ExecutionRoleArn parametro durante l'utilizzo di questa API.

addJobFlowFasi

La richiesta utilizza la chiave"ClusterId". Usi di Amazon EMR. "JobFlowId" La richiesta utilizza un singolo passaggio.
{ "Step": <"StepConfig object"> }
Amazon EMR utilizza questo:
{ "Steps": [<StepConfig objects>] }
La risposta è:
{ "StepId": "string" }
Amazon EMR restituisce questo:
{ "StepIds": [<strings>] }
addStep.sync

Aggiunge un nuovo passaggio a un cluster in esecuzione.

Facoltativamente, puoi anche specificare il ExecutionRoleArn parametro durante l'utilizzo di questa API.

addJobFlowFasi

Come addStep, ma attende il completamento del passaggio.
cancelStep

Annulla un passaggio in sospeso in un cluster in esecuzione.

cancelSteps La richiesta utilizza questo:
{ "StepId": "string" }
Amazon EMR utilizza questo:
{ "StepIds": [<strings>] }
La risposta è:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR utilizza questo:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

Modifica le capacità Spot target e on demand per il parco di istanze con il InstanceFleetName specificato.

modifyInstanceFleet La richiesta è la stessa di quella di modifyInstanceFleet, ad eccezione di quanto segue:
  • Il campo Instance.InstanceFleetId non è consentito.

  • In fase di esecuzione InstanceFleetId viene determinato automaticamente dall'integrazione del servizio chiamando ListInstanceFleets e analizzando il risultato.

modifyInstanceGroupByName

Modifica il numero di nodi e le impostazioni di configurazione di un gruppo di istanze.

modifyInstanceGroups La richiesta è questa:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR utilizza un elenco:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

All'interno dell'oggetto InstanceGroupModifyConfig il campo InstanceGroupId non è consentito.

È stato aggiunto un nuovo campo, InstanceGroupName. In fase di esecuzione InstanceGroupId viene determinato automaticamente dall'integrazione del servizio chiamando ListInstanceGroups e analizzando il risultato.

Esempio di workflow

L'esempio seguente include uno stato Task che crea un cluster.

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

Di seguito è riportato uno stato Task che consente la protezione di terminazione.

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

Di seguito è riportato uno stato Task che invia un passaggio a un cluster.

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

Di seguito è riportato uno stato Task che annulla un passaggio.

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

Di seguito è riportato uno stato Task che termina un cluster.

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

Di seguito è riportato uno stato Task che consente di scalare un cluster verso l'alto o verso il basso per un gruppo di istanze.

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

Di seguito è riportato uno stato Task che consente di scalare un cluster verso l'alto o verso il basso per un parco di istanze.

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

Politiche IAM per le chiamate ad Amazon EMR

I seguenti modelli di esempio mostrano come AWS Step Functions generare le politiche IAM in base alle risorse nella definizione della macchina a stati. Per ulteriori informazioni, consulta In che modo Step Functions genera policy IAM per servizi integrati e Scopri i modelli di integrazione dei servizi in Step Functions.

addStep

Risorse statiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Risorse dinamiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

Risorse statiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Risorse dinamiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

Risorse statiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::{{account}}:role/[[roleName]]" ] } ] }

setClusterTerminationProtection

Risorse statiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Risorse dinamiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

Risorse statiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Risorse dinamiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

Risorse statiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Risorse dinamiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

Risorse statiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Risorse dinamiche

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }