Hay más AWS SDK ejemplos disponibles en el GitHub repositorio de AWS Doc SDK Examples
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
EMREjemplos de Amazon que utilizan SDK Python (Boto3)
Los siguientes ejemplos de código muestran cómo realizar acciones e implementar escenarios comunes AWS SDK for Python (Boto3) mediante AmazonEMR.
Las acciones son extractos de código de programas más grandes y deben ejecutarse en contexto. Mientras las acciones muestran cómo llamar a las funciones de servicio individuales, es posible ver las acciones en contexto en los escenarios relacionados.
Los escenarios son ejemplos de código que muestran cómo llevar a cabo una tarea específica a través de llamadas a varias funciones dentro del servicio o combinado con otros Servicios de AWS.
Cada ejemplo incluye un enlace al código fuente completo, donde puede encontrar instrucciones sobre cómo configurar y ejecutar el código en su contexto.
Temas
Acciones
En el siguiente ejemplo de código se muestra cómo usar AddJobFlowSteps
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Agregue un paso de Spark, que el clúster ejecuta en cuanto se agrega.
def add_step(cluster_id, name, script_uri, script_args, emr_client): """ Adds a job step to the specified cluster. This example adds a Spark step, which is run by the cluster as soon as it is added. :param cluster_id: The ID of the cluster. :param name: The name of the step. :param script_uri: The URI where the Python script is stored. :param script_args: Arguments to pass to the Python script. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly added step. """ try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[ { "Name": name, "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", script_uri, *script_args, ], }, } ], ) step_id = response["StepIds"][0] logger.info("Started step with ID %s", step_id) except ClientError: logger.exception("Couldn't start step %s with URI %s.", name, script_uri) raise else: return step_id
Ejecute un comando de Amazon EMR File System (EMRFS) como paso de trabajo en un clúster. Esto se puede usar para automatizar EMRFS los comandos en un clúster en lugar de ejecutarlos manualmente a través de una SSH conexión.
import boto3 from botocore.exceptions import ClientError def add_emrfs_step(command, bucket_url, cluster_id, emr_client): """ Add an EMRFS command as a job flow step to an existing cluster. :param command: The EMRFS command to run. :param bucket_url: The URL of a bucket that contains tracking metadata. :param cluster_id: The ID of the cluster to update. :param emr_client: The Boto3 Amazon EMR client object. :return: The ID of the added job flow step. Status can be tracked by calling the emr_client.describe_step() function. """ job_flow_step = { "Name": "Example EMRFS Command Step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": ["/usr/bin/emrfs", command, bucket_url], }, } try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[job_flow_step] ) step_id = response["StepIds"][0] print(f"Added step {step_id} to cluster {cluster_id}.") except ClientError: print(f"Couldn't add a step to cluster {cluster_id}.") raise else: return step_id def usage_demo(): emr_client = boto3.client("emr") # Assumes the first waiting cluster has EMRFS enabled and has created metadata # with the default name of 'EmrFSMetadata'. cluster = emr_client.list_clusters(ClusterStates=["WAITING"])["Clusters"][0] add_emrfs_step( "sync", "s3://elasticmapreduce/samples/cloudfront", cluster["Id"], emr_client ) if __name__ == "__main__": usage_demo()
-
Para API obtener más información, consulte AddJobFlowStepsla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DescribeCluster
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def describe_cluster(cluster_id, emr_client): """ Gets detailed information about a cluster. :param cluster_id: The ID of the cluster to describe. :param emr_client: The Boto3 EMR client object. :return: The retrieved cluster information. """ try: response = emr_client.describe_cluster(ClusterId=cluster_id) cluster = response["Cluster"] logger.info("Got data for cluster %s.", cluster["Name"]) except ClientError: logger.exception("Couldn't get data for cluster %s.", cluster_id) raise else: return cluster
-
Para API obtener más información, consulte DescribeClusterla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DescribeStep
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def describe_step(cluster_id, step_id, emr_client): """ Gets detailed information about the specified step, including the current state of the step. :param cluster_id: The ID of the cluster. :param step_id: The ID of the step. :param emr_client: The Boto3 EMR client object. :return: The retrieved information about the specified step. """ try: response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id) step = response["Step"] logger.info("Got data for step %s.", step_id) except ClientError: logger.exception("Couldn't get data for step %s.", step_id) raise else: return step
-
Para API obtener más información, consulte DescribeStepla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar ListSteps
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def list_steps(cluster_id, emr_client): """ Gets a list of steps for the specified cluster. In this example, all steps are returned, including completed and failed steps. :param cluster_id: The ID of the cluster. :param emr_client: The Boto3 EMR client object. :return: The list of steps for the specified cluster. """ try: response = emr_client.list_steps(ClusterId=cluster_id) steps = response["Steps"] logger.info("Got %s steps for cluster %s.", len(steps), cluster_id) except ClientError: logger.exception("Couldn't get steps for cluster %s.", cluster_id) raise else: return steps
-
Para API obtener más información, consulte ListStepsla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar RunJobFlow
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def run_job_flow( name, log_uri, keep_alive, applications, job_flow_role, service_role, security_groups, steps, emr_client, ): """ Runs a job flow with the specified steps. A job flow creates a cluster of instances and adds steps to be run on the cluster. Steps added to the cluster are run as soon as the cluster is ready. This example uses the 'emr-5.30.1' release. A list of recent releases can be found here: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html. :param name: The name of the cluster. :param log_uri: The URI where logs are stored. This can be an Amazon S3 bucket URL, such as 's3://my-log-bucket'. :param keep_alive: When True, the cluster is put into a Waiting state after all steps are run. When False, the cluster terminates itself when the step queue is empty. :param applications: The applications to install on each instance in the cluster, such as Hive or Spark. :param job_flow_role: The IAM role assumed by the cluster. :param service_role: The IAM role assumed by the service. :param security_groups: The security groups to assign to the cluster instances. Amazon EMR adds all needed rules to these groups, so they can be empty if you require only the default rules. :param steps: The job flow steps to add to the cluster. These are run in order when the cluster is ready. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly created cluster. """ try: response = emr_client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel="emr-5.30.1", Instances={ "MasterInstanceType": "m5.xlarge", "SlaveInstanceType": "m5.xlarge", "InstanceCount": 3, "KeepJobFlowAliveWhenNoSteps": keep_alive, "EmrManagedMasterSecurityGroup": security_groups["manager"].id, "EmrManagedSlaveSecurityGroup": security_groups["worker"].id, }, Steps=[ { "Name": step["name"], "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", step["script_uri"], *step["script_args"], ], }, } for step in steps ], Applications=[{"Name": app} for app in applications], JobFlowRole=job_flow_role.name, ServiceRole=service_role.name, EbsRootVolumeSize=10, VisibleToAllUsers=True, ) cluster_id = response["JobFlowId"] logger.info("Created cluster %s.", cluster_id) except ClientError: logger.exception("Couldn't create cluster.") raise else: return cluster_id
-
Para API obtener más información, consulte RunJobFlowla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar TerminateJobFlows
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. def terminate_cluster(cluster_id, emr_client): """ Terminates a cluster. This terminates all instances in the cluster and cannot be undone. Any data not saved elsewhere, such as in an Amazon S3 bucket, is lost. :param cluster_id: The ID of the cluster to terminate. :param emr_client: The Boto3 EMR client object. """ try: emr_client.terminate_job_flows(JobFlowIds=[cluster_id]) logger.info("Terminated cluster %s.", cluster_id) except ClientError: logger.exception("Couldn't terminate cluster %s.", cluster_id) raise
-
Para API obtener más información, consulte TerminateJobFlowsla AWS SDKreferencia de Python (Boto3). API
-
Escenarios
El siguiente ejemplo de código muestra cómo crear un EMR clúster de Amazon de corta duración que ejecute un paso y finalice automáticamente una vez finalizado el paso.
- SDKpara Python (Boto3)
-
Cree un EMR clúster de Amazon de corta duración que estime el valor de pi con Apache Spark para paralelizar una gran cantidad de cálculos. El trabajo escribe los resultados en EMR los registros de Amazon y en un bucket de Amazon Simple Storage Service (Amazon S3). El clúster se termina solo después de finalizar el trabajo.
Crear un bucket de Amazon S3 y cargar un script de trabajo.
Funciones de Create AWS Identity and Access Management (IAM).
Cree grupos de seguridad de Amazon Elastic Compute Cloud (AmazonEC2).
Crear un clúster de corta duración y ejecutar un solo paso de trabajo.
Este ejemplo se ve mejor en GitHub. Para obtener el código fuente completo y las instrucciones sobre cómo configurarlo y ejecutarlo, consulte el ejemplo completo en GitHub
. Servicios utilizados en este ejemplo
Amazon EMR
El siguiente ejemplo de código muestra AWS Systems Manager cómo ejecutar un script de shell en EMR instancias de Amazon que instala bibliotecas adicionales. De esta forma, puedes automatizar la administración de instancias en lugar de ejecutar los comandos manualmente a través de una SSH conexión.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import argparse import time import boto3 def install_libraries_on_core_nodes(cluster_id, script_path, emr_client, ssm_client): """ Copies and runs a shell script on the core nodes in the cluster. :param cluster_id: The ID of the cluster. :param script_path: The path to the script, typically an Amazon S3 object URL. :param emr_client: The Boto3 Amazon EMR client. :param ssm_client: The Boto3 AWS Systems Manager client. """ core_nodes = emr_client.list_instances( ClusterId=cluster_id, InstanceGroupTypes=["CORE"] )["Instances"] core_instance_ids = [node["Ec2InstanceId"] for node in core_nodes] print(f"Found core instances: {core_instance_ids}.") commands = [ # Copy the shell script from Amazon S3 to each node instance. f"aws s3 cp {script_path} /home/hadoop", # Run the shell script to install libraries on each node instance. "bash /home/hadoop/install_libraries.sh", ] for command in commands: print(f"Sending '{command}' to core instances...") command_id = ssm_client.send_command( InstanceIds=core_instance_ids, DocumentName="AWS-RunShellScript", Parameters={"commands": [command]}, TimeoutSeconds=3600, )["Command"]["CommandId"] while True: # Verify the previous step succeeded before running the next step. cmd_result = ssm_client.list_commands(CommandId=command_id)["Commands"][0] if cmd_result["StatusDetails"] == "Success": print(f"Command succeeded.") break elif cmd_result["StatusDetails"] in ["Pending", "InProgress"]: print(f"Command status is {cmd_result['StatusDetails']}, waiting...") time.sleep(10) else: print(f"Command status is {cmd_result['StatusDetails']}, quitting.") raise RuntimeError( f"Command {command} failed to run. " f"Details: {cmd_result['StatusDetails']}" ) def main(): parser = argparse.ArgumentParser() parser.add_argument("cluster_id", help="The ID of the cluster.") parser.add_argument("script_path", help="The path to the script in Amazon S3.") args = parser.parse_args() emr_client = boto3.client("emr") ssm_client = boto3.client("ssm") install_libraries_on_core_nodes( args.cluster_id, args.script_path, emr_client, ssm_client ) if __name__ == "__main__": main()
-
Para API obtener más información, consulte ListInstancesla AWS SDKreferencia de Python (Boto3). API
-