Amazon Redshift Redshift-Beispiele SDK für die Verwendung von Java 2.x - AWS SDKCode-Beispiele

Weitere AWS SDK Beispiele sind im Repo AWS Doc SDK Examples GitHub verfügbar.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Amazon Redshift Redshift-Beispiele SDK für die Verwendung von Java 2.x

Die folgenden Codebeispiele zeigen Ihnen, wie Sie mithilfe von Amazon Redshift Aktionen ausführen und allgemeine Szenarien implementieren. AWS SDK for Java 2.x

Basics sind Codebeispiele, die Ihnen zeigen, wie Sie die wesentlichen Operationen innerhalb eines Service ausführen.

Aktionen sind Codeauszüge aus größeren Programmen und müssen im Kontext ausgeführt werden. Aktionen zeigen Ihnen zwar, wie Sie einzelne Servicefunktionen aufrufen, aber Sie können Aktionen im Kontext der zugehörigen Szenarien sehen.

Szenarien sind Codebeispiele, die Ihnen zeigen, wie Sie bestimmte Aufgaben ausführen, indem Sie mehrere Funktionen innerhalb eines Dienstes oder in Kombination mit anderen aufrufen AWS-Services.

Jedes Beispiel enthält einen Link zum vollständigen Quellcode, in dem Sie Anweisungen zum Einrichten und Ausführen des Codes im Kontext finden.

Erste Schritte

Die folgenden Codebeispiele zeigen, wie Sie mit Amazon Redshift beginnen können.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.redshift.RedshiftClient; import software.amazon.awssdk.services.redshift.paginators.DescribeClustersIterable; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class HelloRedshift { public static void main(String[] args) { Region region = Region.US_EAST_1; RedshiftClient redshiftClient = RedshiftClient.builder() .region(region) .build(); listClustersPaginator(redshiftClient); } public static void listClustersPaginator(RedshiftClient redshiftClient) { DescribeClustersIterable clustersIterable = redshiftClient.describeClustersPaginator(); clustersIterable.stream() .flatMap(r -> r.clusters().stream()) .forEach(cluster -> System.out .println(" Cluster identifier: " + cluster.clusterIdentifier() + " status = " + cluster.clusterStatus())); } }
  • APIEinzelheiten finden Sie DescribeClustersin der AWS SDK for Java 2.x APIReferenz.

Grundlagen

Das folgende Codebeispiel zeigt, wie Sie Kernoperationen für Amazon Redshift mithilfe eines AWS SDK erlernen.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Führen Sie ein interaktives Szenario aus, in dem die Funktionen von Amazon Redshift demonstriert werden.

import com.example.redshift.User; import com.google.gson.Gson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.redshift.model.ClusterAlreadyExistsException; import software.amazon.awssdk.services.redshift.model.CreateClusterResponse; import software.amazon.awssdk.services.redshift.model.DeleteClusterResponse; import software.amazon.awssdk.services.redshift.model.ModifyClusterResponse; import software.amazon.awssdk.services.redshift.model.RedshiftException; import software.amazon.awssdk.services.redshiftdata.model.ExecuteStatementResponse; import software.amazon.awssdk.services.redshiftdata.model.RedshiftDataException; import java.util.Scanner; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient; import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest; import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * * This example requires an AWS Secrets Manager secret that contains the * database credentials. If you do not create a * secret that specifies user name and password, this example will not work. For details, see: * * https://docs.aws.amazon.com/secretsmanager/latest/userguide/integrating_how-services-use-secrets_RS.html * This Java example performs these tasks: * * 1. Prompts the user for a unique cluster ID or use the default value. * 2. Creates a Redshift cluster with the specified or default cluster Id value. * 3. Waits until the Redshift cluster is available for use. * 4. Lists all databases using a pagination API call. * 5. Creates a table named "Movies" with fields ID, title, and year. * 6. Inserts a specified number of records into the "Movies" table by reading the Movies JSON file. * 7. Prompts the user for a movie release year. * 8. Runs a SQL query to retrieve movies released in the specified year. * 9. Modifies the Redshift cluster. * 10. Prompts the user for confirmation to delete the Redshift cluster. * 11. If confirmed, deletes the specified Redshift cluster. */ public class RedshiftScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); private static final Logger logger = LoggerFactory.getLogger(RedshiftScenario.class); static RedshiftActions redshiftActions = new RedshiftActions(); public static void main(String[] args) throws Exception { final String usage = """ Usage: <jsonFilePath> <secretName>\s Where: jsonFilePath - The path to the Movies JSON file (you can locate that file in ../../../resources/sample_files/movies.json) secretName - The name of the secret that belongs to Secret Manager that stores the user name and password used in this scenario. """; if (args.length != 2) { logger.info(usage); return; } String jsonFilePath = args[0]; String secretName = args[1]; Scanner scanner = new Scanner(System.in); logger.info(DASHES); logger.info("Welcome to the Amazon Redshift SDK Basics scenario."); logger.info(""" This Java program demonstrates how to interact with Amazon Redshift by using the AWS SDK for Java (v2).\s Amazon Redshift is a fully managed, petabyte-scale data warehouse service hosted in the cloud. The program's primary functionalities include cluster creation, verification of cluster readiness,\s list databases, table creation, data population within the table, and execution of SQL statements. Furthermore, it demonstrates the process of querying data from the Movie table.\s Upon completion of the program, all AWS resources are cleaned up. """); logger.info("Lets get started..."); logger.info(""" First, we will retrieve the user name and password from Secrets Manager. Using Amazon Secrets Manager to store Redshift credentials provides several security benefits. It allows you to securely store and manage sensitive information, such as passwords, API keys, and database credentials, without embedding them directly in your application code. More information can be found here: https://docs.aws.amazon.com/secretsmanager/latest/userguide/integrating_how-services-use-secrets_RS.html """); Gson gson = new Gson(); User user = gson.fromJson(String.valueOf(getSecretValues(secretName)), User.class); waitForInputToContinue(scanner); logger.info(DASHES); try { runScenario(user, scanner, jsonFilePath); } catch (RuntimeException e) { e.printStackTrace(); } catch (Throwable e) { throw new RuntimeException(e); } } private static void runScenario(User user, Scanner scanner, String jsonFilePath) throws Throwable { String databaseName = "dev"; System.out.println(DASHES); logger.info("Create a Redshift Cluster"); logger.info("A Redshift cluster refers to the collection of computing resources and storage that work together to process and analyze large volumes of data."); logger.info("Enter a cluster id value or accept the default by hitting Enter (default is redshift-cluster-movies): "); String userClusterId = scanner.nextLine(); String clusterId = userClusterId.isEmpty() ? "redshift-cluster-movies" : userClusterId; try { CompletableFuture<CreateClusterResponse> future = redshiftActions.createClusterAsync(clusterId, user.getUserName(), user.getUserPassword()); CreateClusterResponse response = future.join(); logger.info("Cluster successfully created. Cluster Identifier {} ", response.cluster().clusterIdentifier()); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof ClusterAlreadyExistsException) { logger.info("The Cluster {} already exists. Moving on...", clusterId); } else { logger.info("An unexpected error occurred: " + rt.getMessage()); } } logger.info(DASHES); logger.info(DASHES); logger.info("Wait until {} is available.", clusterId); waitForInputToContinue(scanner); try { CompletableFuture<Void> future = redshiftActions.waitForClusterReadyAsync(clusterId); future.join(); logger.info("Cluster is ready!"); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftException redshiftEx) { logger.info("Redshift error occurred: Error message: {}, Error code {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.info("An unexpected error occurred: " + rt.getMessage()); } throw cause; } logger.info(DASHES); logger.info(DASHES); String databaseInfo = """ When you created $clusteridD, the dev database is created by default and used in this scenario.\s To create a custom database, you need to have a CREATEDB privilege.\s For more information, see the documentation here: https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_DATABASE.html. """.replace("$clusteridD", clusterId); logger.info(databaseInfo); waitForInputToContinue(scanner); logger.info(DASHES); logger.info(DASHES); logger.info("List databases in {} ",clusterId); waitForInputToContinue(scanner); try { CompletableFuture<Void> future = redshiftActions.listAllDatabasesAsync(clusterId, user.getUserName(), "dev"); future.join(); logger.info("Databases listed successfully."); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftDataException redshiftEx) { logger.error("Redshift Data error occurred: {} Error code: {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.error("An unexpected error occurred: {}", rt.getMessage()); } throw cause; } logger.info(DASHES); logger.info(DASHES); logger.info("Now you will create a table named Movies."); waitForInputToContinue(scanner); try { CompletableFuture<ExecuteStatementResponse> future = redshiftActions.createTableAsync(clusterId, databaseName, user.getUserName()); future.join(); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftDataException redshiftEx) { logger.info("Redshift Data error occurred: {} Error code: {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.info("An unexpected error occurred: {}", rt.getMessage()); } throw cause; } logger.info(DASHES); logger.info(DASHES); logger.info("Populate the Movies table using the Movies.json file."); logger.info("Specify the number of records you would like to add to the Movies Table."); logger.info("Please enter a value between 50 and 200."); int numRecords; do { logger.info("Enter a value: "); while (!scanner.hasNextInt()) { logger.info("Invalid input. Please enter a value between 50 and 200."); logger.info("Enter a year: "); scanner.next(); } numRecords = scanner.nextInt(); } while (numRecords < 50 || numRecords > 200); try { redshiftActions.popTableAsync(clusterId, databaseName, user.getUserName(), jsonFilePath, numRecords).join(); // Wait for the operation to complete } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftDataException redshiftEx) { logger.info("Redshift Data error occurred: {} Error code: {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.info("An unexpected error occurred: {}", rt.getMessage()); } throw cause; } waitForInputToContinue(scanner); logger.info(DASHES); logger.info(DASHES); logger.info("Query the Movies table by year. Enter a value between 2012-2014."); int movieYear; do { logger.info("Enter a year: "); while (!scanner.hasNextInt()) { logger.info("Invalid input. Please enter a valid year between 2012 and 2014."); logger.info("Enter a year: "); scanner.next(); } movieYear = scanner.nextInt(); scanner.nextLine(); } while (movieYear < 2012 || movieYear > 2014); String id; try { CompletableFuture<String> future = redshiftActions.queryMoviesByYearAsync(databaseName, user.getUserName(), movieYear, clusterId); id = future.join(); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftDataException redshiftEx) { logger.info("Redshift Data error occurred: {} Error code: {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.info("An unexpected error occurred: {}", rt.getMessage()); } throw cause; } logger.info("The identifier of the statement is " + id); waitForInputToContinue(scanner); try { CompletableFuture<Void> future = redshiftActions.checkStatementAsync(id); future.join(); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftDataException redshiftEx) { logger.info("Redshift Data error occurred: {} Error code: {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.info("An unexpected error occurred: {}", rt.getMessage()); } throw cause; } waitForInputToContinue(scanner); try { CompletableFuture<Void> future = redshiftActions.getResultsAsync(id); future.join(); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftDataException redshiftEx) { logger.info("Redshift Data error occurred: {} Error code: {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.info("An unexpected error occurred: {}", rt.getMessage()); } throw cause; } waitForInputToContinue(scanner); logger.info(DASHES); logger.info(DASHES); logger.info("Now you will modify the Redshift cluster."); waitForInputToContinue(scanner); try { CompletableFuture<ModifyClusterResponse> future = redshiftActions.modifyClusterAsync(clusterId);; future.join(); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftDataException redshiftEx) { logger.info("Redshift Data error occurred: {} Error code: {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.info("An unexpected error occurred: {}", rt.getMessage()); } throw cause; } waitForInputToContinue(scanner); logger.info(DASHES); logger.info(DASHES); logger.info("Would you like to delete the Amazon Redshift cluster? (y/n)"); String delAns = scanner.nextLine().trim(); if (delAns.equalsIgnoreCase("y")) { logger.info("You selected to delete {} ", clusterId); waitForInputToContinue(scanner); try { CompletableFuture<DeleteClusterResponse> future = redshiftActions.deleteRedshiftClusterAsync(clusterId);; future.join(); } catch (RuntimeException rt) { Throwable cause = rt.getCause(); if (cause instanceof RedshiftDataException redshiftEx) { logger.info("Redshift Data error occurred: {} Error code: {}", redshiftEx.getMessage(), redshiftEx.awsErrorDetails().errorCode()); } else { logger.info("An unexpected error occurred: {}", rt.getMessage()); } throw cause; } } else { logger.info("The {} was not deleted", clusterId); } logger.info(DASHES); logger.info(DASHES); logger.info("This concludes the Amazon Redshift SDK Basics scenario."); logger.info(DASHES); } private static SecretsManagerClient getSecretClient() { Region region = Region.US_EAST_1; return SecretsManagerClient.builder() .region(region) .build(); } private static void waitForInputToContinue(Scanner scanner) { while (true) { System.out.println(""); System.out.println("Enter 'c' followed by <ENTER> to continue:"); String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("c")) { System.out.println("Continuing with the program..."); System.out.println(""); break; } else { // Handle invalid input. System.out.println("Invalid input. Please try again."); } } } // Get the Amazon Redshift credentials from AWS Secrets Manager. private static String getSecretValues(String secretName) { SecretsManagerClient secretClient = getSecretClient(); GetSecretValueRequest valueRequest = GetSecretValueRequest.builder() .secretId(secretName) .build(); GetSecretValueResponse valueResponse = secretClient.getSecretValue(valueRequest); return valueResponse.secretString(); } }

Eine Wrapper-Klasse für Amazon Redshift Redshift-MethodenSDK.

public class RedshiftActions { private static final Logger logger = LoggerFactory.getLogger(RedshiftActions.class); private static RedshiftDataAsyncClient redshiftDataAsyncClient; private static RedshiftAsyncClient redshiftAsyncClient; private static RedshiftAsyncClient getAsyncClient() { if (redshiftAsyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryStrategy(RetryMode.STANDARD) .build(); redshiftAsyncClient = RedshiftAsyncClient.builder() .httpClient(httpClient) .overrideConfiguration(overrideConfig) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); } return redshiftAsyncClient; } private static RedshiftDataAsyncClient getAsyncDataClient() { if (redshiftDataAsyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryStrategy(RetryMode.STANDARD) .build(); redshiftDataAsyncClient = RedshiftDataAsyncClient.builder() .httpClient(httpClient) .overrideConfiguration(overrideConfig) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); } return redshiftDataAsyncClient; } /** * Creates a new Amazon Redshift cluster asynchronously. * @param clusterId the unique identifier for the cluster * @param username the username for the administrative user * @param userPassword the password for the administrative user * @return a CompletableFuture that represents the asynchronous operation of creating the cluster * @throws RuntimeException if the cluster creation fails */ public CompletableFuture<CreateClusterResponse> createClusterAsync(String clusterId, String username, String userPassword) { CreateClusterRequest clusterRequest = CreateClusterRequest.builder() .clusterIdentifier(clusterId) .masterUsername(username) .masterUserPassword(userPassword) .nodeType("ra3.4xlarge") .publiclyAccessible(true) .numberOfNodes(2) .build(); return getAsyncClient().createCluster(clusterRequest) .whenComplete((response, exception) -> { if (response != null) { logger.info("Created cluster "); } else { throw new RuntimeException("Failed to create cluster: " + exception.getMessage(), exception); } }); } /** * Waits asynchronously for the specified cluster to become available. * @param clusterId the identifier of the cluster to wait for * @return a {@link CompletableFuture} that completes when the cluster is ready */ public CompletableFuture<Void> waitForClusterReadyAsync(String clusterId) { DescribeClustersRequest clustersRequest = DescribeClustersRequest.builder() .clusterIdentifier(clusterId) .build(); logger.info("Waiting for cluster to become available. This may take a few minutes."); long startTime = System.currentTimeMillis(); // Recursive method to poll the cluster status. return checkClusterStatusAsync(clustersRequest, startTime); } private CompletableFuture<Void> checkClusterStatusAsync(DescribeClustersRequest clustersRequest, long startTime) { return getAsyncClient().describeClusters(clustersRequest) .thenCompose(clusterResponse -> { List<Cluster> clusterList = clusterResponse.clusters(); boolean clusterReady = false; for (Cluster cluster : clusterList) { if ("available".equals(cluster.clusterStatus())) { clusterReady = true; break; } } if (clusterReady) { logger.info(String.format("Cluster is available!")); return CompletableFuture.completedFuture(null); } else { long elapsedTimeMillis = System.currentTimeMillis() - startTime; long elapsedSeconds = elapsedTimeMillis / 1000; long minutes = elapsedSeconds / 60; long seconds = elapsedSeconds % 60; System.out.printf("\rElapsed Time: %02d:%02d - Waiting for cluster...", minutes, seconds); System.out.flush(); // Wait 1 second before the next status check return CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException("Error during sleep: " + e.getMessage(), e); } }).thenCompose(ignored -> checkClusterStatusAsync(clustersRequest, startTime)); } }).exceptionally(exception -> { throw new RuntimeException("Failed to get cluster status: " + exception.getMessage(), exception); }); } /** * Lists all databases asynchronously for the specified cluster, database user, and database. * @param clusterId the identifier of the cluster to list databases for * @param dbUser the database user to use for the list databases request * @param database the database to list databases for * @return a {@link CompletableFuture} that completes when the database listing is complete, or throws a {@link RuntimeException} if there was an error */ public CompletableFuture<Void> listAllDatabasesAsync(String clusterId, String dbUser, String database) { ListDatabasesRequest databasesRequest = ListDatabasesRequest.builder() .clusterIdentifier(clusterId) .dbUser(dbUser) .database(database) .build(); // Asynchronous paginator for listing databases. ListDatabasesPublisher databasesPaginator = getAsyncDataClient().listDatabasesPaginator(databasesRequest); CompletableFuture<Void> future = databasesPaginator.subscribe(response -> { response.databases().forEach(db -> { logger.info("The database name is {} ", db); }); }); // Return the future for asynchronous handling. return future.exceptionally(exception -> { throw new RuntimeException("Failed to list databases: " + exception.getMessage(), exception); }); } /** * Creates an asynchronous task to execute a SQL statement for creating a new table. * * @param clusterId the identifier of the Amazon Redshift cluster * @param databaseName the name of the database to create the table in * @param userName the username to use for the database connection * @return a {@link CompletableFuture} that completes with the result of the SQL statement execution * @throws RuntimeException if there is an error creating the table */ public CompletableFuture<ExecuteStatementResponse> createTableAsync(String clusterId, String databaseName, String userName) { ExecuteStatementRequest createTableRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .dbUser(userName) .database(databaseName) .sql("CREATE TABLE Movies (" + "id INT PRIMARY KEY, " + "title VARCHAR(100), " + "year INT)") .build(); return getAsyncDataClient().executeStatement(createTableRequest) .whenComplete((response, exception) -> { if (exception != null) { throw new RuntimeException("Error creating table: " + exception.getMessage(), exception); } else { logger.info("Table created: Movies"); } }); } /** * Asynchronously pops a table from a JSON file. * * @param clusterId the ID of the cluster * @param databaseName the name of the database * @param userName the username * @param fileName the name of the JSON file * @param number the number of records to process * @return a CompletableFuture that completes with the number of records added to the Movies table */ public CompletableFuture<Integer> popTableAsync(String clusterId, String databaseName, String userName, String fileName, int number) { return CompletableFuture.supplyAsync(() -> { try { JsonParser parser = new JsonFactory().createParser(new File(fileName)); JsonNode rootNode = new ObjectMapper().readTree(parser); Iterator<JsonNode> iter = rootNode.iterator(); return iter; } catch (IOException e) { throw new RuntimeException("Failed to read or parse JSON file: " + e.getMessage(), e); } }).thenCompose(iter -> processNodesAsync(clusterId, databaseName, userName, iter, number)) .whenComplete((result, exception) -> { if (exception != null) { logger.info("Error {} ", exception.getMessage()); } else { logger.info("{} records were added to the Movies table." , result); } }); } private CompletableFuture<Integer> processNodesAsync(String clusterId, String databaseName, String userName, Iterator<JsonNode> iter, int number) { return CompletableFuture.supplyAsync(() -> { int t = 0; try { while (iter.hasNext()) { if (t == number) break; JsonNode currentNode = iter.next(); int year = currentNode.get("year").asInt(); String title = currentNode.get("title").asText(); // Use SqlParameter to avoid SQL injection. List<SqlParameter> parameterList = new ArrayList<>(); String sqlStatement = "INSERT INTO Movies VALUES( :id , :title, :year);"; SqlParameter idParam = SqlParameter.builder() .name("id") .value(String.valueOf(t)) .build(); SqlParameter titleParam = SqlParameter.builder() .name("title") .value(title) .build(); SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); parameterList.add(idParam); parameterList.add(titleParam); parameterList.add(yearParam); ExecuteStatementRequest insertStatementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .sql(sqlStatement) .database(databaseName) .dbUser(userName) .parameters(parameterList) .build(); getAsyncDataClient().executeStatement(insertStatementRequest); logger.info("Inserted: " + title + " (" + year + ")"); t++; } } catch (RedshiftDataException e) { throw new RuntimeException("Error inserting data: " + e.getMessage(), e); } return t; }); } /** * Checks the status of an SQL statement asynchronously and handles the completion of the statement. * * @param sqlId the ID of the SQL statement to check * @return a {@link CompletableFuture} that completes when the SQL statement's status is either "FINISHED" or "FAILED" */ public CompletableFuture<Void> checkStatementAsync(String sqlId) { DescribeStatementRequest statementRequest = DescribeStatementRequest.builder() .id(sqlId) .build(); return getAsyncDataClient().describeStatement(statementRequest) .thenCompose(response -> { String status = response.statusAsString(); logger.info("... Status: {} ", status); if ("FAILED".equals(status)) { throw new RuntimeException("The Query Failed. Ending program"); } else if ("FINISHED".equals(status)) { return CompletableFuture.completedFuture(null); } else { // Sleep for 1 second and recheck status return CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException("Error during sleep: " + e.getMessage(), e); } }).thenCompose(ignore -> checkStatementAsync(sqlId)); // Recursively call until status is FINISHED or FAILED } }).whenComplete((result, exception) -> { if (exception != null) { // Handle exceptions logger.info("Error: {} ", exception.getMessage()); } else { logger.info("The statement is finished!"); } }); } /** * Asynchronously retrieves the results of a statement execution. * * @param statementId the ID of the statement for which to retrieve the results * @return a {@link CompletableFuture} that completes when the statement result has been processed */ public CompletableFuture<Void> getResultsAsync(String statementId) { GetStatementResultRequest resultRequest = GetStatementResultRequest.builder() .id(statementId) .build(); return getAsyncDataClient().getStatementResult(resultRequest) .handle((response, exception) -> { if (exception != null) { logger.info("Error getting statement result {} ", exception.getMessage()); throw new RuntimeException("Error getting statement result: " + exception.getMessage(), exception); } // Extract and print the field values using streams if the response is valid. response.records().stream() .flatMap(List::stream) .map(Field::stringValue) .filter(value -> value != null) .forEach(value -> System.out.println("The Movie title field is " + value)); return response; }).thenAccept(response -> { // Optionally add more logic here if needed after handling the response }); } /** * Asynchronously queries movies by a given year from a Redshift database. * * @param database the name of the database to query * @param dbUser the user to connect to the database with * @param year the year to filter the movies by * @param clusterId the identifier of the Redshift cluster to connect to * @return a {@link CompletableFuture} containing the response ID of the executed SQL statement */ public CompletableFuture<String> queryMoviesByYearAsync(String database, String dbUser, int year, String clusterId) { String sqlStatement = "SELECT * FROM Movies WHERE year = :year"; SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); ExecuteStatementRequest statementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .database(database) .dbUser(dbUser) .parameters(yearParam) .sql(sqlStatement) .build(); return CompletableFuture.supplyAsync(() -> { try { ExecuteStatementResponse response = getAsyncDataClient().executeStatement(statementRequest).join(); // Use join() to wait for the result return response.id(); } catch (RedshiftDataException e) { throw new RuntimeException("Error executing statement: " + e.getMessage(), e); } }).exceptionally(exception -> { logger.info("Error: {}", exception.getMessage()); return ""; }); } /** * Modifies an Amazon Redshift cluster asynchronously. * * @param clusterId the identifier of the cluster to be modified * @return a {@link CompletableFuture} that completes when the cluster modification is complete */ public CompletableFuture<ModifyClusterResponse> modifyClusterAsync(String clusterId) { ModifyClusterRequest modifyClusterRequest = ModifyClusterRequest.builder() .clusterIdentifier(clusterId) .preferredMaintenanceWindow("wed:07:30-wed:08:00") .build(); return getAsyncClient().modifyCluster(modifyClusterRequest) .whenComplete((clusterResponse, exception) -> { if (exception != null) { if (exception.getCause() instanceof RedshiftException) { logger.info("Error: {} ", exception.getMessage()); } else { logger.info("Unexpected error: {} ", exception.getMessage()); } } else { logger.info("The modified cluster was successfully modified and has " + clusterResponse.cluster().preferredMaintenanceWindow() + " as the maintenance window"); } }); } /** * Deletes a Redshift cluster asynchronously. * * @param clusterId the identifier of the Redshift cluster to be deleted * @return a {@link CompletableFuture} that represents the asynchronous operation of deleting the Redshift cluster */ public CompletableFuture<DeleteClusterResponse> deleteRedshiftClusterAsync(String clusterId) { DeleteClusterRequest deleteClusterRequest = DeleteClusterRequest.builder() .clusterIdentifier(clusterId) .skipFinalClusterSnapshot(true) .build(); return getAsyncClient().deleteCluster(deleteClusterRequest) .whenComplete((response, exception) -> { if (exception != null) { // Handle exceptions if (exception.getCause() instanceof RedshiftException) { logger.info("Error: {}", exception.getMessage()); } else { logger.info("Unexpected error: {}", exception.getMessage()); } } else { // Handle successful response logger.info("The status is {}", response.cluster().clusterStatus()); } }); } }

Aktionen

Das folgende Codebeispiel zeigt die VerwendungCreateCluster.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Erstellen Sie den -Cluster.

/** * Creates a new Amazon Redshift cluster asynchronously. * @param clusterId the unique identifier for the cluster * @param username the username for the administrative user * @param userPassword the password for the administrative user * @return a CompletableFuture that represents the asynchronous operation of creating the cluster * @throws RuntimeException if the cluster creation fails */ public CompletableFuture<CreateClusterResponse> createClusterAsync(String clusterId, String username, String userPassword) { CreateClusterRequest clusterRequest = CreateClusterRequest.builder() .clusterIdentifier(clusterId) .masterUsername(username) .masterUserPassword(userPassword) .nodeType("ra3.4xlarge") .publiclyAccessible(true) .numberOfNodes(2) .build(); return getAsyncClient().createCluster(clusterRequest) .whenComplete((response, exception) -> { if (response != null) { logger.info("Created cluster "); } else { throw new RuntimeException("Failed to create cluster: " + exception.getMessage(), exception); } }); }
  • APIEinzelheiten finden Sie CreateClusterin der AWS SDK for Java 2.x APIReferenz.

Das folgende Codebeispiel zeigt die VerwendungDeleteCluster.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Löschen Sie den Cluster.

/** * Deletes a Redshift cluster asynchronously. * * @param clusterId the identifier of the Redshift cluster to be deleted * @return a {@link CompletableFuture} that represents the asynchronous operation of deleting the Redshift cluster */ public CompletableFuture<DeleteClusterResponse> deleteRedshiftClusterAsync(String clusterId) { DeleteClusterRequest deleteClusterRequest = DeleteClusterRequest.builder() .clusterIdentifier(clusterId) .skipFinalClusterSnapshot(true) .build(); return getAsyncClient().deleteCluster(deleteClusterRequest) .whenComplete((response, exception) -> { if (exception != null) { // Handle exceptions if (exception.getCause() instanceof RedshiftException) { logger.info("Error: {}", exception.getMessage()); } else { logger.info("Unexpected error: {}", exception.getMessage()); } } else { // Handle successful response logger.info("The status is {}", response.cluster().clusterStatus()); } }); }
  • APIEinzelheiten finden Sie DeleteClusterin der AWS SDK for Java 2.x APIReferenz.

Das folgende Codebeispiel zeigt die VerwendungDescribeClusters.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Beschreiben Sie den Cluster.

/** * Waits asynchronously for the specified cluster to become available. * @param clusterId the identifier of the cluster to wait for * @return a {@link CompletableFuture} that completes when the cluster is ready */ public CompletableFuture<Void> waitForClusterReadyAsync(String clusterId) { DescribeClustersRequest clustersRequest = DescribeClustersRequest.builder() .clusterIdentifier(clusterId) .build(); logger.info("Waiting for cluster to become available. This may take a few minutes."); long startTime = System.currentTimeMillis(); // Recursive method to poll the cluster status. return checkClusterStatusAsync(clustersRequest, startTime); } private CompletableFuture<Void> checkClusterStatusAsync(DescribeClustersRequest clustersRequest, long startTime) { return getAsyncClient().describeClusters(clustersRequest) .thenCompose(clusterResponse -> { List<Cluster> clusterList = clusterResponse.clusters(); boolean clusterReady = false; for (Cluster cluster : clusterList) { if ("available".equals(cluster.clusterStatus())) { clusterReady = true; break; } } if (clusterReady) { logger.info(String.format("Cluster is available!")); return CompletableFuture.completedFuture(null); } else { long elapsedTimeMillis = System.currentTimeMillis() - startTime; long elapsedSeconds = elapsedTimeMillis / 1000; long minutes = elapsedSeconds / 60; long seconds = elapsedSeconds % 60; System.out.printf("\rElapsed Time: %02d:%02d - Waiting for cluster...", minutes, seconds); System.out.flush(); // Wait 1 second before the next status check return CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException("Error during sleep: " + e.getMessage(), e); } }).thenCompose(ignored -> checkClusterStatusAsync(clustersRequest, startTime)); } }).exceptionally(exception -> { throw new RuntimeException("Failed to get cluster status: " + exception.getMessage(), exception); }); }
  • APIEinzelheiten finden Sie DescribeClustersunter AWS SDK for Java 2.x APIReferenz.

Das folgende Codebeispiel zeigt die VerwendungDescribeStatement.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

/** * Checks the status of an SQL statement asynchronously and handles the completion of the statement. * * @param sqlId the ID of the SQL statement to check * @return a {@link CompletableFuture} that completes when the SQL statement's status is either "FINISHED" or "FAILED" */ public CompletableFuture<Void> checkStatementAsync(String sqlId) { DescribeStatementRequest statementRequest = DescribeStatementRequest.builder() .id(sqlId) .build(); return getAsyncDataClient().describeStatement(statementRequest) .thenCompose(response -> { String status = response.statusAsString(); logger.info("... Status: {} ", status); if ("FAILED".equals(status)) { throw new RuntimeException("The Query Failed. Ending program"); } else if ("FINISHED".equals(status)) { return CompletableFuture.completedFuture(null); } else { // Sleep for 1 second and recheck status return CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException("Error during sleep: " + e.getMessage(), e); } }).thenCompose(ignore -> checkStatementAsync(sqlId)); // Recursively call until status is FINISHED or FAILED } }).whenComplete((result, exception) -> { if (exception != null) { // Handle exceptions logger.info("Error: {} ", exception.getMessage()); } else { logger.info("The statement is finished!"); } }); }
  • APIEinzelheiten finden Sie DescribeStatementin der AWS SDK for Java 2.x APIReferenz.

Das folgende Codebeispiel zeigt die VerwendungExecuteStatement.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Führt eine SQL Anweisung aus, um eine Datenbanktabelle zu erstellen.

/** * Creates an asynchronous task to execute a SQL statement for creating a new table. * * @param clusterId the identifier of the Amazon Redshift cluster * @param databaseName the name of the database to create the table in * @param userName the username to use for the database connection * @return a {@link CompletableFuture} that completes with the result of the SQL statement execution * @throws RuntimeException if there is an error creating the table */ public CompletableFuture<ExecuteStatementResponse> createTableAsync(String clusterId, String databaseName, String userName) { ExecuteStatementRequest createTableRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .dbUser(userName) .database(databaseName) .sql("CREATE TABLE Movies (" + "id INT PRIMARY KEY, " + "title VARCHAR(100), " + "year INT)") .build(); return getAsyncDataClient().executeStatement(createTableRequest) .whenComplete((response, exception) -> { if (exception != null) { throw new RuntimeException("Error creating table: " + exception.getMessage(), exception); } else { logger.info("Table created: Movies"); } }); }

Führt eine SQL Anweisung aus, um Daten in eine Datenbanktabelle einzufügen.

/** * Asynchronously pops a table from a JSON file. * * @param clusterId the ID of the cluster * @param databaseName the name of the database * @param userName the username * @param fileName the name of the JSON file * @param number the number of records to process * @return a CompletableFuture that completes with the number of records added to the Movies table */ public CompletableFuture<Integer> popTableAsync(String clusterId, String databaseName, String userName, String fileName, int number) { return CompletableFuture.supplyAsync(() -> { try { JsonParser parser = new JsonFactory().createParser(new File(fileName)); JsonNode rootNode = new ObjectMapper().readTree(parser); Iterator<JsonNode> iter = rootNode.iterator(); return iter; } catch (IOException e) { throw new RuntimeException("Failed to read or parse JSON file: " + e.getMessage(), e); } }).thenCompose(iter -> processNodesAsync(clusterId, databaseName, userName, iter, number)) .whenComplete((result, exception) -> { if (exception != null) { logger.info("Error {} ", exception.getMessage()); } else { logger.info("{} records were added to the Movies table." , result); } }); } private CompletableFuture<Integer> processNodesAsync(String clusterId, String databaseName, String userName, Iterator<JsonNode> iter, int number) { return CompletableFuture.supplyAsync(() -> { int t = 0; try { while (iter.hasNext()) { if (t == number) break; JsonNode currentNode = iter.next(); int year = currentNode.get("year").asInt(); String title = currentNode.get("title").asText(); // Use SqlParameter to avoid SQL injection. List<SqlParameter> parameterList = new ArrayList<>(); String sqlStatement = "INSERT INTO Movies VALUES( :id , :title, :year);"; SqlParameter idParam = SqlParameter.builder() .name("id") .value(String.valueOf(t)) .build(); SqlParameter titleParam = SqlParameter.builder() .name("title") .value(title) .build(); SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); parameterList.add(idParam); parameterList.add(titleParam); parameterList.add(yearParam); ExecuteStatementRequest insertStatementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .sql(sqlStatement) .database(databaseName) .dbUser(userName) .parameters(parameterList) .build(); getAsyncDataClient().executeStatement(insertStatementRequest); logger.info("Inserted: " + title + " (" + year + ")"); t++; } } catch (RedshiftDataException e) { throw new RuntimeException("Error inserting data: " + e.getMessage(), e); } return t; }); }

Führt eine SQL Anweisung aus, um eine Datenbanktabelle abzufragen.

/** * Asynchronously queries movies by a given year from a Redshift database. * * @param database the name of the database to query * @param dbUser the user to connect to the database with * @param year the year to filter the movies by * @param clusterId the identifier of the Redshift cluster to connect to * @return a {@link CompletableFuture} containing the response ID of the executed SQL statement */ public CompletableFuture<String> queryMoviesByYearAsync(String database, String dbUser, int year, String clusterId) { String sqlStatement = "SELECT * FROM Movies WHERE year = :year"; SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); ExecuteStatementRequest statementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .database(database) .dbUser(dbUser) .parameters(yearParam) .sql(sqlStatement) .build(); return CompletableFuture.supplyAsync(() -> { try { ExecuteStatementResponse response = getAsyncDataClient().executeStatement(statementRequest).join(); // Use join() to wait for the result return response.id(); } catch (RedshiftDataException e) { throw new RuntimeException("Error executing statement: " + e.getMessage(), e); } }).exceptionally(exception -> { logger.info("Error: {}", exception.getMessage()); return ""; }); }

Das folgende Codebeispiel zeigt die VerwendungGetStatementResult.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Überprüfe das Ergebnis der Abrechnung.

/** * Asynchronously retrieves the results of a statement execution. * * @param statementId the ID of the statement for which to retrieve the results * @return a {@link CompletableFuture} that completes when the statement result has been processed */ public CompletableFuture<Void> getResultsAsync(String statementId) { GetStatementResultRequest resultRequest = GetStatementResultRequest.builder() .id(statementId) .build(); return getAsyncDataClient().getStatementResult(resultRequest) .handle((response, exception) -> { if (exception != null) { logger.info("Error getting statement result {} ", exception.getMessage()); throw new RuntimeException("Error getting statement result: " + exception.getMessage(), exception); } // Extract and print the field values using streams if the response is valid. response.records().stream() .flatMap(List::stream) .map(Field::stringValue) .filter(value -> value != null) .forEach(value -> System.out.println("The Movie title field is " + value)); return response; }).thenAccept(response -> { // Optionally add more logic here if needed after handling the response }); }

Das folgende Codebeispiel zeigt die VerwendungListDatabases.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

/** * Lists all databases asynchronously for the specified cluster, database user, and database. * @param clusterId the identifier of the cluster to list databases for * @param dbUser the database user to use for the list databases request * @param database the database to list databases for * @return a {@link CompletableFuture} that completes when the database listing is complete, or throws a {@link RuntimeException} if there was an error */ public CompletableFuture<Void> listAllDatabasesAsync(String clusterId, String dbUser, String database) { ListDatabasesRequest databasesRequest = ListDatabasesRequest.builder() .clusterIdentifier(clusterId) .dbUser(dbUser) .database(database) .build(); // Asynchronous paginator for listing databases. ListDatabasesPublisher databasesPaginator = getAsyncDataClient().listDatabasesPaginator(databasesRequest); CompletableFuture<Void> future = databasesPaginator.subscribe(response -> { response.databases().forEach(db -> { logger.info("The database name is {} ", db); }); }); // Return the future for asynchronous handling. return future.exceptionally(exception -> { throw new RuntimeException("Failed to list databases: " + exception.getMessage(), exception); }); }
  • APIEinzelheiten finden Sie ListDatabasesin der AWS SDK for Java 2.x APIReferenz.

Das folgende Codebeispiel zeigt die VerwendungModifyCluster.

SDKfür Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Sie sehen das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel-Repository einrichten und ausführen.

Modifizieren Sie einen Cluster.

/** * Modifies an Amazon Redshift cluster asynchronously. * * @param clusterId the identifier of the cluster to be modified * @return a {@link CompletableFuture} that completes when the cluster modification is complete */ public CompletableFuture<ModifyClusterResponse> modifyClusterAsync(String clusterId) { ModifyClusterRequest modifyClusterRequest = ModifyClusterRequest.builder() .clusterIdentifier(clusterId) .preferredMaintenanceWindow("wed:07:30-wed:08:00") .build(); return getAsyncClient().modifyCluster(modifyClusterRequest) .whenComplete((clusterResponse, exception) -> { if (exception != null) { if (exception.getCause() instanceof RedshiftException) { logger.info("Error: {} ", exception.getMessage()); } else { logger.info("Unexpected error: {} ", exception.getMessage()); } } else { logger.info("The modified cluster was successfully modified and has " + clusterResponse.cluster().preferredMaintenanceWindow() + " as the maintenance window"); } }); }
  • APIEinzelheiten finden Sie ModifyClusterunter AWS SDK for Java 2.x APIReferenz.

Szenarien

Das folgende Codebeispiel zeigt, wie Sie eine Webanwendung erstellen, die Arbeitsaufgaben mithilfe einer Amazon Redshift Redshift-Datenbank verfolgt und darüber berichtet.

SDKfür Java 2.x

Zeigt, wie eine Webanwendung erstellt wird, die in einer Amazon-Redshift-Datenbank gespeicherte Arbeitselemente verfolgt und darüber berichtet.

Den vollständigen Quellcode und Anweisungen zum Einrichten eines Spring RESTAPI, der Amazon Redshift Redshift-Daten abfragt und von einer React-Anwendung verwendet werden kann, finden Sie im vollständigen Beispiel unter GitHub.

In diesem Beispiel verwendete Dienste
  • Amazon-Redshift

  • Amazon SES