AWS Glue for Spark 中 ETL 的連線類型和選項 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

AWS Glue for Spark 中 ETL 的連線類型和選項

在 AWS Glue for Spark 中,可採用多種 PySpark 和 Scala 方法及轉換使用 connectionType 參數指定連線類型。這些使用 connectionOptionsoptions 參數指定連線選項。

connectionType 參數可以接受如下表所示的值。以下各節將說明每種類型的相關 connectionOptions (或 options) 參數值。除非另有說明,否則當連線做為來源或接收器使用時,參數即會套用。

如需有關示範設定和使用連線選項的範例程式碼,請參閱每個連線類型的首頁。

connectionType 連線到
dynamodb Amazon DynamoDB 資料庫
kinesis Amazon Kinesis Data Streams
s3 Amazon Simple Storage Service (Amazon S3)
documentdb Amazon DocumentDB (with MongoDB compatibility) 資料庫
opensearch Amazon OpenSearch Service
redshift Amazon Redshift 資料庫
kafka KafkaAmazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos for NoSQL。
azuresql Azure SQL。
bigquery Google BigQuery。
mongodb MongoDB 資料庫 (包含 MongoDB Atlas)。
sqlserver Microsoft SQL 伺服器資料庫 (請參閱 JDBC 連線)
mysql MySQL 資料庫 (請參閱 JDBC 連線)
oracle Oracle 資料庫 (請參閱 JDBC 連線)
postgresql PostgreSQL 資料庫 (請參閱 JDBC 連線)
saphana SAP HANA。
Snowflake Snowflake 資料湖
teradata Teradata Vantage。
vertica Vertica。
custom.* Spark、Athena 或 JDBC 資料存放區 (請參閱 自訂和 AWS Marketplace connectionType 值
marketplace.* Spark、Athena 或 JDBC 資料存放區 (請參閱 自訂和 AWS Marketplace connectionType 值)

自訂和 AWS Marketplace connectionType 值

所需資訊包括下列項目:

  • "connectionType": "marketplace.athena":指定連至 Amazon Athena 資料存放區的連線。連線會使用來自 AWS Marketplace 的連接器。

  • "connectionType": "marketplace.spark":指定連至 Apache Spark 資料存放區的連線。連線會使用來自 AWS Marketplace 的連接器。

  • "connectionType": "marketplace.jdbc":指定連至 JDBC 資料存放區的連線。連線會使用來自 AWS Marketplace 的連接器。

  • "connectionType": "custom.athena":指定連至 Amazon Athena 資料存放區的連線。連線會使用您上傳至 AWS Glue Studio 的自訂連接器。

  • "connectionType": "custom.spark":指定連至 Apache Spark 資料存放區的連線。連線會使用您上傳至 AWS Glue Studio 的自訂連接器。

  • "connectionType": "custom.jdbc":指定連至 JDBC 資料存放區的連線。連線會使用您上傳至 AWS Glue Studio 的自訂連接器。

類型 custom.jdbc 或 marketplace.jdbc 的連線選項

  • className – 字串、必要,驅動程式類別名稱。

  • connectionName – 字串、必要,與連接器相關聯之連線的名稱。

  • url – 字串、必要,具有預留位置 (${}) 的 JDBC URL,用來建立與資料來源的連線。預留位置 ${secretKey} 會替換為 AWS Secrets Manager 中相同名稱的秘密。如需有關建構 URL 的更多資訊,請參閱資料存放區文件。

  • secretIduser/password – 字串、必要,用於擷取 URL 憑證。

  • dbTablequery – 字串、必要,要從中取得資料的資料表或 SQL 查詢。您可以指定 dbTablequery,但不能同時指定兩者。

  • partitionColumn - 字串、選用,用於分割的整數欄名稱。此選項僅適用於包含在 lowerBoundupperBound 以及 numPartitions 中。此選項的運作方式與 Spark SQL JDBC 讀取器相同。如需詳細資訊,請參閱 Apache Spark SQL、DataFrames 和資料集指南中的 JDBC 至其他資料庫

    lowerBoundupperBound 值用於決定分割區步幅,而不是用於篩選資料表中的列。資料表中的所有列都會進行分割並傳回。

    注意

    使用查詢而不是資料表名稱時,您應該驗證查詢是否適用於指定的分割條件。例如:

    • 如果您的查詢格式為 "SELECT col1 FROM table1",則透過在使用分割區欄的查詢結尾附加 WHERE 子句來測試查詢。

    • 如果您的查詢格式為 "SELECT col1 FROM table1 WHERE col2=val",則透過使用 AND 和使用分割區欄的表達式擴展 WHERE 子句來測試查詢。

  • lowerBound - 整數、選用,用來決定分割區步幅的 partitionColumn 最小值。

  • upperBound - 整數、選用,用來決定分割區步幅的 partitionColumn 最大值。

  • numPartitions - 整數、選用,分割區數目。這個值,搭配 lowerBound (包含) 及 upperBound (不含),形成用於分割 partitionColumn 而產生之 WHERE 子句表達式的分割區步幅。

    重要

    請小心分割區的數目,因為太多的分割區可能會造成外部資料庫系統的問題。

  • filterPredicate - 字串、選用,額外條件子句,用於篩選來源的資料。例如:

    BillingCity='Mountain View'

    當您使用查詢,而不是資料表名稱,您應該驗證查詢是否適用於指定的 filterPredicate。例如:

    • 如果您的查詢格式為 "SELECT col1 FROM table1",則透過在使用篩選述詞的查詢結尾附加 WHERE 子句來測試查詢。

    • 如果您的查詢格式為 "SELECT col1 FROM table1 WHERE col2=val",則透過使用 AND 和使用篩選述詞的表達式擴展 WHERE 子句來測試查詢。

  • dataTypeMapping - 字典、選用、自訂資料類型映射,用於建構從 JDBC 資料類型到 Glue 資料類型的映射。例如,選項 "dataTypeMapping":{"FLOAT":"STRING"} 會將 JDBC 類型 FLOAT 的資料欄位映射至 Java String 類型中,方法是呼叫驅動程式的 ResultSet.getString() 方法,並使用它來建構 AWS Glue 記錄。ResultSet 物件是由每個驅動程式實作,因此行為是特定於您使用的驅動程式。請參閱 JDBC 驅動程式的文件,瞭解驅動程式如何執行轉換。

  • 目前支援的 AWS Glue 資料類型為:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    支援的 JDBC 資料類型為 Java8 java.sql.types

    預設資料類型映射 (從 JDBC 到 AWS Glue) 是:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    如果您使用自訂資料類型映射搭配選項 dataTypeMapping,那麼您可以覆寫預設的資料類型映射。只有 dataTypeMapping 選項中的 JDBC 資料類型會受到影響;預設映射會用於所有其他 JDBC 資料類型。如果需要,您可以為其他 JDBC 資料類型新增映射。如果 JDBC 資料類型未包含在預設映射或自訂映射中,則資料類型預設會轉換為 AWS Glue STRING 資料類型。

下面的 Python 程式碼範例示範如何使用 AWS Marketplace JDBC 驅動程式從 JDBC 資料庫讀取。它演示了從資料庫讀取和寫入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

類型 custom.athena 或 marketplace.athena 的連線選項

  • className – 字串、必要,驅動程式類別名稱。當您使用 Athena-CloudWatch 連接器時,此參數值就是類別名稱的字首 (例如 "com.amazonaws.athena.connectors")。Athena-CloudWatch 連接器由兩個類別組成:中繼資料處理常式和記錄處理常式。如果您在此處提供通用字首,則 API 會根據該字首載入正確的類別。

  • tableName – 字串、必要,要讀取的 CloudWatch 日誌串流名稱。此程式碼片段使用特殊檢視名稱 all_log_streams,這表示傳回的動態資料框架將包含來自日誌群組中所有日誌資料串流的資料。

  • schemaName – 字串、必要,要讀取的 CloudWatch 日誌群組。例如:/aws-glue/jobs/output

  • connectionName – 字串、必要,與連接器相關聯之連線的名稱。

如需此連接器的其他選項,請參閱 Amazon Athena CloudWatch 連接器讀我檔案。

下列 Python 程式碼範例示範如何使用 AWS Marketplace 連接器從 Athena 資料存放區讀取。它展示了從 Athena 讀取和寫入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

類型 custom.spark 或 marketplace.spark 的連接選項

  • className – 字串、必要,連接器類別名稱。

  • secretId – 字串、選用,用來擷取連接器連線的憑證。

  • connectionName – 字串、必要,與連接器相關聯之連線的名稱。

  • 其他選項視資料存放區而定。例如,OpenSearch 組態選項以字首 es 開頭,如 Elasticsearch for Apache Hadoop 文件中所述。Spark 連接到 Snowflake 使用選項,例如 sfUsersfPassword,如連接到 Snowflake 指南中的使用 Spark Connector 所述。

下列 Python 程式碼範例示範如何使用 marketplace.spark 連線從 OpenSearch 資料存放區讀取。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

一般選項

本節中的選項作為 connection_options 提供,但並不特別套用於一個連接器。

設定書籤時,通常會使用下列參數。它們可能適用於 Amazon S3 或 JDBC 工作流程。如需更多詳細資訊,請參閱 使用任務書籤

  • jobBookmarkKeys:資料欄名稱陣列。

  • jobBookmarkKeysSortOrder:定義如何根據排序順序比較值的字串。有效值:"asc""desc"

  • useS3ListImplementation:用於在列出 Amazon S3 儲存貯體內容時管理記憶體效能。如需詳細資訊,請參閱 Optimize memory management in AWS Glue