EMRExemples d'utilisation d'Amazon SDK pour Python (Boto3) - Exemples de code de l'AWS SDK

D'autres AWS SDK exemples sont disponibles dans le GitHub dépôt AWS Doc SDK Examples.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

EMRExemples d'utilisation d'Amazon SDK pour Python (Boto3)

Les exemples de code suivants vous montrent comment effectuer des actions et implémenter des scénarios courants à l' AWS SDK for Python (Boto3) aide d'AmazonEMR.

Les actions sont des extraits de code de programmes plus larges et doivent être exécutées dans leur contexte. Alors que les actions vous montrent comment appeler des fonctions de service individuelles, vous pouvez les visualiser dans leur contexte dans leurs scénarios associés.

Les scénarios sont des exemples de code qui vous montrent comment accomplir des tâches spécifiques en appelant plusieurs fonctions au sein d'un service ou en les combinant à d'autres Services AWS.

Chaque exemple inclut un lien vers le code source complet, où vous trouverez des instructions sur la façon de configurer et d'exécuter le code en contexte.

Actions

L'exemple de code suivant montre comment utiliserAddJobFlowSteps.

SDKpour Python (Boto3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

Ajoutez une étape Spark, qui est exécutée par le cluster dès son ajout.

def add_step(cluster_id, name, script_uri, script_args, emr_client): """ Adds a job step to the specified cluster. This example adds a Spark step, which is run by the cluster as soon as it is added. :param cluster_id: The ID of the cluster. :param name: The name of the step. :param script_uri: The URI where the Python script is stored. :param script_args: Arguments to pass to the Python script. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly added step. """ try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[ { "Name": name, "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", script_uri, *script_args, ], }, } ], ) step_id = response["StepIds"][0] logger.info("Started step with ID %s", step_id) except ClientError: logger.exception("Couldn't start step %s with URI %s.", name, script_uri) raise else: return step_id

Exécutez une commande Amazon EMR File System (EMRFS) en tant qu'étape de travail sur un cluster. Cela peut être utilisé pour automatiser des EMRFS commandes sur un cluster au lieu d'exécuter des commandes manuellement via une SSH connexion.

import boto3 from botocore.exceptions import ClientError def add_emrfs_step(command, bucket_url, cluster_id, emr_client): """ Add an EMRFS command as a job flow step to an existing cluster. :param command: The EMRFS command to run. :param bucket_url: The URL of a bucket that contains tracking metadata. :param cluster_id: The ID of the cluster to update. :param emr_client: The Boto3 Amazon EMR client object. :return: The ID of the added job flow step. Status can be tracked by calling the emr_client.describe_step() function. """ job_flow_step = { "Name": "Example EMRFS Command Step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": ["/usr/bin/emrfs", command, bucket_url], }, } try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[job_flow_step] ) step_id = response["StepIds"][0] print(f"Added step {step_id} to cluster {cluster_id}.") except ClientError: print(f"Couldn't add a step to cluster {cluster_id}.") raise else: return step_id def usage_demo(): emr_client = boto3.client("emr") # Assumes the first waiting cluster has EMRFS enabled and has created metadata # with the default name of 'EmrFSMetadata'. cluster = emr_client.list_clusters(ClusterStates=["WAITING"])["Clusters"][0] add_emrfs_step( "sync", "s3://elasticmapreduce/samples/cloudfront", cluster["Id"], emr_client ) if __name__ == "__main__": usage_demo()
  • Pour API plus de détails, reportez-vous AddJobFlowStepsà la section AWS SDKpour la référence Python (Boto3). API

L'exemple de code suivant montre comment utiliserDescribeCluster.

