非同期プログラミングを使用する - AWS SDK for Java 2.x

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

非同期プログラミングを使用する

AWS SDK for Java 2.x は、非ブロッキング I/O サポートを備えた非同期クライアントを特徴とし、いくつかのスレッドで高い同時実行性を実装します。ただし、ノンブロッキング I/O の合計は保証されません。非同期クライアントは、認証情報の取得、AWS 署名バージョン 4 (SigV4) を使用した署名のリクエスト、エンドポイントの検出などの場合、ブロックコールを実行することがあります。

同期メソッドは、クライアントがサービスからのレスポンスを受信するまでスレッドの実行をブロックします。非同期メソッドはすぐに応答を返し、レスポンスを待機せずに呼び出しスレッドに制御を戻します。

非同期メソッドはレスポンスが可能になる前に応答を返すため、準備ができたらレスポンスを得るための手段が必要になります。2.x の AWS SDK for Java 戻り CompletableFuture オブジェクトの非同期クライアントのメソッドで、準備ができたらレスポンスにアクセスできます。

非同期クライアントAPIsを使用する

非同期クライアントメソッドの署名は同期のそれと同じですが、非同期メソッドは、将来の非同期オペレーションの結果を含む CompletableFuture オブジェクトを返します。SDK の非同期メソッドの実行中にエラーがスローされた場合、エラーは としてスローされますCompletionException

結果を得るために使用できる 1 つの方法は、SDK whenComplete()メソッド呼び出しによってCompletableFuture返された にメソッドを連鎖させることです。whenComplete() メソッドは、非同期呼び出しの完了方法CompletionExceptionに応じて、結果またはスローアブルオブジェクトを受け取ります。呼び出し元のコードwhenComplete()に返される前に、結果を処理またはチェックするアクションを に提供します。

SDK メソッドによって返されたオブジェクト以外の何かを返す場合は、代わりに handle() メソッドを使用します。handle() メソッドは と同じパラメータを使用しますがwhenComplete()、結果を処理してオブジェクトを返すことができます。

非同期チェーンが完了するのを待って完了結果を取得するには、 join()メソッドを呼び出します。Throwable オブジェクトがチェーンで処理されなかった場合、join()メソッドは元の例外をラップCompletionExceptionするチェックされていない をスローします。を使用して元の例外にアクセスしますCompletionException#getCause()CompletableFuture#get() メソッドを呼び出して、完了結果を取得することもできます。ただし、 get()メソッドはチェックされた例外をスローできます。

次の例は、DynamoDB 非同期クライアントの listTables()メソッドを操作する方法の 2 つのバリエーションを示しています。に渡されたアクションは、成功したレスポンスwhenComplete()を単にログに記録します。バージョンはテーブル名のリストをhandle()抽出し、リストを返します。どちらの場合も、非同期チェーンでエラーが生成されると、エラーが再スローされるため、クライアントコードが処理できます。

インポート

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;

コード

whenComplete() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { ListTablesResponse listTablesResponse = listTablesWhenComplete(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. if (listTablesResponse.hasTableNames()){ System.out.println("Table exist in this region: " + region.id()); } } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<ListTablesResponse> listTablesWhenComplete(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .whenComplete((listTablesResponse, throwable) -> { if (listTablesResponse != null) { // Consume the response. System.out.println("The SDK's listTables method completed successfully."); } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); }
handle() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { List<String> tableNames = listTablesHandle(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. tableNames.forEach(System.out::println); } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<List<String>> listTablesHandle(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .handle((listTablesResponse, throwable) -> { if (listTablesResponse != null) { return listTablesResponse.tableNames(); // Return the list of table names. } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); } }

非同期メソッドでストリーミングを処理する

ストリーミングコンテンツを使用する非同期メソッドの場合、コンテンツを段階的に提供する AsyncRequestBody を指定するか、レスポンスを受信して処理する AsyncResponseTransformer を指定する必要があります。

次の例では、 PutObjectオペレーションの Amazon S3 非同期形式を使用して、ファイルを非同期にアップロードします。

インポート

import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

コード

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }

次の例では、 GetObjectオペレーションの非同期形式 Amazon S3 を使用して からファイルを取得します。

インポート

import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

コード

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }

高度な非同期オプションを設定する

AWS SDK for Java 2.x は、非同期イベント駆動型ネットワークアプリケーションフレームワークである Netty を使用して I/O スレッドを処理します。 AWS SDK for Java 2.x は、Netty のExecutorService背後にある を作成し、HTTP クライアントリクエストから Netty クライアントに返される先物を完了します。この抽象化により、開発者がスレッドを停止またはスリープすることを選択した場合、アプリケーションが非同期プロセスをブレークするリスクが軽減されます。デフォルトでは、各非同期クライアントはプロセッサ数に基づいてスレッドプールを作成し、ExecutorService 内のキュー内のタスクを管理します。

非同期クライアントを構築するExecutorServiceときに、 の特定の JDK 実装を指定できます。次のスニペットは、固定数のスレッドExecutorServiceを持つ を作成します。

コード

S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();

パフォーマンスを最適化するには、独自のスレッドプールエグゼキューターを管理し、クライアントを設定するときに含めることができます。

ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(<custom_value>), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();