DynamoDB 連線 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

DynamoDB 連線

可以使用 AWS Glue for Spark 在 AWS Glue 的 DynamoDB 中讀取和寫入資料表。可以使用連接到 AWS Glue 工作的 IAM 許可連線到 DynamoDB。AWS Glue 支援將資料寫入到另一個 AWS 帳戶的 DynamoDB 資料表中。如需更多詳細資訊,請參閱 跨帳戶跨區域存取 DynamoDB 資料表

除了 AWS Glue DynamoDB ETL 連接器之外,還可使用 DynamoDB 匯出連接器從 DynamoDB 中讀取,該連接器可調用 DynamoDB ExportTableToPointInTime 請求,並將其存放在您提供的 Amazon S3 位置中,格式為 DynamoDB JSON。AWS Glue 則會透過讀取 Amazon S3 匯出位置中的資料來建立 DynamicFrame 物件。

AWS Glue 1.0 或更新版本中會提供 DynamoDB 寫入器。AWS Glue 2.0 或更新版本中會提供 AWS Glue DynamoDB 匯出連接器。

如需 DynamoDB 的詳細資訊,請參閱 Amazon DynamoDB 文件。

注意

DynamoDB ETL 讀取器不支援篩選條件或下推述詞。

設定 DynamoDB 連線

若要從 AWS Glue 連線至 DynamoDB,請將許可授予給與 AWS Glue 工作相關聯的 IAM 角色,以便與 DynamoDB 互動。如需有關在 DynamoDB 中進行讀取或寫入所需許可的詳細資訊,請參閱 IAM 文件中的 DynamoDB 的動作、資源和條件金鑰

在下列情況中,可能需要其他組態:

  • 使用 DynamoDB 匯出連接器時,將需要設定 IAM,以便工作可以請求 DynamoDB 資料表匯出。此外,需要確定用於匯出的 Amazon S3 儲存貯體,並在 IAM 中提供適當的許可,以便 DynamoDB 可寫入到該儲存貯體,並且 AWS Glue 工作可從中讀取。如需詳細資訊,請參閱請求在 DynamoDB 中匯出資料表

  • 如果 AWS Glue 工作具有特定的 Amazon VPC 連線需求,請使用 NETWORK AWS Glue 連線類型來提供網路選項。由於 DynamoDB 的存取權由 IAM 授權,因此不需要使用 AWS Glue DynamoDB 連線類型。

對 DynamoDB 進行讀取和寫入

下列程式碼範例示範如何讀取(透過 ETL 連接器)及寫入 DynamoDB 資料表。其展示從一個資料表讀取以及寫入另一個資料表。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": test_source, "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": test_sink, "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> test_source, "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> test_sink, "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

使用 DynamoDB 匯出連接器

在 DynamoDB 資料表大小大於 80 GB 時,匯出連接器的效能會比 ETL 連接器更佳。此外,由於匯出請求是在 AWS Glue 任務中 (在 Spark 程序之外) 執行,您可以啟用 AWS Glue 任務的自動擴展,以在匯出請求期間儲存 DPU 使用量。若使用匯出連接器,您也無需為 Spark 執行器平行處理原則或 DynamoDB 輸送量讀取百分比設定分割數。

注意

DynamoDB 對呼叫 ExportTableToPointInTime 請求具有特定要求。如需詳細資訊,請參閱請求在 DynamoDB 中匯出資料表。例如,需要在資料表上啟用時間點還原 (PITR) 才能使用此連接器。DynamoDB 連接器也支援 AWS KMS 加密,以便將 DynamoDB 匯出至 Amazon S3。在 AWS Glue 任務設定中提供安全組態,可為 DynamoDB 的匯出進行 AWS KMS 加密。KMS 金鑰必須位於與 Amazon S3 儲存貯體相同的區域中。

請注意,DynamoDB 匯出和 Amazon S3 儲存皆會產生額外費用和成本。在任務運作完成後,Amazon S3 中匯出的資料仍會存在,因此您可以重新使用該資料,無需進行其他 DynamoDB 匯出。使用此連接器的要求為資料表啟用時間點復原 (PITR)。

DynamoDB ETL 連接器或匯出連接器皆不支援要在 DynamoDB 來源中套用的篩選條件或 pushdown 述詞。

以下程式碼範例演示如何讀取 (透過匯出連接器) 及列印分割區數。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": test_source, "dynamodb.s3.bucket": bucket_name, "dynamodb.s3.prefix": bucket_prefix, "dynamodb.s3.bucketOwner": account_id_of_bucket, } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> test_source, "dynamodb.s3.bucket" -> bucket_name, "dynamodb.s3.prefix" -> bucket_prefix, "dynamodb.s3.bucketOwner" -> account_id_of_bucket, )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

這些範例會演示如何讀取 (透過匯出連接器) 及列印具有 dynamodb 分類的 AWS Glue Data Catalog 資料表:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database=catalog_database, table_name=catalog_table_name, additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": s3_bucket, "dynamodb.s3.prefix": s3_bucket_prefix } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = catalog_database, tableName = catalog_table_name, additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> s3_bucket, "dynamodb.s3.prefix" -> s3_bucket_prefix )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

簡化 DynamoDB 匯出 JSON 的使用

DynamoDB 會使用 AWS Glue DynamoDB 匯出連接器進行匯出,這會產生具有特定巢套結構的 JSON 檔案。如需詳細資訊,請參閱資料物件。AWS Glue 會提供 DynamicFrame 轉換,其可將這些結構解除巢狀,使其成為下游應用程式更易於使用的形式。

您可以使用兩種方式中的一種來調用此轉換。呼叫要從 DynamoDB 進行讀取的方法時,可以使用值 "true" 來設定連線選項 "dynamodb.simplifyDDBJson"。也可以將轉換作為 AWS Glue 程式庫中獨立提供的方法進行呼叫。

請考慮由 DynamoDB 匯出產生的下列結構描述:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

simplifyDDBJson 轉換將此簡化為:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
注意

AWS Glue 3.0 及更新版本中會提供 simplifyDDBJsonunnestDDBJson 轉換也可用於簡化 DynamoDB 匯出 JSON。我們鼓勵使用者從 unnestDDBJson 轉換為 simplifyDDBJson

在 DynamoDB 操作中設定平行處理

為了提高效能,可以調整 DynamoDB 連接器的某些參數。調整平行處理參數時,您的目標是最大限度地利用佈建的 AWS Glue 工作者。然後,如果需要更高的效能,建議透過增加 DPU 數量來橫向擴展您的工作。

可以在使用 ETL 連接器時,使用 dynamodb.splits 參數來變更 DynamoDB 讀取操作中的平行處理。使用匯出連接器進行讀取時,無需為 Spark 執行器平行處理設定分割數。可以使用 dynamodb.output.numParallelTasks 來變更 DynamoDB 寫入操作中的平行處理。

使用 DynamoDB ETL 連接器進行讀取

建議您根據工作組態中設定的最大工作者數目和以下 numSlots 計算來計算 dynamodb.splits。如果進行自動擴展,則實際可用的工作者數量可能會在該上限下變更。如需有關設定工作者數目上限的詳細資訊,請參閱 設定 Spark 工作的工作屬性 AWS Glue 中的工作者數目 (NumberOfWorkers)。

  • numExecutors = NumberOfWorkers - 1

    針對內容,為 Spark 驅動程式保留一個執行程式;其他執行程式則用於處理資料。

  • numSlotsPerExecutor =

    AWS Glue 3.0 and later versions
    • 4,若 WorkerTypeG.1X

    • 8,若 WorkerTypeG.2X

    • 16,若 WorkerTypeG.4X

    • 32,若 WorkerTypeG.8X

    AWS Glue 2.0 and legacy versions
    • 8,若 WorkerTypeG.1X

    • 16,若 WorkerTypeG.2X

  • numSlots = numSlotsPerExecutor * numExecutors

建議您將 dynamodb.splits 設定為可用插槽的數量 numSlots

寫入到 DynamoDB

dynamodb.output.numParallelTasks 參數使用以下計算來確定每個 Spark 任務的 WCU:

permittedWcuPerTask = ( TableWCU * dynamodb.throughput.write.percent ) / dynamodb.output.numParallelTasks

如果組態準確地表示寫入到 DynamoDB 的 Spark 任務數目,則 DynamoDB 寫入器的運作效果最佳。在某些情況下,可能需要覆寫預設計算來提高寫入效能。如果不指定此參數,將透過下列公式自動計算每個 Spark 任務允許的 WCU:

    • numPartitions = dynamicframe.getNumPartitions()

    • numSlots (如本節先前所定義)

    • numParallelTasks = min(numPartitions, numSlots)

  • 範例 1。DPU=10, WorkerType=Standard. 輸入 DynamicFrame 有 100 個 RDD 分割區。

    • numPartitions = 100

    • numExecutors = (10 - 1) * 2 - 1 = 17

    • numSlots = 4 * 17 = 68

    • numParallelTasks = min(100, 68) = 68

  • 範例 2。DPU=10, WorkerType=Standard. 輸入 DynamicFrame 有 20 個 RDD 分割區。

    • numPartitions = 20

    • numExecutors = (10 - 1) * 2 - 1 = 17

    • numSlots = 4 * 17 = 68

    • numParallelTasks = min(20, 68) = 20

注意

舊版 AWS Glue 和使用標準工作者的工作需要不同的方法來計算插槽數量。如果需要調整這些工作的效能,建議轉換為支援的 AWS Glue 版本。

DynamoDB 連線選項參考

指定 Amazon DynamoDB 的連線。

來源連線和接收器連線的連線選項不同。

"connectionType": "dynamodb" 搭配 ETL 連接器作為來源

在使用 AWS Glue DynamoDB ETL 連接器時,使用以下連線選項搭配 "connectionType": "dynamodb" 作為來源:

  • "dynamodb.input.tableName":(必要) 要讀取的 DynamoDB。

  • "dynamodb.throughput.read.percent":(選用) 要使用的讀取容量單位 (RCU) 百分比。預設值設定為「0.5」。可接受的值從「0.1」(含) 到「1.5」(含)。

    • 0.5 代表預設讀取率,這表示 AWS Glue 會嘗試使用資料表一半的讀取容量。如果將值增加到 0.5 以上,AWS Glue 會增加請求率;如果將值減少到 0.5 以下,則會降低讀取請求率。(根據不同因素,例如在 DynamoDB 資料表中是否有統一金鑰分佈等,實際讀取率可能會有所不同。)

    • 當 DynamoDB 資料表處於隨需模式時,AWS Glue 處理資料表的讀取容量為 40000。若要匯出大型資料表,建議您將 DynamoDB 資料表切換為隨需模式。

  • "dynamodb.splits":(選用) 定義讀取此 DynamoDB 資料表的同時,要將資料表分割成多少個區塊。預設值已設為「1」。可接受的值從「1」(含) 到「1,000,000」(含)。

    1 表示無平行處理。我們強烈建議您使用以下公式,指定較大的值以獲得更好的效能。如需有關適當設定某個值的詳細資訊,請參閱 在 DynamoDB 操作中設定平行處理

  • "dynamodb.sts.roleArn":(選用) 跨帳戶存取所要擔任的 IAM 角色 ARN。此參數適用於 AWS Glue 1.0 或以上版本。

  • "dynamodb.sts.roleSessionName":(選用) STS 工作階段名稱。預設設定為 "glue-dynamodb-read-sts-session"。此參數適用於 AWS Glue 1.0 或以上版本。

"connectionType": "dynamodb" 搭配作為來源的 AWS Glue DynamoDB 匯出連接器

在使用 AWS Glue DynamoDB 匯出連接器時 (該連接器僅可用於 AWS Glue 版本 2.0 和更新版本),請搭配使用以下連接選項與 "connectionType": "dynamodb" 作為來源:

  • "dynamodb.export":(必要) 字串值:

    • 如果設為 ddb,則會啟用 AWS Glue DynamoDB 匯出連接器,並將會在 AWS Glue 任務期間呼叫新的 ExportTableToPointInTimeRequest。系統將使用從 dynamodb.s3.bucketdynamodb.s3.prefix 傳遞的位置產生新的匯出。

    • 如果設為 s3,則會啟用 AWS Glue DynamoDB 匯出連接器,但會略過建立新的 DynamoDB 匯出,而是使用 dynamodb.s3.bucketdynamodb.s3.prefix 作為該資料表之前匯出的 Amazon S3 位置。

  • "dynamodb.tableArn":(必要) 要讀取的 DynamoDB。

  • "dynamodb.unnestDDBJson":(選用) 預設值:false。有效值:布林值。如果設為 true,系統會對出現在匯出中的 DynamoDB JSON 結構執行解巢狀轉換。將 "dynamodb.unnestDDBJson""dynamodb.simplifyDDBJson" 同時設定為 true 是錯誤做法。在 AWS Glue 3.0 及更新版本中,建議您在簡化 DynamoDB Map 類型時使用 "dynamodb.simplifyDDBJson" 以獲得更好的行為。如需更多詳細資訊,請參閱 簡化 DynamoDB 匯出 JSON 的使用

  • "dynamodb.simplifyDDBJson":(選用) 預設值:false。有效值:布林值。如果設為 true,系統會執行轉換,以簡化出現在匯出中的 DynamoDB JSON 結構的結構描述。這與 "dynamodb.unnestDDBJson" 選項具有相同的用途,但是對 DynamoDB Map 類型或甚至是 DynamoDB 資料表中的巢狀 Map 類型提供了更好的支援。AWS Glue 3.0 及更新版本中具有此選項。將 "dynamodb.unnestDDBJson""dynamodb.simplifyDDBJson" 同時設定為 true 是錯誤做法。如需更多詳細資訊,請參閱 簡化 DynamoDB 匯出 JSON 的使用

  • "dynamodb.s3.bucket":(選用)指定要在其中執行 DynamoDB ExportTableToPointInTime 程序的 Amazon S3 儲存貯體位置。匯出的檔案格式為 DynamoDB JSON。

    • "dynamodb.s3.prefix":(選用)指定 Amazon S3 儲存貯體內的 Amazon S3 前綴位置,DynamoDB ExportTableToPointInTime 負載將儲存於其中。如果 dynamodb.s3.prefixdynamodb.s3.bucket 均未指定,則這些值將預設為 AWS Glue 任務組態中指定的暫時目錄位置。如需詳細資訊,請參閱 AWS Glue 使用的特殊參數

    • "dynamodb.s3.bucketOwner":指定跨帳戶存取 Amazon S3 所需的儲存貯體擁有者。

  • "dynamodb.sts.roleArn":(選用) DynamoDB 資料表的跨帳戶存取和/或跨區域存取所要擔任的 IAM 角色 ARN。注意:相同的 IAM 角色 ARN 將用於存取為 ExportTableToPointInTime 要求中指定的 Amazon S3 位置。

  • "dynamodb.sts.roleSessionName":(選用) STS 工作階段名稱。預設設定為 "glue-dynamodb-read-sts-session"。

  • "dynamodb.exportTime" (選用) 有效值:表示 ISO-8601 瞬間的字串。應進行匯出的時間點。

  • "dynamodb.sts.region":(如果使用區域端點進行跨區域呼叫,則為必填項) 託管您要讀取的 DynamoDB 資料表的區域。

"connectionType": "dynamodb" 搭配 ETL 連接器做為接收器

使用下列有 "connectionType": "dynamodb" 的連線選項作為接收器:

  • "dynamodb.output.tableName":(必要) 要寫入的 DynamoDB 資料表。

  • "dynamodb.throughput.write.percent":(選用) 要使用的寫入容量單位 (WCU) 百分比。預設值設定為「0.5」。可接受的值從「0.1」(含) 到「1.5」(含)。

    • 0.5 代表預設寫入率,這代表 AWS Glue 會嘗試使用資料表一半的寫入容量。如果將值增加到 0.5 以上,AWS Glue 會增加請求率;如果將值減少到 0.5 以下,則會降低寫入請求率。(根據不同因素,例如在 DynamoDB 資料表中是否有統一金鑰分佈等,實際寫入率可能會有所不同。)

    • 當 DynamoDB 資料表處於隨需模式時,AWS Glue 會以 40000 處理資料表的寫入容量。若要匯入大型資料表,建議您將 DynamoDB 資料表切換為隨需模式。

  • "dynamodb.output.numParallelTasks":(選用) 定義同時寫入 DynamoDB 的平行任務數量。用於計算每個 Spark 任務的寬鬆 WCU。在大多數情況下,AWS Glue 會計算此值的合理預設值。如需更多詳細資訊,請參閱 在 DynamoDB 操作中設定平行處理

  • "dynamodb.output.retry":(選用) 定義有來自 DynamoDB 的 ProvisionedThroughputExceededException 時,要執行多少次重試。預設設定為 "10"。

  • "dynamodb.sts.roleArn":(選用) 跨帳戶存取所要擔任的 IAM 角色 ARN。

  • "dynamodb.sts.roleSessionName":(選用) STS 工作階段名稱。預設設定為 "glue-dynamodb-write-sts-session"。