AWS Batch 使用適用於 Java 的 SDK 2.x 的範例 - AWS SDK for Java 2.x

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

AWS Batch 使用適用於 Java 的 SDK 2.x 的範例

下列程式碼範例示範如何使用 AWS SDK for Java 2.x 搭配 來執行動作和實作常見案例 AWS Batch。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會告訴您如何呼叫個別服務函數,但您可以在其相關情境中查看內容中的動作。

案例是向您展示如何呼叫服務中的多個函數或與其他 AWS 服務組合來完成特定任務的程式碼範例。

每個範例都包含完整原始程式碼的連結,您可以在其中找到如何在內容中設定和執行程式碼的指示。

開始使用

下列程式碼範例說明如何開始使用 AWS Batch。

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.batch.BatchAsyncClient; import software.amazon.awssdk.services.batch.model.JobStatus; import software.amazon.awssdk.services.batch.model.JobSummary; import software.amazon.awssdk.services.batch.model.ListJobsRequest; import software.amazon.awssdk.services.batch.paginators.ListJobsPublisher; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; public class HelloBatch { private static BatchAsyncClient batchClient; public static void main(String[] args) { List<JobSummary> jobs = listJobs("my-job-queue"); jobs.forEach(job -> System.out.printf("Job ID: %s, Job Name: %s, Job Status: %s%n", job.jobId(), job.jobName(), job.status()) ); } public static List<JobSummary> listJobs(String jobQueue) { if (jobQueue == null || jobQueue.isEmpty()) { throw new IllegalArgumentException("Job queue cannot be null or empty"); } ListJobsRequest listJobsRequest = ListJobsRequest.builder() .jobQueue(jobQueue) .jobStatus(JobStatus.SUCCEEDED) .build(); List<JobSummary> jobSummaries = new ArrayList<>(); ListJobsPublisher listJobsPaginator = getAsyncClient().listJobsPaginator(listJobsRequest); CompletableFuture<Void> future = listJobsPaginator.subscribe(response -> { jobSummaries.addAll(response.jobSummaryList()); }); future.join(); return jobSummaries; } private static BatchAsyncClient getAsyncClient() { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) // Increase max concurrency to handle more simultaneous connections. .connectionTimeout(Duration.ofSeconds(60)) // Set the connection timeout. .readTimeout(Duration.ofSeconds(60)) // Set the read timeout. .writeTimeout(Duration.ofSeconds(60)) // Set the write timeout. .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) // Set the overall API call timeout. .apiCallAttemptTimeout(Duration.ofSeconds(90)) // Set the individual call attempt timeout. .retryPolicy(RetryPolicy.builder() // Add a retry policy to handle transient errors. .numRetries(3) // Number of retry attempts. .build()) .build(); if (batchClient == null) { batchClient = BatchAsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .build(); } return batchClient; } }
  • 如需 API 詳細資訊,請參閱 AWS SDK for Java 2.x API 參考中的 listJobsPaginator

動作

下列程式碼範例示範如何使用 CreateComputeEnvironment

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Asynchronously creates a new compute environment in AWS Batch. * * @param computeEnvironmentName the name of the compute environment to create * @param batchIAMRole the IAM role to be used by the compute environment * @param subnet the subnet ID to be used for the compute environment * @param secGroup the security group ID to be used for the compute environment * @return a {@link CompletableFuture} representing the asynchronous operation, which will complete with the * {@link CreateComputeEnvironmentResponse} when the compute environment has been created * @throws BatchException if there is an error creating the compute environment * @throws RuntimeException if there is an unexpected error during the operation */ public CompletableFuture<CreateComputeEnvironmentResponse> createComputeEnvironmentAsync( String computeEnvironmentName, String batchIAMRole, String subnet, String secGroup) { CreateComputeEnvironmentRequest environmentRequest = CreateComputeEnvironmentRequest.builder() .computeEnvironmentName(computeEnvironmentName) .type(CEType.MANAGED) .state(CEState.ENABLED) .computeResources(ComputeResource.builder() .type(CRType.FARGATE) .maxvCpus(256) .subnets(Collections.singletonList(subnet)) .securityGroupIds(Collections.singletonList(secGroup)) .build()) .serviceRole(batchIAMRole) .build(); CompletableFuture<CreateComputeEnvironmentResponse> response = getAsyncClient().createComputeEnvironment(environmentRequest); response.whenComplete((resp, ex) -> { if (ex != null) { String errorMessage = "Unexpected error occurred: " + ex.getMessage(); throw new RuntimeException(errorMessage, ex); } }); return response; }

下列程式碼範例示範如何使用 CreateJobQueue

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Creates a job queue asynchronously. * * @param jobQueueName the name of the job queue to create * @param computeEnvironmentName the name of the compute environment to associate with the job queue * @return a CompletableFuture that completes with the Amazon Resource Name (ARN) of the job queue */ public CompletableFuture<String> createJobQueueAsync(String jobQueueName, String computeEnvironmentName) { if (jobQueueName == null || jobQueueName.isEmpty()) { throw new IllegalArgumentException("Job queue name cannot be null or empty"); } if (computeEnvironmentName == null || computeEnvironmentName.isEmpty()) { throw new IllegalArgumentException("Compute environment name cannot be null or empty"); } CreateJobQueueRequest request = CreateJobQueueRequest.builder() .jobQueueName(jobQueueName) .priority(1) .computeEnvironmentOrder(ComputeEnvironmentOrder.builder() .computeEnvironment(computeEnvironmentName) .order(1) .build()) .build(); CompletableFuture<CreateJobQueueResponse> response = getAsyncClient().createJobQueue(request); response.whenComplete((resp, ex) -> { if (ex != null) { String errorMessage = "Unexpected error occurred: " + ex.getMessage(); throw new RuntimeException(errorMessage, ex); } }); return response.thenApply(CreateJobQueueResponse::jobQueueArn); }
  • 如需 API 詳細資訊,請參閱 AWS SDK for Java 2.x API 參考中的 CreateJobQueue

下列程式碼範例示範如何使用 DeleteComputeEnvironment

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

public CompletableFuture<DeleteComputeEnvironmentResponse> deleteComputeEnvironmentAsync(String computeEnvironmentName) { DeleteComputeEnvironmentRequest deleteComputeEnvironment = DeleteComputeEnvironmentRequest.builder() .computeEnvironment(computeEnvironmentName) .build(); return getAsyncClient().deleteComputeEnvironment(deleteComputeEnvironment) .whenComplete((response, ex) -> { if (ex != null) { Throwable cause = ex.getCause(); if (cause instanceof BatchException) { throw new RuntimeException(cause); } else { throw new RuntimeException("Unexpected error: " + cause.getMessage(), cause); } } }); }

下列程式碼範例示範如何使用 DeleteJobQueue

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Deletes a Batch job queue asynchronously. * * @param jobQueueArn The Amazon Resource Name (ARN) of the job queue to delete. * @return A CompletableFuture that represents the asynchronous deletion of the job queue. * The future completes when the job queue has been successfully deleted or if an error occurs. * If successful, the future will be completed with a {@code Void} value. * If an error occurs, the future will be completed exceptionally with the thrown exception. */ public CompletableFuture<Void> deleteJobQueueAsync(String jobQueueArn) { DeleteJobQueueRequest deleteRequest = DeleteJobQueueRequest.builder() .jobQueue(jobQueueArn) .build(); CompletableFuture<DeleteJobQueueResponse> responseFuture = getAsyncClient().deleteJobQueue(deleteRequest); return responseFuture.whenComplete((deleteResponse, ex) -> { if (ex != null) { throw new RuntimeException("Failed to delete job queue: " + ex.getMessage(), ex); } }).thenApply(deleteResponse -> null); }
  • 如需 API 詳細資訊,請參閱 AWS SDK for Java 2.x API 參考中的 DeleteJobQueue

下列程式碼範例示範如何使用 DeregisterJobDefinition

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Deregisters a job definition asynchronously. * * @param jobDefinition the name of the job definition to be deregistered * @return a CompletableFuture that completes when the job definition has been deregistered * or an exception has occurred */ public CompletableFuture<DeregisterJobDefinitionResponse> deregisterJobDefinitionAsync(String jobDefinition) { DeregisterJobDefinitionRequest jobDefinitionRequest = DeregisterJobDefinitionRequest.builder() .jobDefinition(jobDefinition) .build(); CompletableFuture<DeregisterJobDefinitionResponse> responseFuture = getAsyncClient().deregisterJobDefinition(jobDefinitionRequest); responseFuture.whenComplete((response, ex) -> { if (ex != null) { throw new RuntimeException("Unexpected error occurred: " + ex.getMessage(), ex); } }); return responseFuture; }

下列程式碼範例示範如何使用 DescribeComputeEnvironments

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Checks the status of the specified compute environment. * * @param computeEnvironmentName the name of the compute environment to check * @return a CompletableFuture containing the status of the compute environment, or "ERROR" if an exception occurs */ public CompletableFuture<String> checkComputeEnvironmentsStatus(String computeEnvironmentName) { if (computeEnvironmentName == null || computeEnvironmentName.isEmpty()) { throw new IllegalArgumentException("Compute environment name cannot be null or empty"); } DescribeComputeEnvironmentsRequest environmentsRequest = DescribeComputeEnvironmentsRequest.builder() .computeEnvironments(computeEnvironmentName) .build(); CompletableFuture<DescribeComputeEnvironmentsResponse> response = getAsyncClient().describeComputeEnvironments(environmentsRequest); response.whenComplete((resp, ex) -> { if (ex != null) { String errorMessage = "Unexpected error occurred: " + ex.getMessage(); throw new RuntimeException(errorMessage, ex); } }); return response.thenApply(resp -> resp.computeEnvironments().stream() .map(env -> env.statusAsString()) .findFirst() .orElse("UNKNOWN")); }

下列程式碼範例示範如何使用 DescribeJobQueues

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Asynchronously describes the job queue associated with the specified compute environment. * * @param computeEnvironmentName the name of the compute environment to find the associated job queue for * @return a {@link CompletableFuture} that, when completed, contains the job queue ARN associated with the specified compute environment * @throws RuntimeException if the job queue description fails */ public CompletableFuture<String> describeJobQueueAsync(String computeEnvironmentName) { DescribeJobQueuesRequest describeJobQueuesRequest = DescribeJobQueuesRequest.builder() .build(); CompletableFuture<DescribeJobQueuesResponse> responseFuture = getAsyncClient().describeJobQueues(describeJobQueuesRequest); return responseFuture.whenComplete((describeJobQueuesResponse, ex) -> { if (describeJobQueuesResponse != null) { String jobQueueARN; for (JobQueueDetail jobQueueDetail : describeJobQueuesResponse.jobQueues()) { for (ComputeEnvironmentOrder computeEnvironmentOrder : jobQueueDetail.computeEnvironmentOrder()) { String computeEnvironment = computeEnvironmentOrder.computeEnvironment(); String name = getComputeEnvironmentName(computeEnvironment); if (name.equals(computeEnvironmentName)) { jobQueueARN = jobQueueDetail.jobQueueArn(); logger.info("Job queue ARN associated with the compute environment: " + jobQueueARN); } } } } else { throw new RuntimeException("Failed to describe job queue: " + ex.getMessage(), ex); } }).thenApply(describeJobQueuesResponse -> { String jobQueueARN = ""; for (JobQueueDetail jobQueueDetail : describeJobQueuesResponse.jobQueues()) { for (ComputeEnvironmentOrder computeEnvironmentOrder : jobQueueDetail.computeEnvironmentOrder()) { String computeEnvironment = computeEnvironmentOrder.computeEnvironment(); String name = getComputeEnvironmentName(computeEnvironment); if (name.equals(computeEnvironmentName)) { jobQueueARN = jobQueueDetail.jobQueueArn(); } } } return jobQueueARN; }); }
  • 如需 API 詳細資訊,請參閱 AWS SDK for Java 2.x API 參考中的 DescribeJobQueues

下列程式碼範例示範如何使用 DescribeJobs

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Asynchronously retrieves the status of a specific job. * * @param jobId the ID of the job to retrieve the status for * @return a CompletableFuture that completes with the job status */ public CompletableFuture<String> describeJobAsync(String jobId) { DescribeJobsRequest describeJobsRequest = DescribeJobsRequest.builder() .jobs(jobId) .build(); CompletableFuture<DescribeJobsResponse> responseFuture = getAsyncClient().describeJobs(describeJobsRequest); return responseFuture.whenComplete((response, ex) -> { if (ex != null) { throw new RuntimeException("Unexpected error occurred: " + ex.getMessage(), ex); } }).thenApply(response -> response.jobs().get(0).status().toString()); }
  • 如需 API 詳細資訊,請參閱 AWS SDK for Java 2.x API 參考中的 DescribeJobs

下列程式碼範例示範如何使用 ListJobsPaginator

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Asynchronously lists the jobs in the specified job queue with the given job status. * * @param jobQueue the name of the job queue to list jobs from * @return a List<JobSummary> that contains the jobs that succeeded */ public List<JobSummary> listJobsAsync(String jobQueue) { if (jobQueue == null || jobQueue.isEmpty()) { throw new IllegalArgumentException("Job queue cannot be null or empty"); } ListJobsRequest listJobsRequest = ListJobsRequest.builder() .jobQueue(jobQueue) .jobStatus(JobStatus.SUCCEEDED) // Filter jobs by status. .build(); List<JobSummary> jobSummaries = new ArrayList<>(); ListJobsPublisher listJobsPaginator = getAsyncClient().listJobsPaginator(listJobsRequest); CompletableFuture<Void> future = listJobsPaginator.subscribe(response -> { jobSummaries.addAll(response.jobSummaryList()); }); future.join(); return jobSummaries; }
  • 如需 API 詳細資訊,請參閱 AWS SDK for Java 2.x API 參考中的 ListJobsPaginator

下列程式碼範例示範如何使用 RegisterJobDefinition

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Registers a new job definition asynchronously in AWS Batch. * <p> * When using Fargate as the compute environment, it is crucial to set the * {@link NetworkConfiguration} with {@link AssignPublicIp#ENABLED} to * ensure proper networking configuration for the Fargate tasks. This * allows the tasks to communicate with external services, access the * internet, or communicate within a VPC. * * @param jobDefinitionName the name of the job definition to be registered * @param executionRoleARN the ARN (Amazon Resource Name) of the execution role * that provides permissions for the containers in the job * @param cpuArch a value of either X86_64 or ARM64 required for the service call * @return a CompletableFuture that completes with the ARN of the registered * job definition upon successful execution, or completes exceptionally with * an error if the registration fails */ public CompletableFuture<String> registerJobDefinitionAsync(String jobDefinitionName, String executionRoleARN, String image, String cpuArch) { NetworkConfiguration networkConfiguration = NetworkConfiguration.builder() .assignPublicIp(AssignPublicIp.ENABLED) .build(); ContainerProperties containerProperties = ContainerProperties.builder() .image(image) .executionRoleArn(executionRoleARN) .resourceRequirements( Arrays.asList( ResourceRequirement.builder() .type(ResourceType.VCPU) .value("1") .build(), ResourceRequirement.builder() .type(ResourceType.MEMORY) .value("2048") .build() ) ) .networkConfiguration(networkConfiguration) .runtimePlatform(b -> b .cpuArchitecture(cpuArch) .operatingSystemFamily("LINUX")) .build(); RegisterJobDefinitionRequest request = RegisterJobDefinitionRequest.builder() .jobDefinitionName(jobDefinitionName) .type(JobDefinitionType.CONTAINER) .containerProperties(containerProperties) .platformCapabilities(PlatformCapability.FARGATE) .build(); CompletableFuture<String> future = new CompletableFuture<>(); getAsyncClient().registerJobDefinition(request) .thenApply(RegisterJobDefinitionResponse::jobDefinitionArn) .whenComplete((result, ex) -> { if (ex != null) { future.completeExceptionally(ex); } else { future.complete(result); } }); return future; }

下列程式碼範例示範如何使用 SubmitJob

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Submits a job asynchronously to the AWS Batch service. * * @param jobDefinitionName the name of the job definition to use * @param jobQueueName the name of the job queue to submit the job to * @param jobARN the Amazon Resource Name (ARN) of the job definition * @return a CompletableFuture that, when completed, contains the job ID of the submitted job */ public CompletableFuture<String> submitJobAsync(String jobDefinitionName, String jobQueueName, String jobARN) { SubmitJobRequest jobRequest = SubmitJobRequest.builder() .jobDefinition(jobARN) .jobName(jobDefinitionName) .jobQueue(jobQueueName) .build(); CompletableFuture<SubmitJobResponse> responseFuture = getAsyncClient().submitJob(jobRequest); responseFuture.whenComplete((response, ex) -> { if (ex != null) { throw new RuntimeException("Unexpected error occurred: " + ex.getMessage(), ex); } }); return responseFuture.thenApply(SubmitJobResponse::jobId); }
  • 如需 API 詳細資訊,請參閱 AWS SDK for Java 2.x API 參考中的 SubmitJob

下列程式碼範例示範如何使用 UpdateComputeEnvironment

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Disables the specified compute environment asynchronously. * * @param computeEnvironmentName the name of the compute environment to disable * @return a CompletableFuture that completes when the compute environment is disabled */ public CompletableFuture<UpdateComputeEnvironmentResponse> disableComputeEnvironmentAsync(String computeEnvironmentName) { UpdateComputeEnvironmentRequest updateRequest = UpdateComputeEnvironmentRequest.builder() .computeEnvironment(computeEnvironmentName) .state(CEState.DISABLED) .build(); CompletableFuture<UpdateComputeEnvironmentResponse> responseFuture = getAsyncClient().updateComputeEnvironment(updateRequest); responseFuture.whenComplete((response, ex) -> { if (ex != null) { throw new RuntimeException("Failed to disable compute environment: " + ex.getMessage(), ex); } }); return responseFuture; }

下列程式碼範例示範如何使用 UpdateJobQueue

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/** * Disables the specified job queue asynchronously. * * @param jobQueueArn the Amazon Resource Name (ARN) of the job queue to be disabled * @return a {@link CompletableFuture} that completes when the job queue update operation is complete, * or completes exceptionally if an error occurs during the operation */ public CompletableFuture<Void> disableJobQueueAsync(String jobQueueArn) { UpdateJobQueueRequest updateRequest = UpdateJobQueueRequest.builder() .jobQueue(jobQueueArn) .state(JQState.DISABLED) .build(); CompletableFuture<UpdateJobQueueResponse> responseFuture = getAsyncClient().updateJobQueue(updateRequest); return responseFuture.whenComplete((updateResponse, ex) -> { if (ex != null) { throw new RuntimeException("Failed to update job queue: " + ex.getMessage(), ex); } }).thenApply(updateResponse -> null); }
  • 如需 API 詳細資訊,請參閱 AWS SDK for Java 2.x API 參考中的 UpdateJobQueue

案例

以下程式碼範例顯示做法:

  • 建立 AWS Batch 運算環境。

  • 檢查運算環境的狀態。

  • 設定 AWS Batch 任務佇列和任務定義。

  • 註冊任務定義。

  • 提交 AWS Batch 任務。

  • 取得適用於任務佇列的任務清單。

  • 檢查任務的狀態。

  • 刪除 AWS Batch 資源。

SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

執行示範 AWS Batch 功能的互動式案例。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.batch.model.BatchException; import software.amazon.awssdk.services.batch.model.ClientException; import software.amazon.awssdk.services.batch.model.CreateComputeEnvironmentResponse; import software.amazon.awssdk.services.batch.model.JobSummary; import software.amazon.awssdk.services.ec2.Ec2AsyncClient; import software.amazon.awssdk.services.ec2.model.DescribeSecurityGroupsRequest; import software.amazon.awssdk.services.ec2.model.DescribeSecurityGroupsResponse; import software.amazon.awssdk.services.ec2.model.DescribeSubnetsRequest; import software.amazon.awssdk.services.ec2.model.DescribeSubnetsResponse; import software.amazon.awssdk.services.ec2.model.DescribeVpcsRequest; import software.amazon.awssdk.services.ec2.model.Filter; import software.amazon.awssdk.services.ec2.model.SecurityGroup; import software.amazon.awssdk.services.ec2.model.Subnet; import software.amazon.awssdk.services.ec2.model.Vpc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * NOTE * This scenario submits a job that pulls a Docker image named echo-text from Amazon ECR to Amazon Fargate. * * To place this Docker image on Amazon ECR, run the following Basics scenario. * * https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/example_code/ecr * */ public class BatchScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); // Define two stacks used in this Basics Scenario. private static final String ROLES_STACK = "RolesStack"; private static String defaultSubnet; private static String defaultSecurityGroup; private static final Logger logger = LoggerFactory.getLogger(BatchScenario.class); public static void main(String[] args) throws InterruptedException { BatchActions batchActions = new BatchActions(); Scanner scanner = new Scanner(System.in); String computeEnvironmentName = "my-compute-environment"; String jobQueueName = "my-job-queue"; String jobDefinitionName = "my-job-definition"; // See the NOTE in this Java code example (at start). String dockerImage = "dkr.ecr.us-east-1.amazonaws.com/echo-text:echo-text"; logger.info(""" AWS Batch is a fully managed batch processing service that dynamically provisions the required compute resources for batch computing workloads. The Java V2 `BatchAsyncClient` allows developers to automate the submission, monitoring, and management of batch jobs. This scenario provides an example of setting up a compute environment, job queue and job definition, and then submitting a job. This scenario submits a job that pulls a Docker image named echo-text from Amazon ECR to Amazon Fargate. To place this Docker image on Amazon ECR, run the following Basics scenario. https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/example_code/ecr Let's get started... You have two choices: 1 - Run the entire program. 2 - Delete an existing Compute Environment (created from a previous execution of this program that did not complete). """); while (true) { String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("1")) { logger.info("Continuing with the program..."); // logger.info(""); break; } else if (input.trim().equalsIgnoreCase("2")) { String jobQueueARN = String.valueOf(batchActions. describeJobQueueAsync(computeEnvironmentName)); if (!jobQueueARN.isEmpty()) { batchActions.disableJobQueueAsync(jobQueueARN); countdown(1); batchActions.deleteJobQueueAsync(jobQueueARN); } try { batchActions.disableComputeEnvironmentAsync(computeEnvironmentName) .exceptionally(ex -> { logger.info("Disable compute environment failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { logger.info("Failed to disable compute environment: " + ex.getMessage()); } countdown(2); batchActions.deleteComputeEnvironmentAsync(computeEnvironmentName).join(); return; } else { // Handle invalid input. logger.info("Invalid input. Please try again."); } } System.out.println(DASHES); waitForInputToContinue(scanner); // Get an AWS Account id used to retrieve the docker image from Amazon ECR. // Create a single-element array to store the `accountId` value. String[] accId = new String[1]; CompletableFuture<String> accountIdFuture = batchActions.getAccountId(); accountIdFuture.thenAccept(accountId -> { logger.info("Account ID: " + accountId); accId[0] = accountId; }).join(); dockerImage = accId[0]+"."+dockerImage; // Get a default subnet and default security associated with the default VPC. getSubnetSecurityGroup(); logger.info("Use AWS CloudFormation to create two IAM roles that are required for this scenario."); CloudFormationHelper.deployCloudFormationStack(ROLES_STACK); Map<String, String> stackOutputs = CloudFormationHelper.getStackOutputs(ROLES_STACK); String batchIAMRole = stackOutputs.get("BatchRoleArn"); String executionRoleARN = stackOutputs.get("EcsRoleArn"); logger.info("The IAM role needed to interact with AWS Batch is "+batchIAMRole); waitForInputToContinue(scanner); logger.info(DASHES); logger.info("1. Create a Batch compute environment"); logger.info(""" A compute environment is a resource where you can run your batch jobs. After creating a compute environment, you can define job queues and job definitions to submit jobs for execution. The benefit of creating a compute environment is it allows you to easily configure and manage the compute resources that will be used to run your Batch jobs. By separating the compute environment from the job definitions, you can easily scale your compute resources up or down as needed, without having to modify your job definitions. This makes it easier to manage your Batch workloads and ensures that your jobs have the necessary compute resources to run efficiently. """); waitForInputToContinue(scanner); try { CompletableFuture<CreateComputeEnvironmentResponse> future = batchActions.createComputeEnvironmentAsync(computeEnvironmentName, batchIAMRole, defaultSubnet, defaultSecurityGroup); CreateComputeEnvironmentResponse response = future.join(); logger.info("Compute Environment ARN: " + response.computeEnvironmentArn()); } catch (RuntimeException rte) { Throwable cause = rte.getCause(); if (cause instanceof ClientException batchExceptionEx) { String myErrorCode = batchExceptionEx.awsErrorDetails().errorMessage(); if ("Object already exists".contains(myErrorCode)) { logger.info("The compute environment '" + computeEnvironmentName + "' already exists. Moving on..."); } else { logger.info("Batch error occurred: {} (Code: {})", batchExceptionEx.getMessage(), batchExceptionEx.awsErrorDetails().errorCode()); return; } } else { logger.info("An unexpected error occurred: {}", (cause != null ? cause.getMessage() : rte.getMessage())); } } waitForInputToContinue(scanner); logger.info(DASHES); logger.info(DASHES); logger.info("2. Check the status of the "+computeEnvironmentName +" Compute Environment."); waitForInputToContinue(scanner); try { CompletableFuture<String> future = batchActions.checkComputeEnvironmentsStatus(computeEnvironmentName); String status = future.join(); logger.info("Compute Environment Status: " + status); } catch (RuntimeException rte) { Throwable cause = rte.getCause(); if (cause instanceof ClientException batchExceptionEx) { logger.info("Batch error occurred: {} (Code: {})", batchExceptionEx.getMessage(), batchExceptionEx.awsErrorDetails().errorCode()); return; } else { logger.info("An unexpected error occurred: " + (cause != null ? cause.getMessage() : rte.getMessage())); return; } } waitForInputToContinue(scanner); logger.info(DASHES); logger.info(DASHES); logger.info("3. Create a job queue"); logger.info(""" A job queue is an essential component that helps manage the execution of your batch jobs. It acts as a buffer, where jobs are placed and then scheduled for execution based on their priority and the available resources in the compute environment. """); waitForInputToContinue(scanner); String jobQueueArn = null; try { CompletableFuture<String> jobQueueFuture = batchActions.createJobQueueAsync(jobQueueName, computeEnvironmentName); jobQueueArn = jobQueueFuture.join(); logger.info("Job Queue ARN: " + jobQueueArn); } catch (RuntimeException rte) { Throwable cause = rte.getCause(); if (cause instanceof BatchException batchExceptionEx) { String myErrorCode = batchExceptionEx.awsErrorDetails().errorMessage(); if ("Object already exists".contains(myErrorCode)) { logger.info("The job queue '" + jobQueueName + "' already exists. Moving on..."); // Retrieve the ARN of the job queue. CompletableFuture<String> jobQueueArnFuture = batchActions.getJobQueueARN(jobQueueName); jobQueueArn = jobQueueArnFuture.join(); logger.info("Job Queue ARN: " + jobQueueArn); } else { logger.info("Batch error occurred: {} (Code: {})", batchExceptionEx.getMessage(), batchExceptionEx.awsErrorDetails().errorCode()); return; } } else { logger.info("An unexpected error occurred: " + (cause != null ? cause.getMessage() : rte.getMessage())); return; // End the execution } } waitForInputToContinue(scanner); logger.info(DASHES); logger.info("4. Register a Job Definition."); logger.info(""" Registering a job in AWS Batch using the Fargate launch type ensures that all necessary parameters, such as the execution role, command to run, and so on are specified and reused across multiple job submissions. The job definition pulls a Docker image from Amazon ECR and executes the Docker image. """); waitForInputToContinue(scanner); String jobARN; try { String platform = ""; while (true) { logger.info(""" On which platform/CPU architecture combination did you build the Docker image?: 1. Windows X86_64 2. Mac or Linux ARM64 3. Mac or Linux X86_64 Please select 1, 2, or 3. """); String platAns = scanner.nextLine().trim(); if (platAns.equals("1")) { platform = "X86_64"; break; // Exit loop since a valid option is selected } else if (platAns.equals("2")) { platform = "ARM64"; break; // Exit loop since a valid option is selected } else if (platAns.equals("3")) { platform = "X86_64"; break; // Exit loop since a valid option is selected } else { System.out.println("Invalid input. Please select either 1 or 2."); } } jobARN = batchActions.registerJobDefinitionAsync(jobDefinitionName, executionRoleARN, dockerImage, platform) .exceptionally(ex -> { System.err.println("Register job definition failed: " + ex.getMessage()); return null; }) .join(); if (jobARN != null) { logger.info("Job ARN: " + jobARN); } } catch (RuntimeException rte) { logger.error("A Batch exception occurred while registering the job: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } logger.info(DASHES); logger.info(DASHES); logger.info("5. Submit an AWS Batch job from a job definition."); waitForInputToContinue(scanner); String jobId; try { jobId = batchActions.submitJobAsync(jobDefinitionName, jobQueueName, jobARN) .exceptionally(ex -> { System.err.println("Submit job failed: " + ex.getMessage()); return null; }) .join(); logger.info("The job id is "+jobId); logger.info("Let's wait 2 minutes for the job to complete"); countdown(2); } catch (RuntimeException rte) { logger.error("A Batch exception occurred while submitting the job: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); logger.info(DASHES); logger.info("6. Get a list of jobs applicable to the job queue."); waitForInputToContinue(scanner); try { List<JobSummary> jobs = batchActions.listJobsAsync(jobQueueName); jobs.forEach(job -> logger.info("Job ID: {}, Job Name: {}, Job Status: {}", job.jobId(), job.jobName(), job.status())); } catch (RuntimeException rte) { logger.info("A Batch exception occurred while submitting the job: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } waitForInputToContinue(scanner); logger.info(DASHES); logger.info(DASHES); logger.info("7. Check the status of job "+jobId); waitForInputToContinue(scanner); try { CompletableFuture<String> future = batchActions.describeJobAsync(jobId); String jobStatus = future.join(); logger.info("Job Status: " + jobStatus); } catch (RuntimeException rte) { logger.info("A Batch exception occurred while submitting the job: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); logger.info("8. Delete Batch resources"); logger.info( """ When deleting an AWS Batch compute environment, it does not happen instantaneously. There is typically a delay, similar to some other AWS resources. AWS Batch starts the deletion process. """); logger.info("Would you like to delete the AWS Batch resources such as the compute environment? (y/n)"); String delAns = scanner.nextLine().trim(); if (delAns.equalsIgnoreCase("y")) { logger.info("You selected to delete the AWS ECR resources."); logger.info("First, we will deregister the Job Definition."); waitForInputToContinue(scanner); try { batchActions.deregisterJobDefinitionAsync(jobARN) .exceptionally(ex -> { logger.info("Deregister job definition failed: " + ex.getMessage()); return null; }) .join(); logger.info(jobARN + " was deregistered"); } catch (RuntimeException rte) { logger.error("A Batch exception occurred: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } logger.info("Second, we will disable and then delete the Job Queue."); waitForInputToContinue(scanner); try { batchActions.disableJobQueueAsync(jobQueueArn) .exceptionally(ex -> { logger.info("Disable job queue failed: " + ex.getMessage()); return null; }) .join(); logger.info(jobQueueArn + " was disabled"); } catch (RuntimeException rte) { logger.info("A Batch exception occurred: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } batchActions.waitForJobQueueToBeDisabledAsync(jobQueueArn); try { CompletableFuture<Void> future = batchActions.waitForJobQueueToBeDisabledAsync(jobQueueArn); future.join(); logger.info("Job queue is now disabled."); } catch (RuntimeException rte) { logger.info("A Batch exception occurred: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } waitForInputToContinue(scanner); try { batchActions.deleteJobQueueAsync(jobQueueArn); logger.info(jobQueueArn +" was deleted"); } catch (RuntimeException rte) { logger.info("A Batch exception occurred: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } logger.info("Let's wait 2 minutes for the job queue to be deleted"); countdown(2); waitForInputToContinue(scanner); logger.info("Third, we will delete the Compute Environment."); waitForInputToContinue(scanner); try { batchActions.disableComputeEnvironmentAsync(computeEnvironmentName) .exceptionally(ex -> { System.err.println("Disable compute environment failed: " + ex.getMessage()); return null; }) .join(); logger.info("Compute environment disabled") ; } catch (RuntimeException rte) { logger.info("A Batch exception occurred: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } batchActions.checkComputeEnvironmentsStatus(computeEnvironmentName).thenAccept(state -> { logger.info("Current State: " + state); }).join(); logger.info("Lets wait 1 min for the compute environment to be deleted"); countdown(1); try { batchActions.deleteComputeEnvironmentAsync(computeEnvironmentName).join(); logger.info(computeEnvironmentName +" was deleted."); } catch (RuntimeException rte) { logger.info("A Batch exception occurred: {}", rte.getCause() != null ? rte.getCause().getMessage() : rte.getMessage()); return; } waitForInputToContinue(scanner); CloudFormationHelper.destroyCloudFormationStack(ROLES_STACK); } logger.info(DASHES); logger.info("This concludes the AWS Batch SDK scenario"); logger.info(DASHES); } private static void waitForInputToContinue(Scanner scanner) { while (true) { logger.info(""); logger.info("Enter 'c' followed by <ENTER> to continue:"); String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("c")) { logger.info("Continuing with the program..."); logger.info(""); break; } else { // Handle invalid input. logger.info("Invalid input. Please try again."); } } } public static void countdown(int minutes) throws InterruptedException { int seconds = 0; for (int i = minutes * 60 + seconds; i >= 0; i--) { int displayMinutes = i / 60; int displaySeconds = i % 60; System.out.print(String.format("\r%02d:%02d", displayMinutes, displaySeconds)); Thread.sleep(1000); // Wait for 1 second } logger.info("Countdown complete!"); } private static void getSubnetSecurityGroup() { try (Ec2AsyncClient ec2Client = Ec2AsyncClient.create()) { CompletableFuture<Vpc> defaultVpcFuture = ec2Client.describeVpcs(DescribeVpcsRequest.builder() .filters(Filter.builder() .name("is-default") .values("true") .build()) .build()) .thenApply(response -> response.vpcs().stream() .findFirst() .orElseThrow(() -> new RuntimeException("Default VPC not found"))); CompletableFuture<String> defaultSubnetFuture = defaultVpcFuture .thenCompose(vpc -> ec2Client.describeSubnets(DescribeSubnetsRequest.builder() .filters(Filter.builder() .name("vpc-id") .values(vpc.vpcId()) .build(), Filter.builder() .name("default-for-az") .values("true") .build()) .build()) .thenApply(DescribeSubnetsResponse::subnets) .thenApply(subnets -> subnets.stream() .findFirst() .map(Subnet::subnetId) .orElseThrow(() -> new RuntimeException("No default subnet found")))); CompletableFuture<String> defaultSecurityGroupFuture = defaultVpcFuture .thenCompose(vpc -> ec2Client.describeSecurityGroups(DescribeSecurityGroupsRequest.builder() .filters(Filter.builder() .name("group-name") .values("default") .build(), Filter.builder() .name("vpc-id") .values(vpc.vpcId()) .build()) .build()) .thenApply(DescribeSecurityGroupsResponse::securityGroups) .thenApply(securityGroups -> securityGroups.stream() .findFirst() .map(SecurityGroup::groupId) .orElseThrow(() -> new RuntimeException("No default security group found")))); defaultSubnet = defaultSubnetFuture.join(); defaultSecurityGroup = defaultSecurityGroupFuture.join(); } } }

SDK AWS Batch 方法的包裝函式類別。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.batch.BatchAsyncClient; import software.amazon.awssdk.services.batch.BatchClient; import software.amazon.awssdk.services.batch.model.AssignPublicIp; import software.amazon.awssdk.services.batch.model.BatchException; import software.amazon.awssdk.services.batch.model.CEState; import software.amazon.awssdk.services.batch.model.CEType; import software.amazon.awssdk.services.batch.model.CRType; import software.amazon.awssdk.services.batch.model.ComputeEnvironmentOrder; import software.amazon.awssdk.services.batch.model.ComputeResource; import software.amazon.awssdk.services.batch.model.ContainerProperties; import software.amazon.awssdk.services.batch.model.CreateComputeEnvironmentRequest; import software.amazon.awssdk.services.batch.model.CreateComputeEnvironmentResponse; import software.amazon.awssdk.services.batch.model.CreateJobQueueRequest; import software.amazon.awssdk.services.batch.model.DeleteComputeEnvironmentRequest; import software.amazon.awssdk.services.batch.model.DeleteComputeEnvironmentResponse; import software.amazon.awssdk.services.batch.model.DeleteJobQueueRequest; import software.amazon.awssdk.services.batch.model.DeleteJobQueueResponse; import software.amazon.awssdk.services.batch.model.DeregisterJobDefinitionRequest; import software.amazon.awssdk.services.batch.model.DeregisterJobDefinitionResponse; import software.amazon.awssdk.services.batch.model.DescribeComputeEnvironmentsRequest; import software.amazon.awssdk.services.batch.model.DescribeComputeEnvironmentsResponse; import software.amazon.awssdk.services.batch.model.DescribeJobQueuesRequest; import software.amazon.awssdk.services.batch.model.DescribeJobQueuesResponse; import software.amazon.awssdk.services.batch.model.DescribeJobsRequest; import software.amazon.awssdk.services.batch.model.DescribeJobsResponse; import software.amazon.awssdk.services.batch.model.JQState; import software.amazon.awssdk.services.batch.model.JobDefinitionType; import software.amazon.awssdk.services.batch.model.JobDetail; import software.amazon.awssdk.services.batch.model.JobQueueDetail; import software.amazon.awssdk.services.batch.model.JobStatus; import software.amazon.awssdk.services.batch.model.JobSummary; import software.amazon.awssdk.services.batch.model.ListJobsRequest; import software.amazon.awssdk.services.batch.model.RegisterJobDefinitionResponse; import software.amazon.awssdk.services.batch.model.NetworkConfiguration; import software.amazon.awssdk.services.batch.model.PlatformCapability; import software.amazon.awssdk.services.batch.model.RegisterJobDefinitionRequest; import software.amazon.awssdk.services.batch.model.ResourceRequirement; import software.amazon.awssdk.services.batch.model.ResourceType; import software.amazon.awssdk.services.batch.model.RuntimePlatform; import software.amazon.awssdk.services.batch.model.SubmitJobRequest; import software.amazon.awssdk.services.batch.model.CreateJobQueueResponse; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; import software.amazon.awssdk.services.batch.model.SubmitJobResponse; import software.amazon.awssdk.services.batch.model.UpdateComputeEnvironmentRequest; import software.amazon.awssdk.services.batch.model.UpdateComputeEnvironmentResponse; import software.amazon.awssdk.services.batch.model.UpdateJobQueueRequest; import software.amazon.awssdk.services.batch.model.UpdateJobQueueResponse; import software.amazon.awssdk.services.batch.paginators.ListJobsPublisher; import software.amazon.awssdk.services.sts.StsAsyncClient; import software.amazon.awssdk.services.sts.model.GetCallerIdentityResponse; public class BatchActions { private static BatchAsyncClient batchClient; private static final Logger logger = LoggerFactory.getLogger(BatchActions.class); private static BatchAsyncClient getAsyncClient() { if (batchClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryPolicy(RetryPolicy.builder() .numRetries(3) .build()) .build(); batchClient = BatchAsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .build(); } return batchClient; } /** * Asynchronously creates a new compute environment in AWS Batch. * * @param computeEnvironmentName the name of the compute environment to create * @param batchIAMRole the IAM role to be used by the compute environment * @param subnet the subnet ID to be used for the compute environment * @param secGroup the security group ID to be used for the compute environment * @return a {@link CompletableFuture} representing the asynchronous operation, which will complete with the * {@link CreateComputeEnvironmentResponse} when the compute environment has been created * @throws BatchException if there is an error creating the compute environment * @throws RuntimeException if there is an unexpected error during the operation */ public CompletableFuture<CreateComputeEnvironmentResponse> createComputeEnvironmentAsync( String computeEnvironmentName, String batchIAMRole, String subnet, String secGroup) { CreateComputeEnvironmentRequest environmentRequest = CreateComputeEnvironmentRequest.builder() .computeEnvironmentName(computeEnvironmentName) .type(CEType.MANAGED) .state(CEState.ENABLED) .computeResources(ComputeResource.builder() .type(CRType.FARGATE) .maxvCpus(256) .subnets(Collections.singletonList(subnet)) .securityGroupIds(Collections.singletonList(secGroup)) .build()) .serviceRole(batchIAMRole) .build(); CompletableFuture<CreateComputeEnvironmentResponse> response = getAsyncClient().createComputeEnvironment(environmentRequest); response.whenComplete((resp, ex) -> { if (ex != null) { String errorMessage = "Unexpected error occurred: " + ex.getMessage(); throw new RuntimeException(errorMessage, ex); } }); return response; } public CompletableFuture<DeleteComputeEnvironmentResponse> deleteComputeEnvironmentAsync(String computeEnvironmentName) { DeleteComputeEnvironmentRequest deleteComputeEnvironment = DeleteComputeEnvironmentRequest.builder() .computeEnvironment(computeEnvironmentName) .build(); return getAsyncClient().deleteComputeEnvironment(deleteComputeEnvironment) .whenComplete((response, ex) -> { if (ex != null) { Throwable cause = ex.getCause(); if (cause instanceof BatchException) { throw new RuntimeException(cause); } else { throw new RuntimeException("Unexpected error: " + cause.getMessage(), cause); } } }); } /** * Checks the status of the specified compute environment. * * @param computeEnvironmentName the name of the compute environment to check * @return a CompletableFuture containing the status of the compute environment, or "ERROR" if an exception occurs */ public CompletableFuture<String> checkComputeEnvironmentsStatus(String computeEnvironmentName) { if (computeEnvironmentName == null || computeEnvironmentName.isEmpty()) { throw new IllegalArgumentException("Compute environment name cannot be null or empty"); } DescribeComputeEnvironmentsRequest environmentsRequest = DescribeComputeEnvironmentsRequest.builder() .computeEnvironments(computeEnvironmentName) .build(); CompletableFuture<DescribeComputeEnvironmentsResponse> response = getAsyncClient().describeComputeEnvironments(environmentsRequest); response.whenComplete((resp, ex) -> { if (ex != null) { String errorMessage = "Unexpected error occurred: " + ex.getMessage(); throw new RuntimeException(errorMessage, ex); } }); return response.thenApply(resp -> resp.computeEnvironments().stream() .map(env -> env.statusAsString()) .findFirst() .orElse("UNKNOWN")); } /** * Creates a job queue asynchronously. * * @param jobQueueName the name of the job queue to create * @param computeEnvironmentName the name of the compute environment to associate with the job queue * @return a CompletableFuture that completes with the Amazon Resource Name (ARN) of the job queue */ public CompletableFuture<String> createJobQueueAsync(String jobQueueName, String computeEnvironmentName) { if (jobQueueName == null || jobQueueName.isEmpty()) { throw new IllegalArgumentException("Job queue name cannot be null or empty"); } if (computeEnvironmentName == null || computeEnvironmentName.isEmpty()) { throw new IllegalArgumentException("Compute environment name cannot be null or empty"); } CreateJobQueueRequest request = CreateJobQueueRequest.builder() .jobQueueName(jobQueueName) .priority(1) .computeEnvironmentOrder(ComputeEnvironmentOrder.builder() .computeEnvironment(computeEnvironmentName) .order(1) .build()) .build(); CompletableFuture<CreateJobQueueResponse> response = getAsyncClient().createJobQueue(request); response.whenComplete((resp, ex) -> { if (ex != null) { String errorMessage = "Unexpected error occurred: " + ex.getMessage(); throw new RuntimeException(errorMessage, ex); } }); return response.thenApply(CreateJobQueueResponse::jobQueueArn); } /** * Asynchronously lists the jobs in the specified job queue with the given job status. * * @param jobQueue the name of the job queue to list jobs from * @return a List<JobSummary> that contains the jobs that succeeded */ public List<JobSummary> listJobsAsync(String jobQueue) { if (jobQueue == null || jobQueue.isEmpty()) { throw new IllegalArgumentException("Job queue cannot be null or empty"); } ListJobsRequest listJobsRequest = ListJobsRequest.builder() .jobQueue(jobQueue) .jobStatus(JobStatus.SUCCEEDED) // Filter jobs by status. .build(); List<JobSummary> jobSummaries = new ArrayList<>(); ListJobsPublisher listJobsPaginator = getAsyncClient().listJobsPaginator(listJobsRequest); CompletableFuture<Void> future = listJobsPaginator.subscribe(response -> { jobSummaries.addAll(response.jobSummaryList()); }); future.join(); return jobSummaries; } /** * Registers a new job definition asynchronously in AWS Batch. * <p> * When using Fargate as the compute environment, it is crucial to set the * {@link NetworkConfiguration} with {@link AssignPublicIp#ENABLED} to * ensure proper networking configuration for the Fargate tasks. This * allows the tasks to communicate with external services, access the * internet, or communicate within a VPC. * * @param jobDefinitionName the name of the job definition to be registered * @param executionRoleARN the ARN (Amazon Resource Name) of the execution role * that provides permissions for the containers in the job * @param cpuArch a value of either X86_64 or ARM64 required for the service call * @return a CompletableFuture that completes with the ARN of the registered * job definition upon successful execution, or completes exceptionally with * an error if the registration fails */ public CompletableFuture<String> registerJobDefinitionAsync(String jobDefinitionName, String executionRoleARN, String image, String cpuArch) { NetworkConfiguration networkConfiguration = NetworkConfiguration.builder() .assignPublicIp(AssignPublicIp.ENABLED) .build(); ContainerProperties containerProperties = ContainerProperties.builder() .image(image) .executionRoleArn(executionRoleARN) .resourceRequirements( Arrays.asList( ResourceRequirement.builder() .type(ResourceType.VCPU) .value("1") .build(), ResourceRequirement.builder() .type(ResourceType.MEMORY) .value("2048") .build() ) ) .networkConfiguration(networkConfiguration) .runtimePlatform(b -> b .cpuArchitecture(cpuArch) .operatingSystemFamily("LINUX")) .build(); RegisterJobDefinitionRequest request = RegisterJobDefinitionRequest.builder() .jobDefinitionName(jobDefinitionName) .type(JobDefinitionType.CONTAINER) .containerProperties(containerProperties) .platformCapabilities(PlatformCapability.FARGATE) .build(); CompletableFuture<String> future = new CompletableFuture<>(); getAsyncClient().registerJobDefinition(request) .thenApply(RegisterJobDefinitionResponse::jobDefinitionArn) .whenComplete((result, ex) -> { if (ex != null) { future.completeExceptionally(ex); } else { future.complete(result); } }); return future; } /** * Deregisters a job definition asynchronously. * * @param jobDefinition the name of the job definition to be deregistered * @return a CompletableFuture that completes when the job definition has been deregistered * or an exception has occurred */ public CompletableFuture<DeregisterJobDefinitionResponse> deregisterJobDefinitionAsync(String jobDefinition) { DeregisterJobDefinitionRequest jobDefinitionRequest = DeregisterJobDefinitionRequest.builder() .jobDefinition(jobDefinition) .build(); CompletableFuture<DeregisterJobDefinitionResponse> responseFuture = getAsyncClient().deregisterJobDefinition(jobDefinitionRequest); responseFuture.whenComplete((response, ex) -> { if (ex != null) { throw new RuntimeException("Unexpected error occurred: " + ex.getMessage(), ex); } }); return responseFuture; } /** * Disables the specified job queue asynchronously. * * @param jobQueueArn the Amazon Resource Name (ARN) of the job queue to be disabled * @return a {@link CompletableFuture} that completes when the job queue update operation is complete, * or completes exceptionally if an error occurs during the operation */ public CompletableFuture<Void> disableJobQueueAsync(String jobQueueArn) { UpdateJobQueueRequest updateRequest = UpdateJobQueueRequest.builder() .jobQueue(jobQueueArn) .state(JQState.DISABLED) .build(); CompletableFuture<UpdateJobQueueResponse> responseFuture = getAsyncClient().updateJobQueue(updateRequest); return responseFuture.whenComplete((updateResponse, ex) -> { if (ex != null) { throw new RuntimeException("Failed to update job queue: " + ex.getMessage(), ex); } }).thenApply(updateResponse -> null); } /** * Deletes a Batch job queue asynchronously. * * @param jobQueueArn The Amazon Resource Name (ARN) of the job queue to delete. * @return A CompletableFuture that represents the asynchronous deletion of the job queue. * The future completes when the job queue has been successfully deleted or if an error occurs. * If successful, the future will be completed with a {@code Void} value. * If an error occurs, the future will be completed exceptionally with the thrown exception. */ public CompletableFuture<Void> deleteJobQueueAsync(String jobQueueArn) { DeleteJobQueueRequest deleteRequest = DeleteJobQueueRequest.builder() .jobQueue(jobQueueArn) .build(); CompletableFuture<DeleteJobQueueResponse> responseFuture = getAsyncClient().deleteJobQueue(deleteRequest); return responseFuture.whenComplete((deleteResponse, ex) -> { if (ex != null) { throw new RuntimeException("Failed to delete job queue: " + ex.getMessage(), ex); } }).thenApply(deleteResponse -> null); } /** * Asynchronously describes the job queue associated with the specified compute environment. * * @param computeEnvironmentName the name of the compute environment to find the associated job queue for * @return a {@link CompletableFuture} that, when completed, contains the job queue ARN associated with the specified compute environment * @throws RuntimeException if the job queue description fails */ public CompletableFuture<String> describeJobQueueAsync(String computeEnvironmentName) { DescribeJobQueuesRequest describeJobQueuesRequest = DescribeJobQueuesRequest.builder() .build(); CompletableFuture<DescribeJobQueuesResponse> responseFuture = getAsyncClient().describeJobQueues(describeJobQueuesRequest); return responseFuture.whenComplete((describeJobQueuesResponse, ex) -> { if (describeJobQueuesResponse != null) { String jobQueueARN; for (JobQueueDetail jobQueueDetail : describeJobQueuesResponse.jobQueues()) { for (ComputeEnvironmentOrder computeEnvironmentOrder : jobQueueDetail.computeEnvironmentOrder()) { String computeEnvironment = computeEnvironmentOrder.computeEnvironment(); String name = getComputeEnvironmentName(computeEnvironment); if (name.equals(computeEnvironmentName)) { jobQueueARN = jobQueueDetail.jobQueueArn(); logger.info("Job queue ARN associated with the compute environment: " + jobQueueARN); } } } } else { throw new RuntimeException("Failed to describe job queue: " + ex.getMessage(), ex); } }).thenApply(describeJobQueuesResponse -> { String jobQueueARN = ""; for (JobQueueDetail jobQueueDetail : describeJobQueuesResponse.jobQueues()) { for (ComputeEnvironmentOrder computeEnvironmentOrder : jobQueueDetail.computeEnvironmentOrder()) { String computeEnvironment = computeEnvironmentOrder.computeEnvironment(); String name = getComputeEnvironmentName(computeEnvironment); if (name.equals(computeEnvironmentName)) { jobQueueARN = jobQueueDetail.jobQueueArn(); } } } return jobQueueARN; }); } /** * Disables the specified compute environment asynchronously. * * @param computeEnvironmentName the name of the compute environment to disable * @return a CompletableFuture that completes when the compute environment is disabled */ public CompletableFuture<UpdateComputeEnvironmentResponse> disableComputeEnvironmentAsync(String computeEnvironmentName) { UpdateComputeEnvironmentRequest updateRequest = UpdateComputeEnvironmentRequest.builder() .computeEnvironment(computeEnvironmentName) .state(CEState.DISABLED) .build(); CompletableFuture<UpdateComputeEnvironmentResponse> responseFuture = getAsyncClient().updateComputeEnvironment(updateRequest); responseFuture.whenComplete((response, ex) -> { if (ex != null) { throw new RuntimeException("Failed to disable compute environment: " + ex.getMessage(), ex); } }); return responseFuture; } /** * Submits a job asynchronously to the AWS Batch service. * * @param jobDefinitionName the name of the job definition to use * @param jobQueueName the name of the job queue to submit the job to * @param jobARN the Amazon Resource Name (ARN) of the job definition * @return a CompletableFuture that, when completed, contains the job ID of the submitted job */ public CompletableFuture<String> submitJobAsync(String jobDefinitionName, String jobQueueName, String jobARN) { SubmitJobRequest jobRequest = SubmitJobRequest.builder() .jobDefinition(jobARN) .jobName(jobDefinitionName) .jobQueue(jobQueueName) .build(); CompletableFuture<SubmitJobResponse> responseFuture = getAsyncClient().submitJob(jobRequest); responseFuture.whenComplete((response, ex) -> { if (ex != null) { throw new RuntimeException("Unexpected error occurred: " + ex.getMessage(), ex); } }); return responseFuture.thenApply(SubmitJobResponse::jobId); } /** * Asynchronously retrieves the status of a specific job. * * @param jobId the ID of the job to retrieve the status for * @return a CompletableFuture that completes with the job status */ public CompletableFuture<String> describeJobAsync(String jobId) { DescribeJobsRequest describeJobsRequest = DescribeJobsRequest.builder() .jobs(jobId) .build(); CompletableFuture<DescribeJobsResponse> responseFuture = getAsyncClient().describeJobs(describeJobsRequest); return responseFuture.whenComplete((response, ex) -> { if (ex != null) { throw new RuntimeException("Unexpected error occurred: " + ex.getMessage(), ex); } }).thenApply(response -> response.jobs().get(0).status().toString()); } /** * Disables the specific job queue using the asynchronous Java client. * * @param jobQueueArn the Amazon Resource Name (ARN) of the job queue to wait for * @return a {@link CompletableFuture} that completes when the job queue is disabled */ public CompletableFuture<Void> waitForJobQueueToBeDisabledAsync(String jobQueueArn) { AtomicBoolean isDisabled = new AtomicBoolean(false); return CompletableFuture.runAsync(() -> { while (!isDisabled.get()) { DescribeJobQueuesRequest describeRequest = DescribeJobQueuesRequest.builder() .jobQueues(jobQueueArn) .build(); CompletableFuture<DescribeJobQueuesResponse> responseFuture = getAsyncClient().describeJobQueues(describeRequest); responseFuture.whenComplete((describeResponse, ex) -> { if (describeResponse != null) { for (JobQueueDetail jobQueue : describeResponse.jobQueues()) { if (jobQueue.jobQueueArn().equals(jobQueueArn) && jobQueue.state() == JQState.DISABLED) { isDisabled.set(true); break; } } } else { throw new RuntimeException("Error describing job queues", ex); } }).join(); if (!isDisabled.get()) { try { logger.info("Waiting for job queue to be disabled..."); Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Thread interrupted while waiting for job queue to be disabled", e); } } } }).whenComplete((result, throwable) -> { if (throwable != null) { throw new RuntimeException("Error while waiting for job queue to be disabled", throwable); } }); } public CompletableFuture<String> getJobQueueARN(String jobQueueName) { // Describe the job queue asynchronously CompletableFuture<DescribeJobQueuesResponse> describeJobQueuesFuture = batchClient.describeJobQueues( DescribeJobQueuesRequest.builder() .jobQueues(jobQueueName) .build() ); // Handle the asynchronous response and return the Job Queue ARN in the CompletableFuture<String> CompletableFuture<String> jobQueueArnFuture = new CompletableFuture<>(); describeJobQueuesFuture.whenComplete((response, error) -> { if (error != null) { if (error instanceof BatchException) { logger.info("Batch error: " + ((BatchException) error).awsErrorDetails().errorMessage()); } else { logger.info("Error describing job queue: " + error.getMessage()); } jobQueueArnFuture.completeExceptionally(new RuntimeException("Failed to retrieve Job Queue ARN", error)); } else { if (response.jobQueues().isEmpty()) { jobQueueArnFuture.completeExceptionally(new RuntimeException("Job queue not found: " + jobQueueName)); } else { // Assuming only one job queue is returned for the given name String jobQueueArn = response.jobQueues().get(0).jobQueueArn(); jobQueueArnFuture.complete(jobQueueArn); } } }); return jobQueueArnFuture; } private static String getComputeEnvironmentName(String computeEnvironment) { String[] parts = computeEnvironment.split("/"); if (parts.length == 2) { return parts[1]; } return null; } public CompletableFuture<String> getAccountId() { StsAsyncClient stsAsyncClient = StsAsyncClient.builder() .region(Region.US_EAST_1) .build(); return stsAsyncClient.getCallerIdentity() .thenApply(GetCallerIdentityResponse::account); } }