SDKpour Python (Boto3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

def describe_cluster(cluster_id, emr_client): """ Gets detailed information about a cluster. :param cluster_id: The ID of the cluster to describe. :param emr_client: The Boto3 EMR client object. :return: The retrieved cluster information. """ try: response = emr_client.describe_cluster(ClusterId=cluster_id) cluster = response["Cluster"] logger.info("Got data for cluster %s.", cluster["Name"]) except ClientError: logger.exception("Couldn't get data for cluster %s.", cluster_id) raise else: return cluster
  • Pour API plus de détails, reportez-vous DescribeClusterà la section AWS SDKpour la référence Python (Boto3). API

L'exemple de code suivant montre comment utiliserDescribeStep.

SDKpour Python (Boto3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

def describe_step(cluster_id, step_id, emr_client): """ Gets detailed information about the specified step, including the current state of the step. :param cluster_id: The ID of the cluster. :param step_id: The ID of the step. :param emr_client: The Boto3 EMR client object. :return: The retrieved information about the specified step. """ try: response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id) step = response["Step"] logger.info("Got data for step %s.", step_id) except ClientError: logger.exception("Couldn't get data for step %s.", step_id) raise else: return step
  • Pour API plus de détails, reportez-vous DescribeStepà la section AWS SDKpour la référence Python (Boto3). API

L'exemple de code suivant montre comment utiliserListSteps.

SDKpour Python (Boto3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

def list_steps(cluster_id, emr_client): """ Gets a list of steps for the specified cluster. In this example, all steps are returned, including completed and failed steps. :param cluster_id: The ID of the cluster. :param emr_client: The Boto3 EMR client object. :return: The list of steps for the specified cluster. """ try: response = emr_client.list_steps(ClusterId=cluster_id) steps = response["Steps"] logger.info("Got %s steps for cluster %s.", len(steps), cluster_id) except ClientError: logger.exception("Couldn't get steps for cluster %s.", cluster_id) raise else: return steps
  • Pour API plus de détails, reportez-vous ListStepsà la section AWS SDKpour la référence Python (Boto3). API

L'exemple de code suivant montre comment utiliserRunJobFlow.

SDKpour Python (Boto3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

def run_job_flow( name, log_uri, keep_alive, applications, job_flow_role, service_role, security_groups, steps, emr_client, ): """ Runs a job flow with the specified steps. A job flow creates a cluster of instances and adds steps to be run on the cluster. Steps added to the cluster are run as soon as the cluster is ready. This example uses the 'emr-5.30.1' release. A list of recent releases can be found here: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html. :param name: The name of the cluster. :param log_uri: The URI where logs are stored. This can be an Amazon S3 bucket URL, such as 's3://my-log-bucket'. :param keep_alive: When True, the cluster is put into a Waiting state after all steps are run. When False, the cluster terminates itself when the step queue is empty. :param applications: The applications to install on each instance in the cluster, such as Hive or Spark. :param job_flow_role: The IAM role assumed by the cluster. :param service_role: The IAM role assumed by the service. :param security_groups: The security groups to assign to the cluster instances. Amazon EMR adds all needed rules to these groups, so they can be empty if you require only the default rules. :param steps: The job flow steps to add to the cluster. These are run in order when the cluster is ready. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly created cluster. """ try: response = emr_client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel="emr-5.30.1", Instances={ "MasterInstanceType": "m5.xlarge", "SlaveInstanceType": "m5.xlarge", "InstanceCount": 3, "KeepJobFlowAliveWhenNoSteps": keep_alive, "EmrManagedMasterSecurityGroup": security_groups["manager"].id, "EmrManagedSlaveSecurityGroup": security_groups["worker"].id, }, Steps=[ { "Name": step["name"], "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", step["script_uri"], *step["script_args"], ], }, } for step in steps ], Applications=[{"Name": app} for app in applications], JobFlowRole=job_flow_role.name, ServiceRole=service_role.name, EbsRootVolumeSize=10, VisibleToAllUsers=True, ) cluster_id = response["JobFlowId"] logger.info("Created cluster %s.", cluster_id) except ClientError: logger.exception("Couldn't create cluster.") raise else: return cluster_id
  • Pour API plus de détails, reportez-vous RunJobFlowà la section AWS SDKpour la référence Python (Boto3). API

L'exemple de code suivant montre comment utiliserTerminateJobFlows.

SDKpour Python (Boto3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

def terminate_cluster(cluster_id, emr_client): """ Terminates a cluster. This terminates all instances in the cluster and cannot be undone. Any data not saved elsewhere, such as in an Amazon S3 bucket, is lost. :param cluster_id: The ID of the cluster to terminate. :param emr_client: The Boto3 EMR client object. """ try: emr_client.terminate_job_flows(JobFlowIds=[cluster_id]) logger.info("Terminated cluster %s.", cluster_id) except ClientError: logger.exception("Couldn't terminate cluster %s.", cluster_id) raise
  • Pour API plus de détails, reportez-vous TerminateJobFlowsà la section AWS SDKpour la référence Python (Boto3). API

Scénarios

L'exemple de code suivant montre comment créer un EMR cluster Amazon de courte durée qui exécute une étape et s'arrête automatiquement une fois l'étape terminée.

SDKpour Python (Boto3)

Créez un EMR cluster Amazon de courte durée qui estime la valeur de pi à l'aide d'Apache Spark pour paralléliser un grand nombre de calculs. La tâche écrit le résultat dans EMR les journaux Amazon et dans un compartiment Amazon Simple Storage Service (Amazon S3). Le cluster se termine automatiquement une fois la tâche terminée.

  • Créez un compartiment Amazon S3 et chargez un script de tâche.

  • Créez AWS Identity and Access Management (IAM) des rôles.

  • Créez des groupes de sécurité Amazon Elastic Compute Cloud (AmazonEC2).

  • Créez un cluster de courte durée et exécuter une seule étape de tâche.

Il est préférable de visionner cet exemple sur GitHub. Pour obtenir le code source complet et les instructions de configuration et d'exécution, consultez l'exemple complet sur GitHub.

Les services utilisés dans cet exemple
  • Amazon EMR

L'exemple de code suivant montre comment AWS Systems Manager exécuter un script shell sur des EMR instances Amazon qui installe des bibliothèques supplémentaires. Ainsi, vous pouvez automatiser la gestion des instances au lieu d'exécuter des commandes manuellement via une SSH connexion.

SDKpour Python (Boto3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

import argparse import time import boto3 def install_libraries_on_core_nodes(cluster_id, script_path, emr_client, ssm_client): """ Copies and runs a shell script on the core nodes in the cluster. :param cluster_id: The ID of the cluster. :param script_path: The path to the script, typically an Amazon S3 object URL. :param emr_client: The Boto3 Amazon EMR client. :param ssm_client: The Boto3 AWS Systems Manager client. """ core_nodes = emr_client.list_instances( ClusterId=cluster_id, InstanceGroupTypes=["CORE"] )["Instances"] core_instance_ids = [node["Ec2InstanceId"] for node in core_nodes] print(f"Found core instances: {core_instance_ids}.") commands = [ # Copy the shell script from Amazon S3 to each node instance. f"aws s3 cp {script_path} /home/hadoop", # Run the shell script to install libraries on each node instance. "bash /home/hadoop/install_libraries.sh", ] for command in commands: print(f"Sending '{command}' to core instances...") command_id = ssm_client.send_command( InstanceIds=core_instance_ids, DocumentName="AWS-RunShellScript", Parameters={"commands": [command]}, TimeoutSeconds=3600, )["Command"]["CommandId"] while True: # Verify the previous step succeeded before running the next step. cmd_result = ssm_client.list_commands(CommandId=command_id)["Commands"][0] if cmd_result["StatusDetails"] == "Success": print(f"Command succeeded.") break elif cmd_result["StatusDetails"] in ["Pending", "InProgress"]: print(f"Command status is {cmd_result['StatusDetails']}, waiting...") time.sleep(10) else: print(f"Command status is {cmd_result['StatusDetails']}, quitting.") raise RuntimeError( f"Command {command} failed to run. " f"Details: {cmd_result['StatusDetails']}" ) def main(): parser = argparse.ArgumentParser() parser.add_argument("cluster_id", help="The ID of the cluster.") parser.add_argument("script_path", help="The path to the script in Amazon S3.") args = parser.parse_args() emr_client = boto3.client("emr") ssm_client = boto3.client("ssm") install_libraries_on_core_nodes( args.cluster_id, args.script_path, emr_client, ssm_client ) if __name__ == "__main__": main()
  • Pour API plus de détails, reportez-vous ListInstancesà la section AWS SDKpour la référence Python (Boto3). API