JDBC 連線 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

JDBC 連線

某些資料庫類型通常具有關聯性,支援透過 JDBC 標準進行連線。如需 的詳細資訊JDBC,請參閱 Java JDBC API 文件。 AWS Glue 原生支援透過其JDBC連接器連線至特定資料庫 - 程式JDBC庫在 Glue Spark AWS 任務中提供。使用 AWS Glue 程式庫連線到這些資料庫類型時,您可以存取一組標準選項。

這些JDBC connectionType 值包括下列項目:

  • "connectionType": "sqlserver":指定與 Microsoft SQL Server 資料庫的連線。

  • "connectionType": "mysql":指定與 MySQL 資料庫的連線。

  • "connectionType": "oracle":指定 Oracle 資料庫的連線。

  • "connectionType": "postgresql":指定 PostgreSQL 資料庫的連線。

  • "connectionType": "redshift":指定 Amazon Redshift 資料庫的連線。如需詳細資訊,請參閱Redshift 連線

下表列出 Glue AWS 支援的JDBC驅動程式版本。

產品 JDBC Glue 5.0 的驅動程式版本 JDBC Glue 4.0 的驅動程式版本 JDBC Glue 3.0 的驅動程式版本 JDBC Glue 0.9、1.0、2.0 的驅動程式版本
Microsoft SQL 伺服器 10.2.0 9.4.0 7.x 6.x
我的SQL 8.0.33 8.0.23 8.0.23 5.1
Oracle Database 23.3.0.23.09 21.7 21.1 11.2
PostgreSQL 42.7.3 42.3.6 42.2.18 42.1.x
Amazon Redshift * redshift-jdbc42-2.1.0.29 redshift-jdbc42-2.1.0.16 redshift-jdbc41-1.2.12.1017 redshift-jdbc41-1.2.12.1017

* 對於 Amazon Redshift 連線類型,包含在JDBC連線連線選項中的所有其他選項名稱/值對,包括格式化選項,都會直接傳遞至基礎 SparkSQL DataSource。在 AWS Glue 4.0 及更新版本的 Glue 搭配 Spark AWS 任務中,Amazon Redshift 的 AWS Glue 原生連接器會使用 Apache Spark 的 Amazon Redshift 整合。如需詳細資訊,請參閱 Apache Spark 的 Amazon Redshift 整合。在舊版中,請參閱 Spark 的 Amazon Redshift 資料來源

若要設定 Amazon VPC以使用 連線至 Amazon RDS資料存放區JDBC,請參閱 為 Amazon RDS 資料存放區的 JDBC 連線設定 Amazon VPC AWS Glue

注意

AWS 在執行期間,Glue 任務只會與一個子網路相關聯。這可能會影響您透過相同的任務連線至多個資料來源的能力。此行為不限於JDBC來源。

JDBC 連線選項參考

如果您已經定義 JDBC AWS Glue 連線,則可以重複使用其中定義的組態屬性,例如:url、使用者和密碼;因此您不需要在程式碼中將其指定為連線選項。此功能適用於 AWS Glue 3.0 及更新版本。若要這樣做,請使用下列連線屬性:

  • "useConnectionProperties":將其設定為 "true",以指示您想要使用連線中的組態。

  • "connectionName":輸入要從中擷取組態的連線名稱,連線必須在與任務相同的區域中定義。

