本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
ETL 中 的連線類型和選項 AWS Glue 適用於 Spark
In (入) AWS Glue 對於 Spark,各種 PySpark 和 Scala 方法和轉換會使用 connectionType
參數指定連線類型。這些使用 connectionOptions
或 options
參數指定連線選項。
connectionType
參數可以接受如下表所示的值。以下各節將說明每種類型的相關 connectionOptions
(或 options
) 參數值。除非另有說明,否則當連線做為來源或接收器使用時,參數即會套用。
如需有關示範設定和使用連線選項的範例程式碼,請參閱每個連線類型的首頁。
connectionType |
連線到 |
---|---|
dynamodb | Amazon DynamoDB 資料庫 |
kinesis | Amazon Kinesis Data Streams |
s3 | Amazon Simple Storage Service (Amazon S3) |
documentdb | Amazon DocumentDB (with MongoDB compatibility) 資料庫 |
opensearch | Amazon OpenSearch Service。 |
redshift | Amazon Redshift |
kafka | Kafka |
azurecosmos | Azure Cosmos for NoSQL. |
azuresql | Azure SQL。 |
bigquery | Google BigQuery。 |
mongodb | MongoDB |
sqlserver | Microsoft SQL Server 資料庫 (請參閱 JDBC 連線) |
mysql | 我的SQL |
oracle | Oracle |
postgresql | PostgreSQL |
saphana | SAP HANA. |
Snowflake | Snowflake |
teradata | Teradata Vantage。 |
vertica | Vertica。 |
custom.* | Spark、Athena 或JDBC資料存放區 (請參閱 自訂 和 AWS Marketplace connectionType 值 |
marketplace.* | Spark、Athena 或JDBC資料存放區 (請參閱 自訂 和 AWS Marketplace connectionType 值) |
DataFrame Spark 的 AWS Glue 5.0 ETL中的 選項
DataFrame 是資料集,組織成類似於資料表的具名資料欄,並支援功能樣式 (map/reduce/filter/etc.) 操作和SQL操作 (選取、專案、彙總)。
若要 DataFrame 為 Glue 支援的資料來源建立 ,需要下列項目:
資料來源連接器
ClassName
資料來源連線
Options
同樣地,若要將 寫入 DataFrame Glue 支援的資料接收器,也需要相同的 :
資料來源連接器
ClassName
資料來源連線
Options
請注意,connectionName
不支援 Glue AWS 功能,例如任務書籤和 DynamicFrame 選項,例如 DataFrame。如需 DataFrame 和支援操作的詳細資訊,請參閱 的 Spark 文件DataFrame
指定連接器 ClassName
若要指定資料來源/接收器ClassName
的 ,請使用 .format
選項提供ClassName
定義資料來源/接收器的對應連接器。
JDBC 連接器
對於JDBC連接器,指定 jdbc
選項的值,.format
並在 driver
選項ClassName
中提供JDBC驅動程式。
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
下表列出 Glue AWS for 中支援資料來源ClassName
的JDBC驅動程式 DataFrames。
資料來源 | 驅動程式 ClassName |
---|---|
PostgreSQL | org.postgresql.Driver |
Oracle | oracle.jdbc.driver。OracleDriver |
SQLServer | com.microsoft.sqlserver.jdbc。SQLServerDriver |
我的SQL | com.mysql.jdbc.Driver |
SAPHana | com.sap.db.jdbc.Driver |
Teradata | com.teradata.jdbc。TeraDriver |
Spark 連接器
對於火花連接器,將連接器ClassName
的 指定為 .format
選項的值。
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
下表列出 Glue for AWS 中支援資料來源ClassName
的 Spark 連接器 DataFrames。
資料來源 | ClassName |
---|---|
MongoDB/DocumentDB | glue.spark.mongodb |
Redshift | io.github.spark_redshift_community.spark.redshift |
AzureCosmos | cosmos.oltp |
AzureSQL | com.microsoft.sqlserver.jdbc.spark |
BigQuery | com.google.cloud.spark.bigquery |
OpenSearch | org.opensearch.spark.sql |
Snowflake | net.snowflake.spark.snowflake |
Vertica | com.vertica.spark.datasource。VerticaSource |
指定連線選項
若要指定與資料來源/接收器連線Options
的 ,請使用 .option(<KEY>, <VALUE>)
提供個別選項.options(<MAP>)
,或提供多個選項做為索引鍵/值映射。
每個資料來源/接收器都支援自己的一組連線 Options
。如需可用 的詳細資訊Options
,請參閱下表所列特定資料來源/接收器 Spark 連接器的公有文件。
範例
下列範例是從 PostgreSQL 讀取並寫入 SnowFlake:
Python
範例:
from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala
範例:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
自訂 和 AWS Marketplace connectionType 值
這些索引標籤包括以下項目:
-
"connectionType": "marketplace.athena"
:指定連至 Amazon Athena 資料存放區的連線。連線使用來自 的連接器 AWS Marketplace。 -
"connectionType": "marketplace.spark"
:指定連至 Apache Spark 資料存放區的連線。連線使用來自 的連接器 AWS Marketplace。 -
"connectionType": "marketplace.jdbc"
:指定資料JDBC存放區的連線。連線使用來自 的連接器 AWS Marketplace。 -
"connectionType": "custom.athena"
:指定連至 Amazon Athena 資料存放區的連線。連線使用您上傳到的自訂連接器 AWS Glue Studio. -
"connectionType": "custom.spark"
:指定連至 Apache Spark 資料存放區的連線。連線使用您上傳到的自訂連接器 AWS Glue Studio. -
"connectionType": "custom.jdbc"
:指定與JDBC資料存放區的連線。連線使用您上傳到的自訂連接器 AWS Glue Studio.
類型 custom.jdbc 或 marketplace.jdbc 的連線選項
-
className
– 字串、必要,驅動程式類別名稱。 -
connectionName
– 字串、必要,與連接器相關聯之連線的名稱。 -
url
– 字串為必要,JDBCURL具有預留位置 (${}
),用於建置與資料來源的連線。預留位置${secretKey}
會替換為 AWS Secrets Manager中相同名稱的秘密。如需建構 的詳細資訊,請參閱資料存放區文件URL。 -
secretId
或user/password
– 用於擷取 登入資料的字串,必要URL。 -
dbTable
或query
– 要從中取得資料的字串、必要、資料表或SQL查詢。您可以指定dbTable
或query
,但不能同時指定兩者。 -
partitionColumn
- 字串、選用,用於分割的整數欄名稱。此選項僅適用於包含在lowerBound
、upperBound
以及numPartitions
中。此選項的運作方式與 Spark SQLJDBC讀取器相同。如需詳細資訊,請參閱《Apache Spark DataFrames SQL和資料集指南》中的JDBC前往其他資料庫。 lowerBound
和upperBound
值用於決定分割區步幅,而不是用於篩選資料表中的列。資料表中的所有列都會進行分割並傳回。注意
使用查詢而不是資料表名稱時,您應該驗證查詢是否適用於指定的分割條件。例如:
-
如果您的查詢格式為
"SELECT col1 FROM table1"
,則透過在使用分割區欄的查詢結尾附加WHERE
子句來測試查詢。 -
如果您的查詢格式為 "
SELECT col1 FROM table1 WHERE col2=val"
,則透過使用AND
和使用分割區欄的表達式擴展WHERE
子句來測試查詢。
-
-
lowerBound
- 整數、選用,用來決定分割區步幅的partitionColumn
最小值。 -
upperBound
- 整數、選用,用來決定分割區步幅的partitionColumn
最大值。 -
numPartitions
- 整數、選用,分割區數目。這個值,搭配lowerBound
(包含) 及upperBound
(不含),形成用於分割partitionColumn
而產生之WHERE
子句表達式的分割區步幅。重要
請小心分割區的數目,因為太多的分割區可能會造成外部資料庫系統的問題。
-
filterPredicate
- 字串、選用,額外條件子句,用於篩選來源的資料。例如:BillingCity='Mountain View'
當您使用查詢,而不是資料表名稱,您應該驗證查詢是否適用於指定的
filterPredicate
。例如:-
如果您的查詢格式為
"SELECT col1 FROM table1"
,則透過在使用篩選述詞的查詢結尾附加WHERE
子句來測試查詢。 -
如果您的查詢格式為
"SELECT col1 FROM table1 WHERE col2=val"
,則透過使用AND
和使用篩選述詞的表達式擴展WHERE
子句來測試查詢。
-
-
dataTypeMapping
– 字典、選用、自訂的資料類型映射,可建置從JDBC資料類型到 Glue 資料類型的映射。例如, 選項會透過呼叫驅動程式的ResultSet.getString()
方法,將 JDBC 類型的資料欄位"dataTypeMapping":{"FLOAT":"STRING"}
映射FLOAT
至 JavaString
類型,並使用它來建置 AWS Glue 記錄。ResultSet
物件是由每個驅動程式實作,因此行為是特定於您使用的驅動程式。請參閱JDBC驅動程式的文件,以了解驅動程式如何執行轉換。 -
所以此 AWS Glue 目前支援的資料類型為:
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
支援的JDBC資料類型為 Java8 java.sql.types
。 預設資料類型映射 (從 JDBC到 AWS Glue) 為:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
如果您使用自訂資料類型映射搭配選項
dataTypeMapping
,那麼您可以覆寫預設的資料類型映射。只有dataTypeMapping
選項中列出的JDBC資料類型受到影響;預設映射用於所有其他JDBC資料類型。如有需要,您可以新增其他JDBC資料類型的映射。如果JDBC預設映射或自訂映射中未包含資料類型,則資料類型會轉換為 AWS GlueSTRING
根據預設的資料類型。 -
下列 Python 程式碼範例示範如何使用驅動程式從JDBC資料庫 AWS Marketplace JDBC讀取 。它演示了從資料庫讀取和寫入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
類型 custom.athena 或 marketplace.athena 的連線選項
-
className
– 字串、必要,驅動程式類別名稱。當您使用 Athena-CloudWatch connector 時,此參數值是類別名稱的字首 (例如"com.amazonaws.athena.connectors"
)。Athena-CloudWatch connector 由兩個類別組成:中繼資料處理常式和記錄處理常式。如果您在此處提供通用字首,則 會根據該字首API載入正確的類別。 -
tableName
– 要讀取的 CloudWatch 日誌串流名稱為必要字串。此程式碼片段使用特殊檢視名稱all_log_streams
,這表示傳回的動態資料框架將包含來自日誌群組中所有日誌資料串流的資料。 -
schemaName
– 字串,需要用來讀取的 CloudWatch 日誌群組名稱。例如:/aws-glue/jobs/output
。 -
connectionName
– 字串、必要,與連接器相關聯之連線的名稱。
如需此連接器的其他選項,請參閱 上的 Amazon Athena CloudWatch Connector README
下列 Python 程式碼範例示範如何使用 AWS Marketplace 連接器從 Athena 資料存放區讀取。它展示了從 Athena 讀取和寫入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
類型 custom.spark 或 marketplace.spark 的連接選項
-
className
– 字串、必要,連接器類別名稱。 -
secretId
– 字串、選用,用來擷取連接器連線的憑證。 -
connectionName
– 字串、必要,與連接器相關聯之連線的名稱。 -
其他選項視資料存放區而定。例如, OpenSearch 組態選項以字首 開頭
es
,如 Elasticsearch for Apache Hadoop文件中所述。Spark 連接到 Snowflake 使用選項,例如 sfUser
和sfPassword
,如連接到 Snowflake 指南中的使用 Spark Connector所述。
下列 Python 程式碼範例示範如何使用 marketplace.spark
連線從 OpenSearch 資料存放區讀取 。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
一般選項
本節中的選項作為 connection_options
提供,但並不特別套用於一個連接器。
設定書籤時,通常會使用下列參數。它們可能適用於 Amazon S3 或JDBC工作流程。如需詳細資訊,請參閱使用任務書籤。
jobBookmarkKeys
:資料欄名稱陣列。jobBookmarkKeysSortOrder
:定義如何根據排序順序比較值的字串。有效值:"asc"
、"desc"
。useS3ListImplementation
:用於在列出 Amazon S3 儲存貯體內容時管理記憶體效能。如需詳細資訊,請參閱最佳化 Glue AWS 中的記憶體管理。