本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
处理分页结果:扫描和查询
DynamoDB 增强版API客户端的scan
、query
和batch
方法返回包含一个或多个页面的响应。一个页面包含一个或多个项目。您的代码可以按页处理响应,也可以处理单个项目。
同步DynamoDbEnhancedClient
客户端返回的分页响应返回一个PageIterableDynamoDbEnhancedAsyncClient
返回一个PagePublisher
本节介绍如何处理分页结果,并提供使用扫描和查询APIs的示例。
扫描表
SDK的scan
首先,我们通过查看同步映射类的scan
方法来探索PageIterable
接口DynamoDbTable
使用同步 API
以下示例演示使用表达式scan
方法。ProductCatalog是前面显示的模型对象。
在评论行 2 之后显示的筛选表达式将退回的ProductCatalog
商品限制为价格在 8.00 到 80.00 之间(含)的商品。
此示例还使用注释行 1 后面显示的attributesToProject
方法排除这些isbn
值。
在注释第 3 行之后pagedResults
,scan
方法返回PageIterable
对象。PageIterable
的 stream
方法返回一个 java.util.Stream
从注释行 4 开始,该示例演示访问 ProductCatalog
项目的两种变体。注释行 4a 之后的版本通过每个页面进行流式传输,并对每页上的项目进行排序和记录。注释行 4b 之后的版本会跳过页面迭代并直接访问项目。
由于 PageIterable
接口有两个父接口(java.lang.Iterable
SdkIterable
Iterable
引入了 forEach
、iterator
和 spliterator
方法,而 SdkIterable
引入了 stream
方法。
public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }
使用异步 API
异步 scan
方法将结果作为 PagePublisher
对象返回。PagePublisher
接口有两种 subscribe
方法可用于处理响应页面。一种 subscribe
方法来自 org.reactivestreams.Publisher
父接口。要使用第一个选项处理页面,请向 subscribe
方法传递一个 Subscriber
实例。接下来的第一个示例演示了 subscribe
方法的用法。
第二种subscribe
方法来自接SdkPublishersubscribe
接受 Consumer
Subscriber
。此 subscribe
方法变体如接下来的第二个示例所示。
以下示例演示 scan
方法的异步版本,该版本使用了上一个示例中的相同筛选表达式。
在注释行 3 之后,DynamoDbAsyncTable.scan
返回一个 PagePublisher
对象。在下一行,代码创建一个 org.reactivestreams.Subscriber
接口实例 ProductCatalogSubscriber
,在注释行 4 之后,该实例订阅到 PagePublisher
。
在 ProductCatalogSubscriber
类示例的注释行 8 之后,Subscriber
对象从 onNext
方法中的每个页面收集 ProductCatalog
项目。这些项目存储在私有 List
变量中,并在调用代码中使用 ProductCatalogSubscriber.getSubscribedItems()
方法对它们进行访问。这是在注释行 5 之后调用的。
检索了列表后,代码按价格对所有 ProductCatalog
项目进行排序并记录每个项目。
ProductCatalogSubscriber
类CountDownLatch
public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }
以下代码段示例使用的 PagePublisher.subscribe
方法版本在注释行 6 之后接受 Consumer
。Java lambda 参数使用页面,进一步处理每个项目。在此示例中,对每个页面进行处理,并对每页上的项目进行排序和记录。
// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
PagePublisher
的 items
方法对模型实例进行解包,以便您的代码可以直接处理这些项目。以下代码段演示了这种方法。
// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
查询表
DynamoDbTable
类的 query()
@DynamoDbPartitionKey
注释和可选的 @DynamoDbSortKey
注释用于定义数据类的主键。
query()
方法需要一个分区键值来查找与提供的值相匹配的项目。如果您的表还定义了排序键,则可以在查询中为它添加一个值作为额外的比较条件来微调结果。
除了处理结果之外,同步版本和异步版本 query()
的工作原理相同。与一样,对于同步调用 scan
API,query
API则返回 a,PageIterable
PagePublisher
对于异步调用,则返回 a。我们之前在扫描部分讨论了 PageIterable
和 PagePublisher
的使用。
Query
方法示例
下面的 query()
方法代码示例使用 MovieActor
类。数据类定义了一个复合主键,该主键由分区键的 movie
属性和排序键的 actor
属性组成。
该类还表示它使用名为 acting_award_year
的全局二级索引。索引的复合主键由分区键的 actingaward
属性和排序键的 actingyear
属性组成。在本主题的后面部分,我们将在演示如何创建和使用索引时,引用 acting_award_year
索引。
package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }
对以下项目进行查询之后的代码示例。
MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}
以下代码定义了两个QueryConditionalQueryConditionals
使用键值(可以单独使用分区键,也可以与排序键组合使用),并与 DynamoDB 服务的键条件表达式相对应。API在注释行 1 之后,该示例定义了与分区值为 movie01
的项目匹配的 keyEqual
实例。
此示例还在注释行 2 之后,定义了一个筛选表达式,过滤掉任何没有 actingschoolname
的项目。
在注释第 3 行之后,该示例显示了代码传递给DynamoDbTable.query()
方法的QueryEnhancedRequest
public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );
下面是运行该方法的输出。该输出显示 movieName
值为 movie01 的项目,不显示 actingSchoolName
等于 null
的项目。
2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}
在以下查询请求代码变体中,之前在注释行 3 之后显示的 keyEqual
QueryConditional
将替换为注释行 1a 之后定义的 sortGreaterThanOrEqualTo
QueryConditional
。以下代码还删除了筛选表达式。
QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo)
由于此表具有复合主键,因此所有 QueryConditional
实例都需要分区键值。以 sort...
开头的 QueryConditional
方法指示需要排序 键。结果未排序。
以下输出显示了查询的结果。该查询仅返回 movieName
值等于 movie01 且 actorName
值大于或等于 actor2 的项目。由于筛选条件已被删除,因此查询会返回没有 actingSchoolName
属性值的项目。
2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}