

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用AWS SDK for Java 2.x 异步编程
<a name="asynchronous"></a>

AWS SDK for Java 2.x 提供了异步客户端，这些客户端支持非阻塞 I/O，可以在几个线程中实现高并发。然而，并不能保证完全实现非阻塞 I/O。异步客户端在某些情况下仍可能执行阻塞调用，这些情况包括凭证检索、使用[AWS签名版本 4（SigV4）](https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html)请求签名或端点发现。

同步方法会阻止执行您的线程，直到客户端接收到服务的响应。异步方法会立即返回，并控制调用的线程，而不必等待响应。

由于异步方法在收到响应之前返回，所以需要通过某种方法在响应准备就绪时接收响应。适用于 Java 的 AWS SDK 2.x 中的异步客户端方法将返回 *CompletableFuture 对象*，这些对象可让您在响应准备就绪时访问响应。

## 使用异步客户端 API
<a name="basics-async-non-streaming"></a>

异步客户端方法的签名与其对应同步方法的签名相同，但是异步方法返回 [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/index.html?java/util/concurrent/CompletableFuture.html) 对象，该对象*在未来*会包含异步操作的结果。如果在 SDK 的异步方法执行时发生错误，则错误会作为 `CompletionException` 引发。

您可以用来获取结果的一种方法是，将一个 `whenComplete()` 方法链接到 SDK 方法调用所返回的 `CompletableFuture`。`whenComplete()` 方法接收结果或 Throwable 类型的对象 `CompletionException`，具体取决于完成异步调用的方式。您向 `whenComplete()` 提供一个操作，用于在结果返回给调用代码之前，对结果进行处理或检查。

如果要返回的不是 SDK 方法原本返回的对象，而是其他内容，请改用 `handle()` 方法。`handle()` 方法采用的参数与 `whenComplete()` 相同，但您可以处理结果并返回对象。

要等待异步链完成并检索完成结果，可以调用 `join()` 方法。如果未在链中处理 `Throwable` 对象，则 `join()` 方法会抛出封装了原始异常的 `CompletionException`。您可以使用 `CompletionException#getCause()` 访问原始异常。您也可以调用 `CompletableFuture#get()` 方法来获取完成结果。但 `get()` 方法会抛出受检异常。

以下示例显示了两种变体，说明如何使用 DynamoDB 异步客户端的 `listTables()` 方法。传递给 `whenComplete()` 的操作仅记录成功的响应，而 `handle()` 版本则提取表名列表并返回列表。在这两种情况下，如果异步链中生成错误，则会重新抛出错误，以便客户端代码有机会对其进行处理。

 **导入**。

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;

import java.util.List;
import java.util.concurrent.CompletableFuture;
```

 **代码** 

------
#### [ whenComplete() variation ]

```
public class DynamoDbAsyncListTables {

    public static void main(String[] args) {
        Region region = Region.US_EAST_1;
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        try {
            ListTablesResponse listTablesResponse = listTablesWhenComplete(dynamoDbAsyncClient).join();  // The join() method may throw a CompletionException.
            if (listTablesResponse.hasTableNames()){
                System.out.println("Table exist in this region: " + region.id());
            }
        } catch (RuntimeException e) {
            // Handle as needed. Here we simply print out the class names.
            System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'.
            System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'.
        }
    }

    public static CompletableFuture<ListTablesResponse> listTablesWhenComplete(DynamoDbAsyncClient client) {
        return client.listTables(ListTablesRequest.builder().build())
            .whenComplete((listTablesResponse, throwable) -> {
                if (listTablesResponse != null) {  // Consume the response.
                    System.out.println("The SDK's listTables method completed successfully.");
                } else {
                    RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException.
                                                                                      // The SDK throws only RuntimeExceptions, so this is a safe cast.
                    System.out.println(cause.getMessage());  // Log error here, but rethrow so the calling code can handle as needed.
                    throw cause;
                }
            });
    }
```

------
#### [ handle() variation ]

```
public class DynamoDbAsyncListTables {

    public static void main(String[] args) {
        Region region = Region.US_EAST_1;
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        try {
            List<String> tableNames = listTablesHandle(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException.
            tableNames.forEach(System.out::println);
        } catch (RuntimeException e) {
            // Handle as needed. Here we simply print out the class names.
            System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'.
            System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'.
        }
    }

    public static CompletableFuture<List<String>> listTablesHandle(DynamoDbAsyncClient client) {
        return client.listTables(ListTablesRequest.builder().build())
            .handle((listTablesResponse, throwable) -> {
                if (listTablesResponse != null) {
                    return listTablesResponse.tableNames(); // Return the list of table names.
                } else {
                    RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException.
                                                                                      // The SDK throws only RuntimeExceptions, so this is a safe cast.
                    System.out.println(cause.getMessage());  // Log error here, but rethrow so the calling code can handle as needed.
                    throw cause;
                }
            });
    }
}
```

------

## 使用异步方法处理流
<a name="basics-async-streaming"></a>

对于包含流式内容的异步方法，必须提供 [AsyncRequestBody](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/async/AsyncRequestBody.html)，以便以递增方式提供内容，或者提供 [AsyncResponseTransformer](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/async/AsyncResponseTransformer.html) 以接收和处理响应。

以下示例使用 `PutObject` 操作的异步形式，以异步方式将文件上传到 Amazon S3。

 **导入**。

```
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
```

 **代码** 

```
/**
 * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials.
 *
 * For information, see this documentation topic:
 *
 * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
 */

public class S3AsyncOps {

     public static void main(String[] args) {

         final String USAGE = "\n" +
                 "Usage:\n" +
                 "    S3AsyncOps <bucketName> <key> <path>\n\n" +
                 "Where:\n" +
                 "    bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" +
                 "    key - the name of the object (for example, book.pdf). \n" +
                 "    path - the local path to the file (for example, C:/AWS/book.pdf). \n" ;

        if (args.length != 3) {
            System.out.println(USAGE);
             System.exit(1);
        }

        String bucketName = args[0];
        String key = args[1];
        String path = args[2];

        Region region = Region.US_WEST_2;
        S3AsyncClient client = S3AsyncClient.builder()
                .region(region)
                .build();

        PutObjectRequest objectRequest = PutObjectRequest.builder()
                .bucket(bucketName)
                .key(key)
                .build();

        // Put the object into the bucket
        CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest,
                AsyncRequestBody.fromFile(Paths.get(path))
        );
        future.whenComplete((resp, err) -> {
            try {
                if (resp != null) {
                    System.out.println("Object uploaded. Details: " + resp);
                } else {
                    // Handle error
                    err.printStackTrace();
                }
            } finally {
                // Only close the client when you are completely done with it
                client.close();
            }
        });

        future.join();
    }
}
```

以下示例使用 `GetObject` 操作的异步形式，从 Amazon S3 获取文件。

 **导入**。

```
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
```

 **代码** 

```
/**
 * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials.
 *
 * For information, see this documentation topic:
 *
 * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
 */

public class S3AsyncStreamOps {

    public static void main(String[] args) {

        final String USAGE = "\n" +
                "Usage:\n" +
                "    S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" +
                "Where:\n" +
                "    bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" +
                "    objectKey - the name of the object (for example, book.pdf). \n" +
                "    path - the local path to the file (for example, C:/AWS/book.pdf). \n" ;

        if (args.length != 3) {
            System.out.println(USAGE);
            System.exit(1);
         }

        String bucketName = args[0];
        String objectKey = args[1];
        String path = args[2];

        Region region = Region.US_WEST_2;
        S3AsyncClient client = S3AsyncClient.builder()
                .region(region)
                .build();

        GetObjectRequest objectRequest = GetObjectRequest.builder()
                .bucket(bucketName)
                .key(objectKey)
                .build();

        CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest,
                AsyncResponseTransformer.toFile(Paths.get(path)));

        futureGet.whenComplete((resp, err) -> {
            try {
                if (resp != null) {
                    System.out.println("Object downloaded. Details: "+resp);
                } else {
                    err.printStackTrace();
                }
            } finally {
               // Only close the client when you are completely done with it
                client.close();
            }
        });
        futureGet.join();
    }
}
```

## 配置高级异步选项
<a name="advanced-operations"></a>

适用于 Java 的 AWS SDK 2.x 使用 [Netty](https://netty.io) 处理 I/O 线程，这是一种异步的事件驱动网络应用程序框架。适用于 Java 的 AWS SDK 2.x 在 Netty 之后创建 `ExecutorService`，以完成从 HTTP 客户端请求返回到 Netty 客户端的 future。这种抽象化可以减少在客服人员选择停止或休眠线程时，应用程序中断异步处理的风险。默认情况下，每个异步客户端都会根据处理器数量创建一个线程池，并管理 `ExecutorService` 队列中的任务。

在构建异步客户端时，可以指定特定的 JDK 实现 `ExecutorService`。以下代码段创建了一个具有固定线程数的 `ExecutorService`。

 **代码** 

```
S3AsyncClient clientThread = S3AsyncClient.builder()
  .asyncConfiguration(
    b -> b.advancedOption(SdkAdvancedAsyncClientOption
      .FUTURE_COMPLETION_EXECUTOR,
      Executors.newFixedThreadPool(10)
    )
  )
  .build();
```

要优化性能，您可以管理自己的线程池执行程序，并在配置客户端时包括该执行程序。

```
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50,
    10, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(<custom_value>),
    new ThreadFactoryBuilder()
      .threadNamePrefix("sdk-async-response").build());

// Allow idle core threads to time out
executor.allowCoreThreadTimeOut(true);

S3AsyncClient clientThread = S3AsyncClient.builder()
  .asyncConfiguration(
    b -> b.advancedOption(SdkAdvancedAsyncClientOption
      .FUTURE_COMPLETION_EXECUTOR,
      executor
    )
  )
  .build();
```