Ada lebih banyak AWS SDK contoh yang tersedia di GitHub repo SDKContoh AWS Dokumen
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
EMRContoh Amazon menggunakan SDK untuk Python (Boto3)
Contoh kode berikut menunjukkan cara melakukan tindakan dan mengimplementasikan skenario umum dengan menggunakan AWS SDK for Python (Boto3) with AmazonEMR.
Tindakan adalah kutipan kode dari program yang lebih besar dan harus dijalankan dalam konteks. Sementara tindakan menunjukkan cara memanggil fungsi layanan individual, Anda dapat melihat tindakan dalam konteks dalam skenario terkait.
Skenario adalah contoh kode yang menunjukkan kepada Anda bagaimana menyelesaikan tugas tertentu dengan memanggil beberapa fungsi dalam layanan atau dikombinasikan dengan yang lain Layanan AWS.
Setiap contoh menyertakan tautan ke kode sumber lengkap, di mana Anda dapat menemukan instruksi tentang cara mengatur dan menjalankan kode dalam konteks.
Tindakan
Contoh kode berikut menunjukkan cara menggunakanAddJobFlowSteps
.
- SDKuntuk Python (Boto3)
-
catatan
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS
. Tambahkan langkah Spark, yang dijalankan oleh cluster segera setelah ditambahkan.
def add_step(cluster_id, name, script_uri, script_args, emr_client): """ Adds a job step to the specified cluster. This example adds a Spark step, which is run by the cluster as soon as it is added. :param cluster_id: The ID of the cluster. :param name: The name of the step. :param script_uri: The URI where the Python script is stored. :param script_args: Arguments to pass to the Python script. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly added step. """ try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[ { "Name": name, "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", script_uri, *script_args, ], }, } ], ) step_id = response["StepIds"][0] logger.info("Started step with ID %s", step_id) except ClientError: logger.exception("Couldn't start step %s with URI %s.", name, script_uri) raise else: return step_id
Jalankan perintah Amazon EMR File System (EMRFS) sebagai langkah pekerjaan pada cluster. Ini dapat digunakan untuk mengotomatiskan EMRFS perintah pada cluster alih-alih menjalankan perintah secara manual melalui SSH koneksi.
import boto3 from botocore.exceptions import ClientError def add_emrfs_step(command, bucket_url, cluster_id, emr_client): """ Add an EMRFS command as a job flow step to an existing cluster. :param command: The EMRFS command to run. :param bucket_url: The URL of a bucket that contains tracking metadata. :param cluster_id: The ID of the cluster to update. :param emr_client: The Boto3 Amazon EMR client object. :return: The ID of the added job flow step. Status can be tracked by calling the emr_client.describe_step() function. """ job_flow_step = { "Name": "Example EMRFS Command Step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": ["/usr/bin/emrfs", command, bucket_url], }, } try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[job_flow_step] ) step_id = response["StepIds"][0] print(f"Added step {step_id} to cluster {cluster_id}.") except ClientError: print(f"Couldn't add a step to cluster {cluster_id}.") raise else: return step_id def usage_demo(): emr_client = boto3.client("emr") # Assumes the first waiting cluster has EMRFS enabled and has created metadata # with the default name of 'EmrFSMetadata'. cluster = emr_client.list_clusters(ClusterStates=["WAITING"])["Clusters"][0] add_emrfs_step( "sync", "s3://elasticmapreduce/samples/cloudfront", cluster["Id"], emr_client ) if __name__ == "__main__": usage_demo()
-
Untuk API detailnya, lihat AddJobFlowSteps AWSSDKReferensi Python (Boto3). API
-
Contoh kode berikut menunjukkan cara menggunakanDescribeCluster
.
- SDKuntuk Python (Boto3)
-
catatan
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS
. def describe_cluster(cluster_id, emr_client): """ Gets detailed information about a cluster. :param cluster_id: The ID of the cluster to describe. :param emr_client: The Boto3 EMR client object. :return: The retrieved cluster information. """ try: response = emr_client.describe_cluster(ClusterId=cluster_id) cluster = response["Cluster"] logger.info("Got data for cluster %s.", cluster["Name"]) except ClientError: logger.exception("Couldn't get data for cluster %s.", cluster_id) raise else: return cluster
-
Untuk API detailnya, lihat DescribeCluster AWSSDKReferensi Python (Boto3). API
-
Contoh kode berikut menunjukkan cara menggunakanDescribeStep
.
- SDKuntuk Python (Boto3)
-
catatan
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS
. def describe_step(cluster_id, step_id, emr_client): """ Gets detailed information about the specified step, including the current state of the step. :param cluster_id: The ID of the cluster. :param step_id: The ID of the step. :param emr_client: The Boto3 EMR client object. :return: The retrieved information about the specified step. """ try: response = emr_client.describe_step(ClusterId=cluster_id, StepId=step_id) step = response["Step"] logger.info("Got data for step %s.", step_id) except ClientError: logger.exception("Couldn't get data for step %s.", step_id) raise else: return step
-
Untuk API detailnya, lihat DescribeStep AWSSDKReferensi Python (Boto3). API
-
Contoh kode berikut menunjukkan cara menggunakanListSteps
.
- SDKuntuk Python (Boto3)
-
catatan
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS
. def list_steps(cluster_id, emr_client): """ Gets a list of steps for the specified cluster. In this example, all steps are returned, including completed and failed steps. :param cluster_id: The ID of the cluster. :param emr_client: The Boto3 EMR client object. :return: The list of steps for the specified cluster. """ try: response = emr_client.list_steps(ClusterId=cluster_id) steps = response["Steps"] logger.info("Got %s steps for cluster %s.", len(steps), cluster_id) except ClientError: logger.exception("Couldn't get steps for cluster %s.", cluster_id) raise else: return steps
-
Untuk API detailnya, lihat ListSteps AWSSDKReferensi Python (Boto3). API
-
Contoh kode berikut menunjukkan cara menggunakanRunJobFlow
.
- SDKuntuk Python (Boto3)
-
catatan
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS
. def run_job_flow( name, log_uri, keep_alive, applications, job_flow_role, service_role, security_groups, steps, emr_client, ): """ Runs a job flow with the specified steps. A job flow creates a cluster of instances and adds steps to be run on the cluster. Steps added to the cluster are run as soon as the cluster is ready. This example uses the 'emr-5.30.1' release. A list of recent releases can be found here: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html. :param name: The name of the cluster. :param log_uri: The URI where logs are stored. This can be an Amazon S3 bucket URL, such as 's3://my-log-bucket'. :param keep_alive: When True, the cluster is put into a Waiting state after all steps are run. When False, the cluster terminates itself when the step queue is empty. :param applications: The applications to install on each instance in the cluster, such as Hive or Spark. :param job_flow_role: The IAM role assumed by the cluster. :param service_role: The IAM role assumed by the service. :param security_groups: The security groups to assign to the cluster instances. Amazon EMR adds all needed rules to these groups, so they can be empty if you require only the default rules. :param steps: The job flow steps to add to the cluster. These are run in order when the cluster is ready. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly created cluster. """ try: response = emr_client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel="emr-5.30.1", Instances={ "MasterInstanceType": "m5.xlarge", "SlaveInstanceType": "m5.xlarge", "InstanceCount": 3, "KeepJobFlowAliveWhenNoSteps": keep_alive, "EmrManagedMasterSecurityGroup": security_groups["manager"].id, "EmrManagedSlaveSecurityGroup": security_groups["worker"].id, }, Steps=[ { "Name": step["name"], "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", step["script_uri"], *step["script_args"], ], }, } for step in steps ], Applications=[{"Name": app} for app in applications], JobFlowRole=job_flow_role.name, ServiceRole=service_role.name, EbsRootVolumeSize=10, VisibleToAllUsers=True, ) cluster_id = response["JobFlowId"] logger.info("Created cluster %s.", cluster_id) except ClientError: logger.exception("Couldn't create cluster.") raise else: return cluster_id
-
Untuk API detailnya, lihat RunJobFlow AWSSDKReferensi Python (Boto3). API
-
Contoh kode berikut menunjukkan cara menggunakanTerminateJobFlows
.
- SDKuntuk Python (Boto3)
-
catatan
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS
. def terminate_cluster(cluster_id, emr_client): """ Terminates a cluster. This terminates all instances in the cluster and cannot be undone. Any data not saved elsewhere, such as in an Amazon S3 bucket, is lost. :param cluster_id: The ID of the cluster to terminate. :param emr_client: The Boto3 EMR client object. """ try: emr_client.terminate_job_flows(JobFlowIds=[cluster_id]) logger.info("Terminated cluster %s.", cluster_id) except ClientError: logger.exception("Couldn't terminate cluster %s.", cluster_id) raise
-
Untuk API detailnya, lihat TerminateJobFlows AWSSDKReferensi Python (Boto3). API
-
Skenario
Contoh kode berikut menunjukkan cara membuat EMR klaster Amazon berumur pendek yang menjalankan langkah dan secara otomatis berakhir setelah langkah selesai.
- SDKuntuk Python (Boto3)
-
Buat EMR cluster Amazon berumur pendek yang memperkirakan nilai pi menggunakan Apache Spark untuk memparalelkan sejumlah besar perhitungan. Pekerjaan tersebut menulis output ke EMR log Amazon dan ke bucket Amazon Simple Storage Service (Amazon S3). Cluster mengakhiri dirinya sendiri setelah menyelesaikan pekerjaan.
Buat bucket Amazon S3 dan unggah skrip pekerjaan.
Buat AWS Identity and Access Management (IAM) peran.
Buat grup keamanan Amazon Elastic Compute Cloud (AmazonEC2).
Buat cluster berumur pendek dan jalankan satu langkah pekerjaan.
Contoh ini paling baik dilihat di GitHub. Untuk kode sumber lengkap dan instruksi tentang cara mengatur dan menjalankan, lihat contoh lengkapnya di GitHub
. Layanan yang digunakan dalam contoh ini
Amazon EMR
Contoh kode berikut menunjukkan AWS Systems Manager cara menggunakan skrip shell pada EMR instance Amazon yang menginstal pustaka tambahan. Dengan cara ini, Anda dapat mengotomatiskan manajemen instance alih-alih menjalankan perintah secara manual melalui SSH koneksi.
- SDKuntuk Python (Boto3)
-
catatan
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS
. import argparse import time import boto3 def install_libraries_on_core_nodes(cluster_id, script_path, emr_client, ssm_client): """ Copies and runs a shell script on the core nodes in the cluster. :param cluster_id: The ID of the cluster. :param script_path: The path to the script, typically an Amazon S3 object URL. :param emr_client: The Boto3 Amazon EMR client. :param ssm_client: The Boto3 AWS Systems Manager client. """ core_nodes = emr_client.list_instances( ClusterId=cluster_id, InstanceGroupTypes=["CORE"] )["Instances"] core_instance_ids = [node["Ec2InstanceId"] for node in core_nodes] print(f"Found core instances: {core_instance_ids}.") commands = [ # Copy the shell script from Amazon S3 to each node instance. f"aws s3 cp {script_path} /home/hadoop", # Run the shell script to install libraries on each node instance. "bash /home/hadoop/install_libraries.sh", ] for command in commands: print(f"Sending '{command}' to core instances...") command_id = ssm_client.send_command( InstanceIds=core_instance_ids, DocumentName="AWS-RunShellScript", Parameters={"commands": [command]}, TimeoutSeconds=3600, )["Command"]["CommandId"] while True: # Verify the previous step succeeded before running the next step. cmd_result = ssm_client.list_commands(CommandId=command_id)["Commands"][0] if cmd_result["StatusDetails"] == "Success": print(f"Command succeeded.") break elif cmd_result["StatusDetails"] in ["Pending", "InProgress"]: print(f"Command status is {cmd_result['StatusDetails']}, waiting...") time.sleep(10) else: print(f"Command status is {cmd_result['StatusDetails']}, quitting.") raise RuntimeError( f"Command {command} failed to run. " f"Details: {cmd_result['StatusDetails']}" ) def main(): parser = argparse.ArgumentParser() parser.add_argument("cluster_id", help="The ID of the cluster.") parser.add_argument("script_path", help="The path to the script in Amazon S3.") args = parser.parse_args() emr_client = boto3.client("emr") ssm_client = boto3.client("ssm") install_libraries_on_core_nodes( args.cluster_id, args.script_path, emr_client, ssm_client ) if __name__ == "__main__": main()
-
Untuk API detailnya, lihat ListInstances AWSSDKReferensi Python (Boto3). API
-