本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用非同步程式設計
這些 AWS SDK for Java 2.x 功能具有非封鎖 I/O 支援的非同步用戶端,可在幾個執行緒中實作高並行。不過,無法保證非封鎖 I/O 總數。非同步用戶端可能會在某些情況下執行封鎖呼叫,例如憑證擷取、使用 AWS Signature 第 4 版 (SigV4) 請求簽署,或端點探索。
同步方法會封鎖您的執行緒執行,直到用戶端收到服務的回應。非同步方法會立即傳回,將控制權回歸給呼叫端執行緒,無需等待回應。
由於非同步方法會在有可用回應之前傳回,您需要一個方法在回應準備好時取得回應。 AWS SDK for Java 非同步用戶端在 2.x 傳回CompletableFuture 物件中的方法,可讓您在回應準備就緒時存取回應。
使用非同步用戶端 APIs
非同步用戶端方法的簽章與其同步方法相同,但非同步方法會傳回包含未來非同步操作結果的CompletableFutureCompletionException
。
您可以使用 取得結果的一個方法是將whenComplete()
方法鏈結到SDK方法呼叫CompletableFuture
傳回的 。該whenComplete()
方法CompletionException
會根據非同步呼叫的完成方式,接收結果或類型 的可推展物件。您向 提供動作whenComplete()
,以在結果傳回至呼叫程式碼之前處理或檢查結果。
如果您想要傳回SDK方法傳回之物件以外的其他物件,請改用 handle()
方法。此handle()
方法使用與 相同的參數whenComplete()
,但您可以處理結果並傳回物件。
若要等待非同步鏈完成並擷取完成結果,您可以呼叫 join()
方法。如果未在鏈中處理Throwable
物件,則 join()
方法會捨棄未核取的 CompletionException
,以包裝原始例外狀況。您可以使用 存取原始例外狀況CompletionException#getCause()
。您也可以呼叫 CompletableFuture#get()
方法來取得完成結果。不過, get()
方法可能會擲回核取的例外狀況。
下列範例顯示如何使用 DynamoDB 非同步用戶端listTables()
方法的兩種變化。傳遞給 的動作whenComplete()
只會記錄成功的回應,而handle()
版本會擷取資料表名稱清單並傳回清單。在這兩種情況下,如果在非同步鏈中產生錯誤,則錯誤會遭到撤銷,因此用戶端程式碼有機會處理。
匯入
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;
Code
以非同步方法處理串流
對於具有串流內容的非同步方法,您必須提供 AsyncRequestBody
下列範例會使用 Amazon S3 非同步形式的PutObject
操作,以非同步方式將檔案上傳至非同步。
匯入
import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;
Code
/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }
下列範例 Amazon S3 使用非同步形式的 GetObject
操作,從 取得檔案。
匯入
import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;
Code
/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }
設定進階非同步選項
AWS SDK for Java 2.x 使用非同步事件驅動的網路應用程式架構 Netty ExecutorService
後面的 ,以完成從HTTP用戶端請求到 Netty 用戶端傳回的期貨。如果開發人員選擇停止或睡眠執行緒,則此抽象概念可降低應用程式中斷非同步程序的風險。根據預設,每個非同步用戶端都會根據處理器數目建立執行緒集區,並管理 內佇列中的任務ExecutorService
。
您可以在建置非同步用戶端ExecutorService
時指定 的特定JDK實作。下列程式碼片段會建立ExecutorService
具有固定數目執行緒的 。
Code
S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();
若要最佳化效能,您可以管理自己的執行緒集區執行器,並在設定用戶端時包含它。
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(
<custom_value>
), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();