將下列連線選項與JDBC連線搭配使用:

  • "url":(必要) 資料庫JDBCURL的 。

  • "dbtable":(必要) 要讀取的資料庫資料表。對於在資料庫中支援結構描述JDBC的資料存放區,請指定 schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。

  • "user":(必要) 連線時所要使用的使用者名稱。

  • "password":(必要) 連線時所要使用的密碼。

  • (選用) 下列選項可讓您提供自訂JDBC驅動程式。如果您必須使用 AWS Glue 原生不支援的驅動程式,請使用這些選項。

    ETL 任務可以針對資料來源和目標使用不同的JDBC驅動程式版本,即使來源和目標是相同的資料庫產品。這可讓您在不同版本的來源和目標資料庫之間移轉資料。若要使用這些選項,您必須先將JDBC驅動程式JAR的檔案上傳至 Amazon S3。

    • "customJdbcDriverS3Path":自訂JDBC驅動程式的 Amazon S3 路徑。

    • "customJdbcDriverClassName":JDBC驅動程式的類別名稱。

  • "bulkSize":(選用) 用於設定平行插入,以加速大量載入JDBC目標。指定寫入或插入資料時要使用之平行處理程度的整數值。此選項有助於改善寫入資料庫的效能,例如 Arch User Repository (AUR)。

  • "hashfield" (選用) 字串,用於指定JDBC資料表中資料欄的名稱,用於在平行讀取JDBC資料表時將資料分割為分割區。提供 "hashfield" 或 "hashexpression"。如需詳細資訊,請參閱從 JDBC 資料表中平行讀取

  • "hashexpression" (選用) 傳回整數的SQL選取子句。用於在平行讀取JDBC資料表時,將JDBC資料表中的資料分割為分割區。提供 "hashfield" 或 "hashexpression"。如需詳細資訊,請參閱從 JDBC 資料表中平行讀取

  • "hashpartitions" (選用) 正整數。用於指定從JDBC資料表平行讀取時JDBC,資料表的平行讀取次數。預設:7。如需詳細資訊,請參閱從 JDBC 資料表中平行讀取

  • "sampleQuery":(選用) 自訂SQL查詢陳述式。用於指定資料表中的資訊子集,以擷取資料表內容範例。設定時,無論資料為何,其效率可能低於 DynamicFrame 方法,導致逾時或記憶體不足錯誤。如需詳細資訊,請參閱使用 sampleQuery

  • "enablePartitioningForSampleQuery":(選用) 布林值。預設:false。用於在指定 時啟用平行讀取JDBC資料表sampleQuery如果設定為 true, sampleQueryGlue AWS 必須以「where」或「and」結尾,才能附加分割條件。如需詳細資訊,請參閱使用 sampleQuery

  • "sampleSize":(選用) 正整數。限制範例查詢傳回的資料列數。只有當 enablePartitioningForSampleQuery 為 true 時才適用。如果未啟用分割,則應直接在 sampleQuery 中新增 "limit x" 以限制大小。如需詳細資訊,請參閱使用 sampleQuery

使用 sampleQuery

本節說明如何使用 sampleQuerysampleSizeenablePartitioningForSampleQuery

可以使用 sampleQuery 高效地對資料集的幾個資料列進行抽樣。依預設,由單一執行器執行查詢。設定時,無論資料為何,其效率可能低於 DynamicFrame 方法,導致逾時或記憶體不足錯誤。在基礎資料庫SQL上執行 通常ETL只需要用於效能用途。如果您嘗試預覽資料集的幾個資料列,請考慮使用 顯示。如果您嘗試使用 轉換資料集SQL,請考慮使用 toDF SQL來定義 Spark 轉換,以 DataFrame 形式比對您的資料。

雖然您的查詢可能會操作各種資料表,但 dbtable 仍然是必需的。

使用 sampleQuery 擷取資料表的範例

使用預設 sampleQuery 行為擷取資料範例時, AWS Glue 不會預期大量輸送量,因此會在單一執行器上執行您的查詢。為了限制您提供的資料,而不會造成效能問題,我們建議您SQL提供 LIMIT子句。

範例 使用 sampleQuery 而不分割

以下程式碼範例顯示如何使用 sampleQuery 而不進行分割。

//A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

針對 sampleQuery 較大的資料集使用

如果您正在讀取大型資料集,您可能需要啟用JDBC分割以平行查詢資料表。如需詳細資訊,請參閱從 JDBC 資料表中平行讀取。若要sampleQuery搭配JDBC分割使用 ,請將 enablePartitioningForSampleQuery 設為 true。啟用此功能需要您對 sampleQuery 進行一些更改。

搭配 使用JDBC分割時sampleQuery,您的查詢必須以「where」或「and」結尾,Glue AWS 才能附加分割條件。

如果您想要在平行讀取JDBC資料表 sampleQuery 時限制 的結果,請設定 "sampleSize" 參數,而不是指定LIMIT子句。

範例 sampleQuery 搭配JDBC分割使用

下列程式碼範例示範如何使用 sampleQuery與JDBC分割。

//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

備註和限制:

範例查詢不可與任務書籤一起使用。在提供兩者的組態時,系統將忽略書籤狀態。

使用自訂JDBC驅動程式

下列程式碼範例示範如何使用自訂JDBC驅動程式從JDBC資料庫讀取和寫入資料庫。這些範例示範讀取一個版本的資料庫產品並寫入相同產品的更高版本。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(url: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(url: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }