在 Python 中调用 AWS Glue API
请注意,Boto 3 资源 API 尚不可用于 AWS Glue。目前,只有 Boto 3 客户端 API 可用。
Python 中的 AWS Glue API 名称
Java 和其他编程语言中的 AWS Glue API 名称通常是 CamelCased。但是,当从 Python 调用时,这些通用名称将更改为小写,部分的某些名称用下划线字符隔开,使它们更“Pythonic”。在 AWS Glue API 参考文档中,这些 Pythonic 名称在通用 CamelCased 名称之后列在括号中。
但是,尽管 AWS Glue API 名称自身转换为小写,其参数名称仍保持大写。请务必记住这一点,因为在调用 AWS Glue API 时会按名称传递参数,如下一节中所述。
在 AWS Glue 中传递和访问 Python 参数
在 Python 对 AWS Glue API 的调用中,最好按名称显式传递参数。例如:
job = glue.create_job(Name='sample', Role='Glue_DefaultRole', Command={'Name': 'glueetl', 'ScriptLocation': 's3://my_script_bucket/scripts/my_etl_script.py'})
了解 Python 创建您可以在 Job 结构 或 JobRun 结构 中指定为 ETL 脚本参数的名称/值元组的字典是很有帮助的。然后,Boto 3 通过 REST API 调用,以 JSON 格式将其传递给 AWS Glue。这意味着,当您在脚本中访问这些参数时,不能依赖它们的顺序。
例如,假设您在 Python Lambda 处理程序函数中启动 JobRun
,并且您希望指定多个参数。您的代码看起来可能类似于:
from datetime import datetime, timedelta client = boto3.client('glue') def lambda_handler(event, context): last_hour_date_time = datetime.now() - timedelta(hours = 1) day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") hour_partition_value = last_hour_date_time.strftime("%-H") response = client.start_job_run( JobName = 'my_test_Job', Arguments = { '--day_partition_key': 'partition_0', '--hour_partition_key': 'partition_1', '--day_partition_value': day_partition_value, '--hour_partition_value': hour_partition_value } )
要在 ETL 脚本中可靠地访问这些参数,请使用 AWS Glue 的 getResolvedOptions
函数,然后从生成的字典访问它们:
import sys from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ['JOB_NAME', 'day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value']) print "The day partition key is: ", args['day_partition_key'] print "and the day partition value is: ", args['day_partition_value']
如果您想传递属于嵌套 JSON 字符串的参数,以便在它传递给您的 AWS Glue ETL 任务时保存参数值,您必须在开始任务运行之前对参数字符串进行编码,然后在引用任务脚本之前对参数字符串进行解码。例如,考虑以下参数字符串:
glue_client.start_job_run(JobName = "gluejobname", Arguments={ "--my_curly_braces_string": '{"a": {"b": {"c": [{"d": {"e": 42}}]}}}' })
要正确传递此参数,您应将参数编码为 Base64 编码的字符串。
import base64 ... sample_string='{"a": {"b": {"c": [{"d": {"e": 42}}]}}}' sample_string_bytes = sample_string.encode("ascii") base64_bytes = base64.b64encode(sample_string_bytes) base64_string = base64_bytes.decode("ascii") ... glue_client.start_job_run(JobName = "gluejobname", Arguments={ "--my_curly_braces_string": base64_bytes}) ... sample_string_bytes = base64.b64decode(base64_bytes) sample_string = sample_string_bytes.decode("ascii") print(f"Decoded string: {sample_string}") ...
示例:创建并运行作业
下面的示例显示如何使用 Python 调用 AWS Glue API 来创建和运行 ETL 作业。
创建并运行作业
-
创建 AWS Glue 客户端的实例。
import boto3 glue = boto3.client(service_name='glue', region_name='us-east-1', endpoint_url='https://glue.us-east-1.amazonaws.com')
-
创建作业。您必须使用
glueetl
作为名称的 ETL 命令,如以下代码所示:myJob = glue.create_job(Name='sample', Role='Glue_DefaultRole', Command={'Name': 'glueetl', 'ScriptLocation': 's3://my_script_bucket/scripts/my_etl_script.py'})
-
启动您在上一步中创建的作业的新运行:
myNewJobRun = glue.start_job_run(JobName=myJob['Name'])
-
获取作业状态:
status = glue.get_job_run(JobName=myJob['Name'], RunId=myNewJobRun['JobRunId'])
-
打印作业运行的当前状态:
print(status['JobRun']['JobRunState'])