本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在中使用 DynamoDB 批量操作 AWS AppSync
AWS AppSync 支持对单个区域中的一个或多个表使用 Amazon DynamoDB 批量操作。支持的操作为 BatchGetItem
、BatchPutItem
和 BatchDeleteItem
。通过在中使用这些功能 AWS AppSync,您可以执行以下任务:
-
在单个查询中传递键列表,并从表中返回结果
-
在单个查询中从一个或多个表中读取记录
-
将记录批量写入到一个或多个表
-
在可能存在关系的多个表中有条件地写入或删除记录
中的批处理操作与非批处理操作 AWS AppSync 有两个主要区别:
-
数据来源角色必须具有解析器访问的所有表的权限。
-
解析器的表规范是请求对象的一部分。
单个表批处理
警告
BatchPutItem
与BatchDeleteItem
冲突检测和解决一起使用时不支持和。必须禁用这些设置以防止可能出现的错误。
首先,让我们创建一个新的 GraphQL API。在 AWS AppSync 控制台中,选择 “创建” API、“GraphQL APIs” 和 “从头开始设计”。命名您的 APIBatchTutorial API
,选择下一步,然后在指定 GraphQL 资源步骤中,选择稍后创建 GraphQL 资源,然后单击下一步。查看您的详细信息并创建API. 转到 “架构” 页面并粘贴以下架构,请注意,对于查询,我们将传入一个列表IDs:
type Post { id: ID! title: String } input PostInput { id: ID! title: String } type Query { batchGet(ids: [ID]): [Post] } type Mutation { batchAdd(posts: [PostInput]): [Post] batchDelete(ids: [ID]): [Post] }
保存您的架构,并选择页面顶部的创建资源。选择使用现有的类型,并选择 Post
类型。将您的表命名为 Posts
。确保主键设置为 id
,取消选择自动生成 GraphQL(您将提供自己的代码),然后选择创建。首先, AWS AppSync
创建一个新的 DynamoDB 表以及一个使用相应角色连接到该表的数据来源。不过,您仍然需要为该角色添加一些权限。转到数据来源页面,并选择新的数据来源。在选择现有角色下面,您会注意到已为该表自动创建了一个角色。记下这个角色(应该看起来像appsync-ds-ddb-aaabbbcccddd-Posts
),然后进入IAM控制台(https://console.aws.amazon.com/iam/+
”(名称应与角色名称相似)。在显示该策略时,选择折叠菜单顶部的编辑。您需要为您的策略添加批处理权限,具体来说是 dynamodb:BatchGetItem
和 dynamodb:BatchWriteItem
。代码片段如下所示:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "dynamodb:DeleteItem", "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:Query", "dynamodb:Scan", "dynamodb:UpdateItem", "dynamodb:BatchWriteItem", "dynamodb:BatchGetItem" ], "Resource": [ "arn:aws:dynamodb:…", "arn:aws:dynamodb:…" ] } ] }
选择下一步,然后选择保存更改。您的策略现在应该允许进行批处理。
返回 AWS AppSync 控制台,转到 “架构” 页面,然后选择Mutation.batchAdd
字段旁边的 “附加”。将 Posts
表作为数据来源以创建解析器。在代码编辑器中,将处理程序替换为下面的代码片段。该代码片段自动获取 GraphQL input PostInput
类型中的每个项目,并构建 BatchPutItem
操作所需的映射:
import { util } from "@aws-appsync/utils"; export function request(ctx) { return { operation: "BatchPutItem", tables: { Posts: ctx.args.posts.map((post) => util.dynamodb.toMapValues(post)), }, }; } export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type); } return ctx.result.data.Posts; }
导航到 AWS AppSync 控制台的 “查询” 页面并运行以下batchAdd
变更:
mutation add { batchAdd(posts:[{ id: 1 title: "Running in the Park"},{ id: 2 title: "Playing fetch" }]){ id title } }
您应该会看到在屏幕上输出的结果;可以在 DynamoDB 控制台中扫描写入到 Posts
表的值以验证这一点。
接下来,重复附加解析器的过程,但对于 Query.batchGet
字段,将 Posts
表作为数据来源。将处理程序替换为以下代码。这会自动接受 GraphQL ids:[]
类型的每个项目并构建 BatchGetItem
操作所需的一个映射:
import { util } from "@aws-appsync/utils"; export function request(ctx) { return { operation: "BatchGetItem", tables: { Posts: { keys: ctx.args.ids.map((id) => util.dynamodb.toMapValues({ id })), consistentRead: true, }, }, }; } export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type); } return ctx.result.data.Posts; }
现在,返回 AWS AppSync 控制台的 “查询” 页面并运行以下batchGet
查询:
query get { batchGet(ids:[1,2,3]){ id title } }
这应返回您早前添加的两个 id
值的结果。请注意,对于值为 3
的 id
,将返回 null
值。这是因为您的 Posts
表中还没有具有该值的记录。另请注意, AWS AppSync 返回结果的顺序与传递给查询的键的顺序相同,这是一项代表您 AWS AppSync 执行的附加功能。因此,如果切换到 batchGet(ids:[1,3,2])
,您会看到顺序发生了变化。您还将了解哪个 id
返回了 null
值。
最后,将另一个解析器附加到 Mutation.batchDelete
字段,并将 Posts
表作为数据来源。将处理程序替换为以下代码。这会自动接受 GraphQL ids:[]
类型的每个项目并构建 BatchGetItem
操作所需的一个映射:
import { util } from "@aws-appsync/utils"; export function request(ctx) { return { operation: "BatchDeleteItem", tables: { Posts: ctx.args.ids.map((id) => util.dynamodb.toMapValues({ id })), }, }; } export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type); } return ctx.result.data.Posts; }
现在,返回 AWS AppSync 控制台的 “查询” 页面并运行以下batchDelete
变更:
mutation delete { batchDelete(ids:[1,2]){ id } }
现在应删除 id
为 1
和 2
的记录。如果您更早重新运行 batchGet()
查询,则应返回 null
。
多个表批处理
警告
BatchPutItem
与BatchDeleteItem
冲突检测和解决一起使用时不支持和。必须禁用这些设置以防止可能出现的错误。
AWS AppSync 还允许您对表执行批量操作。我们来构建更复杂的应用程序。想象一下,我们构建一个宠物健康应用程序,其中传感器报告宠物的位置和体温。传感器由电池供电,并每隔几分钟尝试连接到网络。当传感器建立连接时,它会将其读数发送给我们 AWS AppSync API。然后,触发器分析数据,这样,就可以向宠物主人显示控制面板。我们重点关注如何表示传感器与后端数据存储之间的交互。
在 AWS AppSync 控制台中,选择 “创建” API、“GraphQL APIs” 和 “从头开始设计”。命名您的 APIMultiBatchTutorial API
,选择下一步,然后在指定 GraphQL 资源步骤中,选择稍后创建 GraphQL 资源,然后单击下一步。查看您的详细信息并创建API. 转到架构页面,并粘贴和保存以下架构:
type Mutation { # Register a batch of readings recordReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult # Delete a batch of readings deleteReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult } type Query { # Retrieve all possible readings recorded by a sensor at a specific time getReadings(sensorId: ID!, timestamp: String!): [SensorReading] } type RecordResult { temperatureReadings: [TemperatureReading] locationReadings: [LocationReading] } interface SensorReading { sensorId: ID! timestamp: String! } # Sensor reading representing the sensor temperature (in Fahrenheit) type TemperatureReading implements SensorReading { sensorId: ID! timestamp: String! value: Float } # Sensor reading representing the sensor location (lat,long) type LocationReading implements SensorReading { sensorId: ID! timestamp: String! lat: Float long: Float } input TemperatureReadingInput { sensorId: ID! timestamp: String value: Float } input LocationReadingInput { sensorId: ID! timestamp: String lat: Float long: Float }
我们需要创建两个 DynamoDB 表:
-
locationReadings
存储传感器位置读数。 -
temperatureReadings
存储传感器温度读数。
这两个表具有相同的主键结构:将 sensorId (String)
作为分区键,并将 timestamp (String)
作为排序键。
选择页面顶部的创建资源。选择使用现有的类型,并选择 locationReadings
类型。将您的表命名为 locationReadings
。确保将主键设置为 sensorId
,并将排序键设置为 timestamp
。取消选择自动生成 GraphQL(您将提供自己的代码),然后选择创建。为 temperatureReadings
重复该过程,并将 temperatureReadings
作为类型和表名称。使用与上面相同的键。
您的新表将包含自动生成的角色。您仍然需要为这些角色添加一些权限。转到数据来源页面并选择 locationReadings
。在选择现有角色下面,您可以看到该角色。记下这个角色(应该看起来像appsync-ds-ddb-aaabbbcccddd-locationReadings
),然后进入IAM控制台(https://console.aws.amazon.com/iam/+
”(名称应与角色名称相似)。在显示该策略时,选择折叠菜单顶部的编辑。您需要为该策略添加权限。代码片段如下所示:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "dynamodb:DeleteItem", "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:Query", "dynamodb:Scan", "dynamodb:UpdateItem", "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Resource": [ "arn:aws:dynamodb:region:account:table/locationReadings", "arn:aws:dynamodb:region:account:table/locationReadings/*", "arn:aws:dynamodb:region:account:table/temperatureReadings", "arn:aws:dynamodb:region:account:table/temperatureReadings/*" ] } ] }
选择下一步,然后选择保存更改。使用上面相同的策略片段,为 temperatureReadings
数据来源重复该过程。
BatchPutItem -记录传感器读数
我们的传感器需要能够在连接到 Internet 后立即发送其读数。API他们将使用 GraphQL 字段Mutation.recordReadings
来执行此操作。我们需要在该字段中添加一个解析器。
在 AWS AppSync 控制台的 “架构” 页面中,选择Mutation.recordReadings
字段旁边的 “附加”。在下一个屏幕上,将 locationReadings
表作为数据来源以创建解析器。
在创建解析器后,在编辑器中将处理程序替换为以下代码。我们可以通过 BatchPutItem
操作指定多个表:
import { util } from '@aws-appsync/utils' export function request(ctx) { const { locReadings, tempReadings } = ctx.args const locationReadings = locReadings.map((loc) => util.dynamodb.toMapValues(loc)) const temperatureReadings = tempReadings.map((tmp) => util.dynamodb.toMapValues(tmp)) return { operation: 'BatchPutItem', tables: { locationReadings, temperatureReadings, }, } } export function response(ctx) { if (ctx.error) { util.appendError(ctx.error.message, ctx.error.type) } return ctx.result.data }
使用批处理操作,可能会从调用中同时返回错误和结果。在这种情况下,我们可以自主执行一些额外的错误处理。
注意
使用 utils.appendError()
与 util.error()
类似,主要区别在于,它不会中断请求或响应处理程序评估。相反,它指示字段存在错误,但允许评估处理程序,从而将数据返回到调用方。在您的应用程序需要返回部分结果时,我们建议您使用 utils.appendError()
。
保存解析器并导航到 AWS AppSync 控制台中的 “查询” 页面。我们现在可以发送一些传感器读数。
执行以下变更:
mutation sendReadings { recordReadings( tempReadings: [ {sensorId: 1, value: 85.5, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, value: 85.7, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, value: 85.8, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, value: 84.2, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, value: 81.5, timestamp: "2018-02-01T17:21:09.000+08:00"} ] locReadings: [ {sensorId: 1, lat: 47.615063, long: -122.333551, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, lat: 47.615163, long: -122.333552, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, lat: 47.615263, long: -122.333553, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, lat: 47.615363, long: -122.333554, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, lat: 47.615463, long: -122.333555, timestamp: "2018-02-01T17:21:09.000+08:00"} ]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }
我们在一个变更中发送了 10 个传感器读数,这些读数拆分到两个表中。使用 DynamoDB 控制台验证是否在 locationReadings
和 temperatureReadings
表中显示数据。
BatchDeleteItem -删除传感器读数
同样,我们还需要能够批量删除传感器读数。我们使用 Mutation.deleteReadings
GraphQL 字段来实现此目的。在 AWS AppSync 控制台的 “架构” 页面中,选择Mutation.deleteReadings
字段旁边的 “附加”。在下一个屏幕上,将 locationReadings
表作为数据来源以创建解析器。
在创建解析器后,在代码编辑器中将处理程序替换为下面的代码片段。在该解析器中,我们使用帮助程序函数映射器,该映射器从提供的输入中提取 sensorId
和 timestamp
。
import { util } from '@aws-appsync/utils' export function request(ctx) { const { locReadings, tempReadings } = ctx.args const mapper = ({ sensorId, timestamp }) => util.dynamodb.toMapValues({ sensorId, timestamp }) return { operation: 'BatchDeleteItem', tables: { locationReadings: locReadings.map(mapper), temperatureReadings: tempReadings.map(mapper), }, } } export function response(ctx) { if (ctx.error) { util.appendError(ctx.error.message, ctx.error.type) } return ctx.result.data }
保存解析器并导航到 AWS AppSync 控制台中的 “查询” 页面。现在,让我们删除几个传感器读数。
执行以下变更:
mutation deleteReadings { # Let's delete the first two readings we recorded deleteReadings( tempReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}] locReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }
注意
与 DeleteItem
操作相反,响应中不会返回完全删除的项目。只返回传递的键。要了解更多信息,请参阅 Dynamo D JavaScript B BatchDeleteItem 的解析器内函数参考。
通过 DynamoDB 控制台验证是否从 locationReadings
和 temperatureReadings
表中删除了这两个读数。
BatchGetItem -检索读数
我们的应用程序的另一个常见操作是,检索传感器在特定时间点的读数。我们将解析器附加到架构上的 Query.getReadings
GraphQL 字段。在 AWS AppSync 控制台的 “架构” 页面中,选择Query.getReadings
字段旁边的 “附加”。在下一个屏幕上,将 locationReadings
表作为数据来源以创建解析器。
让我们使用以下代码:
import { util } from '@aws-appsync/utils' export function request(ctx) { const keys = [util.dynamodb.toMapValues(ctx.args)] const consistentRead = true return { operation: 'BatchGetItem', tables: { locationReadings: { keys, consistentRead }, temperatureReadings: { keys, consistentRead }, }, } } export function response(ctx) { if (ctx.error) { util.appendError(ctx.error.message, ctx.error.type) } const { locationReadings: locs, temperatureReadings: temps } = ctx.result.data return [ ...locs.map((l) => ({ ...l, __typename: 'LocationReading' })), ...temps.map((t) => ({ ...t, __typename: 'TemperatureReading' })), ] }
保存解析器并导航到 AWS AppSync 控制台中的 “查询” 页面。现在,让我们检索传感器读数。
执行以下查询:
query getReadingsForSensorAndTime { # Let's retrieve the very first two readings getReadings(sensorId: 1, timestamp: "2018-02-01T17:21:06.000+08:00") { sensorId timestamp ...on TemperatureReading { value } ...on LocationReading { lat long } } }
我们已经成功演示了使用的 DynamoDB 批处理操作的用法。 AWS AppSync
错误处理
在中 AWS AppSync,数据源操作有时会返回部分结果。部分结果是一个术语,我们用它来表示操作的输出中包含某些数据和一个错误。由于错误处理本质上是特定于应用程序的, AWS AppSync 因此您可以有机会在响应处理程序中处理错误。上下文中的解析器调用错误(如果有)为 ctx.error
。调用错误始终包含一条消息和一个类型,可作为属性 ctx.error.message
和 ctx.error.type
进行访问。在响应处理程序中,您可以使用三种方法处理部分结果:
-
仅返回数据以忽略调用错误。
-
停止处理程序评估以引发错误(使用
util.error(...)
),这不会返回任何数据。 -
附加一个错误(使用
util.appendError(...)
)并且也返回数据。
让我们通过 DynamoDB 批处理操作分别说明上述三点。
DynamoDB 批处理操作
借助 DynamoDB 批处理操作,批处理可能会部分完成。也就是说,某些请求的项目或键未得到处理。如果无法完成批处理, AWS AppSync 则将在上下文中设置未处理的项目和调用错误。
我们将使用本教程前一部分中 Query.getReadings
操作的 BatchGetItem
字段配置来实施错误处理。这一次,我们假定在执行 Query.getReadings
字段时,temperatureReadings
DynamoDB 表耗尽了预置的吞吐量。DynamoDB 在第二次尝试 AWS AppSync 时引发了 ProvisionedThroughputExceededException
a,用于处理批次中的剩余元素。
以下内容JSON表示在 DynamoDB 批处理调用之后但在调用响应处理程序之前的序列化上下文:
{ "arguments": { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" }, "source": null, "result": { "data": { "temperatureReadings": [ null ], "locationReadings": [ { "lat": 47.615063, "long": -122.333551, "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ] }, "unprocessedKeys": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] } }, "error": { "type": "DynamoDB:ProvisionedThroughputExceededException", "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" }, "outErrors": [] }
关于上下文需要注意的几点:
-
调用错误已在
ctx.error
上下文中设置 AWS AppSync,错误类型已设置为DynamoDB:ProvisionedThroughputExceededException
。 -
即使存在错误,也会在
ctx.result.data
中为每个表映射结果。 -
在
ctx.result.data.unprocessedKeys
中提供了未处理的键。在这里, AWS AppSync 由于表吞吐量不足,无法使用密钥:1sensorId,timestamp: 2018-02-01T 17:21:05.000 + 08:00) 检索项目。
注意
对于 BatchPutItem
,它是 ctx.result.data.unprocessedItems
。对于 BatchDeleteItem
,它是 ctx.result.data.unprocessedKeys
。
我们通过三种不同方式处理此错误。
1. 承受调用错误
返回数据而不处理调用错误:这会有效地承受此错误,同时使给定 GraphQL 字段的结果始终成功。
我们编写的代码是熟悉的,并且仅关注结果数据。
响应处理程序
export function response(ctx) { return ctx.result.data }
GraphQL 响应
{ "data": { "getReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "lat": 47.615063, "long": -122.333551 }, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] } }
将不向错误响应中添加任何错误,因为只对数据执行了操作。
2. 引发错误以中止执行响应处理程序
从客户端角度看,在应将部分失败视为完全失败时,您可以中止执行响应处理程序以防止返回数据。util.error(...)
实用程序方法实现完全此行为。
响应处理程序代码
export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type, null, ctx.result.data.unprocessedKeys); } return ctx.result.data; }
GraphQL 响应
{ "data": { "getReadings": null }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }
即使可能已从 DynamoDB 批处理操作返回了一些结果,我们也选择引发错误,这样,getReadings
GraphQL 字段为 Null,并且此错误已添加到 GraphQL 响应的错误数据块中。
3. 追加错误以返回数据和错误
在某些情况下,为了提供更好的用户体验,应用程序可以返回部分结果并向其客户端通知未处理的项目。客户端可以决定是实施重试,还是将错误翻译出来并返回给最终用户。util.appendError(...)
是一种实现该行为的实用程序方法,它让应用程序设计者在上下文中附加错误,而不干扰响应处理程序评估。评估响应处理程序后, AWS AppSync 将通过将任何上下文错误附加到 GraphQL 响应的错误块来处理它们。
响应处理程序代码
export function response(ctx) { if (ctx.error) { util.appendError(ctx.error.message, ctx.error.type, null, ctx.result.data.unprocessedKeys); } return ctx.result.data; }
我们在 GraphQL 响应的错误块中转发了调用错误和 unprocessedKeys
元素。getReadings
字段也从 locationReadings
表中返回部分数据,如下面的响应中所示。
GraphQL 响应
{ "data": { "getReadings": [ null, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }