SDK适用于 Java 2.x 的 Amazon S3 控制示例 - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

SDK适用于 Java 2.x 的 Amazon S3 控制示例

以下代码示例向您展示了如何使用 AWS SDK for Java 2.x 与 Amazon S3 Control 配合使用来执行操作和实现常见场景。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您展示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以在其中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例显示了如何开始使用 “Amazon S3 控件”

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3control.S3ControlAsyncClient; import software.amazon.awssdk.services.s3control.model.JobListDescriptor; import software.amazon.awssdk.services.s3control.model.JobStatus; import software.amazon.awssdk.services.s3control.model.ListJobsRequest; import software.amazon.awssdk.services.s3control.paginators.ListJobsPublisher; import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; public class HelloS3Batch { private static S3ControlAsyncClient asyncClient; public static void main(String []args ) { S3BatchActions actions = new S3BatchActions(); String accountId= actions.getAccountId(); try { listBatchJobsAsync(accountId) .exceptionally(ex -> { System.err.println("List batch jobs failed: " + ex.getMessage()); return null; }) .join(); // Wait for completion } catch (CompletionException ex) { System.err.println("Failed to list batch jobs: " + ex.getMessage()); } } /** * Retrieves the asynchronous S3 Control client instance. * <p> * This method creates and returns a singleton instance of the {@link S3ControlAsyncClient}. If the instance * has not been created yet, it will be initialized with the following configuration: * <ul> * <li>Maximum concurrency: 100</li> * <li>Connection timeout: 60 seconds</li> * <li>Read timeout: 60 seconds</li> * <li>Write timeout: 60 seconds</li> * <li>API call timeout: 2 minutes</li> * <li>API call attempt timeout: 90 seconds</li> * <li>Retry policy: 3 retries</li> * <li>Region: US_EAST_1</li> * <li>Credentials provider: {@link EnvironmentVariableCredentialsProvider}</li> * </ul> * * @return the asynchronous S3 Control client instance */ private static S3ControlAsyncClient getAsyncClient() { if (asyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryPolicy(RetryPolicy.builder() .numRetries(3) .build()) .build(); asyncClient = S3ControlAsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); } return asyncClient; } /** * Asynchronously lists batch jobs that have completed for the specified account. * * @param accountId the ID of the account to list jobs for * @return a CompletableFuture that completes when the job listing operation is finished */ public static CompletableFuture<Void> listBatchJobsAsync(String accountId) { ListJobsRequest jobsRequest = ListJobsRequest.builder() .jobStatuses(JobStatus.COMPLETE) .accountId(accountId) .maxResults(10) .build(); ListJobsPublisher publisher = getAsyncClient().listJobsPaginator(jobsRequest); return publisher.subscribe(response -> { List<JobListDescriptor> jobs = response.jobs(); for (JobListDescriptor job : jobs) { System.out.println("The job id is " + job.jobId()); System.out.println("The job priority is " + job.priority()); } }).thenAccept(response -> { System.out.println("Listing batch jobs completed"); }).exceptionally(ex -> { System.err.println("Failed to list batch jobs: " + ex.getMessage()); throw new RuntimeException(ex); }); }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 ListJobsPaginator” 中的。

操作

以下代码示例显示了如何使用CreateJob

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * Creates an asynchronous S3 job using the AWS Java SDK. * * @param accountId the AWS account ID associated with the job * @param iamRoleArn the ARN of the IAM role to be used for the job * @param manifestLocation the location of the job manifest file in S3 * @param reportBucketName the name of the S3 bucket to store the job report * @param uuid a unique identifier for the job * @return a CompletableFuture that represents the asynchronous creation of the S3 job. * The CompletableFuture will return the job ID if the job is created successfully, * or throw an exception if there is an error. */ public CompletableFuture<String> createS3JobAsync(String accountId, String iamRoleArn, String manifestLocation, String reportBucketName, String uuid) { String[] bucketName = new String[]{""}; String[] parts = reportBucketName.split(":::"); if (parts.length > 1) { bucketName[0] = parts[1]; } else { System.out.println("The input string does not contain the expected format."); } return CompletableFuture.supplyAsync(() -> getETag(bucketName[0], "job-manifest.csv")) .thenCompose(eTag -> { ArrayList<S3Tag> tagSet = new ArrayList<>(); S3Tag s3Tag = S3Tag.builder() .key("keyOne") .value("ValueOne") .build(); S3Tag s3Tag2 = S3Tag.builder() .key("keyTwo") .value("ValueTwo") .build(); tagSet.add(s3Tag); tagSet.add(s3Tag2); S3SetObjectTaggingOperation objectTaggingOperation = S3SetObjectTaggingOperation.builder() .tagSet(tagSet) .build(); JobOperation jobOperation = JobOperation.builder() .s3PutObjectTagging(objectTaggingOperation) .build(); JobManifestLocation jobManifestLocation = JobManifestLocation.builder() .objectArn(manifestLocation) .eTag(eTag) .build(); JobManifestSpec manifestSpec = JobManifestSpec.builder() .fieldsWithStrings("Bucket", "Key") .format("S3BatchOperations_CSV_20180820") .build(); JobManifest jobManifest = JobManifest.builder() .spec(manifestSpec) .location(jobManifestLocation) .build(); JobReport jobReport = JobReport.builder() .bucket(reportBucketName) .prefix("reports") .format("Report_CSV_20180820") .enabled(true) .reportScope("AllTasks") .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .accountId(accountId) .description("Job created using the AWS Java SDK") .manifest(jobManifest) .operation(jobOperation) .report(jobReport) .priority(42) .roleArn(iamRoleArn) .clientRequestToken(uuid) .confirmationRequired(false) .build(); // Create the job asynchronously. return getAsyncClient().createJob(jobRequest) .thenApply(CreateJobResponse::jobId); }) .handle((jobId, ex) -> { if (ex != null) { Throwable cause = (ex instanceof CompletionException) ? ex.getCause() : ex; if (cause instanceof S3ControlException) { throw new CompletionException(cause); } else { throw new RuntimeException(cause); } } return jobId; }); }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 CreateJob” 中的。

以下代码示例显示了如何使用DeleteJobTagging

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * Asynchronously deletes the tags associated with a specific batch job. * * @param jobId The ID of the batch job whose tags should be deleted. * @param accountId The ID of the account associated with the batch job. * @return A CompletableFuture that completes when the job tags have been successfully deleted, or an exception is thrown if the deletion fails. */ public CompletableFuture<Void> deleteBatchJobTagsAsync(String jobId, String accountId) { DeleteJobTaggingRequest jobTaggingRequest = DeleteJobTaggingRequest.builder() .accountId(accountId) .jobId(jobId) .build(); return asyncClient.deleteJobTagging(jobTaggingRequest) .thenAccept(response -> { System.out.println("You have successfully deleted " + jobId + " tagging."); }) .exceptionally(ex -> { System.err.println("Failed to delete job tags: " + ex.getMessage()); throw new RuntimeException(ex); }); }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 DeleteJobTagging” 中的。

以下代码示例显示了如何使用DescribeJob

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * Asynchronously describes the specified job. * * @param jobId the ID of the job to describe * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job description is available * @throws RuntimeException if an error occurs while describing the job */ public CompletableFuture<Void> describeJobAsync(String jobId, String accountId) { DescribeJobRequest jobRequest = DescribeJobRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return getAsyncClient().describeJob(jobRequest) .thenAccept(response -> { System.out.println("Job ID: " + response.job().jobId()); System.out.println("Description: " + response.job().description()); System.out.println("Status: " + response.job().statusAsString()); System.out.println("Role ARN: " + response.job().roleArn()); System.out.println("Priority: " + response.job().priority()); System.out.println("Progress Summary: " + response.job().progressSummary()); // Print out details about the job manifest. JobManifest manifest = response.job().manifest(); System.out.println("Manifest Location: " + manifest.location().objectArn()); System.out.println("Manifest ETag: " + manifest.location().eTag()); // Print out details about the job operation. JobOperation operation = response.job().operation(); if (operation.s3PutObjectTagging() != null) { System.out.println("Operation: S3 Put Object Tagging"); System.out.println("Tag Set: " + operation.s3PutObjectTagging().tagSet()); } // Print out details about the job report. JobReport report = response.job().report(); System.out.println("Report Bucket: " + report.bucket()); System.out.println("Report Prefix: " + report.prefix()); System.out.println("Report Format: " + report.format()); System.out.println("Report Enabled: " + report.enabled()); System.out.println("Report Scope: " + report.reportScopeAsString()); }) .exceptionally(ex -> { System.err.println("Failed to describe job: " + ex.getMessage()); throw new RuntimeException(ex); }); }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 DescribeJob” 中的。

以下代码示例显示了如何使用GetJobTagging

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * Asynchronously retrieves the tags associated with a specific job in an AWS account. * * @param jobId the ID of the job for which to retrieve the tags * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job tags have been retrieved, or with an exception if the operation fails * @throws RuntimeException if an error occurs while retrieving the job tags */ public CompletableFuture<Void> getJobTagsAsync(String jobId, String accountId) { GetJobTaggingRequest request = GetJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return asyncClient.getJobTagging(request) .thenAccept(response -> { List<S3Tag> tags = response.tags(); if (tags.isEmpty()) { System.out.println("No tags found for job ID: " + jobId); } else { for (S3Tag tag : tags) { System.out.println("Tag key is: " + tag.key()); System.out.println("Tag value is: " + tag.value()); } } }) .exceptionally(ex -> { System.err.println("Failed to get job tags: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 GetJobTagging” 中的。

以下代码示例显示了如何使用PutJobTagging

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * Asynchronously adds tags to a job in the system. * * @param jobId the ID of the job to add tags to * @param accountId the account ID associated with the job * @return a CompletableFuture that completes when the tagging operation is finished */ public CompletableFuture<Void> putJobTaggingAsync(String jobId, String accountId) { S3Tag departmentTag = S3Tag.builder() .key("department") .value("Marketing") .build(); S3Tag fiscalYearTag = S3Tag.builder() .key("FiscalYear") .value("2020") .build(); PutJobTaggingRequest putJobTaggingRequest = PutJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .tags(departmentTag, fiscalYearTag) .build(); return asyncClient.putJobTagging(putJobTaggingRequest) .thenRun(() -> { System.out.println("Additional Tags were added to job " + jobId); }) .exceptionally(ex -> { System.err.println("Failed to add tags to job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 PutJobTagging” 中的。

以下代码示例显示了如何使用UpdateJobPriority

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * Updates the priority of a job asynchronously. * * @param jobId the ID of the job to update * @param accountId the ID of the account associated with the job * @return a {@link CompletableFuture} that represents the asynchronous operation, which completes when the job priority has been updated or an error has occurred */ public CompletableFuture<Void> updateJobPriorityAsync(String jobId, String accountId) { UpdateJobPriorityRequest priorityRequest = UpdateJobPriorityRequest.builder() .accountId(accountId) .jobId(jobId) .priority(60) .build(); CompletableFuture<Void> future = new CompletableFuture<>(); getAsyncClient().updateJobPriority(priorityRequest) .thenAccept(response -> { System.out.println("The job priority was updated"); future.complete(null); // Complete the CompletableFuture on successful execution }) .exceptionally(ex -> { System.err.println("Failed to update job priority: " + ex.getMessage()); future.completeExceptionally(ex); // Complete the CompletableFuture exceptionally on error return null; // Return null to handle the exception }); return future; }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 UpdateJobPriority” 中的。

以下代码示例显示了如何使用UpdateJobStatus

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * Cancels a job asynchronously. * * @param jobId The ID of the job to be canceled. * @param accountId The ID of the account associated with the job. * @return A {@link CompletableFuture} that completes when the job status has been updated to "CANCELLED". * If an error occurs during the update, the returned future will complete exceptionally. */ public CompletableFuture<Void> cancelJobAsync(String jobId, String accountId) { UpdateJobStatusRequest updateJobStatusRequest = UpdateJobStatusRequest.builder() .accountId(accountId) .jobId(jobId) .requestedJobStatus(String.valueOf(JobStatus.CANCELLED)) .build(); return asyncClient.updateJobStatus(updateJobStatusRequest) .thenAccept(updateJobStatusResponse -> { System.out.println("Job status updated to: " + updateJobStatusResponse.status()); }) .exceptionally(ex -> { System.err.println("Failed to cancel job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 UpdateJobStatus” 中的。

场景

以下代码示例展示了如何学习 “Amazon S3 控制” 的核心操作。

SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

学习核心操作。

package com.example.s3.batch; import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.IOException; import java.util.Map; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.CompletionException; public class S3BatchScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); private static final String STACK_NAME = "MyS3Stack"; public static void main(String[] args) throws IOException { S3BatchActions actions = new S3BatchActions(); String accountId = actions.getAccountId(); String uuid = java.util.UUID.randomUUID().toString(); Scanner scanner = new Scanner(System.in); System.out.println(DASHES); System.out.println("Welcome to the Amazon S3 Batch basics scenario."); System.out.println(""" S3 Batch operations enables efficient and cost-effective processing of large-scale data stored in Amazon S3. It automatically scales resources to handle varying workloads without the need for manual intervention. One of the key features of S3 Batch is its ability to perform tagging operations on objects stored in S3 buckets. Users can leverage S3 Batch to apply, update, or remove tags on thousands or millions of objects in a single operation, streamlining the management and organization of their data. This can be particularly useful for tasks such as cost allocation, lifecycle management, or metadata-driven workflows, where consistent and accurate tagging is essential. S3 Batch's scalability and serverless nature make it an ideal solution for organizations with growing data volumes and complex data management requirements. This Java program walks you through Amazon S3 Batch operations. Let's get started... """); waitForInputToContinue(scanner); // Use CloudFormation to stand up the resource required for this scenario. System.out.println("Use CloudFormation to stand up the resource required for this scenario."); CloudFormationHelper.deployCloudFormationStack(STACK_NAME); Map<String, String> stackOutputs = CloudFormationHelper.getStackOutputs(STACK_NAME); String iamRoleArn = stackOutputs.get("S3BatchRoleArn"); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Setup the required bucket for this scenario."); waitForInputToContinue(scanner); String bucketName = "x-" + UUID.randomUUID(); actions.createBucket(bucketName); String reportBucketName = "arn:aws:s3:::"+bucketName; String manifestLocation = "arn:aws:s3:::"+bucketName+"/job-manifest.csv"; System.out.println("Populate the bucket with the required files."); String[] fileNames = {"job-manifest.csv", "object-key-1.txt", "object-key-2.txt", "object-key-3.txt", "object-key-4.txt"}; actions.uploadFilesToBucket(bucketName, fileNames, actions); waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("1. Create a S3 Batch Job"); System.out.println("This job tags all objects listed in the manifest file with tags"); waitForInputToContinue(scanner); String jobId ; try { jobId = actions.createS3JobAsync(accountId, iamRoleArn, manifestLocation, reportBucketName, uuid).join(); System.out.println("The Job id is " + jobId); } catch (S3Exception e) { System.err.println("SSM error: " + e.getMessage()); return; } catch (RuntimeException e) { System.err.println("Unexpected error: " + e.getMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Update an existing S3 Batch Operations job's priority"); System.out.println(""" In this step, we modify the job priority value. The higher the number, the higher the priority. So, a job with a priority of `30` would have a higher priority than a job with a priority of `20`. This is a common way to represent the priority of a task or job, with higher numbers indicating a higher priority. Ensure that the job status allows for priority updates. Jobs in certain states (e.g., Cancelled, Failed, or Completed) cannot have their priorities updated. Only jobs in the Active or Suspended state typically allow priority updates. """); try { actions.updateJobPriorityAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Update job priority failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to update job priority: " + ex.getMessage()); } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Cancel the S3 Batch job"); System.out.print("Do you want to cancel the Batch job? (y/n): "); String cancelAns = scanner.nextLine(); if (cancelAns != null && cancelAns.trim().equalsIgnoreCase("y")) { try { actions.cancelJobAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Cancel job failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to cancel job: " + ex.getMessage()); } } else { System.out.println("Job " +jobId +" was not canceled."); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Describe the job that was just created"); waitForInputToContinue(scanner); try { actions.describeJobAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Describe job failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to describe job: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Describe the tags associated with the job"); waitForInputToContinue(scanner); try { actions.getJobTagsAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Get job tags failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to get job tags: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("6. Update Batch Job Tags"); waitForInputToContinue(scanner); try { actions.putJobTaggingAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Put job tagging failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to put job tagging: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Delete the Amazon S3 Batch job tagging."); System.out.print("Do you want to delete Batch job tagging? (y/n)"); String delAns = scanner.nextLine(); if (delAns != null && delAns.trim().equalsIgnoreCase("y")) { try { actions.deleteBatchJobTagsAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Delete batch job tags failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to delete batch job tags: " + ex.getMessage()); } } else { System.out.println("Tagging was not deleted."); } System.out.println(DASHES); System.out.println(DASHES); System.out.print("Do you want to delete the AWS resources used in this scenario? (y/n)"); String delResAns = scanner.nextLine(); if (delResAns != null && delResAns.trim().equalsIgnoreCase("y")) { actions.deleteFilesFromBucket(bucketName, fileNames, actions); actions.deleteBucketFolderAsync(bucketName); actions.deleteBucket(bucketName) .thenRun(() -> System.out.println("Bucket deletion completed")) .exceptionally(ex -> { System.err.println("Error occurred: " + ex.getMessage()); return null; }); CloudFormationHelper.destroyCloudFormationStack(STACK_NAME); } else { System.out.println("The AWS resources were not deleted."); } System.out.println("The Amazon S3 Batch scenario has successfully completed."); System.out.println(DASHES); } private static void waitForInputToContinue(Scanner scanner) { while (true) { System.out.println(); System.out.println("Enter 'c' followed by <ENTER> to continue:"); String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("c")) { System.out.println("Continuing with the program..."); System.out.println(); break; } else { // Handle invalid input. System.out.println("Invalid input. Please try again."); } } } }

封装操作的操作类。

public class S3BatchActions { private static S3ControlAsyncClient asyncClient; private static S3AsyncClient s3AsyncClient ; /** * Retrieves the asynchronous S3 Control client instance. * <p> * This method creates and returns a singleton instance of the {@link S3ControlAsyncClient}. If the instance * has not been created yet, it will be initialized with the following configuration: * <ul> * <li>Maximum concurrency: 100</li> * <li>Connection timeout: 60 seconds</li> * <li>Read timeout: 60 seconds</li> * <li>Write timeout: 60 seconds</li> * <li>API call timeout: 2 minutes</li> * <li>API call attempt timeout: 90 seconds</li> * <li>Retry policy: 3 retries</li> * <li>Region: US_EAST_1</li> * <li>Credentials provider: {@link EnvironmentVariableCredentialsProvider}</li> * </ul> * * @return the asynchronous S3 Control client instance */ private static S3ControlAsyncClient getAsyncClient() { if (asyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryPolicy(RetryPolicy.builder() .numRetries(3) .build()) .build(); asyncClient = S3ControlAsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); } return asyncClient; } private static S3AsyncClient getS3AsyncClient() { if (asyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryPolicy(RetryPolicy.builder() .numRetries(3) .build()) .build(); s3AsyncClient = S3AsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); } return s3AsyncClient; } /** * Cancels a job asynchronously. * * @param jobId The ID of the job to be canceled. * @param accountId The ID of the account associated with the job. * @return A {@link CompletableFuture} that completes when the job status has been updated to "CANCELLED". * If an error occurs during the update, the returned future will complete exceptionally. */ public CompletableFuture<Void> cancelJobAsync(String jobId, String accountId) { UpdateJobStatusRequest updateJobStatusRequest = UpdateJobStatusRequest.builder() .accountId(accountId) .jobId(jobId) .requestedJobStatus(String.valueOf(JobStatus.CANCELLED)) .build(); return asyncClient.updateJobStatus(updateJobStatusRequest) .thenAccept(updateJobStatusResponse -> { System.out.println("Job status updated to: " + updateJobStatusResponse.status()); }) .exceptionally(ex -> { System.err.println("Failed to cancel job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } /** * Updates the priority of a job asynchronously. * * @param jobId the ID of the job to update * @param accountId the ID of the account associated with the job * @return a {@link CompletableFuture} that represents the asynchronous operation, which completes when the job priority has been updated or an error has occurred */ public CompletableFuture<Void> updateJobPriorityAsync(String jobId, String accountId) { UpdateJobPriorityRequest priorityRequest = UpdateJobPriorityRequest.builder() .accountId(accountId) .jobId(jobId) .priority(60) .build(); CompletableFuture<Void> future = new CompletableFuture<>(); getAsyncClient().updateJobPriority(priorityRequest) .thenAccept(response -> { System.out.println("The job priority was updated"); future.complete(null); // Complete the CompletableFuture on successful execution }) .exceptionally(ex -> { System.err.println("Failed to update job priority: " + ex.getMessage()); future.completeExceptionally(ex); // Complete the CompletableFuture exceptionally on error return null; // Return null to handle the exception }); return future; } /** * Asynchronously retrieves the tags associated with a specific job in an AWS account. * * @param jobId the ID of the job for which to retrieve the tags * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job tags have been retrieved, or with an exception if the operation fails * @throws RuntimeException if an error occurs while retrieving the job tags */ public CompletableFuture<Void> getJobTagsAsync(String jobId, String accountId) { GetJobTaggingRequest request = GetJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return asyncClient.getJobTagging(request) .thenAccept(response -> { List<S3Tag> tags = response.tags(); if (tags.isEmpty()) { System.out.println("No tags found for job ID: " + jobId); } else { for (S3Tag tag : tags) { System.out.println("Tag key is: " + tag.key()); System.out.println("Tag value is: " + tag.value()); } } }) .exceptionally(ex -> { System.err.println("Failed to get job tags: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } /** * Asynchronously deletes the tags associated with a specific batch job. * * @param jobId The ID of the batch job whose tags should be deleted. * @param accountId The ID of the account associated with the batch job. * @return A CompletableFuture that completes when the job tags have been successfully deleted, or an exception is thrown if the deletion fails. */ public CompletableFuture<Void> deleteBatchJobTagsAsync(String jobId, String accountId) { DeleteJobTaggingRequest jobTaggingRequest = DeleteJobTaggingRequest.builder() .accountId(accountId) .jobId(jobId) .build(); return asyncClient.deleteJobTagging(jobTaggingRequest) .thenAccept(response -> { System.out.println("You have successfully deleted " + jobId + " tagging."); }) .exceptionally(ex -> { System.err.println("Failed to delete job tags: " + ex.getMessage()); throw new RuntimeException(ex); }); } /** * Asynchronously describes the specified job. * * @param jobId the ID of the job to describe * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job description is available * @throws RuntimeException if an error occurs while describing the job */ public CompletableFuture<Void> describeJobAsync(String jobId, String accountId) { DescribeJobRequest jobRequest = DescribeJobRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return getAsyncClient().describeJob(jobRequest) .thenAccept(response -> { System.out.println("Job ID: " + response.job().jobId()); System.out.println("Description: " + response.job().description()); System.out.println("Status: " + response.job().statusAsString()); System.out.println("Role ARN: " + response.job().roleArn()); System.out.println("Priority: " + response.job().priority()); System.out.println("Progress Summary: " + response.job().progressSummary()); // Print out details about the job manifest. JobManifest manifest = response.job().manifest(); System.out.println("Manifest Location: " + manifest.location().objectArn()); System.out.println("Manifest ETag: " + manifest.location().eTag()); // Print out details about the job operation. JobOperation operation = response.job().operation(); if (operation.s3PutObjectTagging() != null) { System.out.println("Operation: S3 Put Object Tagging"); System.out.println("Tag Set: " + operation.s3PutObjectTagging().tagSet()); } // Print out details about the job report. JobReport report = response.job().report(); System.out.println("Report Bucket: " + report.bucket()); System.out.println("Report Prefix: " + report.prefix()); System.out.println("Report Format: " + report.format()); System.out.println("Report Enabled: " + report.enabled()); System.out.println("Report Scope: " + report.reportScopeAsString()); }) .exceptionally(ex -> { System.err.println("Failed to describe job: " + ex.getMessage()); throw new RuntimeException(ex); }); } /** * Creates an asynchronous S3 job using the AWS Java SDK. * * @param accountId the AWS account ID associated with the job * @param iamRoleArn the ARN of the IAM role to be used for the job * @param manifestLocation the location of the job manifest file in S3 * @param reportBucketName the name of the S3 bucket to store the job report * @param uuid a unique identifier for the job * @return a CompletableFuture that represents the asynchronous creation of the S3 job. * The CompletableFuture will return the job ID if the job is created successfully, * or throw an exception if there is an error. */ public CompletableFuture<String> createS3JobAsync(String accountId, String iamRoleArn, String manifestLocation, String reportBucketName, String uuid) { String[] bucketName = new String[]{""}; String[] parts = reportBucketName.split(":::"); if (parts.length > 1) { bucketName[0] = parts[1]; } else { System.out.println("The input string does not contain the expected format."); } return CompletableFuture.supplyAsync(() -> getETag(bucketName[0], "job-manifest.csv")) .thenCompose(eTag -> { ArrayList<S3Tag> tagSet = new ArrayList<>(); S3Tag s3Tag = S3Tag.builder() .key("keyOne") .value("ValueOne") .build(); S3Tag s3Tag2 = S3Tag.builder() .key("keyTwo") .value("ValueTwo") .build(); tagSet.add(s3Tag); tagSet.add(s3Tag2); S3SetObjectTaggingOperation objectTaggingOperation = S3SetObjectTaggingOperation.builder() .tagSet(tagSet) .build(); JobOperation jobOperation = JobOperation.builder() .s3PutObjectTagging(objectTaggingOperation) .build(); JobManifestLocation jobManifestLocation = JobManifestLocation.builder() .objectArn(manifestLocation) .eTag(eTag) .build(); JobManifestSpec manifestSpec = JobManifestSpec.builder() .fieldsWithStrings("Bucket", "Key") .format("S3BatchOperations_CSV_20180820") .build(); JobManifest jobManifest = JobManifest.builder() .spec(manifestSpec) .location(jobManifestLocation) .build(); JobReport jobReport = JobReport.builder() .bucket(reportBucketName) .prefix("reports") .format("Report_CSV_20180820") .enabled(true) .reportScope("AllTasks") .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .accountId(accountId) .description("Job created using the AWS Java SDK") .manifest(jobManifest) .operation(jobOperation) .report(jobReport) .priority(42) .roleArn(iamRoleArn) .clientRequestToken(uuid) .confirmationRequired(false) .build(); // Create the job asynchronously. return getAsyncClient().createJob(jobRequest) .thenApply(CreateJobResponse::jobId); }) .handle((jobId, ex) -> { if (ex != null) { Throwable cause = (ex instanceof CompletionException) ? ex.getCause() : ex; if (cause instanceof S3ControlException) { throw new CompletionException(cause); } else { throw new RuntimeException(cause); } } return jobId; }); } /** * Retrieves the ETag (Entity Tag) for an object stored in an Amazon S3 bucket. * * @param bucketName the name of the Amazon S3 bucket where the object is stored * @param key the key (file name) of the object in the Amazon S3 bucket * @return the ETag of the object */ public String getETag(String bucketName, String key) { S3Client s3Client = S3Client.builder() .region(Region.US_EAST_1) .build(); HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() .bucket(bucketName) .key(key) .build(); HeadObjectResponse headObjectResponse = s3Client.headObject(headObjectRequest); return headObjectResponse.eTag(); } /** * Asynchronously adds tags to a job in the system. * * @param jobId the ID of the job to add tags to * @param accountId the account ID associated with the job * @return a CompletableFuture that completes when the tagging operation is finished */ public CompletableFuture<Void> putJobTaggingAsync(String jobId, String accountId) { S3Tag departmentTag = S3Tag.builder() .key("department") .value("Marketing") .build(); S3Tag fiscalYearTag = S3Tag.builder() .key("FiscalYear") .value("2020") .build(); PutJobTaggingRequest putJobTaggingRequest = PutJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .tags(departmentTag, fiscalYearTag) .build(); return asyncClient.putJobTagging(putJobTaggingRequest) .thenRun(() -> { System.out.println("Additional Tags were added to job " + jobId); }) .exceptionally(ex -> { System.err.println("Failed to add tags to job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } // Setup the S3 bucket required for this scenario. /** * Creates an Amazon S3 bucket with the specified name. * * @param bucketName the name of the S3 bucket to create * @throws S3Exception if there is an error creating the bucket */ public void createBucket(String bucketName) { try { S3Client s3Client = S3Client.builder() .region(Region.US_EAST_1) .build(); S3Waiter s3Waiter = s3Client.waiter(); CreateBucketRequest bucketRequest = CreateBucketRequest.builder() .bucket(bucketName) .build(); s3Client.createBucket(bucketRequest); HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder() .bucket(bucketName) .build(); // Wait until the bucket is created and print out the response. WaiterResponse<HeadBucketResponse> waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait); waiterResponse.matched().response().ifPresent(System.out::println); System.out.println(bucketName + " is ready"); } catch (S3Exception e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } /** * Uploads a file to an Amazon S3 bucket asynchronously. * * @param bucketName the name of the S3 bucket to upload the file to * @param fileName the name of the file to be uploaded * @throws RuntimeException if an error occurs during the file upload */ public void populateBucket(String bucketName, String fileName) { // Define the path to the directory. Path filePath = Paths.get("src/main/resources/batch/", fileName).toAbsolutePath(); PutObjectRequest putOb = PutObjectRequest.builder() .bucket(bucketName) .key(fileName) .build(); CompletableFuture<PutObjectResponse> future = getS3AsyncClient().putObject(putOb, AsyncRequestBody.fromFile(filePath)); future.whenComplete((result, ex) -> { if (ex != null) { System.err.println("Error uploading file: " + ex.getMessage()); } else { System.out.println("Successfully placed " + fileName + " into bucket " + bucketName); } }).join(); } // Update the bucketName in CSV. public void updateCSV(String newValue) { Path csvFilePath = Paths.get("src/main/resources/batch/job-manifest.csv").toAbsolutePath(); try { // Read all lines from the CSV file. List<String> lines = Files.readAllLines(csvFilePath); // Update the first value in each line. List<String> updatedLines = lines.stream() .map(line -> { String[] parts = line.split(","); parts[0] = newValue; return String.join(",", parts); }) .collect(Collectors.toList()); // Write the updated lines back to the CSV file Files.write(csvFilePath, updatedLines); System.out.println("CSV file updated successfully."); } catch (Exception e) { e.printStackTrace(); } } /** * Deletes an object from an Amazon S3 bucket asynchronously. * * @param bucketName The name of the S3 bucket where the object is stored. * @param objectName The name of the object to be deleted. * @return A {@link CompletableFuture} that completes when the object has been deleted, * or throws a {@link RuntimeException} if an error occurs during the deletion. */ public CompletableFuture<Void> deleteBucketObjects(String bucketName, String objectName) { ArrayList<ObjectIdentifier> toDelete = new ArrayList<>(); toDelete.add(ObjectIdentifier.builder() .key(objectName) .build()); DeleteObjectsRequest dor = DeleteObjectsRequest.builder() .bucket(bucketName) .delete(Delete.builder() .objects(toDelete).build()) .build(); return getS3AsyncClient().deleteObjects(dor) .thenAccept(result -> { System.out.println("The object was deleted!"); }) .exceptionally(ex -> { throw new RuntimeException("Error deleting object: " + ex.getMessage(), ex); }); } /** * Deletes a folder and all its contents asynchronously from an Amazon S3 bucket. * * @param bucketName the name of the S3 bucket containing the folder to be deleted * @return a {@link CompletableFuture} that completes when the folder and its contents have been deleted * @throws RuntimeException if any error occurs during the deletion process */ public void deleteBucketFolderAsync(String bucketName) { String folderName = "reports/"; ListObjectsV2Request request = ListObjectsV2Request.builder() .bucket(bucketName) .prefix(folderName) .build(); CompletableFuture<ListObjectsV2Response> listObjectsFuture = getS3AsyncClient().listObjectsV2(request); listObjectsFuture.thenCompose(response -> { List<CompletableFuture<DeleteObjectResponse>> deleteFutures = response.contents().stream() .map(obj -> { DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder() .bucket(bucketName) .key(obj.key()) .build(); return getS3AsyncClient().deleteObject(deleteRequest) .thenApply(deleteResponse -> { System.out.println("Deleted object: " + obj.key()); return deleteResponse; }); }) .collect(Collectors.toList()); return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) .thenCompose(v -> { // Delete the folder. DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder() .bucket(bucketName) .key(folderName) .build(); return getS3AsyncClient().deleteObject(deleteRequest) .thenApply(deleteResponse -> { System.out.println("Deleted folder: " + folderName); return deleteResponse; }); }); }).join(); } /** * Deletes an Amazon S3 bucket. * * @param bucketName the name of the bucket to delete * @return a {@link CompletableFuture} that completes when the bucket has been deleted, or exceptionally if there is an error * @throws RuntimeException if there is an error deleting the bucket */ public CompletableFuture<Void> deleteBucket(String bucketName) { S3AsyncClient s3Client = getS3AsyncClient(); return s3Client.deleteBucket(DeleteBucketRequest.builder() .bucket(bucketName) .build()) .thenAccept(deleteBucketResponse -> { System.out.println(bucketName + " was deleted"); }) .exceptionally(ex -> { // Handle the exception or rethrow it. throw new RuntimeException("Failed to delete bucket: " + bucketName, ex); }); } /** * Uploads a set of files to an Amazon S3 bucket. * * @param bucketName the name of the S3 bucket to upload the files to * @param fileNames an array of file names to be uploaded * @param actions an instance of {@link S3BatchActions} that provides the implementation for the necessary S3 operations * @throws IOException if there's an error creating the text files or uploading the files to the S3 bucket */ public static void uploadFilesToBucket(String bucketName, String[] fileNames, S3BatchActions actions) throws IOException { actions.updateCSV(bucketName); createTextFiles(fileNames); for (String fileName : fileNames) { actions.populateBucket(bucketName, fileName); } System.out.println("All files are placed in the S3 bucket " + bucketName); } /** * Deletes the specified files from the given S3 bucket. * * @param bucketName the name of the S3 bucket * @param fileNames an array of file names to be deleted from the bucket * @param actions the S3BatchActions instance to be used for the file deletion * @throws IOException if an I/O error occurs during the file deletion */ public void deleteFilesFromBucket(String bucketName, String[] fileNames, S3BatchActions actions) throws IOException { for (String fileName : fileNames) { actions.deleteBucketObjects(bucketName, fileName) .thenRun(() -> System.out.println("Object deletion completed")) .exceptionally(ex -> { System.err.println("Error occurred: " + ex.getMessage()); return null; }); } System.out.println("All files have been deleted from the bucket " + bucketName); } public static void createTextFiles(String[] fileNames) { String currentDirectory = System.getProperty("user.dir"); String directoryPath = currentDirectory + "\\src\\main\\resources\\batch"; Path path = Paths.get(directoryPath); try { // Create the directory if it doesn't exist. if (Files.notExists(path)) { Files.createDirectories(path); System.out.println("Created directory: " + path.toString()); } else { System.out.println("Directory already exists: " + path.toString()); } for (String fileName : fileNames) { // Check if the file is a .txt file. if (fileName.endsWith(".txt")) { // Define the path for the new file. Path filePath = path.resolve(fileName); System.out.println("Attempting to create file: " + filePath.toString()); // Create and write content to the new file. Files.write(filePath, "This is a test".getBytes()); // Verify the file was created. if (Files.exists(filePath)) { System.out.println("Successfully created file: " + filePath.toString()); } else { System.out.println("Failed to create file: " + filePath.toString()); } } } } catch (IOException e) { System.err.println("An error occurred: " + e.getMessage()); e.printStackTrace(); } } public String getAccountId() { StsClient stsClient = StsClient.builder() .region(Region.US_EAST_1) .build(); GetCallerIdentityResponse callerIdentityResponse = stsClient.getCallerIdentity(); return callerIdentityResponse.account(); } }