本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在 Python 中呼叫 AWS Glue API
請注意,AWS Glue 尚未提供 Boto 3 資源 API。目前,只能使用 Boto 3 用戶端 API。
Python 中的 AWS Glue API 名稱
在 Java 及其他程式設計語言中,AWS Glue API 的名稱通常是 CamelCased。但是,從 Python 呼叫時,這些一般名稱會變更為小寫字母,並以底線字元區隔名稱的部分,使其更為「Pythonic」。在 AWS Glue API 參考文件中,這些 Pythonic 名稱會列在一般 CamelCased 名稱後面的刮號中。
但是,雖然 AWS Glue API 名稱本身會轉換為小寫字母,但其參數名稱仍維持大寫字母。請務必記住這一點,因為如下所述,在呼叫 AWS Glue API 時,應以名稱傳遞參數。
在 AWS Glue 中傳遞和存取 Python 參數
在 Python 中呼叫 AWS Glue API,最好以名稱明確傳遞參數。例如:
job = glue.create_job(Name='sample', Role='Glue_DefaultRole', Command={'Name': 'glueetl', 'ScriptLocation': 's3://my_script_bucket/scripts/my_etl_script.py'})
這可協助您了解 Python 建立字典的名稱/值元組,讓您在 Job 結構 或 JobRun 結構 中將其指定為 ETL 指令碼的引數。Boto 3 會透過 REST API 呼叫,以 JSON 格式將它們傳送到 AWS Glue。這表示您在指令碼中存取這些引數時,無法倚賴引數的排序。
例如,假設您在 Python Lambda 處理常式函式中起始 JobRun
,而且要指定幾個參數。您的程式碼看起來類似如下:
from datetime import datetime, timedelta client = boto3.client('glue') def lambda_handler(event, context): last_hour_date_time = datetime.now() - timedelta(hours = 1) day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") hour_partition_value = last_hour_date_time.strftime("%-H") response = client.start_job_run( JobName = 'my_test_Job', Arguments = { '--day_partition_key': 'partition_0', '--hour_partition_key': 'partition_1', '--day_partition_value': day_partition_value, '--hour_partition_value': hour_partition_value } )
若要在您的 ETL 指令碼中可靠地存取這些參數,請使用 AWS Glue 的 getResolvedOptions
指定其名稱,然後從產生的字典存取這些參數:
import sys from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ['JOB_NAME', 'day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value']) print "The day partition key is: ", args['day_partition_key'] print "and the day partition value is: ", args['day_partition_value']
如果想傳遞一個巢狀 JSON 字串的引數,以在將參數值傳遞給 AWS Glue ETL 任務時保留參數值,則您必須在開始任務執行之前對參數字串進行編碼,然後在任務指令碼參考其之前對參數字串進行解碼。例如,請試想有下列引數字串:
glue_client.start_job_run(JobName = "gluejobname", Arguments={ "--my_curly_braces_string": '{"a": {"b": {"c": [{"d": {"e": 42}}]}}}' })
若要正確傳遞此參數,您應該將引數編碼為 Base64 編碼的字串。
import base64 ... sample_string='{"a": {"b": {"c": [{"d": {"e": 42}}]}}}' sample_string_bytes = sample_string.encode("ascii") base64_bytes = base64.b64encode(sample_string_bytes) base64_string = base64_bytes.decode("ascii") ... glue_client.start_job_run(JobName = "gluejobname", Arguments={ "--my_curly_braces_string": base64_bytes}) ... sample_string_bytes = base64.b64decode(base64_bytes) sample_string = sample_string_bytes.decode("ascii") print(f"Decoded string: {sample_string}") ...
範例:建立和執行任務
以下範例顯示如何使用 Python 呼叫 AWS Glue API,以建立和執行 ETL 任務。
建立和執行任務
-
建立 AWS Glue 用戶端執行個體:
import boto3 glue = boto3.client(service_name='glue', region_name='us-east-1', endpoint_url='https://glue.us-east-1.amazonaws.com')
-
建立任務。您必須使用
glueetl
做為 ETL 命令的名稱,如以下程式碼所示:myJob = glue.create_job(Name='sample', Role='Glue_DefaultRole', Command={'Name': 'glueetl', 'ScriptLocation': 's3://my_script_bucket/scripts/my_etl_script.py'})
-
為您在上個步驟建立的任務啟動新的執行:
myNewJobRun = glue.start_job_run(JobName=myJob['Name'])
-
取得任務狀態:
status = glue.get_job_run(JobName=myJob['Name'], RunId=myNewJobRun['JobRunId'])
-
列印任務執行的目前狀態:
print(status['JobRun']['JobRunState'])