

文档 AWS SDK 示例 GitHub 存储库中还有更多 [S AWS DK 示例](https://github.com/awsdocs/aws-doc-sdk-examples)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon EMR 的代码示例 AWS SDKs
<a name="emr_code_examples"></a>

以下代码示例向您展示了如何将 Amazon EMR 与 AWS 软件开发套件 (SDK) 配合使用。

*操作*是大型程序的代码摘录，必须在上下文中运行。您可以通过操作了解如何调用单个服务函数，还可以通过函数相关场景的上下文查看操作。

*场景*是向您展示如何通过在一个服务中调用多个函数或与其他 AWS 服务服务结合来完成特定任务的代码示例。

**更多资源**
+  **[ Amazon EMR 管理指南](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html)**——有关 Amazon EMR 的更多信息。
+ **[Amazon EMR API 参考](https://docs.aws.amazon.com/emr/latest/APIReference/Welcome.html)**——有关所有可用的 Amazon EMR 操作的详细信息。
+ **[AWS 开发者中心](https://aws.amazon.com/developer/code-examples/?awsf.sdk-code-examples-product=product%23emr)** — 您可以按类别或全文搜索筛选的代码示例。
+ **[AWS SDK 示例](https://github.com/awsdocs/aws-doc-sdk-examples)** — 包含首选语言完整代码的 GitHub 存储库。包括有关设置和运行代码的说明。

**Contents**
+ [基本功能](emr_code_examples_basics.md)
  + [操作](emr_code_examples_actions.md)
    + [`AddJobFlowSteps`](emr_example_emr_AddJobFlowSteps_section.md)
    + [`DescribeCluster`](emr_example_emr_DescribeCluster_section.md)
    + [`DescribeStep`](emr_example_emr_DescribeStep_section.md)
    + [`ListSteps`](emr_example_emr_ListSteps_section.md)
    + [`RunJobFlow`](emr_example_emr_RunJobFlow_section.md)
    + [`TerminateJobFlows`](emr_example_emr_TerminateJobFlows_section.md)
+ [场景](emr_code_examples_scenarios.md)
  + [创建一个短期 Amazon EMR 集群并运行一个步骤](emr_example_emr_Scenario_ShortLivedEmrCluster_section.md)
  + [运行 Shell 脚本以安装库](emr_example_emr_Usage_InstallLibrariesWithSsm_section.md)

# 使用 Amazon EMR 的基本示例 AWS SDKs
<a name="emr_code_examples_basics"></a>

以下代码示例展示了如何使用 Amazon EMR 的基础知识。 AWS SDKs

**Contents**
+ [操作](emr_code_examples_actions.md)
  + [`AddJobFlowSteps`](emr_example_emr_AddJobFlowSteps_section.md)
  + [`DescribeCluster`](emr_example_emr_DescribeCluster_section.md)
  + [`DescribeStep`](emr_example_emr_DescribeStep_section.md)
  + [`ListSteps`](emr_example_emr_ListSteps_section.md)
  + [`RunJobFlow`](emr_example_emr_RunJobFlow_section.md)
  + [`TerminateJobFlows`](emr_example_emr_TerminateJobFlows_section.md)

# 使用 Amazon EMR 执行的操作 AWS SDKs
<a name="emr_code_examples_actions"></a>

以下代码示例演示了如何使用执行单个 Amazon EMR 操作。 AWS SDKs每个示例都包含一个指向的链接 GitHub，您可以在其中找到有关设置和运行代码的说明。

这些代码节选调用了 Amazon EMR API，是必须在上下文中运行的较大型程序的代码节选。您可以在[使用 Amazon EMR 的场景 AWS SDKs](emr_code_examples_scenarios.md)中结合上下文查看操作。

 以下示例仅包括最常用的操作。有关完整列表，请参阅《[Amazon EMR API Reference](https://docs.aws.amazon.com/emr/latest/APIReference/Welcome.html)》。

**Topics**
+ [`AddJobFlowSteps`](emr_example_emr_AddJobFlowSteps_section.md)
+ [`DescribeCluster`](emr_example_emr_DescribeCluster_section.md)
+ [`DescribeStep`](emr_example_emr_DescribeStep_section.md)
+ [`ListSteps`](emr_example_emr_ListSteps_section.md)
+ [`RunJobFlow`](emr_example_emr_RunJobFlow_section.md)
+ [`TerminateJobFlows`](emr_example_emr_TerminateJobFlows_section.md)

# 与 AWS SDK `AddJobFlowSteps` 配合使用
<a name="emr_example_emr_AddJobFlowSteps_section"></a>

以下代码示例演示如何使用 `AddJobFlowSteps`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/emr#code-examples)中查找完整示例，了解如何进行设置和运行。
添加 Spark 步骤，该步骤在添加后立即由集群运行。  

```
def add_step(cluster_id, name, script_uri, script_args, emr_client):
    """
    Adds a job step to the specified cluster. This example adds a Spark
    step, which is run by the cluster as soon as it is added.

    :param cluster_id: The ID of the cluster.
    :param name: The name of the step.
    :param script_uri: The URI where the Python script is stored.
    :param script_args: Arguments to pass to the Python script.
    :param emr_client: The Boto3 EMR client object.
    :return: The ID of the newly added step.
    """
    try:
        response = emr_client.add_job_flow_steps(
            JobFlowId=cluster_id,
            Steps=[
                {
                    "Name": name,
                    "ActionOnFailure": "CONTINUE",
                    "HadoopJarStep": {
                        "Jar": "command-runner.jar",
                        "Args": [
                            "spark-submit",
                            "--deploy-mode",
                            "cluster",
                            script_uri,
                            *script_args,
                        ],
                    },
                }
            ],
        )
        step_id = response["StepIds"][0]
        logger.info("Started step with ID %s", step_id)
    except ClientError:
        logger.exception("Couldn't start step %s with URI %s.", name, script_uri)
        raise
    else:
        return step_id
```
在集群上作为任务步骤运行 Amazon EMR 文件系统 (EMRFS) 命令。这可用于在集群上自动执行 EMRFS 命令，而不是通过 SSH 连接手动运行命令。  

```
import boto3
from botocore.exceptions import ClientError


def add_emrfs_step(command, bucket_url, cluster_id, emr_client):
    """
    Add an EMRFS command as a job flow step to an existing cluster.

    :param command: The EMRFS command to run.
    :param bucket_url: The URL of a bucket that contains tracking metadata.
    :param cluster_id: The ID of the cluster to update.
    :param emr_client: The Boto3 Amazon EMR client object.
    :return: The ID of the added job flow step. Status can be tracked by calling
             the emr_client.describe_step() function.
    """
    job_flow_step = {
        "Name": "Example EMRFS Command Step",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["/usr/bin/emrfs", command, bucket_url],
        },
    }

    try:
        response = emr_client.add_job_flow_steps(
            JobFlowId=cluster_id, Steps=[job_flow_step]
        )
        step_id = response["StepIds"][0]
        print(f"Added step {step_id} to cluster {cluster_id}.")
    except ClientError:
        print(f"Couldn't add a step to cluster {cluster_id}.")
        raise
    else:
        return step_id


def usage_demo():
    emr_client = boto3.client("emr")
    # Assumes the first waiting cluster has EMRFS enabled and has created metadata
    # with the default name of 'EmrFSMetadata'.
    cluster = emr_client.list_clusters(ClusterStates=["WAITING"])["Clusters"][0]
    add_emrfs_step(
        "sync", "s3://elasticmapreduce/samples/cloudfront", cluster["Id"], emr_client
    )


if __name__ == "__main__":
    usage_demo()
```
+  有关 API 的详细信息，请参阅适用[AddJobFlowSteps](https://docs.aws.amazon.com/goto/boto3/elasticmapreduce-2009-03-31/AddJobFlowSteps)于 *Python 的AWS SDK (Boto3) API 参考*。

------
#### [ SAP ABAP ]

**适用于 SAP ABAP 的 SDK**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/sap-abap/services/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
    TRY.
        " Build args list for Spark submit
        DATA lt_args TYPE /aws1/cl_emrxmlstringlist_w=>tt_xmlstringlist.
        APPEND NEW /aws1/cl_emrxmlstringlist_w( 'spark-submit' ) TO lt_args.
        APPEND NEW /aws1/cl_emrxmlstringlist_w( '--deploy-mode' ) TO lt_args.
        APPEND NEW /aws1/cl_emrxmlstringlist_w( 'cluster' ) TO lt_args.
        APPEND NEW /aws1/cl_emrxmlstringlist_w( iv_script_uri ) TO lt_args.
        APPEND LINES OF it_script_args TO lt_args.

        " Create step configuration
        DATA(lo_hadoop_jar_step) = NEW /aws1/cl_emrhadoopjarstepcfg(
          iv_jar = 'command-runner.jar'
          it_args = lt_args
        ).

        DATA(lo_step_config) = NEW /aws1/cl_emrstepconfig(
          iv_name = iv_name
          iv_actiononfailure = 'CONTINUE'
          io_hadoopjarstep = lo_hadoop_jar_step
        ).

        DATA lt_steps TYPE /aws1/cl_emrstepconfig=>tt_stepconfiglist.
        APPEND lo_step_config TO lt_steps.

        DATA(lo_result) = lo_emr->addjobflowsteps(
          iv_jobflowid = iv_cluster_id
          it_steps = lt_steps
        ).

        " Get first step ID
        DATA(lt_step_ids) = lo_result->get_stepids( ).
        READ TABLE lt_step_ids INDEX 1 INTO DATA(lo_step_id_obj).
        IF sy-subrc = 0.
          ov_step_id = lo_step_id_obj->get_value( ).
          MESSAGE |Step added with ID { ov_step_id }| TYPE 'I'.
        ENDIF.
      CATCH /aws1/cx_emrinternalservererr INTO DATA(lo_internal_error).
        DATA(lv_error) = lo_internal_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
    ENDTRY.
```
+  有关 API 的详细信息，请参阅适用[AddJobFlowSteps](https://docs.aws.amazon.com/sdk-for-sap-abap/v1/api/latest/index.html)于 S *AP 的AWS SDK ABAP API 参考*。

------

# `DescribeCluster`与 AWS SDK 或 CLI 配合使用
<a name="emr_example_emr_DescribeCluster_section"></a>

以下代码示例演示如何使用 `DescribeCluster`。

------
#### [ CLI ]

**AWS CLI**  
命令:  

```
aws emr describe-cluster --cluster-id j-XXXXXXXX
```
输出：  

```
For release-label based uniform instance groups cluster:

        {
            "Cluster": {
                "Status": {
                    "Timeline": {
                        "ReadyDateTime": 1436475075.199,
                        "CreationDateTime": 1436474656.563,
                    },
                    "State": "WAITING",
                    "StateChangeReason": {
                        "Message": "Waiting for steps to run"
                    }
                },
                "Ec2InstanceAttributes": {
                    "ServiceAccessSecurityGroup": "sg-xxxxxxxx",
                    "EmrManagedMasterSecurityGroup": "sg-xxxxxxxx",
                    "IamInstanceProfile": "EMR_EC2_DefaultRole",
                    "Ec2KeyName": "myKey",
                    "Ec2AvailabilityZone": "us-east-1c",
                    "EmrManagedSlaveSecurityGroup": "sg-yyyyyyyyy"
                },
                "Name": "My Cluster",
                "ServiceRole": "EMR_DefaultRole",
                "Tags": [],
                "TerminationProtected": true,
                "UnhealthyNodeReplacement": true,
                "ReleaseLabel": "emr-4.0.0",
                "NormalizedInstanceHours": 96,
                "InstanceGroups": [
                    {
                        "RequestedInstanceCount": 2,
                        "Status": {
                            "Timeline": {
                                "ReadyDateTime": 1436475074.245,
                                "CreationDateTime": 1436474656.564,
                                "EndDateTime": 1436638158.387
                            },
                            "State": "RUNNING",
                            "StateChangeReason": {
                                "Message": "",
                            }
                        },
                        "Name": "CORE",
                        "InstanceGroupType": "CORE",
                        "Id": "ig-YYYYYYY",
                        "Configurations": [],
                        "InstanceType": "m3.large",
                        "Market": "ON_DEMAND",
                        "RunningInstanceCount": 2
                    },
                    {
                        "RequestedInstanceCount": 1,
                        "Status": {
                            "Timeline": {
                                "ReadyDateTime": 1436475074.245,
                                "CreationDateTime": 1436474656.564,
                                "EndDateTime": 1436638158.387
                            },
                            "State": "RUNNING",
                            "StateChangeReason": {
                                "Message": "",
                            }
                        },
                        "Name": "MASTER",
                        "InstanceGroupType": "MASTER",
                        "Id": "ig-XXXXXXXXX",
                        "Configurations": [],
                        "InstanceType": "m3.large",
                        "Market": "ON_DEMAND",
                        "RunningInstanceCount": 1
                    }
                ],
                "Applications": [
                    {
                        "Name": "Hadoop"
                    }
                ],
                "VisibleToAllUsers": true,
                "BootstrapActions": [],
                "MasterPublicDnsName": "ec2-54-147-144-78.compute-1.amazonaws.com",
                "AutoTerminate": false,
                "Id": "j-XXXXXXXX",
                "Configurations": [
                    {
                        "Properties": {
                            "fs.s3.consistent.retryPeriodSeconds": "20",
                            "fs.s3.enableServerSideEncryption": "true",
                            "fs.s3.consistent": "false",
                            "fs.s3.consistent.retryCount": "2"
                        },
                        "Classification": "emrfs-site"
                    }
                ]
            }
        }


For release-label based instance fleet cluster:
{
    "Cluster": {
        "Status": {
            "Timeline": {
                "ReadyDateTime": 1487897289.705,
                "CreationDateTime": 1487896933.942
            },
            "State": "WAITING",
            "StateChangeReason": {
                "Message": "Waiting for steps to run"
            }
        },
        "Ec2InstanceAttributes": {
            "EmrManagedMasterSecurityGroup": "sg-xxxxx",
            "RequestedEc2AvailabilityZones": [],
            "RequestedEc2SubnetIds": [],
            "IamInstanceProfile": "EMR_EC2_DefaultRole",
            "Ec2AvailabilityZone": "us-east-1a",
            "EmrManagedSlaveSecurityGroup": "sg-xxxxx"
        },
        "Name": "My Cluster",
        "ServiceRole": "EMR_DefaultRole",
        "Tags": [],
        "TerminationProtected": false,
        "UnhealthyNodeReplacement": false,
        "ReleaseLabel": "emr-5.2.0",
        "NormalizedInstanceHours": 472,
        "InstanceCollectionType": "INSTANCE_FLEET",
        "InstanceFleets": [
            {
                "Status": {
                    "Timeline": {
                        "ReadyDateTime": 1487897212.74,
                        "CreationDateTime": 1487896933.948
                    },
                    "State": "RUNNING",
                    "StateChangeReason": {
                        "Message": ""
                    }
                },
                "ProvisionedSpotCapacity": 1,
                "Name": "MASTER",
                "InstanceFleetType": "MASTER",
                "LaunchSpecifications": {
                    "SpotSpecification": {
                        "TimeoutDurationMinutes": 60,
                        "TimeoutAction": "TERMINATE_CLUSTER"
                    }
                },
                "TargetSpotCapacity": 1,
                "ProvisionedOnDemandCapacity": 0,
                "InstanceTypeSpecifications": [
                    {
                        "BidPrice": "0.5",
                        "InstanceType": "m3.xlarge",
                        "WeightedCapacity": 1
                    }
                ],
                "Id": "if-xxxxxxx",
                "TargetOnDemandCapacity": 0
            }
        ],
        "Applications": [
            {
                "Version": "2.7.3",
                "Name": "Hadoop"
            }
        ],
        "ScaleDownBehavior": "TERMINATE_AT_INSTANCE_HOUR",
        "VisibleToAllUsers": true,
        "BootstrapActions": [],
        "MasterPublicDnsName": "ec2-xxx-xx-xxx-xx.compute-1.amazonaws.com",
        "AutoTerminate": false,
        "Id": "j-xxxxx",
        "Configurations": []
    }
}

For ami based uniform instance group cluster:

    {
        "Cluster": {
            "Status": {
                "Timeline": {
                    "ReadyDateTime": 1399400564.432,
                    "CreationDateTime": 1399400268.62
                },
                "State": "WAITING",
                "StateChangeReason": {
                    "Message": "Waiting for steps to run"
                }
            },
            "Ec2InstanceAttributes": {
                "IamInstanceProfile": "EMR_EC2_DefaultRole",
                "Ec2AvailabilityZone": "us-east-1c"
            },
            "Name": "My Cluster",
            "Tags": [],
            "TerminationProtected": true,
            "UnhealthyNodeReplacement": true,
            "RunningAmiVersion": "2.5.4",
            "InstanceGroups": [
                {
                    "RequestedInstanceCount": 1,
                    "Status": {
                        "Timeline": {
                            "ReadyDateTime": 1399400558.848,
                            "CreationDateTime": 1399400268.621
                        },
                        "State": "RUNNING",
                        "StateChangeReason": {
                            "Message": ""
                        }
                    },
                    "Name": "Master instance group",
                    "InstanceGroupType": "MASTER",
                    "InstanceType": "m1.small",
                    "Id": "ig-ABCD",
                    "Market": "ON_DEMAND",
                    "RunningInstanceCount": 1
                },
                {
                    "RequestedInstanceCount": 2,
                    "Status": {
                        "Timeline": {
                            "ReadyDateTime": 1399400564.439,
                            "CreationDateTime": 1399400268.621
                        },
                        "State": "RUNNING",
                        "StateChangeReason": {
                            "Message": ""
                        }
                    },
                    "Name": "Core instance group",
                    "InstanceGroupType": "CORE",
                    "InstanceType": "m1.small",
                    "Id": "ig-DEF",
                    "Market": "ON_DEMAND",
                    "RunningInstanceCount": 2
                }
            ],
            "Applications": [
                {
                    "Version": "1.0.3",
                    "Name": "hadoop"
                }
            ],
            "BootstrapActions": [],
            "VisibleToAllUsers": false,
            "RequestedAmiVersion": "2.4.2",
            "LogUri": "s3://myLogUri/",
            "AutoTerminate": false,
            "Id": "j-XXXXXXXX"
        }
    }
```
+  有关 API 的详细信息，请参阅*AWS CLI 命令参考[DescribeCluster](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/emr/describe-cluster.html)*中的。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
def describe_cluster(cluster_id, emr_client):
    """
    Gets detailed information about a cluster.

    :param cluster_id: The ID of the cluster to describe.
    :param emr_client: The Boto3 EMR client object.
    :return: The retrieved cluster information.
    """
    try:
        response = emr_client.describe_cluster(ClusterId=cluster_id)
        cluster = response["Cluster"]
        logger.info("Got data for cluster %s.", cluster["Name"])
    except ClientError:
        logger.exception("Couldn't get data for cluster %s.", cluster_id)
        raise
    else:
        return cluster
```
+  有关 API 的详细信息，请参阅适用[DescribeCluster](https://docs.aws.amazon.com/goto/boto3/elasticmapreduce-2009-03-31/DescribeCluster)于 *Python 的AWS SDK (Boto3) API 参考*。

------
#### [ SAP ABAP ]

**适用于 SAP ABAP 的 SDK**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/sap-abap/services/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
    TRY.
        oo_result = lo_emr->describecluster(
          iv_clusterid = iv_cluster_id
        ).
        DATA(lo_cluster) = oo_result->get_cluster( ).
        DATA(lv_cluster_name) = lo_cluster->get_name( ).
        MESSAGE |Retrieved cluster information for { lv_cluster_name }| TYPE 'I'.
      CATCH /aws1/cx_emrinternalserverex INTO DATA(lo_internal_error).
        DATA(lv_error) = lo_internal_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
      CATCH /aws1/cx_emrinvalidrequestex INTO DATA(lo_invalid_error).
        lv_error = lo_invalid_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
    ENDTRY.
```
+  有关 API 的详细信息，请参阅适用[DescribeCluster](https://docs.aws.amazon.com/sdk-for-sap-abap/v1/api/latest/index.html)于 S *AP 的AWS SDK ABAP API 参考*。

------

# `DescribeStep`与 AWS SDK 或 CLI 配合使用
<a name="emr_example_emr_DescribeStep_section"></a>

以下代码示例演示如何使用 `DescribeStep`。

------
#### [ CLI ]

**AWS CLI**  
以下命令描述集群中步骤 ID 为 `s-3LZC0QUT43AM` 和集群 ID 为 `j-3SD91U2E1L2QX` 的步骤：  

```
aws emr describe-step --cluster-id j-3SD91U2E1L2QX --step-id s-3LZC0QUT43AM
```
输出：  

```
{
    "Step": {
        "Status": {
            "Timeline": {
                "EndDateTime": 1433200470.481,
                "CreationDateTime": 1433199926.597,
                "StartDateTime": 1433200404.959
            },
            "State": "COMPLETED",
            "StateChangeReason": {}
        },
        "Config": {
            "Args": [
                "s3://us-west-2.elasticmapreduce/libs/hive/hive-script",
                "--base-path",
                "s3://us-west-2.elasticmapreduce/libs/hive/",
                "--install-hive",
                "--hive-versions",
                "0.13.1"
            ],
            "Jar": "s3://us-west-2.elasticmapreduce/libs/script-runner/script-runner.jar",
            "Properties": {}
        },
        "Id": "s-3LZC0QUT43AM",
        "ActionOnFailure": "TERMINATE_CLUSTER",
        "Name": "Setup hive"
    }
}
```
+  有关 API 的详细信息，请参阅*AWS CLI 命令参考[DescribeStep](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/emr/describe-step.html)*中的。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
def describe_step(cluster_id, step_id, emr_client):
    """
    Gets detailed information about the specified step, including the current state of
    the step.

    :param cluster_id: The ID of the cluster.
    :param step_id: The ID of the step.
    :param emr_client: The Boto3 EMR client object.
    :return: The retrieved information about the specified step.
    """
    try:
        response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id)
        step = response["Step"]
        logger.info("Got data for step %s.", step_id)
    except ClientError:
        logger.exception("Couldn't get data for step %s.", step_id)
        raise
    else:
        return step
```
+  有关 API 的详细信息，请参阅适用[DescribeStep](https://docs.aws.amazon.com/goto/boto3/elasticmapreduce-2009-03-31/DescribeStep)于 *Python 的AWS SDK (Boto3) API 参考*。

------
#### [ SAP ABAP ]

**适用于 SAP ABAP 的 SDK**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/sap-abap/services/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
    TRY.
        oo_result = lo_emr->describestep(
          iv_clusterid = iv_cluster_id
          iv_stepid = iv_step_id
        ).
        DATA(lo_step) = oo_result->get_step( ).
        DATA(lv_step_name) = lo_step->get_name( ).
        MESSAGE |Retrieved step information for { lv_step_name }| TYPE 'I'.
      CATCH /aws1/cx_emrinternalserverex INTO DATA(lo_internal_error).
        DATA(lv_error) = lo_internal_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
      CATCH /aws1/cx_emrinvalidrequestex INTO DATA(lo_invalid_error).
        lv_error = lo_invalid_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
    ENDTRY.
```
+  有关 API 的详细信息，请参阅适用[DescribeStep](https://docs.aws.amazon.com/sdk-for-sap-abap/v1/api/latest/index.html)于 S *AP 的AWS SDK ABAP API 参考*。

------

# `ListSteps`与 AWS SDK 或 CLI 配合使用
<a name="emr_example_emr_ListSteps_section"></a>

以下代码示例演示如何使用 `ListSteps`。

------
#### [ CLI ]

**AWS CLI**  
以下命令列出了集群 ID 为 `j-3SD91U2E1L2QX` 的集群的所有步骤：  

```
aws emr list-steps --cluster-id j-3SD91U2E1L2QX
```
+  有关 API 的详细信息，请参阅*AWS CLI 命令参考[ListSteps](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/emr/list-steps.html)*中的。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
def list_steps(cluster_id, emr_client):
    """
    Gets a list of steps for the specified cluster. In this example, all steps are
    returned, including completed and failed steps.

    :param cluster_id: The ID of the cluster.
    :param emr_client: The Boto3 EMR client object.
    :return: The list of steps for the specified cluster.
    """
    try:
        response = emr_client.list_steps(ClusterId=cluster_id)
        steps = response["Steps"]
        logger.info("Got %s steps for cluster %s.", len(steps), cluster_id)
    except ClientError:
        logger.exception("Couldn't get steps for cluster %s.", cluster_id)
        raise
    else:
        return steps
```
+  有关 API 的详细信息，请参阅适用[ListSteps](https://docs.aws.amazon.com/goto/boto3/elasticmapreduce-2009-03-31/ListSteps)于 *Python 的AWS SDK (Boto3) API 参考*。

------
#### [ SAP ABAP ]

**适用于 SAP ABAP 的 SDK**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/sap-abap/services/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
    TRY.
        oo_result = lo_emr->liststeps(
          iv_clusterid = iv_cluster_id
        ).
        DATA(lt_steps) = oo_result->get_steps( ).
        DATA(lv_step_count) = lines( lt_steps ).
        MESSAGE |Retrieved { lv_step_count } steps for cluster| TYPE 'I'.
      CATCH /aws1/cx_emrinternalserverex INTO DATA(lo_internal_error).
        DATA(lv_error) = lo_internal_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
      CATCH /aws1/cx_emrinvalidrequestex INTO DATA(lo_invalid_error).
        lv_error = lo_invalid_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
    ENDTRY.
```
+  有关 API 的详细信息，请参阅适用[ListSteps](https://docs.aws.amazon.com/sdk-for-sap-abap/v1/api/latest/index.html)于 S *AP 的AWS SDK ABAP API 参考*。

------

# 与 AWS SDK `RunJobFlow` 配合使用
<a name="emr_example_emr_RunJobFlow_section"></a>

以下代码示例演示如何使用 `RunJobFlow`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
def run_job_flow(
    name,
    log_uri,
    keep_alive,
    applications,
    job_flow_role,
    service_role,
    security_groups,
    steps,
    emr_client,
):
    """
    Runs a job flow with the specified steps. A job flow creates a cluster of
    instances and adds steps to be run on the cluster. Steps added to the cluster
    are run as soon as the cluster is ready.

    This example uses the 'emr-5.30.1' release. A list of recent releases can be
    found here:
        https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html.

    :param name: The name of the cluster.
    :param log_uri: The URI where logs are stored. This can be an Amazon S3 bucket URL,
                    such as 's3://my-log-bucket'.
    :param keep_alive: When True, the cluster is put into a Waiting state after all
                       steps are run. When False, the cluster terminates itself when
                       the step queue is empty.
    :param applications: The applications to install on each instance in the cluster,
                         such as Hive or Spark.
    :param job_flow_role: The IAM role assumed by the cluster.
    :param service_role: The IAM role assumed by the service.
    :param security_groups: The security groups to assign to the cluster instances.
                            Amazon EMR adds all needed rules to these groups, so
                            they can be empty if you require only the default rules.
    :param steps: The job flow steps to add to the cluster. These are run in order
                  when the cluster is ready.
    :param emr_client: The Boto3 EMR client object.
    :return: The ID of the newly created cluster.
    """
    try:
        response = emr_client.run_job_flow(
            Name=name,
            LogUri=log_uri,
            ReleaseLabel="emr-5.30.1",
            Instances={
                "MasterInstanceType": "m5.xlarge",
                "SlaveInstanceType": "m5.xlarge",
                "InstanceCount": 3,
                "KeepJobFlowAliveWhenNoSteps": keep_alive,
                "EmrManagedMasterSecurityGroup": security_groups["manager"].id,
                "EmrManagedSlaveSecurityGroup": security_groups["worker"].id,
            },
            Steps=[
                {
                    "Name": step["name"],
                    "ActionOnFailure": "CONTINUE",
                    "HadoopJarStep": {
                        "Jar": "command-runner.jar",
                        "Args": [
                            "spark-submit",
                            "--deploy-mode",
                            "cluster",
                            step["script_uri"],
                            *step["script_args"],
                        ],
                    },
                }
                for step in steps
            ],
            Applications=[{"Name": app} for app in applications],
            JobFlowRole=job_flow_role.name,
            ServiceRole=service_role.name,
            EbsRootVolumeSize=10,
            VisibleToAllUsers=True,
        )
        cluster_id = response["JobFlowId"]
        logger.info("Created cluster %s.", cluster_id)
    except ClientError:
        logger.exception("Couldn't create cluster.")
        raise
    else:
        return cluster_id
```
+  有关 API 的详细信息，请参阅适用[RunJobFlow](https://docs.aws.amazon.com/goto/boto3/elasticmapreduce-2009-03-31/RunJobFlow)于 *Python 的AWS SDK (Boto3) API 参考*。

------
#### [ SAP ABAP ]

**适用于 SAP ABAP 的 SDK**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/sap-abap/services/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
    TRY.
        " Create instances configuration
        DATA(lo_instances) = NEW /aws1/cl_emrjobflowinstsconfig(
          iv_masterinstancetype = 'm5.xlarge'
          iv_slaveinstancetype = 'm5.xlarge'
          iv_instancecount = 3
          iv_keepjobflowalivewhennos00 = iv_keep_alive
          iv_emrmanagedmastersecgroup = iv_primary_sec_grp
          iv_emrmanagedslavesecgroup = iv_secondary_sec_grp
        ).

        DATA(lo_result) = lo_emr->runjobflow(
          iv_name = iv_name
          iv_loguri = iv_log_uri
          iv_releaselabel = 'emr-5.30.1'
          io_instances = lo_instances
          it_steps = it_steps
          it_applications = it_applications
          iv_jobflowrole = iv_job_flow_role
          iv_servicerole = iv_service_role
          iv_ebsrootvolumesize = 10
          iv_visibletoallusers = abap_true
        ).

        ov_cluster_id = lo_result->get_jobflowid( ).
        MESSAGE 'EMR cluster created successfully.' TYPE 'I'.
      CATCH /aws1/cx_emrinternalservererr INTO DATA(lo_internal_error).
        DATA(lv_error) = lo_internal_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
      CATCH /aws1/cx_emrclientexc INTO DATA(lo_client_error).
        lv_error = lo_client_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
    ENDTRY.
```
+  有关 API 的详细信息，请参阅适用[RunJobFlow](https://docs.aws.amazon.com/sdk-for-sap-abap/v1/api/latest/index.html)于 S *AP 的AWS SDK ABAP API 参考*。

------

# 与 AWS SDK `TerminateJobFlows` 配合使用
<a name="emr_example_emr_TerminateJobFlows_section"></a>

以下代码示例演示如何使用 `TerminateJobFlows`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
def terminate_cluster(cluster_id, emr_client):
    """
    Terminates a cluster. This terminates all instances in the cluster and cannot
    be undone. Any data not saved elsewhere, such as in an Amazon S3 bucket, is lost.

    :param cluster_id: The ID of the cluster to terminate.
    :param emr_client: The Boto3 EMR client object.
    """
    try:
        emr_client.terminate_job_flows(JobFlowIds=[cluster_id])
        logger.info("Terminated cluster %s.", cluster_id)
    except ClientError:
        logger.exception("Couldn't terminate cluster %s.", cluster_id)
        raise
```
+  有关 API 的详细信息，请参阅适用[TerminateJobFlows](https://docs.aws.amazon.com/goto/boto3/elasticmapreduce-2009-03-31/TerminateJobFlows)于 *Python 的AWS SDK (Boto3) API 参考*。

------
#### [ SAP ABAP ]

**适用于 SAP ABAP 的 SDK**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/sap-abap/services/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
    TRY.
        DATA lt_cluster_ids TYPE /aws1/cl_emrxmlstringlist_w=>tt_xmlstringlist.
        APPEND NEW /aws1/cl_emrxmlstringlist_w( iv_cluster_id ) TO lt_cluster_ids.

        lo_emr->terminatejobflows(
          it_jobflowids = lt_cluster_ids
        ).
        MESSAGE 'EMR cluster terminated successfully.' TYPE 'I'.
      CATCH /aws1/cx_emrinternalservererr INTO DATA(lo_internal_error).
        DATA(lv_error) = lo_internal_error->if_message~get_text( ).
        MESSAGE lv_error TYPE 'E'.
    ENDTRY.
```
+  有关 API 的详细信息，请参阅适用[TerminateJobFlows](https://docs.aws.amazon.com/sdk-for-sap-abap/v1/api/latest/index.html)于 S *AP 的AWS SDK ABAP API 参考*。

------

# 使用 Amazon EMR 的场景 AWS SDKs
<a name="emr_code_examples_scenarios"></a>

以下代码示例向您展示了如何使用在 Amazon EMR 中实现常见场景。 AWS SDKs这些场景演示了如何通过调用 Amazon EMR 中的多个函数或与其他 AWS 服务结合来完成特定任务。每个场景都包含完整源代码的链接，您可以在其中找到有关如何设置和运行代码的说明。

场景以中等水平的经验为目标，可帮助您结合具体环境了解服务操作。

**Topics**
+ [创建一个短期 Amazon EMR 集群并运行一个步骤](emr_example_emr_Scenario_ShortLivedEmrCluster_section.md)
+ [运行 Shell 脚本以安装库](emr_example_emr_Usage_InstallLibrariesWithSsm_section.md)

# 创建一个短暂的 Amazon EMR 集群并使用软件开发工具包运行一个步骤 AWS
<a name="emr_example_emr_Scenario_ShortLivedEmrCluster_section"></a>

以下代码示例展示如何创建一个短期 Amazon EMR 集群，该集群将运行一个步骤，并在该步骤完成后自动终止。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 创建一个短期 Amazon EMR 集群，该集群将使用 Apache Spark 估计 pi 的值，以并行处理大量计算。该任务会将输出写入 Amazon EMR 日志中，以及 Amazon Simple Storage Service（Amazon S3）桶中。该集群将在完成该任务后自行终止。  
+ 创建 Amazon S3 桶并上传任务脚本。
+ 创建 AWS Identity and Access Management (IAM) 角色。
+ 创建 Amazon Elastic Compute Cloud (Amazon EC2) 安全组。
+ 创建一个短期集群并运行单个任务步骤。
 最好在上查看此示例 GitHub。有关如何设置和运行的完整源代码和说明，请参阅上的完整示例[GitHub](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/emr)。  

**本示例中使用的服务**
+ Amazon EMR

------

# 运行 shell 脚本，使用软件开发工具包在 Amazon EMR 实例上安装库 AWS
<a name="emr_example_emr_Usage_InstallLibrariesWithSsm_section"></a>

以下代码示例展示了 AWS Systems Manager 如何使用在安装其他库的 Amazon EMR 实例上运行 shell 脚本。这样，您就可以自动管理实例，而不必通过 SSH 连接手动运行命令。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/emr#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import argparse
import time
import boto3


def install_libraries_on_core_nodes(cluster_id, script_path, emr_client, ssm_client):
    """
    Copies and runs a shell script on the core nodes in the cluster.

    :param cluster_id: The ID of the cluster.
    :param script_path: The path to the script, typically an Amazon S3 object URL.
    :param emr_client: The Boto3 Amazon EMR client.
    :param ssm_client: The Boto3 AWS Systems Manager client.
    """
    core_nodes = emr_client.list_instances(
        ClusterId=cluster_id, InstanceGroupTypes=["CORE"]
    )["Instances"]
    core_instance_ids = [node["Ec2InstanceId"] for node in core_nodes]
    print(f"Found core instances: {core_instance_ids}.")

    commands = [
        # Copy the shell script from Amazon S3 to each node instance.
        f"aws s3 cp {script_path} /home/hadoop",
        # Run the shell script to install libraries on each node instance.
        "bash /home/hadoop/install_libraries.sh",
    ]
    for command in commands:
        print(f"Sending '{command}' to core instances...")
        command_id = ssm_client.send_command(
            InstanceIds=core_instance_ids,
            DocumentName="AWS-RunShellScript",
            Parameters={"commands": [command]},
            TimeoutSeconds=3600,
        )["Command"]["CommandId"]
        while True:
            # Verify the previous step succeeded before running the next step.
            cmd_result = ssm_client.list_commands(CommandId=command_id)["Commands"][0]
            if cmd_result["StatusDetails"] == "Success":
                print(f"Command succeeded.")
                break
            elif cmd_result["StatusDetails"] in ["Pending", "InProgress"]:
                print(f"Command status is {cmd_result['StatusDetails']}, waiting...")
                time.sleep(10)
            else:
                print(f"Command status is {cmd_result['StatusDetails']}, quitting.")
                raise RuntimeError(
                    f"Command {command} failed to run. "
                    f"Details: {cmd_result['StatusDetails']}"
                )


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("cluster_id", help="The ID of the cluster.")
    parser.add_argument("script_path", help="The path to the script in Amazon S3.")
    args = parser.parse_args()

    emr_client = boto3.client("emr")
    ssm_client = boto3.client("ssm")

    install_libraries_on_core_nodes(
        args.cluster_id, args.script_path, emr_client, ssm_client
    )


if __name__ == "__main__":
    main()
```
+  有关 API 的详细信息，请参阅适用[ListInstances](https://docs.aws.amazon.com/goto/boto3/elasticmapreduce-2009-03-31/ListInstances)于 *Python 的AWS SDK (Boto3) API 参考*。

------