Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Aprenda a realizar la integración AWS Step Functions con Amazon EMR mediante la integración del servicio Amazon EMR proporcionada. APIs La integración de servicios APIs es similar a la del Amazon EMR correspondiente APIs, con algunas diferencias en los campos que se pasan y en las respuestas que se devuelven.
Para obtener más información sobre la integración con AWS los servicios de Step Functions, consulte Integración de los servicios de yCómo pasar parámetros a una API de servicio en Step Functions.
Características principales de la integración optimizada de Amazon EMR
La integración optimizada del servicio Amazon EMR tiene un conjunto personalizado APIs que envuelve el Amazon EMR APIs subyacente, que se describe a continuación. Por ello, difiere considerablemente de la integración del servicio Amazon EMR AWS SDK.
-
Se admite el patrón de integración Ejecutar un trabajo (.sync).
Step Functions no finaliza automáticamente un clúster de Amazon EMR si se detiene la ejecución. Si su máquina de estado se detiene antes de que su clúster de Amazon EMR finalice, es posible que el clúster siga ejecutándose indefinidamente y que se acumulen cargos adicionales. Para evitarlo, asegúrese de que cualquier clúster de Amazon EMR que cree finalice correctamente. Para obtener más información, consulte:
-
Control de la terminación de los clústeres en la guía del usuario de Amazon EMR.
-
La sección Ejecutar un trabajo (.sync) de Patrones de integración de servicios.
nota
A partir de emr-5.28.0
, puede especificar el parámetro StepConcurrencyLevel
al crear un clúster para permitir que diferentes pasos se ejecuten en paralelo en un único clúster. Puede utilizar los estados Map
y Parallel
de Step Functions para enviar trabajo en paralelo al clúster.
La disponibilidad de la integración del servicio Amazon EMR está sujeta a la disponibilidad de Amazon EMR. APIs Consulte la documentación de Amazon EMR para conocer las limitaciones en regiones especiales.
nota
Para su integración con Amazon EMR, Step Functions tiene una frecuencia de sondeo de trabajo codificada de 60 segundos durante los primeros 10 minutos y de 300 segundos después.
Amazon EMR optimizado APIs
En la siguiente tabla se describen las diferencias entre cada API de integración de servicios de Amazon EMR y la correspondiente Amazon EMR. APIs
API de integración de servicios de Amazon EMR | API de EMR correspondiente | Diferencias |
---|---|---|
createCluster Crea y comienza a ejecutar un clúster (flujo de trabajo). Amazon EMR está vinculado directamente a un tipo de rol de IAM único conocido como rol vinculado a servicio. Para que |
runJobFlow | createCluster utiliza la misma sintaxis de solicitud que runJobFlow, excepto en lo siguiente:
Amazon EMR utiliza lo siguiente:
|
createCluster.sync Crea y comienza a ejecutar un clúster (flujo de trabajo). |
runJobFlow | Lo mismo que createCluster , pero espera a que el clúster alcance el estado WAITING . |
setClusterTerminationProtección Bloquea un clúster (flujo de tareas) para que las EC2 instancias del clúster no se puedan cerrar mediante la intervención del usuario, una llamada a la API o un error en el flujo de tareas. |
setTerminationProtection | La solicitud utiliza: Amazon EMR utiliza lo siguiente:
|
terminateCluster Cierra un clúster (flujo de trabajo). |
terminateJobFlows | La solicitud utiliza: Amazon EMR utiliza lo siguiente:
|
terminateCluster.sync Cierra un clúster (flujo de trabajo). |
terminateJobFlows | Lo mismo que terminateCluster , pero espera a que el clúster finalice. |
addStep Agrega un nuevo paso a un clúster en ejecución. Si lo desea, también puede especificar el parámetro |
La solicitud utiliza la clave "ClusterId" . Amazon EMR utiliza "JobFlowId" . La solicitud utiliza un solo paso. Amazon EMR utiliza lo siguiente: La respuesta es la siguiente: Amazon EMR devuelve lo siguiente:
|
|
addStep.sync Agrega un nuevo paso a un clúster en ejecución. Si lo desea, también puede especificar el parámetro |
Lo mismo que addStep , pero espera a que el paso se complete. |
|
cancelStep Cancela un paso pendiente en un clúster en ejecución. |
cancelSteps | La solicitud utiliza: Amazon EMR utiliza lo siguiente: La respuesta es la siguiente: Amazon EMR utiliza lo siguiente:
|
modifyInstanceFleetByName
Modifica las capacidades de Spot de destino y de destino bajo demanda para la flota de instancias con el |
modifyInstanceFleet | La solicitud es la misma que para modifyInstanceFleet , excepto por lo siguiente:
|
modifyInstanceGroupByName
Modifica el número de nodos y las opciones de configuración de un grupo de instancias. |
modifyInstanceGroups | La solicitud es la siguiente: Amazon EMR utiliza una lista:
En el objeto Se ha añadido un nuevo campo, |
Ejemplo de flujo de trabajo
El código siguiente incluye un estado Task
que crea un clúster.
"Create_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "MyWorkflowCluster",
"VisibleToAllUsers": true,
"ReleaseLabel": "emr-5.28.0",
"Applications": [
{
"Name": "Hive"
}
],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/",
"Instances": {
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"Name": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge"
}
]
},
{
"InstanceFleetType": "CORE",
"Name": "CORE",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge"
}
]
}
]
}
},
"End": true
}
El código siguiente incluye un estado Task
que habilita la protección de la terminación.
"Enable_Termination_Protection": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"TerminationProtected": true
},
"End": true
}
El código siguiente incluye un estado Task
que envía un paso a un clúster.
"Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"ExecutionRoleArn": "arn:aws:iam::123456789012
:role/myEMR-execution-role
",
"Step": {
"Name": "The first step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script",
"--run-hive-script",
"--args",
"-f",
"s3://<region>
.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
"-d",
"INPUT=s3://<region>
.elasticmapreduce.samples",
"-d",
"OUTPUT=s3://<amzn-s3-demo-bucket>
/MyHiveQueryResults/"
]
}
}
},
"End": true
}
El código siguiente incluye un estado Task
que cancela un paso.
"Cancel_Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:cancelStep",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"StepId.$": "$.AddStepsResult.StepId"
},
"End": true
}
El código siguiente incluye un estado Task
que termina un clúster.
"Terminate_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
El código siguiente incluye un estado Task
que escala un clúster hacia arriba o hacia abajo para un grupo de instancias.
"ModifyInstanceGroupByName": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName",
"Parameters": {
"ClusterId": "j-1234567890123",
"InstanceGroupName": "MyCoreGroup",
"InstanceGroup": {
"InstanceCount": 8
}
},
"End": true
}
El código siguiente incluye un estado Task
que escala un clúster hacia arriba o hacia abajo para una flota de instancias.
"ModifyInstanceFleetByName": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName",
"Parameters": {
"ClusterId": "j-1234567890123",
"InstanceFleetName": "MyCoreFleet",
"InstanceFleet": {
"TargetOnDemandCapacity": 8,
"TargetSpotCapacity": 0
}
},
"End": true
}
Políticas de IAM para llamar a Amazon EMR
En las siguientes plantillas de ejemplo, se muestra cómo se AWS Step Functions generan las políticas de IAM en función de los recursos de la definición de su máquina de estado. Para obtener más información, consulte Generación de políticas de IAM para servicios integrados por Steps Functions y Descubrimiento de los patrones de integración de servicios en Step Functions.
addStep
Recursos estáticos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:CancelSteps"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]"
]
}
]
}
Recursos dinámicos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:CancelSteps"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
cancelStep
Recursos estáticos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:CancelSteps",
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
Recursos dinámicos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:CancelSteps",
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
createCluster
Recursos estáticos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:RunJobFlow",
"elasticmapreduce:DescribeCluster",
"elasticmapreduce:TerminateJobFlows"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::{{account}}
:role/[[roleName]]
"
]
}
]
}
setClusterTerminationProtection
Recursos estáticos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:SetTerminationProtection",
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
Recursos dinámicos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "elasticmapreduce:SetTerminationProtection",
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
modifyInstanceFleetByName
Recursos estáticos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceFleet",
"elasticmapreduce:ListInstanceFleets"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
Recursos dinámicos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceFleet",
"elasticmapreduce:ListInstanceFleets"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}
modifyInstanceGroupByName
Recursos estáticos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceGroups",
"elasticmapreduce:ListInstanceGroups"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
Recursos dinámicos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ModifyInstanceGroups",
"elasticmapreduce:ListInstanceGroups"
],
"Resource": "*"
}
]
}
terminateCluster
Recursos estáticos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:TerminateJobFlows",
"elasticmapreduce:DescribeCluster"
],
"Resource": [
"arn:aws:elasticmapreduce:[[region]]
:[[accountId]]
:cluster/[[clusterId]]
"
]
}
]
}
Recursos dinámicos
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:TerminateJobFlows",
"elasticmapreduce:DescribeCluster"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
}
]
}