AWS SDK와 ExecuteStatement 함께 사용 - AWS SDK 코드 예제

Doc AWS SDK 예제 GitHub 리포지토리에서 더 많은 SDK 예제를 사용할 수 있습니다. AWS

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS SDK와 ExecuteStatement 함께 사용

다음 코드 예시에서는 ExecuteStatement을 사용하는 방법을 보여 줍니다.

작업 예제는 대규모 프로그램에서 발췌한 코드이며 컨텍스트에 맞춰 실행해야 합니다. 다음 코드 예제에서 컨텍스트 내 이 작업을 확인할 수 있습니다.

Java
SDK for Java 2.x
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

SQL 스테이트먼트를 실행하여 데이터베이스 테이블을 만듭니다.

/** * Creates an asynchronous task to execute a SQL statement for creating a new table. * * @param clusterId the identifier of the Amazon Redshift cluster * @param databaseName the name of the database to create the table in * @param userName the username to use for the database connection * @return a {@link CompletableFuture} that completes with the result of the SQL statement execution * @throws RuntimeException if there is an error creating the table */ public CompletableFuture<ExecuteStatementResponse> createTableAsync(String clusterId, String databaseName, String userName) { ExecuteStatementRequest createTableRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .dbUser(userName) .database(databaseName) .sql("CREATE TABLE Movies (" + "id INT PRIMARY KEY, " + "title VARCHAR(100), " + "year INT)") .build(); return getAsyncDataClient().executeStatement(createTableRequest) .whenComplete((response, exception) -> { if (exception != null) { throw new RuntimeException("Error creating table: " + exception.getMessage(), exception); } else { logger.info("Table created: Movies"); } }); }

SQL 스테이트먼트를 실행하여 데이터베이스 테이블에 데이터를 삽입합니다.

/** * Asynchronously pops a table from a JSON file. * * @param clusterId the ID of the cluster * @param databaseName the name of the database * @param userName the username * @param fileName the name of the JSON file * @param number the number of records to process * @return a CompletableFuture that completes with the number of records added to the Movies table */ public CompletableFuture<Integer> popTableAsync(String clusterId, String databaseName, String userName, String fileName, int number) { return CompletableFuture.supplyAsync(() -> { try { JsonParser parser = new JsonFactory().createParser(new File(fileName)); JsonNode rootNode = new ObjectMapper().readTree(parser); Iterator<JsonNode> iter = rootNode.iterator(); return iter; } catch (IOException e) { throw new RuntimeException("Failed to read or parse JSON file: " + e.getMessage(), e); } }).thenCompose(iter -> processNodesAsync(clusterId, databaseName, userName, iter, number)) .whenComplete((result, exception) -> { if (exception != null) { logger.info("Error {} ", exception.getMessage()); } else { logger.info("{} records were added to the Movies table." , result); } }); } private CompletableFuture<Integer> processNodesAsync(String clusterId, String databaseName, String userName, Iterator<JsonNode> iter, int number) { return CompletableFuture.supplyAsync(() -> { int t = 0; try { while (iter.hasNext()) { if (t == number) break; JsonNode currentNode = iter.next(); int year = currentNode.get("year").asInt(); String title = currentNode.get("title").asText(); // Use SqlParameter to avoid SQL injection. List<SqlParameter> parameterList = new ArrayList<>(); String sqlStatement = "INSERT INTO Movies VALUES( :id , :title, :year);"; SqlParameter idParam = SqlParameter.builder() .name("id") .value(String.valueOf(t)) .build(); SqlParameter titleParam = SqlParameter.builder() .name("title") .value(title) .build(); SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); parameterList.add(idParam); parameterList.add(titleParam); parameterList.add(yearParam); ExecuteStatementRequest insertStatementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .sql(sqlStatement) .database(databaseName) .dbUser(userName) .parameters(parameterList) .build(); getAsyncDataClient().executeStatement(insertStatementRequest); logger.info("Inserted: " + title + " (" + year + ")"); t++; } } catch (RedshiftDataException e) { throw new RuntimeException("Error inserting data: " + e.getMessage(), e); } return t; }); }

SQL 스테이트먼트를 실행하여 데이터베이스 테이블을 쿼리합니다.

/** * Asynchronously queries movies by a given year from a Redshift database. * * @param database the name of the database to query * @param dbUser the user to connect to the database with * @param year the year to filter the movies by * @param clusterId the identifier of the Redshift cluster to connect to * @return a {@link CompletableFuture} containing the response ID of the executed SQL statement */ public CompletableFuture<String> queryMoviesByYearAsync(String database, String dbUser, int year, String clusterId) { String sqlStatement = "SELECT * FROM Movies WHERE year = :year"; SqlParameter yearParam = SqlParameter.builder() .name("year") .value(String.valueOf(year)) .build(); ExecuteStatementRequest statementRequest = ExecuteStatementRequest.builder() .clusterIdentifier(clusterId) .database(database) .dbUser(dbUser) .parameters(yearParam) .sql(sqlStatement) .build(); return CompletableFuture.supplyAsync(() -> { try { ExecuteStatementResponse response = getAsyncDataClient().executeStatement(statementRequest).join(); // Use join() to wait for the result return response.id(); } catch (RedshiftDataException e) { throw new RuntimeException("Error executing statement: " + e.getMessage(), e); } }).exceptionally(exception -> { logger.info("Error: {}", exception.getMessage()); return ""; }); }
  • API 세부 정보는 AWS SDK for Java 2.x API 참조ExecuteStatement를 참조하세요.