AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
AWS Glue 用SDK于 Python 的示例 (Boto3)
以下代码示例向您展示了如何使用with来执行操作和实现常见场景 AWS Glue。 AWS SDK for Python (Boto3)
基础知识是向您展示如何在服务中执行基本操作的代码示例。
操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。
每个示例都包含一个指向完整源代码的链接,您可以在其中找到有关如何在上下文中设置和运行代码的说明。
开始使用
以下代码示例展示了如何开始使用 AWS Glue。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 import boto3 from botocore.exceptions import ClientError def hello_glue(): """ Lists the job definitions in your AWS Glue account, using the AWS SDK for Python (Boto3). """ try: # Create the Glue client glue = boto3.client("glue") # List the jobs, limiting the results to 10 per page paginator = glue.get_paginator("get_jobs") response_iterator = paginator.paginate( PaginationConfig={"MaxItems": 10, "PageSize": 10} ) # Print the job names print("Here are the jobs in your account:") for page in response_iterator: for job in page["Jobs"]: print(f"\t{job['Name']}") except ClientError as e: print(f"Error: {e}") if __name__ == "__main__": hello_glue()
-
有关API详细信息,请参阅ListJobs中的 AWS SDKPython (Boto3) API 参考。
-
基础知识
以下代码示例展示了如何:
创建一个爬虫来抓取公有 Amazon S3 存储桶并生成格式CSV元数据的数据库。
列出您的中的数据库和表的相关信息 AWS Glue Data Catalog。
创建任务以从 S3 存储桶中提取CSV数据、转换数据,并将JSON格式化的输出加载到另一个 S3 存储桶中。
列出有关作业运行的信息,查看转换后的数据,并清除资源。
有关更多信息,请参阅教程: AWS Glue Studio 入门。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 创建一个封装场景中使用的 AWS Glue 函数的类。
class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_crawler(self, name): """ Gets information about a crawler. :param name: The name of the crawler to look up. :return: Data about the crawler. """ crawler = None try: response = self.glue_client.get_crawler(Name=name) crawler = response["Crawler"] except ClientError as err: if err.response["Error"]["Code"] == "EntityNotFoundException": logger.info("Crawler %s doesn't exist.", name) else: logger.error( "Couldn't get crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return crawler def create_crawler(self, name, role_arn, db_name, db_prefix, s3_target): """ Creates a crawler that can crawl the specified target and populate a database in your AWS Glue Data Catalog with metadata that describes the data in the target. :param name: The name of the crawler. :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. :param db_name: The name to give the database that is created by the crawler. :param db_prefix: The prefix to give any database tables that are created by the crawler. :param s3_target: The URL to an S3 bucket that contains data that is the target of the crawler. """ try: self.glue_client.create_crawler( Name=name, Role=role_arn, DatabaseName=db_name, TablePrefix=db_prefix, Targets={"S3Targets": [{"Path": s3_target}]}, ) except ClientError as err: logger.error( "Couldn't create crawler. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_crawler(self, name): """ Starts a crawler. The crawler crawls its configured target and creates metadata that describes the data it finds in the target data source. :param name: The name of the crawler to start. """ try: self.glue_client.start_crawler(Name=name) except ClientError as err: logger.error( "Couldn't start crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_database(self, name): """ Gets information about a database in your Data Catalog. :param name: The name of the database to look up. :return: Information about the database. """ try: response = self.glue_client.get_database(Name=name) except ClientError as err: logger.error( "Couldn't get database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Database"] def get_tables(self, db_name): """ Gets a list of tables in a Data Catalog database. :param db_name: The name of the database to query. :return: The list of tables in the database. """ try: response = self.glue_client.get_tables(DatabaseName=db_name) except ClientError as err: logger.error( "Couldn't get tables %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["TableList"] def create_job(self, name, description, role_arn, script_location): """ Creates a job definition for an extract, transform, and load (ETL) job that can be run by AWS Glue. :param name: The name of the job definition. :param description: The description of the job definition. :param role_arn: The ARN of an IAM role that grants AWS Glue the permissions it requires to run the job. :param script_location: The Amazon S3 URL of a Python ETL script that is run as part of the job. The script defines how the data is transformed. """ try: self.glue_client.create_job( Name=name, Description=description, Role=role_arn, Command={ "Name": "glueetl", "ScriptLocation": script_location, "PythonVersion": "3", }, GlueVersion="3.0", ) except ClientError as err: logger.error( "Couldn't create job %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_job_run(self, name, input_database, input_table, output_bucket_name): """ Starts a job run. A job run extracts data from the source, transforms it, and loads it to the output bucket. :param name: The name of the job definition. :param input_database: The name of the metadata database that contains tables that describe the source data. This is typically created by a crawler. :param input_table: The name of the table in the metadata database that describes the source data. :param output_bucket_name: The S3 bucket where the output is written. :return: The ID of the job run. """ try: # The custom Arguments that are passed to this function are used by the # Python ETL script to determine the location of input and output data. response = self.glue_client.start_job_run( JobName=name, Arguments={ "--input_database": input_database, "--input_table": input_table, "--output_bucket_url": f"s3://{output_bucket_name}/", }, ) except ClientError as err: logger.error( "Couldn't start job run %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRunId"] def list_jobs(self): """ Lists the names of job definitions in your account. :return: The list of job definition names. """ try: response = self.glue_client.list_jobs() except ClientError as err: logger.error( "Couldn't list jobs. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobNames"] def get_job_runs(self, job_name): """ Gets information about runs that have been performed for a specific job definition. :param job_name: The name of the job definition to look up. :return: The list of job runs. """ try: response = self.glue_client.get_job_runs(JobName=job_name) except ClientError as err: logger.error( "Couldn't get job runs for %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRuns"] def get_job_run(self, name, run_id): """ Gets information about a single job run. :param name: The name of the job definition for the run. :param run_id: The ID of the run. :return: Information about the run. """ try: response = self.glue_client.get_job_run(JobName=name, RunId=run_id) except ClientError as err: logger.error( "Couldn't get job run %s/%s. Here's why: %s: %s", name, run_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRun"] def delete_job(self, job_name): """ Deletes a job definition. This also deletes data about all runs that are associated with this job definition. :param job_name: The name of the job definition to delete. """ try: self.glue_client.delete_job(JobName=job_name) except ClientError as err: logger.error( "Couldn't delete job %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_table(self, db_name, table_name): """ Deletes a table from a metadata database. :param db_name: The name of the database that contains the table. :param table_name: The name of the table to delete. """ try: self.glue_client.delete_table(DatabaseName=db_name, Name=table_name) except ClientError as err: logger.error( "Couldn't delete table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_database(self, name): """ Deletes a metadata database from your Data Catalog. :param name: The name of the database to delete. """ try: self.glue_client.delete_database(Name=name) except ClientError as err: logger.error( "Couldn't delete database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_crawler(self, name): """ Deletes a crawler. :param name: The name of the crawler to delete. """ try: self.glue_client.delete_crawler(Name=name) except ClientError as err: logger.error( "Couldn't delete crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
创建运行场景的类。
class GlueCrawlerJobScenario: """ Encapsulates a scenario that shows how to create an AWS Glue crawler and job and use them to transform data from CSV to JSON format. """ def __init__(self, glue_client, glue_service_role, glue_bucket): """ :param glue_client: A Boto3 AWS Glue client. :param glue_service_role: An AWS Identity and Access Management (IAM) role that AWS Glue can assume to gain access to the resources it requires. :param glue_bucket: An S3 bucket that can hold a job script and output data from AWS Glue job runs. """ self.glue_client = glue_client self.glue_service_role = glue_service_role self.glue_bucket = glue_bucket @staticmethod def wait(seconds, tick=12): """ Waits for a specified number of seconds, while also displaying an animated spinner. :param seconds: The number of seconds to wait. :param tick: The number of frames per second used to animate the spinner. """ progress = "|/-\\" waited = 0 while waited < seconds: for frame in range(tick): sys.stdout.write(f"\r{progress[frame % len(progress)]}") sys.stdout.flush() time.sleep(1 / tick) waited += 1 def upload_job_script(self, job_script): """ Uploads a Python ETL script to an S3 bucket. The script is used by the AWS Glue job to transform data. :param job_script: The relative path to the job script. """ try: self.glue_bucket.upload_file(Filename=job_script, Key=job_script) print(f"Uploaded job script '{job_script}' to the example bucket.") except S3UploadFailedError as err: logger.error("Couldn't upload job script. Here's why: %s", err) raise def run(self, crawler_name, db_name, db_prefix, data_source, job_script, job_name): """ Runs the scenario. This is an interactive experience that runs at a command prompt and asks you for input throughout. :param crawler_name: The name of the crawler used in the scenario. If the crawler does not exist, it is created. :param db_name: The name to give the metadata database created by the crawler. :param db_prefix: The prefix to give tables added to the database by the crawler. :param data_source: The location of the data source that is targeted by the crawler and extracted during job runs. :param job_script: The job script that is used to transform data during job runs. :param job_name: The name to give the job definition that is created during the scenario. """ wrapper = GlueWrapper(self.glue_client) print(f"Checking for crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) if crawler is None: print(f"Creating crawler {crawler_name}.") wrapper.create_crawler( crawler_name, self.glue_service_role.arn, db_name, db_prefix, data_source, ) print(f"Created crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) pprint(crawler) print("-" * 88) print( f"When you run the crawler, it crawls data stored in {data_source} and " f"creates a metadata database in the AWS Glue Data Catalog that describes " f"the data in the data source." ) print("In this example, the source data is in CSV format.") ready = False while not ready: ready = Question.ask_question( "Ready to start the crawler? (y/n) ", Question.is_yesno ) wrapper.start_crawler(crawler_name) print("Let's wait for the crawler to run. This typically takes a few minutes.") crawler_state = None while crawler_state != "READY": self.wait(10) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler["State"] print(f"Crawler is {crawler['State']}.") print("-" * 88) database = wrapper.get_database(db_name) print(f"The crawler created database {db_name}:") pprint(database) print(f"The database contains these tables:") tables = wrapper.get_tables(db_name) for index, table in enumerate(tables): print(f"\t{index + 1}. {table['Name']}") table_index = Question.ask_question( f"Enter the number of a table to see more detail: ", Question.is_int, Question.in_range(1, len(tables)), ) pprint(tables[table_index - 1]) print("-" * 88) print(f"Creating job definition {job_name}.") wrapper.create_job( job_name, "Getting started example job.", self.glue_service_role.arn, f"s3://{self.glue_bucket.name}/{job_script}", ) print("Created job definition.") print( f"When you run the job, it extracts data from {data_source}, transforms it " f"by using the {job_script} script, and loads the output into " f"S3 bucket {self.glue_bucket.name}." ) print( "In this example, the data is transformed from CSV to JSON, and only a few " "fields are included in the output." ) job_run_status = None if Question.ask_question(f"Ready to run? (y/n) ", Question.is_yesno): job_run_id = wrapper.start_job_run( job_name, db_name, tables[0]["Name"], self.glue_bucket.name ) print(f"Job {job_name} started. Let's wait for it to run.") while job_run_status not in ["SUCCEEDED", "STOPPED", "FAILED", "TIMEOUT"]: self.wait(10) job_run = wrapper.get_job_run(job_name, job_run_id) job_run_status = job_run["JobRunState"] print(f"Job {job_name}/{job_run_id} is {job_run_status}.") print("-" * 88) if job_run_status == "SUCCEEDED": print( f"Data from your job run is stored in your S3 bucket '{self.glue_bucket.name}':" ) try: keys = [ obj.key for obj in self.glue_bucket.objects.filter(Prefix="run-") ] for index, key in enumerate(keys): print(f"\t{index + 1}: {key}") lines = 4 key_index = Question.ask_question( f"Enter the number of a block to download it and see the first {lines} " f"lines of JSON output in the block: ", Question.is_int, Question.in_range(1, len(keys)), ) job_data = io.BytesIO() self.glue_bucket.download_fileobj(keys[key_index - 1], job_data) job_data.seek(0) for _ in range(lines): print(job_data.readline().decode("utf-8")) except ClientError as err: logger.error( "Couldn't get job run data. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise print("-" * 88) job_names = wrapper.list_jobs() if job_names: print(f"Your account has {len(job_names)} jobs defined:") for index, job_name in enumerate(job_names): print(f"\t{index + 1}. {job_name}") job_index = Question.ask_question( f"Enter a number between 1 and {len(job_names)} to see the list of runs for " f"a job: ", Question.is_int, Question.in_range(1, len(job_names)), ) job_runs = wrapper.get_job_runs(job_names[job_index - 1]) if job_runs: print(f"Found {len(job_runs)} runs for job {job_names[job_index - 1]}:") for index, job_run in enumerate(job_runs): print( f"\t{index + 1}. {job_run['JobRunState']} on " f"{job_run['CompletedOn']:%Y-%m-%d %H:%M:%S}" ) run_index = Question.ask_question( f"Enter a number between 1 and {len(job_runs)} to see details for a run: ", Question.is_int, Question.in_range(1, len(job_runs)), ) pprint(job_runs[run_index - 1]) else: print(f"No runs found for job {job_names[job_index - 1]}") else: print("Your account doesn't have any jobs defined.") print("-" * 88) print( f"Let's clean up. During this example we created job definition '{job_name}'." ) if Question.ask_question( "Do you want to delete the definition and all runs? (y/n) ", Question.is_yesno, ): wrapper.delete_job(job_name) print(f"Job definition '{job_name}' deleted.") tables = wrapper.get_tables(db_name) print(f"We also created database '{db_name}' that contains these tables:") for table in tables: print(f"\t{table['Name']}") if Question.ask_question( "Do you want to delete the tables and the database? (y/n) ", Question.is_yesno, ): for table in tables: wrapper.delete_table(db_name, table["Name"]) print(f"Deleted table {table['Name']}.") wrapper.delete_database(db_name) print(f"Deleted database {db_name}.") print(f"We also created crawler '{crawler_name}'.") if Question.ask_question( "Do you want to delete the crawler? (y/n) ", Question.is_yesno ): wrapper.delete_crawler(crawler_name) print(f"Deleted crawler {crawler_name}.") print("-" * 88) def parse_args(args): """ Parse command line arguments. :param args: The command line arguments. :return: The parsed arguments. """ parser = argparse.ArgumentParser( description="Runs the AWS Glue getting started with crawlers and jobs scenario. " "Before you run this scenario, set up scaffold resources by running " "'python scaffold.py deploy'." ) parser.add_argument( "role_name", help="The name of an IAM role that AWS Glue can assume. This role must grant access " "to Amazon S3 and to the permissions granted by the AWSGlueServiceRole " "managed policy.", ) parser.add_argument( "bucket_name", help="The name of an S3 bucket that AWS Glue can access to get the job script and " "put job results.", ) parser.add_argument( "--job_script", default="flight_etl_job_script.py", help="The name of the job script file that is used in the scenario.", ) return parser.parse_args(args) def main(): args = parse_args(sys.argv[1:]) try: print("-" * 88) print( "Welcome to the AWS Glue getting started with crawlers and jobs scenario." ) print("-" * 88) scenario = GlueCrawlerJobScenario( boto3.client("glue"), boto3.resource("iam").Role(args.role_name), boto3.resource("s3").Bucket(args.bucket_name), ) scenario.upload_job_script(args.job_script) scenario.run( "doc-example-crawler", "doc-example-database", "doc-example-", "s3://crawler-public-us-east-1/flight/2016/csv", args.job_script, "doc-example-job", ) print("-" * 88) print( "To destroy scaffold resources, including the IAM role and S3 bucket " "used in this scenario, run 'python scaffold.py destroy'." ) print("\nThanks for watching!") print("-" * 88) except Exception: logging.exception("Something went wrong with the example.")
创建用于在作业运行期间 AWS Glue 提取、转换和加载数据的ETL脚本。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
-
有关API详细信息,请参阅 Python (Boto3) API 参考中的AWS SDK以下主题。
-
操作
以下代码示例演示如何使用 CreateCrawler
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def create_crawler(self, name, role_arn, db_name, db_prefix, s3_target): """ Creates a crawler that can crawl the specified target and populate a database in your AWS Glue Data Catalog with metadata that describes the data in the target. :param name: The name of the crawler. :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. :param db_name: The name to give the database that is created by the crawler. :param db_prefix: The prefix to give any database tables that are created by the crawler. :param s3_target: The URL to an S3 bucket that contains data that is the target of the crawler. """ try: self.glue_client.create_crawler( Name=name, Role=role_arn, DatabaseName=db_name, TablePrefix=db_prefix, Targets={"S3Targets": [{"Path": s3_target}]}, ) except ClientError as err: logger.error( "Couldn't create crawler. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
有关API详细信息,请参阅CreateCrawler中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 CreateJob
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def create_job(self, name, description, role_arn, script_location): """ Creates a job definition for an extract, transform, and load (ETL) job that can be run by AWS Glue. :param name: The name of the job definition. :param description: The description of the job definition. :param role_arn: The ARN of an IAM role that grants AWS Glue the permissions it requires to run the job. :param script_location: The Amazon S3 URL of a Python ETL script that is run as part of the job. The script defines how the data is transformed. """ try: self.glue_client.create_job( Name=name, Description=description, Role=role_arn, Command={ "Name": "glueetl", "ScriptLocation": script_location, "PythonVersion": "3", }, GlueVersion="3.0", ) except ClientError as err: logger.error( "Couldn't create job %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
有关API详细信息,请参阅CreateJob中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 DeleteCrawler
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def delete_crawler(self, name): """ Deletes a crawler. :param name: The name of the crawler to delete. """ try: self.glue_client.delete_crawler(Name=name) except ClientError as err: logger.error( "Couldn't delete crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
有关API详细信息,请参阅DeleteCrawler中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 DeleteDatabase
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def delete_database(self, name): """ Deletes a metadata database from your Data Catalog. :param name: The name of the database to delete. """ try: self.glue_client.delete_database(Name=name) except ClientError as err: logger.error( "Couldn't delete database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
有关API详细信息,请参阅DeleteDatabase中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 DeleteJob
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def delete_job(self, job_name): """ Deletes a job definition. This also deletes data about all runs that are associated with this job definition. :param job_name: The name of the job definition to delete. """ try: self.glue_client.delete_job(JobName=job_name) except ClientError as err: logger.error( "Couldn't delete job %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
有关API详细信息,请参阅DeleteJob中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 DeleteTable
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def delete_table(self, db_name, table_name): """ Deletes a table from a metadata database. :param db_name: The name of the database that contains the table. :param table_name: The name of the table to delete. """ try: self.glue_client.delete_table(DatabaseName=db_name, Name=table_name) except ClientError as err: logger.error( "Couldn't delete table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
有关API详细信息,请参阅DeleteTable中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 GetCrawler
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_crawler(self, name): """ Gets information about a crawler. :param name: The name of the crawler to look up. :return: Data about the crawler. """ crawler = None try: response = self.glue_client.get_crawler(Name=name) crawler = response["Crawler"] except ClientError as err: if err.response["Error"]["Code"] == "EntityNotFoundException": logger.info("Crawler %s doesn't exist.", name) else: logger.error( "Couldn't get crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return crawler
-
有关API详细信息,请参阅GetCrawler中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 GetDatabase
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_database(self, name): """ Gets information about a database in your Data Catalog. :param name: The name of the database to look up. :return: Information about the database. """ try: response = self.glue_client.get_database(Name=name) except ClientError as err: logger.error( "Couldn't get database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Database"]
-
有关API详细信息,请参阅GetDatabase中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 GetJobRun
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_job_run(self, name, run_id): """ Gets information about a single job run. :param name: The name of the job definition for the run. :param run_id: The ID of the run. :return: Information about the run. """ try: response = self.glue_client.get_job_run(JobName=name, RunId=run_id) except ClientError as err: logger.error( "Couldn't get job run %s/%s. Here's why: %s: %s", name, run_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRun"]
-
有关API详细信息,请参阅GetJobRun中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 GetJobRuns
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_job_runs(self, job_name): """ Gets information about runs that have been performed for a specific job definition. :param job_name: The name of the job definition to look up. :return: The list of job runs. """ try: response = self.glue_client.get_job_runs(JobName=job_name) except ClientError as err: logger.error( "Couldn't get job runs for %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRuns"]
-
有关API详细信息,请参阅GetJobRuns中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 GetTables
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_tables(self, db_name): """ Gets a list of tables in a Data Catalog database. :param db_name: The name of the database to query. :return: The list of tables in the database. """ try: response = self.glue_client.get_tables(DatabaseName=db_name) except ClientError as err: logger.error( "Couldn't get tables %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["TableList"]
-
有关API详细信息,请参阅GetTables中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 ListJobs
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def list_jobs(self): """ Lists the names of job definitions in your account. :return: The list of job definition names. """ try: response = self.glue_client.list_jobs() except ClientError as err: logger.error( "Couldn't list jobs. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobNames"]
-
有关API详细信息,请参阅ListJobs中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 StartCrawler
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def start_crawler(self, name): """ Starts a crawler. The crawler crawls its configured target and creates metadata that describes the data it finds in the target data source. :param name: The name of the crawler to start. """ try: self.glue_client.start_crawler(Name=name) except ClientError as err: logger.error( "Couldn't start crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
有关API详细信息,请参阅StartCrawler中的 AWS SDKPython (Boto3) API 参考。
-
以下代码示例演示如何使用 StartJobRun
。
- SDK适用于 Python (Boto3)
-
注意
还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def start_job_run(self, name, input_database, input_table, output_bucket_name): """ Starts a job run. A job run extracts data from the source, transforms it, and loads it to the output bucket. :param name: The name of the job definition. :param input_database: The name of the metadata database that contains tables that describe the source data. This is typically created by a crawler. :param input_table: The name of the table in the metadata database that describes the source data. :param output_bucket_name: The S3 bucket where the output is written. :return: The ID of the job run. """ try: # The custom Arguments that are passed to this function are used by the # Python ETL script to determine the location of input and output data. response = self.glue_client.start_job_run( JobName=name, Arguments={ "--input_database": input_database, "--input_table": input_table, "--output_bucket_url": f"s3://{output_bucket_name}/", }, ) except ClientError as err: logger.error( "Couldn't start job run %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRunId"]
-
有关API详细信息,请参阅StartJobRun中的 AWS SDKPython (Boto3) API 参考。
-