本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 适用于 Java 的 AWS SDK
2.x 处理分页结果
当响应对象太大而无法在单个响应中返回时,许多 AWS 操作都会返回分页结果。在 适用于 Java 的 AWS SDK 1.0 中,响应包含一个用于检索下一页结果的标记。相比之下, 适用于 Java 的 AWS SDK 2.x 具有自动分页方法,可以进行多次服务调用,自动为您获取下一页的结果。您只需编写处理结果的代码。自动分页功能适用于同步和异步客户端。
注意
这些代码段假设您了解使用 SDK 的基础知识,并且已为环境配置了单点登录访问权限。
同步分页
以下示例演示列出 Amazon S3 桶中对象的同步分页方法。
迭代页面
第一个示例演示如何使用分listRes
页器对象(一个ListObjectsV2Iterable
stream
代码在响应页面上进行流式传输,将响应流转换为S3Object
内容流,然后处理 Amazon S3 对象的内容。
以下导入适用于此同步分页部分中的所有示例。
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.waiters.S3Waiter;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
ListObjectsV2Request listReq = ListObjectsV2Request.builder()
.bucket(bucketName)
.maxKeys(1)
.build();
ListObjectsV2Iterable listRes = s3.listObjectsV2Paginator(listReq);
// Process response pages
listRes.stream()
.flatMap(r -> r.contents().stream())
.forEach(content -> System.out
.println(" Key: " + content.key() + " size = " + content.size()));
请参阅上的完整示例
迭代对象
以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。ListObjectsV2Iterable
类的 contents
方法返回一个 SdkIterable
使用流
以下代码段在响应内容上使用 stream
方法来迭代分页项目集合。
// Helper method to work with paginated collection of items directly.
listRes.contents().stream()
.forEach(content -> System.out
.println(" Key: " + content.key() + " size = " + content.size()));
请参阅上的完整示例
使用 for-each 循环
由于 SdkIterable
扩展了 Iterable
接口,因此您可以像处理任何 Iterable
一样处理内容。以下代码段使用标准 for-each
循环迭代响应的内容。
for (S3Object content : listRes.contents()) {
System.out.println(" Key: " + content.key() + " size = " + content.size());
}
请参阅上的完整示例
手动分页
如果您的使用案例需要手动分页,则手动分页仍然可用。对后续请求使用响应对象中的下一个令牌。以下示例使用 while
循环。
ListObjectsV2Request listObjectsReqManual = ListObjectsV2Request.builder()
.bucket(bucketName)
.maxKeys(1)
.build();
boolean done = false;
while (!done) {
ListObjectsV2Response listObjResponse = s3.listObjectsV2(listObjectsReqManual);
for (S3Object content : listObjResponse.contents()) {
System.out.println(content.key());
}
if (listObjResponse.nextContinuationToken() == null) {
done = true;
}
listObjectsReqManual = listObjectsReqManual.toBuilder()
.continuationToken(listObjResponse.nextContinuationToken())
.build();
}
请参阅上的完整示例
异步分页
以下示例演示了列出 DynamoDB 表格的异步分页方法。
迭代表名称页面
以下两个示例使用异步 DynamoDB 客户端,该客户端调用listTablesPaginator
该方法并请求获取。ListTablesPublisher
ListTablesPublisher
实现了两个接口,这为处理响应提供了许多选项。我们将研究每个接口的方法。
使用 Subscriber
以下代码示例演示如何使用 ListTablesPublisher
实现的 org.reactivestreams.Publisher
接口处理分页结果。要了解有关响应式流模型的更多信息,请参阅 React ive St GitHub reams 存储库
以下导入适用于此异步分页部分中的所有示例。
import io.reactivex.rxjava3.core.Flowable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.dynamodb.paginators.ListTablesPublisher;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
以下代码获取一个 ListTablesPublisher
实例。
// Creates a default client with credentials and region loaded from the
// environment.
final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create();
ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build();
ListTablesPublisher publisher = asyncClient.listTablesPaginator(listTablesRequest);
以下代码使用 org.reactivestreams.Subscriber
的匿名实现来处理每个页面的结果。
onSubscribe
方法将调用 Subscription.request
方法来对来自发布者的数据启动请求。必须调用此方法以开始从发布者获取数据。
订阅者的 onNext
方法将处理响应页面,它会访问所有表名称并打印出每个表名称。处理完该页面后,会向发布者请求另一个页面。将重复调用该方法,直到检索了所有页面。
如果检索数据时出现错误,将触发 onError
方法。最后,在 onComplete
方法在请求所有页面后调用。
// A Subscription represents a one-to-one life-cycle of a Subscriber subscribing
// to a Publisher.
publisher.subscribe(new Subscriber<ListTablesResponse>() {
// Maintain a reference to the subscription object, which is required to request
// data from the publisher.
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
// Request method should be called to demand data. Here we request a single
// page.
subscription.request(1);
}
@Override
public void onNext(ListTablesResponse response) {
response.tableNames().forEach(System.out::println);
// After you process the current page, call the request method to signal that
// you are ready for next page.
subscription.request(1);
}
@Override
public void onError(Throwable t) {
// Called when an error has occurred while processing the requests.
}
@Override
public void onComplete() {
// This indicates all the results are delivered and there are no more pages
// left.
}
});
请参阅上的完整示例
使用 Consumer
ListTablesPublisher
实现的 SdkPublisher
接口有一个 subscribe
方法,该方法接受 Consumer
并返回 CompletableFuture<Void>
。
此接口中的 subscribe
方法可用于 org.reactivestreams.Subscriber
开销可能过大的简单用例。当下面的代码使用每个页面时,它会在每个页面上调用 tableNames
方法。该 tableNames
方法返回使用 forEach
方法处理的 DynamoDB 表名的 java.util.List
。
// Use a Consumer for simple use cases.
CompletableFuture<Void> future = publisher.subscribe(
response -> response.tableNames()
.forEach(System.out::println));
请参阅上的完整示例
迭代表名称
以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。与之前用 contents
方法演示的同步 Amazon S3 示例类似,DynamoDB 异步结果类 ListTablesPublisher
具有与底层项目集合交互的 tableNames
便捷方法。此 tableNames
方法的返回类型是一个 SdkPublisher
使用 Subscriber
以下代码获取表名底层集合的 SdkPublisher
。
// Create a default client with credentials and region loaded from the
// environment.
final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create();
ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build();
ListTablesPublisher listTablesPublisher = asyncClient.listTablesPaginator(listTablesRequest);
SdkPublisher<String> publisher = listTablesPublisher.tableNames();
以下代码使用 org.reactivestreams.Subscriber
的匿名实现来处理每个页面的结果。
订阅者的 onNext
方法将处理集合中的单个元素。在本例中,它是一个表名称。处理完该表名称后,会向发布者请求另一个表名称。将重复调用该方法,直到检索了所有表名称。
// Use a Subscriber.
publisher.subscribe(new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override
public void onNext(String tableName) {
System.out.println(tableName);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
请参阅上的完整示例
使用 Consumer
以下示例使用 SdkPublisher
的 subscribe
方法(采用 Consumer
)来处理每个项目。
// Use a Consumer.
CompletableFuture<Void> future = publisher.subscribe(System.out::println);
future.get();
请参阅上的完整示例
使用第三方库
您可以使用其他第三方库,而不是实现自定义订阅者。此示例演示了的用法 RxJava,但是可以使用任何实现响应式流接口的库。有关该库的更多信息, GitHub请参阅上的 RxJava wiki 页面
要使用该库,请将其作为依赖项添加。如果使用了 Maven,示例将显示要使用的 POM 代码段。
POM 条目
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.6</version>
</dependency>
代码
DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create();
ListTablesPublisher publisher = asyncClient.listTablesPaginator(ListTablesRequest.builder()
.build());
// The Flowable class has many helper methods that work with
// an implementation of an org.reactivestreams.Publisher.
List<String> tables = Flowable.fromPublisher(publisher)
.flatMapIterable(ListTablesResponse::tableNames)
.toList()
.blockingGet();
System.out.println(tables);
请参阅上的完整示例