使用非同步程式設計 - AWS SDK for Java 2.x

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用非同步程式設計

這些 AWS SDK for Java 2.x 功能具有非封鎖 I/O 支援的非同步用戶端,可在幾個執行緒中實作高並行。不過,無法保證非封鎖 I/O 總數。非同步用戶端可能會在某些情況下執行封鎖呼叫,例如憑證擷取、使用 AWS Signature 第 4 版 (SigV4) 請求簽署,或端點探索。

同步方法會封鎖您的執行緒執行,直到用戶端收到服務的回應。非同步方法會立即傳回,將控制權回歸給呼叫端執行緒,無需等待回應。

由於非同步方法會在有可用回應之前傳回,您需要一個方法在回應準備好時取得回應。 AWS SDK for Java 非同步用戶端在 2.x 傳回CompletableFuture 物件中的方法,可讓您在回應準備就緒時存取回應。

使用非同步用戶端 APIs

非同步用戶端方法的簽章與其同步方法相同,但非同步方法會傳回包含未來非同步操作結果的CompletableFuture物件。如果在 SDK的非同步方法執行時擲出錯誤,則錯誤會擲出為 CompletionException

您可以使用 取得結果的一個方法是將whenComplete()方法鏈結到SDK方法呼叫CompletableFuture傳回的 。該whenComplete()方法CompletionException會根據非同步呼叫的完成方式,接收結果或類型 的可推展物件。您向 提供動作whenComplete(),以在結果傳回至呼叫程式碼之前處理或檢查結果。

如果您想要傳回SDK方法傳回之物件以外的其他物件,請改用 handle()方法。此handle()方法使用與 相同的參數whenComplete(),但您可以處理結果並傳回物件。

若要等待非同步鏈完成並擷取完成結果,您可以呼叫 join()方法。如果未在鏈中處理Throwable物件,則 join()方法會捨棄未核取的 CompletionException,以包裝原始例外狀況。您可以使用 存取原始例外狀況CompletionException#getCause()。您也可以呼叫 CompletableFuture#get()方法來取得完成結果。不過, get()方法可能會擲回核取的例外狀況。

下列範例顯示如何使用 DynamoDB 非同步用戶端listTables()方法的兩種變化。傳遞給 的動作whenComplete()只會記錄成功的回應,而handle()版本會擷取資料表名稱清單並傳回清單。在這兩種情況下,如果在非同步鏈中產生錯誤,則錯誤會遭到撤銷,因此用戶端程式碼有機會處理。

匯入

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;

Code

whenComplete() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { ListTablesResponse listTablesResponse = listTablesWhenComplete(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. if (listTablesResponse.hasTableNames()){ System.out.println("Table exist in this region: " + region.id()); } } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<ListTablesResponse> listTablesWhenComplete(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .whenComplete((listTablesResponse, throwable) -> { if (listTablesResponse != null) { // Consume the response. System.out.println("The SDK's listTables method completed successfully."); } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); }
handle() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { List<String> tableNames = listTablesHandle(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. tableNames.forEach(System.out::println); } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<List<String>> listTablesHandle(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .handle((listTablesResponse, throwable) -> { if (listTablesResponse != null) { return listTablesResponse.tableNames(); // Return the list of table names. } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); } }

以非同步方法處理串流

對於具有串流內容的非同步方法,您必須提供 AsyncRequestBody 以逐步提供內容,或提供 AsyncResponseTransformer以接收和處理回應。

下列範例會使用 Amazon S3 非同步形式的PutObject操作,以非同步方式將檔案上傳至非同步。

匯入

import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

Code

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }

下列範例 Amazon S3 使用非同步形式的 GetObject 操作,從 取得檔案。

匯入

import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

Code

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }

設定進階非同步選項

AWS SDK for Java 2.x 使用非同步事件驅動的網路應用程式架構 Netty 來處理 I/O 執行緒。 AWS SDK for Java 2.x 會建立 Netty ExecutorService後面的 ,以完成從HTTP用戶端請求到 Netty 用戶端傳回的期貨。如果開發人員選擇停止或睡眠執行緒,則此抽象概念可降低應用程式中斷非同步程序的風險。根據預設,每個非同步用戶端都會根據處理器數目建立執行緒集區,並管理 內佇列中的任務ExecutorService

您可以在建置非同步用戶端ExecutorService時指定 的特定JDK實作。下列程式碼片段會建立ExecutorService具有固定數目執行緒的 。

Code

S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();

若要最佳化效能,您可以管理自己的執行緒集區執行器,並在設定用戶端時包含它。

ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(<custom_value>), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();