JDBC 連線 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

JDBC 連線

某些 (通常是關聯式) 資料庫類型透過 JDBC 標準支援連線。如需有關 JDBC 的詳細資訊,請參閱 Java JDBC API 文件。AWSGlue 以原生方式支援透過其 JDBC 連接器連線到某些資料庫,而 JDBC 程式庫在 AWS Glue Spark 任務中提供。使用 AWS Glue 程式庫連線到這些資料庫類型時,您可以存取一組標準選項。

JDBC connectionType 的值包括以下項目:

  • "connectionType": "sqlserver":指定 Microsoft SQL Server 資料庫的連線。

  • "connectionType": "mysql":指定 MySQL 資料庫的連線。

  • "connectionType": "oracle":指定 Oracle 資料庫的連線。

  • "connectionType": "postgresql":指定 PostgreSQL 資料庫的連線。

  • "connectionType": "redshift":指定 Amazon Redshift 資料庫的連線。如需更多詳細資訊,請參閱 Redshift 連線

下表列出 AWS Glue 支援的 JDBC 驅動程式版本。

產品 適用於 Glue 4.0 的 JDBC 驅動程式版本 適用於 Glue 3.0 的 JDBC 驅動程式版本 適用於 Glue 0.9、1.0、2.0 的 JDBC 驅動程式版本
Microsoft SQL Server 9.4.0 7.x 6.x
MySQL 8.0.23 8.0.23 5.1
Oracle Database 21.7 21.1 11.2
PostgreSQL 42.3.6 42.2.18 42.1.x
MongoDB 4.7.2 4.0.0 2.0.0
Amazon Redshift * redshift-jdbc42-2.1.0.16 redshift-jdbc41-1.2.12.1017 redshift-jdbc41-1.2.12.1017

* 對於 Amazon Redshift 連線類型,JDBC 連線的連線選項中包含的所有其他選項名稱/值對 (包含格式化選項),都會直接傳遞至基礎 SparkSQL DataSource。在 AWS Glue 4.0 和更新版本的 AWS Glue with Spark 任務中,Amazon Redshift 的 AWS Glue 原生連接器會使用 Apache Spark 的 Amazon Redshift 整合。如需詳細資訊,請參閱 Apache Spark 的 Amazon Redshift 整合。在舊版中,請參閱 Spark 的 Amazon Redshift 資料來源

若要將 Amazon VPC 設定為使用 JDBC 連線到 Amazon RDS 資料存放區,請參閱 為 Amazon RDS 資料存放區的 JDBC 連線設定 Amazon VPC AWS Glue

注意

AWS Glue 任務只會在執行期間與一個子網路關聯。這可能會影響您透過相同的任務連線至多個資料來源的能力。此行為不限於 JDBC 來源。

JDBC 連線選項參考

如果您已經定義 JDBC AWS Glue 連線,則可重複使用其中定義的組態屬性,例如:URL、使用者和密碼;因此您不必在程式碼中將其指定為連線選項。這項功能僅能在 AWS Glue 3.0 及更新版本中使用。若要這樣做,請使用下列連線屬性:

  • "useConnectionProperties":將其設定為 "true",以指示您想要使用連線中的組態。

  • "connectionName":輸入要從中擷取組態的連線名稱,連線必須在與任務相同的區域中定義。

將下列連線選項與 JDBC 連線搭配使用:

  • "url":(必要) 資料庫的 JDBC URL。

  • "dbtable":(必要) 要讀取的資料庫資料表。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定 schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。

  • "user":(必要) 連線時所要使用的使用者名稱。

  • "password":(必要) 連線時所要使用的密碼。

  • (選用) 下列選項可讓您提供自訂 JDBC 驅動程式。如果您必須使用 AWS Glue 未原生支援的驅動程式,請使用這些選項。

    ETL 任務可以對資料來源和目標使用不同的 JDBC 驅動程式版本,即使來源和目標是相同的資料庫產品。這可讓您在不同版本的來源和目標資料庫之間移轉資料。若要使用這些選項,您必須先將 JDBC 驅動程式的 JAR 檔案上傳到 Amazon S3。

    • "customJdbcDriverS3Path":自訂 JDBC 驅動程式的 Amazon S3 路徑。

    • "customJdbcDriverClassName":JDBC 驅動程式的類別名稱。

  • "bulkSize":(選用) 用於設定平行插入,以加速 JDBC 目標的大量載入。指定寫入或插入資料時要使用之平行處理程度的整數值。此選項有助於改善寫入資料庫 (例如 Arch 使用者儲存庫 (AUR)) 的效能。

  • "hashfield" (選用) 一個字串,用於指定 JDBC 資料表中的資料欄名稱,以便在從 JDBC 資料表平行讀取時將資料劃分為分割區。提供 "hashfield" 或 "hashexpression"。如需更多詳細資訊,請參閱 從 JDBC 資料表中平行讀取

  • "hashexpression" (選用) 傳回整數的 SQL 選取子句。用於在從 JDBC 資料表平行讀取時,將 JDBC 資料表中的資料劃分為分割區。提供 "hashfield" 或 "hashexpression"。如需更多詳細資訊,請參閱 從 JDBC 資料表中平行讀取

  • "hashpartitions" (選用) 正整數。在從 JDBC 資料表平行讀取時,用於指定 JDBC 資料表的平行讀取次數。預設:7。如需更多詳細資訊,請參閱 從 JDBC 資料表中平行讀取

  • "sampleQuery":(選用) 自訂 SQL 查詢陳述式。用於指定資料表中的資訊子集,以擷取資料表內容範例。在不考慮資料的情況下進行設定時,效率可能比 DynamicFrame 方法低,從而會導致逾時或記憶體不足的錯誤。如需更多詳細資訊,請參閱 使用 sampleQuery

  • "enablePartitioningForSampleQuery":(選用) 布林值。預設:false。用於指定 sampleQuery 時啟用從 JDBC 資料表平行讀取。如果設定為 true,則 AWS Glue 的 sampleQuery 必須以 "where" 或 "and" 結尾,以附加分割條件。如需更多詳細資訊,請參閱 使用 sampleQuery

  • "sampleSize":(選用) 正整數。限制範例查詢傳回的資料列數。只有當 enablePartitioningForSampleQuery 為 true 時才適用。如果未啟用分割,則應直接在 sampleQuery 中新增 "limit x" 以限制大小。如需更多詳細資訊,請參閱 使用 sampleQuery

使用 sampleQuery

本節說明如何使用 sampleQuerysampleSizeenablePartitioningForSampleQuery

可以使用 sampleQuery 高效地對資料集的幾個資料列進行抽樣。依預設,由單一執行器執行查詢。在不考慮資料的情況下進行設定時,效率可能比 DynamicFrame 方法低,從而會導致逾時或記憶體不足的錯誤。作為 ETL 管道的一部分,在基礎資料庫上執行 SQL 通常只需用於效能目的。如果您嘗試預覽資料集的幾個資料列,請考慮使用 顯示。如果您嘗試使用 SQL 轉換資料集,請考慮使用 toDF 針對 DataFrame 表單中的資料定義 SparkSQL 轉換。

雖然您的查詢可能會操作各種資料表,但 dbtable 仍然是必需的。

使用 sampleQuery 擷取資料表範例

使用預設 sampleQuery 行為擷取資料範例時,AWS Glue 預期不會有大量的輸送量,因此它會在單一執行器上執行查詢。為了限制您提供的資料而不會導致效能問題,我們建議您向 SQL 提供一個 LIMIT 子句。

範例 使用 sampleQuery 而不進行分割

以下程式碼範例顯示如何使用 sampleQuery 而不進行分割。

//A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

針對較大的資料集使用 sampleQuery

如果您要讀取大型資料集,則可能需要啟用 JDBC 分割以並行查詢資料表。如需更多詳細資訊,請參閱 從 JDBC 資料表中平行讀取。若要搭配使用 sampleQuery 與 JDBC 分割,請將 enablePartitioningForSampleQuery 設定為 true。啟用此功能需要您對 sampleQuery 進行一些更改。

搭配使用 JDBC 分割區與 sampleQuery 時,您的查詢必須以 "where" 或 "and" 結尾,AWS Glue 才能附加分割條件。

如果您想要在從 JDBC 資料表平行讀取時限制 SampleQuery 的結果數量,請設定 "sampleSize" 參數,而不是指定 LIMIT 子句。

範例 搭配使用 sampleQuery 與 JDBC 分區

以下程式碼範例顯示如何搭配使用 sampleQuery 與 JDBC 分區。

//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

備註和限制:

範例查詢不可與任務書籤一起使用。在提供兩者的組態時,系統將忽略書籤狀態。

使用自訂 JDBC 驅動程式

下面的程式碼範例示範如何讀取和寫入使用自訂 JDBC 驅動程式的 JDBC 資料庫。這些範例示範讀取一個版本的資料庫產品並寫入相同產品的更高版本。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(url: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(url: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }