在中使用 DynamoDB 批量操作 AWS AppSync - AWS AppSync

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在中使用 DynamoDB 批量操作 AWS AppSync

注意

我们现在主要支持 APPSYNC _JS 运行时及其文档。请考虑在此处使用 APPSYNC _JS 运行时及其指南。

AWS AppSync 支持对单个区域中的一个或多个表使用 Amazon DynamoDB 批量操作。支持的操作为 BatchGetItemBatchPutItemBatchDeleteItem。通过在中使用这些功能 AWS AppSync,您可以执行以下任务:

  • 在单个查询中传递键列表,并从表中返回结果

  • 在单个查询中从一个或多个表读取记录

  • 将记录批量写入一个或多个表

  • 有条件写入或删除多个可能有关系的表中的记录

在 DynamoDB AWS AppSync 中使用批处理操作是一种高级技术,需要对后端操作和表结构多加思考和了解。此外,中的批处理操作与非批处理操作 AWS AppSync 有两个主要区别:

  • 数据来源角色必须对解析器将访问的所有表具有权限。

  • 解析器的表规范是映射模板的一部分。

权限

与其他解析器一样,您需要在中创建数据源, AWS AppSync 然后创建角色或使用现有角色。由于批处理操作需要具有 DynamoDB 表的不同权限,因此,您需要为配置的角色授予读取或写入操作权限:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/TABLENAME", "arn:aws:dynamodb:region:account:table/TABLENAME/*" ] } ] }

注意:角色与中的数据源相关联 AWS AppSync,字段上的解析器是针对数据源调用的。配置为针对 DynamoDB 获取的数据来源仅指定一个表,以使配置保持简单。因此,当在单个解析器中针对多个表执行批处理操作(这是一项更高级的任务)时,您必须向该数据来源的角色授予对将与解析器进行交互的任何表的访问权限。这将在上述IAM政策的 “资源” 字段中完成。对表进行配置以便对它们进行批处理调用是在解析器模板中实现的,下面将介绍相关内容。

数据来源

为简便起见,我们将为本教程中使用的所有解析器使用同一个数据来源。在数据源选项卡上,创建一个新的 DynamoDB 数据源并将其命名。BatchTutorial表名称可以是任何内容,因为表名称被指定为批处理操作的请求映射模板的一部分。我们将提供表名称 empty

对于本教程,具有以下内联策略的任何角色都有效:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/Posts", "arn:aws:dynamodb:region:account:table/Posts/*", "arn:aws:dynamodb:region:account:table/locationReadings", "arn:aws:dynamodb:region:account:table/locationReadings/*", "arn:aws:dynamodb:region:account:table/temperatureReadings", "arn:aws:dynamodb:region:account:table/temperatureReadings/*" ] } ] }

单个表批处理

警告

BatchPutItemBatchDeleteItem冲突检测和解决一起使用时不支持和。必须禁用这些设置以防止可能出现的错误。

在此示例中,假设您有一个名为 Posts 的表,您要通过批处理操作在其中添加和删除项目。使用以下架构,请注意,对于查询,我们将传入一个列表IDs:

type Post { id: ID! title: String } input PostInput { id: ID! title: String } type Query { batchGet(ids: [ID]): [Post] } type Mutation { batchAdd(posts: [PostInput]): [Post] batchDelete(ids: [ID]): [Post] } schema { query: Query mutation: Mutation }

使用以下请求映射模板将一个解析器附加到 batchAdd() 字段。这会自动接受 GraphQL input PostInput 类型的每个项目并构建 BatchPutItem 操作所需的一个映射:

#set($postsdata = []) #foreach($item in ${ctx.args.posts}) $util.qr($postsdata.add($util.dynamodb.toMapValues($item))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "Posts": $utils.toJson($postsdata) } }

在这种情况下,响应映射模板是一个简单的传递,但表名称作为 ..data.Posts 追加到上下文对象,如下所示:

$util.toJson($ctx.result.data.Posts)

现在导航到 AWS AppSync控制台的 “查询” 页面并运行以下batchAdd变更:

mutation add { batchAdd(posts:[{ id: 1 title: "Running in the Park"},{ id: 2 title: "Playing fetch" }]){ id title } }

您应该会看到输出到屏幕的结果,并且可以通过 DynamoDB 控制台单独验证这两个值是否写入到 Posts 表。

接下来,使用以下请求映射模板将一个解析器附加到 batchGet() 字段。这会自动接受 GraphQL ids:[] 类型的每个项目并构建 BatchGetItem 操作所需的一个映射:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "Posts": { "keys": $util.toJson($ids), "consistentRead": true, "projection" : { "expression" : "#id, title", "expressionNames" : { "#id" : "id"} } } } }

响应映射模板还是一个简单的传递,且再一次,表名称作为 ..data.Posts 追加到上下文对象:

$util.toJson($ctx.result.data.Posts)

现在返回 AWS AppSync控制台的 “查询” 页面,然后运行以下batchGet 查询

query get { batchGet(ids:[1,2,3]){ id title } }

这应返回您早前添加的两个 id 值的结果。请注意,对于值为 nullid,将返回 3 值。这是因为您的 Posts 表中还没有具有该值的记录。另请注意, AWS AppSync返回结果的顺序与传递给查询的密钥的顺序相同,这是一项代表您 AWS AppSync 执行的附加功能。因此,如果您切换到 batchGet(ids:[1,3,2),将看到订单已更改。您还将了解哪个 id 返回了 null 值。

最后,使用以下请求映射模板将一个解析器附加到 batchDelete() 字段。这会自动接受 GraphQL ids:[] 类型的每个项目并构建 BatchGetItem 操作所需的一个映射:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "Posts": $util.toJson($ids) } }

响应映射模板还是一个简单的传递,且再一次,表名称作为 ..data.Posts 追加到上下文对象:

$util.toJson($ctx.result.data.Posts)

现在返回 AWS AppSync控制台的 Q ueries 页面,然后运行以下batchDelete变更:

mutation delete { batchDelete(ids:[1,2]){ id } }

现在应删除 id12 的记录。如果您更早重新运行 batchGet() 查询,则应返回 null

多个表批处理

警告

BatchPutItemBatchDeleteItem冲突检测和解决一起使用时不支持和。必须禁用这些设置以防止可能出现的错误。

AWS AppSync 还允许您对表执行批量操作。我们来构建更复杂的应用程序。想象一下,我们正在构建一个 Pet Health 应用程序,其中的传感器报告宠物位置和身体温度。传感器由电池供电,并每隔几分钟尝试连接到网络。当传感器建立连接时,它会将其读数发送给我们 AWS AppSync API。然后,触发器分析数据,这样,就可以向宠物主人显示控制面板。我们重点关注如何表示传感器与后端数据存储之间的交互。

作为先决条件,让我们先创建两个 DynamoDB 表 locationReadings;用于存储传感器位置读数temperatureReadings并存储传感器温度读数。这两个表正好共享相同的主键结构:sensorId (String) 是分区键,而 timestamp (String) 是排序键。

我们使用以下 GraphQL 架构:

type Mutation { # Register a batch of readings recordReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult # Delete a batch of readings deleteReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult } type Query { # Retrieve all possible readings recorded by a sensor at a specific time getReadings(sensorId: ID!, timestamp: String!): [SensorReading] } type RecordResult { temperatureReadings: [TemperatureReading] locationReadings: [LocationReading] } interface SensorReading { sensorId: ID! timestamp: String! } # Sensor reading representing the sensor temperature (in Fahrenheit) type TemperatureReading implements SensorReading { sensorId: ID! timestamp: String! value: Float } # Sensor reading representing the sensor location (lat,long) type LocationReading implements SensorReading { sensorId: ID! timestamp: String! lat: Float long: Float } input TemperatureReadingInput { sensorId: ID! timestamp: String value: Float } input LocationReadingInput { sensorId: ID! timestamp: String lat: Float long: Float }

BatchPutItem -记录传感器读数

我们的传感器需要能够在连接到 Internet 后立即发送其读数。API他们将使用 GraphQL 字段Mutation.recordReadings来执行此操作。让我们安装一个解析器,让我们的生活栩栩API如生。

选择 Mutation.recordReadings 字段旁边的附加。在下一个屏幕上,选择本教程开始时创建的同一个 BatchTutorial 数据来源。

我们添加以下请求映射模板。

请求映射模板

## Convert tempReadings arguments to DynamoDB objects #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) $util.qr($tempReadings.add($util.dynamodb.toMapValues($reading))) #end ## Convert locReadings arguments to DynamoDB objects #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) $util.qr($locReadings.add($util.dynamodb.toMapValues($reading))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

如您所见,BatchPutItem 操作使我们能够指定多个表。

我们使用以下响应映射模板。

响应映射模板

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also returns data for the field in the GraphQL response $utils.toJson($ctx.result.data)

使用批处理操作,可能会从调用中同时返回错误和结果。在这种情况下,我们可以自主执行一些额外的错误处理。

注意$utils.appendError() 的用法与 $util.error() 相似,主要区别是前者不中断对映射模板的评估。相反,它指示字段发生了错误,但允许评估模板,因此会将数据返回给调用方。我们建议,当您的应用程序需要返回部分结果时使用 $utils.appendError()

保存解析器并导航到 AWS AppSync 控制台的 “查询” 页面。现在发送一些传感器读数!

执行以下变更:

mutation sendReadings { recordReadings( tempReadings: [ {sensorId: 1, value: 85.5, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, value: 85.7, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, value: 85.8, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, value: 84.2, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, value: 81.5, timestamp: "2018-02-01T17:21:09.000+08:00"} ] locReadings: [ {sensorId: 1, lat: 47.615063, long: -122.333551, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, lat: 47.615163, long: -122.333552, timestamp: "2018-02-01T17:21:06.000+08:00"} {sensorId: 1, lat: 47.615263, long: -122.333553, timestamp: "2018-02-01T17:21:07.000+08:00"} {sensorId: 1, lat: 47.615363, long: -122.333554, timestamp: "2018-02-01T17:21:08.000+08:00"} {sensorId: 1, lat: 47.615463, long: -122.333555, timestamp: "2018-02-01T17:21:09.000+08:00"} ]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

我们在一个变更中发送了 10 个传感器读数,并在两个表之间拆分读数。使用 DynamoDB 控制台验证数据是否同时locationReadings显示在和表中。temperatureReadings

BatchDeleteItem -删除传感器读数

同样,我们也需要删除批量传感器读数。我们使用 Mutation.deleteReadings GraphQL 字段来实现此目的。选择 Mutation.recordReadings 字段旁边的附加。在下一个屏幕上,选择本教程开始时创建的同一个 BatchTutorial 数据来源。

我们使用以下请求映射模板。

请求映射模板

## Convert tempReadings arguments to DynamoDB primary keys #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($tempReadings.add($util.dynamodb.toMapValues($pkey))) #end ## Convert locReadings arguments to DynamoDB primary keys #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($locReadings.add($util.dynamodb.toMapValues($pkey))) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

该响应映射模板与我们用于 Mutation.recordReadings 的模板相同。

响应映射模板

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also return data for the field in the GraphQL response $utils.toJson($ctx.result.data)

保存解析器并导航到 AWS AppSync 控制台的 “查询” 页面。现在,让我们删除几个传感器读数!

执行以下变更:

mutation deleteReadings { # Let's delete the first two readings we recorded deleteReadings( tempReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}] locReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

通过 DynamoDB 控制台验证这两个读数是否已从和表中删除。locationReadingstemperatureReadings

BatchGetItem -检索读数

Pet Health 应用程序的另一个常见操作是检索特定时间点的传感器读数。我们将解析器附加到架构上的 Query.getReadings GraphQL 字段。选择 Attach (附加),然后,在下一个屏幕上选择本教程开始时创建的同一个 BatchTutorial 数据来源。

我们添加以下请求映射模板。

请求映射模板

## Build a single DynamoDB primary key, ## as both locationReadings and tempReadings tables ## share the same primary key structure #set($pkey = {}) $util.qr($pkey.put("sensorId", $ctx.args.sensorId)) $util.qr($pkey.put("timestamp", $ctx.args.timestamp)) { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "locationReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true }, "temperatureReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true } } }

请注意,我们现在正在使用该BatchGetItem操作。

我们的响应映射模板会有点不同,因为我们选择返回 SensorReading 列表。我们将调用结果映射到所需的形状。

响应映射模板

## Merge locationReadings and temperatureReadings ## into a single list ## __typename needed as schema uses an interface #set($sensorReadings = []) #foreach($locReading in $ctx.result.data.locationReadings) $util.qr($locReading.put("__typename", "LocationReading")) $util.qr($sensorReadings.add($locReading)) #end #foreach($tempReading in $ctx.result.data.temperatureReadings) $util.qr($tempReading.put("__typename", "TemperatureReading")) $util.qr($sensorReadings.add($tempReading)) #end $util.toJson($sensorReadings)

保存解析器并导航到 AWS AppSync 控制台的 “查询” 页面。现在,我们来检索传感器读数!

执行以下查询:

query getReadingsForSensorAndTime { # Let's retrieve the very first two readings getReadings(sensorId: 1, timestamp: "2018-02-01T17:21:06.000+08:00") { sensorId timestamp ...on TemperatureReading { value } ...on LocationReading { lat long } } }

我们已经成功演示了使用的 DynamoDB 批处理操作的用法。 AWS AppSync

错误处理

在中 AWS AppSync,数据源操作有时会返回部分结果。部分结果是一个术语,我们用它来表示操作的输出中包含某些数据和一个错误。由于错误处理本质上是特定于应用程序的, AWS AppSync 因此您有机会处理响应映射模板中的错误。上下文中的解析器调用错误(如果有)为 $ctx.error。调用错误始终包含一条消息和一个类型,可作为属性 $ctx.error.message$ctx.error.type 进行访问。在响应映射模板调用期间,您可以通过三种方式处理部分结果:

  1. 仅返回数据以忽略调用错误

  2. 通过停止响应映射模板评估(这不会返回任何数据)来引发错误(使用 $util.error(...))。

  3. 附加一个错误(使用 $util.appendError(...))并且也返回数据

让我们通过 DynamoDB 批处理操作分别说明上述三点!

DynamoDB 批处理操作

借助 DynamoDB 批处理操作,批处理可能会部分完成。也就是说,某些请求的项目或键未得到处理。如果无法完成批处理, AWS AppSync 则将在上下文中设置未处理的项目和调用错误。

我们将使用本教程前一部分中 Query.getReadings 操作的 BatchGetItem 字段配置来实施错误处理。这一次,我们假定在执行 Query.getReadings 字段时,temperatureReadings DynamoDB 表耗尽了预置的吞吐量。DynamoDB 在第二次尝试 AWS AppSync 时引发了 ProvisionedThroughputExceededExceptiona,用于处理批次中的剩余元素。

以下内容JSON表示在 DynamoDB 批量调用之后但在评估响应映射模板之前的序列化上下文。

{ "arguments": { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" }, "source": null, "result": { "data": { "temperatureReadings": [ null ], "locationReadings": [ { "lat": 47.615063, "long": -122.333551, "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ] }, "unprocessedKeys": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] } }, "error": { "type": "DynamoDB:ProvisionedThroughputExceededException", "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" }, "outErrors": [] }

关于上下文需要注意的几点:

  • 已在$ctx.error上下文中设置了调用错误 AWS AppSync,错误类型已设置为 Dynam oDB:。ProvisionedThroughputExceededException

  • 即使存在错误,也会在 $ctx.result.data 中为每个表映射结果

  • $ctx.result.data.unprocessedKeys 中提供了未处理的键。在这里, AWS AppSync 由于表吞吐量不足,无法使用密钥:1sensorId,timestamp: 2018-02-01T 17:21:05.000 + 08:00) 检索项目。

注意:对于 BatchPutItem,它是 $ctx.result.data.unprocessedItems。对于 BatchDeleteItem,它是 $ctx.result.data.unprocessedKeys

我们通过三种不同方式处理此错误。

1. 承受调用错误

返回数据而不处理调用错误:这会有效地承受此错误,同时使给定 GraphQL 字段的结果始终成功。

我们编写的响应映射模板是熟悉的,仅侧重于结果数据。

响应映射模板:

$util.toJson($ctx.result.data)

GraphQL 响应:

{ "data": { "getReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "lat": 47.615063, "long": -122.333551 }, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] } }

将不向错误响应中添加任何错误,因为只对数据执行了操作。

2. 引发错误以中止模板执行

从客户端角度看,在应将部分失败视为完全失败时,您可以中止执行模板以防止返回数据。$util.error(...) 实用程序方法实现完全此行为。

响应映射模板:

## there was an error let's mark the entire field ## as failed and do not return any data back in the response #if ($ctx.error) $util.error($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

GraphQL 响应:

{ "data": { "getReadings": null }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }

即使可能已从 DynamoDB 批处理操作返回了一些结果,我们也选择引发错误,这样,getReadings GraphQL 字段为 Null,并且此错误已添加到 GraphQL 响应的错误数据块中。

3. 追加错误以返回数据和错误

在某些情况下,为了提供更好的用户体验,应用程序可以返回部分结果并向其客户端通知未处理的项目。客户端可以决定是实施重试,还是将错误翻译出来并返回给最终用户。$util.appendError(...) 是一个可以启用此行为的实用程序方法,具体方式为:让应用程序设计人员在上下文中追加错误,而不干扰对模板的评估。评估模板后, AWS AppSync 将通过将任何上下文错误附加到 GraphQL 响应的错误块来处理这些错误。

响应映射模板:

#if ($ctx.error) ## pass the unprocessed keys back to the caller via the `errorInfo` field $util.appendError($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

我们同时转发了 GraphQL 响应的错误块中的调用错误和 unprocessedKeys 元素。该getReadings字段还会返回locationReadings表中的部分数据,如下面的响应所示。

GraphQL 响应:

{ "data": { "getReadings": [ null, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }