教學課程:DynamoDB 使用批次作業 AWS AppSync - AWS AppSync

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程:DynamoDB 使用批次作業 AWS AppSync

注意

我們現在主要支援 APPSYNC _JS 執行階段及其文件。請考慮在此處使用 APPSYNC _JS 運行時及其指南

AWS AppSync 支援在單一區域中的一個或多個表格中使用 Amazon DynamoDB 批次操作。支援的操作包括 BatchGetItemBatchPutItemBatchDeleteItem。透過在中使用這些功能 AWS AppSync,您可以執行下列工作:

  • 在單次查詢中傳遞索引鍵清單,並從資料表傳回結果

  • 在單次查詢中讀取一個或多個資料表的記錄

  • 將大量的記錄寫入一個或多個資料表

  • 在可能具有關聯的多個資料表中,有條件地寫入或刪除記錄

在 DynamoDB 中使用批次作業 AWS AppSync 是一項進階技術,需要對後端作業和資料表結構的額外思考和知識。此外,中的批次處理作業與非批次處理作業 AWS AppSync 有兩個主要差異:

  • 對於解析程式將會存取的所有資料表,資料來源角色都必須具有權限。

  • 解析程式資料表規格包含於映射範本中。

許可

與其他解析器一樣,您需要在中建立資料來源, AWS AppSync 然後建立角色或使用現有角色。由於批次操作需要 DynamoDB 表上的不同權限,因此您需要授與已設定的角色權限以進行讀取或寫入動作:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/TABLENAME", "arn:aws:dynamodb:region:account:table/TABLENAME/*" ] } ] }

附註:角色與中的資料來源相關聯 AWS AppSync,並且會針對資料來源叫用欄位的解析器。設定為針對 DynamoDB 擷取的資料來源只會指定一個表格,以保持組態簡單。因此,在單一解析程式中針對多個資料表執行批次作業時 (這是一項更為進階的工作),您必須授予權限給該資料來源上的角色,以存取解析程式將會與其互動的所有資料表。這將在上述IAM策略的「資源」字段中完成。針對批次操作所要呼叫的資料表進行設定,這項動作是在解析程式範本中完成,我們將會在下列內容中說明此範本。

資料來源

為簡單起見,我們將針對本教學課程中使用的所有解析程式,使用相同的資料來源。在資料來源索引標籤上,建立新 DynamoDB 資料來源並為其命名。BatchTutorial資料表可以使用任何名稱,因為資料表名稱在批次作業中是指定為請求映射範本的一部分。我們會將資料表命名為 empty

在本教學課程中,具備下列內嵌政策的任何角色皆可使用:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/Posts", "arn:aws:dynamodb:region:account:table/Posts/*", "arn:aws:dynamodb:region:account:table/locationReadings", "arn:aws:dynamodb:region:account:table/locationReadings/*", "arn:aws:dynamodb:region:account:table/temperatureReadings", "arn:aws:dynamodb:region:account:table/temperatureReadings/*" ] } ] }

單一資料表批次

警告

BatchPutItemBatchDeleteItem與衝突偵測和解決方案搭配使用時不受支援。必須停用這些設定,以避免可能發生的錯誤。

例如,假設有一個名為 Posts 的資料表,您想要利用批次操作來新增和移除此資料表的項目。使用下面的模式,注意到對於查詢,我們將傳遞一個列表IDs:

type Post { id: ID! title: String } input PostInput { id: ID! title: String } type Query { batchGet(ids: [ID]): [Post] } type Mutation { batchAdd(posts: [PostInput]): [Post] batchDelete(ids: [ID]): [Post] } schema { query: Query mutation: Mutation }

使用下列「請求對應範本」將解析器附加至batchAdd()欄位。這會自動擷取 GraphQL input PostInput 類型的每個項目,並建置 BatchPutItem 作業所需的對應圖:

#set($postsdata = []) #foreach($item in ${ctx.args.posts}) $util.qr($postsdata.add($util.dynamodb.toMapValues($item))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "Posts": $utils.toJson($postsdata) } }

在此情況中的回應映射範本只是簡單的直接傳遞,但是該資料表名稱會以 ..data.Posts 的形式附加至內容物件,如下所示:

$util.toJson($ctx.result.data.Posts)

現在導航到 AWS AppSync控制台的查詢頁面並運行以下batchAdd突變:

mutation add { batchAdd(posts:[{ id: 1 title: "Running in the Park"},{ id: 2 title: "Playing fetch" }]){ id title } }

您應該會看到列印在畫面上的結果,並且可以透過 DynamoDB 主控台獨立驗證這兩個值都寫入貼文表。

接下來,使用下列「請求對應範本」將解析器附加至batchGet()欄位。這會自動擷取 GraphQL ids:[] 類型的每個項目,並建置 BatchGetItem 作業所需的對應圖:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "Posts": { "keys": $util.toJson($ids), "consistentRead": true, "projection" : { "expression" : "#id, title", "expressionNames" : { "#id" : "id"} } } } }

回應映射範本仍然只是簡單的直接傳遞,該資料表名稱也會再次地以 ..data.Posts 的形式附加至內容物件:

$util.toJson($ctx.result.data.Posts)

現在返回 AWS AppSync主控台的 [查] 頁面,並執行下列batchGet 查詢

query get { batchGet(ids:[1,2,3]){ id title } }

這應該會針對您先前所新增的兩個 id 值,傳回其結果。請注意,值為 nullid 傳回了 3 值。這是因為在 Posts 資料表中,尚未有任何記錄具有該值。另請注意,傳 AWS AppSync回結果的順序與傳入查詢的金鑰相同,這是代表您 AWS AppSync 執行的額外功能。因此,如果切換到 batchGet(ids:[1,3,2),順序將會變更。您也會知道哪個 id 傳回 null 值。

最後,使用以下「請求映射模板」將解析器附加到batchDelete()字段中。這會自動擷取 GraphQL ids:[] 類型的每個項目,並建置 BatchGetItem 作業所需的對應圖:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "Posts": $util.toJson($ids) } }

回應映射範本仍然只是簡單的直接傳遞,該資料表名稱也會再次地以 ..data.Posts 的形式附加至內容物件:

$util.toJson($ctx.result.data.Posts)

現在返回 AWS AppSync控制台的「查詢」頁面,並運行以下batchDelete突變:

mutation delete { batchDelete(ids:[1,2]){ id } }

id 12 的記錄現在應已刪除。如果重新執行先前的 batchGet() 查詢,這些應該會傳回 null

多重資料表批次

警告

BatchPutItemBatchDeleteItem與衝突偵測和解決方案搭配使用時不受支援。必須停用這些設定,以避免可能發生的錯誤。

AWS AppSync 還可讓您跨表執行批次作業。來試試建置更複雜的應用程式。試想建置一個寵物健康狀態 (Pet Health) 應用程式,其中會有感應器呈報寵物的位置和體溫。感應器是由電池供電,而且每隔幾分鐘就會試著連線到網路。當傳感器建立連接時,它將其讀數發送給我們的 AWS AppSync API。觸發條件接著就會分析資料,然後將儀表板呈現給寵物的主人。讓我們著重介紹感應器與後端資料存放區之間的互動。

先決條件是,讓我們先建立兩個 DynamoDB 表;locationReadings將儲存感測器位置讀數,並temperatureReadings儲存感測器溫度讀數。這兩個資料表碰巧擁有相同的主索引鍵結構:sensorId (String) 為分割區索引鍵、timestamp (String) 為排序索引鍵。

讓我們使用下列 GraphQL 結構描述:

type Mutation { # Register a batch of readings recordReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult # Delete a batch of readings deleteReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult } type Query { # Retrieve all possible readings recorded by a sensor at a specific time getReadings(sensorId: ID!, timestamp: String!): [SensorReading] } type RecordResult { temperatureReadings: [TemperatureReading] locationReadings: [LocationReading] } interface SensorReading { sensorId: ID! timestamp: String! } # Sensor reading representing the sensor temperature (in Fahrenheit) type TemperatureReading implements SensorReading { sensorId: ID! timestamp: String! value: Float } # Sensor reading representing the sensor location (lat,long) type LocationReading implements SensorReading { sensorId: ID! timestamp: String! lat: Float long: Float } input TemperatureReadingInput { sensorId: ID! timestamp: String value: Float } input LocationReadingInput { sensorId: ID! timestamp: String lat: Float long: Float }

BatchPutItem -記錄傳感器讀數

感應器必須能夠在連線到網際網路之後傳送其讀數。GraphQL 欄位Mutation.recordReadings是API他們將用來執行此操作的欄位。讓我們附上一個解析器,使我們API的生活。

選取欄位旁邊的「附加Mutation.recordReadings」。在下一個畫面中,選取在教學課程開始時所建立的同一個 BatchTutorial 資料來源。

讓我們來新增下列請求映射範本:

請求映射範本

## Convert tempReadings arguments to DynamoDB objects #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) $util.qr($tempReadings.add($util.dynamodb.toMapValues($reading))) #end ## Convert locReadings arguments to DynamoDB objects #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) $util.qr($locReadings.add($util.dynamodb.toMapValues($reading))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

如您所見,BatchPutItem 操作可讓我們指定多個資料表。

讓我們使用下列回應映射範本。

回應映射範本

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also returns data for the field in the GraphQL response $utils.toJson($ctx.result.data)

進行批次操作時,呼叫可能會同時傳回錯誤和結果。在這種情況中,我們可以隨意進行一些額外的錯誤處理。

注意$utils.appendError() 的使用類似於 $util.error(),主要差別在於前者不會中斷映射範本的評估,而是在欄位出現錯誤時發出訊號,但允許範本的評估,進而將資料傳回給呼叫程式。如果應用程式需要傳回部分結果,建議您使用 $utils.appendError()

儲存解析程式並瀏覽至主控台的 [查詢] 頁面 AWS AppSync 。讓我們開始傳送一些感應器讀數吧!

執行下列的變動:

mutation sendReadings { recordReadings( tempReadings: [ {sensorId: 1, value: 85.5, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, value: 85.7, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, value: 85.8, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, value: 84.2, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, value: 81.5, timestamp: "2018-02-01T17:21:09.000+08:00"} ] locReadings: [ {sensorId: 1, lat: 47.615063, long: -122.333551, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, lat: 47.615163, long: -122.333552, timestamp: "2018-02-01T17:21:06.000+08:00"} {sensorId: 1, lat: 47.615263, long: -122.333553, timestamp: "2018-02-01T17:21:07.000+08:00"} {sensorId: 1, lat: 47.615363, long: -122.333554, timestamp: "2018-02-01T17:21:08.000+08:00"} {sensorId: 1, lat: 47.615463, long: -122.333555, timestamp: "2018-02-01T17:21:09.000+08:00"} ]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

我們會在一次的變動中傳送 10 個感應器讀數,讀數的資料會分置於兩個資料表。使用 DynamoDB 主控台來驗證資料是否同時顯示在locationReadingstemperatureReadings表中。

BatchDeleteItem -刪除傳感器讀數

同樣地,我們也需要刪除分批的感應器讀數。讓我們使用 Mutation.deleteReadings GraphQL 欄位來進行這項動作。選取欄位旁邊的「附加Mutation.recordReadings」。在下一個畫面中,選取在教學課程開始時所建立的同一個 BatchTutorial 資料來源。

讓我們使用下列請求映射範本。

請求映射範本

## Convert tempReadings arguments to DynamoDB primary keys #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($tempReadings.add($util.dynamodb.toMapValues($pkey))) #end ## Convert locReadings arguments to DynamoDB primary keys #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($locReadings.add($util.dynamodb.toMapValues($pkey))) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

這個回應映射範本和我們用於 Mutation.recordReadings 的相同。

回應映射範本

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also return data for the field in the GraphQL response $utils.toJson($ctx.result.data)

儲存解析程式並瀏覽至主控台的 [查詢] 頁面 AWS AppSync 。現在,讓我們刪除一些感應器讀數吧!

執行下列的變動:

mutation deleteReadings { # Let's delete the first two readings we recorded deleteReadings( tempReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}] locReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

透過 DynamoDB 主控台驗證這兩個讀數是否已從locationReadingstemperatureReadings表中刪除。

BatchGetItem -檢索讀數

我們寵物健康狀態 (Pet Health) 應用程式另一個常用的操作,就是在特定的時間點擷取感應器的讀數。試試將解析程式連接到結構描述中的 Query.getReadings GraphQL 欄位。選取 Attach (附加),然後在下一個畫面上,選取在教學課程開始時所建立的同一個 BatchTutorial 資料來源。

讓我們來新增下列請求映射範本。

請求映射範本

## Build a single DynamoDB primary key, ## as both locationReadings and tempReadings tables ## share the same primary key structure #set($pkey = {}) $util.qr($pkey.put("sensorId", $ctx.args.sensorId)) $util.qr($pkey.put("timestamp", $ctx.args.timestamp)) { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "locationReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true }, "temperatureReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true } } }

請注意,我們現在正在使用該BatchGetItem操作。

我們的回應映射範本將會有些微的不同,因為我們選擇傳回 SensorReading 清單。讓我們將叫用結果映射到所需的結構。

回應映射範本

## Merge locationReadings and temperatureReadings ## into a single list ## __typename needed as schema uses an interface #set($sensorReadings = []) #foreach($locReading in $ctx.result.data.locationReadings) $util.qr($locReading.put("__typename", "LocationReading")) $util.qr($sensorReadings.add($locReading)) #end #foreach($tempReading in $ctx.result.data.temperatureReadings) $util.qr($tempReading.put("__typename", "TemperatureReading")) $util.qr($sensorReadings.add($tempReading)) #end $util.toJson($sensorReadings)

儲存解析程式並瀏覽至主控台的 [查詢] 頁面 AWS AppSync 。現在,讓我們擷取感應器讀數吧!

執行下列的查詢:

query getReadingsForSensorAndTime { # Let's retrieve the very first two readings getReadings(sensorId: 1, timestamp: "2018-02-01T17:21:06.000+08:00") { sensorId timestamp ...on TemperatureReading { value } ...on LocationReading { lat long } } }

我們已經成功示範了使用. AWS AppSync

錯誤處理

在中 AWS AppSync,資料來源作業有時會傳回部分結果。我們將會使用部分結果這個詞彙,來表示操作的輸出包含一些資料和錯誤的情況。由於錯誤處理本質上是應用程式特定的,因此可 AWS AppSync 讓您有機會處理回應對應範本中的錯誤。如果發生解析程式呼叫錯誤,在文字內容中會出現 $ctx.error。呼叫錯誤一律包含訊息和類型,可做為 $ctx.error.message$ctx.error.type 屬性存取。在回應映射範本呼叫期間,您可以用三種方式來處理部分結果:

  1. 藉由只傳回資料來抑制呼叫錯誤

  2. 藉由停止進行回應映射範本的評估 (這不會傳回任何資料),來產生錯誤 (使用 $util.error(...))。

  3. 附加錯誤 (使用 $util.appendError(...)),而且也傳回資料

讓我們透過 DynamoDB 的批次操作來示範上述三點!

DynamoDB 批次操作

使用 DynamoDB 批次操作時,批次作業就有可能部分完成。也就是說,請求的項目或索引鍵可以有一些尚未處理完成。如果 AWS AppSync 無法完成批次,則會在前後關聯上設定未處理的項目和呼叫錯誤。

我們將會使用 Query.getReadings 欄位組態 (取自於本教學課程先前區段所說明的 BatchGetItem 操作) 來建置錯誤處理功能。這次,讓我們假設在執行 Query.getReadings 欄位時,temperatureReadings DynamoDB 資料表用盡了佈建的輸送量。DynamoDB 在第二次嘗試處理批次中的 AWS AppSync 剩餘元素ProvisionedThroughputExceededException時提出了一個。

以下JSON代表 DynamoDB 批次叫用之後,但在評估回應對應範本之前的序列化內容。

{ "arguments": { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" }, "source": null, "result": { "data": { "temperatureReadings": [ null ], "locationReadings": [ { "lat": 47.615063, "long": -122.333551, "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ] }, "unprocessedKeys": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] } }, "error": { "type": "DynamoDB:ProvisionedThroughputExceededException", "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" }, "outErrors": [] }

對於內容的幾個注意事項:

  • 呼叫錯誤已在$ctx.error上下文中設定 AWS AppSync,且錯誤類型已設定為 DynamoDB:。ProvisionedThroughputExceededException

  • 即使出現錯誤,也會針對每個資料表對應結果 ($ctx.result.data)

  • 未處理的索引鍵可透過 $ctx.result.data.unprocessedKeys 取得。在這裡, AWS AppSync 由於表吞吐量不足,因此無法使用密鑰:1sensorId,時間戳:17:21:05.000 + 08:00)檢索該項目。

注意:對於 BatchPutItem,則為 $ctx.result.data.unprocessedItems。如果是 BatchDeleteItem,則為 $ctx.result.data.unprocessedKeys

讓我們用三種不同的方式來處理這項錯誤。

1. 抑制呼叫錯誤

傳回資料而不處理呼叫錯誤,可有效地抑制錯誤,讓指定 GraphQL 欄位的結果一律成功。

我們所編寫的回應映射範本,只熟悉和著重於結果的資料。

回應映射範本:

$util.toJson($ctx.result.data)

GraphQL 回應:

{ "data": { "getReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "lat": 47.615063, "long": -122.333551 }, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] } }

錯誤回應中不會加入任何錯誤,因為只處理了資料。

2. 丟出錯誤來中止範本的執行

如果從用戶端的角度,部分故障應視為完全故障,則您可以中止範本的執行,來防止傳回資料。$util.error(...) 公用程式方法可確實完成這項動作。

回應映射範本:

## there was an error let's mark the entire field ## as failed and do not return any data back in the response #if ($ctx.error) $util.error($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

GraphQL 回應:

{ "data": { "getReadings": null }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }

即使 DynamoDB 批次作業可能已經傳回一些結果,我們還是選擇丟出錯誤,如此 getReadings GraphQL 欄位為 null,而錯誤也會加入到 GraphQL 回應的 errors (錯誤) 區塊。

3. 附加錯誤,以同時傳回資料和錯誤

在某些情況中,為提供更好的使用者體驗,應用程式可以傳回部分結果,並通知其用戶端有未處理的項目。用戶端可以決定是否要重試,或是轉譯錯誤後傳回給最終使用者。$util.appendError(...) 是一項公用程式方法,可讓應用程式的設計人員將錯誤附加於文字內容,但不會干擾到範本的評估,進而完成前述的動作。評估範本之後, AWS AppSync 會將任何內容錯誤附加至 GraphQL 回應的錯誤區塊來處理這些錯誤。

回應映射範本:

#if ($ctx.error) ## pass the unprocessed keys back to the caller via the `errorInfo` field $util.appendError($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

我們轉發了 GraphQL 響應的錯誤塊內的調用錯誤和 unprocessedKeys 元素。該getReadings字段還從locationReadings表中返回部分數據,您可以在下面的響應中看到。

GraphQL 回應:

{ "data": { "getReadings": [ null, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }