与 AWS SDK或SelectObjectContent一起使用 CLI - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

与 AWS SDK或SelectObjectContent一起使用 CLI

以下代码示例演示如何使用 SelectObjectContent

CLI
AWS CLI

根据SQL语句筛选 Amazon S3 对象的内容

以下select-object-content示例my-data-file.csv使用指定的SQL语句筛选对象并将输出发送到文件。

aws s3api select-object-content \ --bucket my-bucket \ --key my-data-file.csv \ --expression "select * from s3object limit 100" \ --expression-type 'SQL' \ --input-serialization '{"CSV": {}, "CompressionType": "NONE"}' \ --output-serialization '{"CSV": {}}' "output.csv"

此命令不生成任何输出。

Java
SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

以下示例显示了使用JSON对象的查询。完整的示例还显示了CSV对象的用法。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CSVInput; import software.amazon.awssdk.services.s3.model.CSVOutput; import software.amazon.awssdk.services.s3.model.CompressionType; import software.amazon.awssdk.services.s3.model.ExpressionType; import software.amazon.awssdk.services.s3.model.FileHeaderInfo; import software.amazon.awssdk.services.s3.model.InputSerialization; import software.amazon.awssdk.services.s3.model.JSONInput; import software.amazon.awssdk.services.s3.model.JSONOutput; import software.amazon.awssdk.services.s3.model.JSONType; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.OutputSerialization; import software.amazon.awssdk.services.s3.model.Progress; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest; import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler; import software.amazon.awssdk.services.s3.model.Stats; import java.io.IOException; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; public class SelectObjectContentExample { static final Logger logger = LoggerFactory.getLogger(SelectObjectContentExample.class); static final String BUCKET_NAME = "amzn-s3-demo-bucket-" + UUID.randomUUID(); static final S3AsyncClient s3AsyncClient = S3AsyncClient.create(); static String FILE_CSV = "csv"; static String FILE_JSON = "json"; static String URL_CSV = "https://raw.githubusercontent.com/mledoze/countries/master/dist/countries.csv"; static String URL_JSON = "https://raw.githubusercontent.com/mledoze/countries/master/dist/countries.json"; public static void main(String[] args) { SelectObjectContentExample selectObjectContentExample = new SelectObjectContentExample(); try { SelectObjectContentExample.setUp(); selectObjectContentExample.runSelectObjectContentMethodForJSON(); selectObjectContentExample.runSelectObjectContentMethodForCSV(); } catch (SdkException e) { logger.error(e.getMessage(), e); System.exit(1); } finally { SelectObjectContentExample.tearDown(); } } EventStreamInfo runSelectObjectContentMethodForJSON() { // Set up request parameters. final String queryExpression = "select * from s3object[*][*] c where c.area < 350000"; final String fileType = FILE_JSON; InputSerialization inputSerialization = InputSerialization.builder() .json(JSONInput.builder().type(JSONType.DOCUMENT).build()) .compressionType(CompressionType.NONE) .build(); OutputSerialization outputSerialization = OutputSerialization.builder() .json(JSONOutput.builder().recordDelimiter(null).build()) .build(); // Build the SelectObjectContentRequest. SelectObjectContentRequest select = SelectObjectContentRequest.builder() .bucket(BUCKET_NAME) .key(FILE_JSON) .expression(queryExpression) .expressionType(ExpressionType.SQL) .inputSerialization(inputSerialization) .outputSerialization(outputSerialization) .build(); EventStreamInfo eventStreamInfo = new EventStreamInfo(); // Call the selectObjectContent method with the request and a response handler. // Supply an EventStreamInfo object to the response handler to gather records and information from the response. s3AsyncClient.selectObjectContent(select, buildResponseHandler(eventStreamInfo)).join(); // Log out information gathered while processing the response stream. long recordCount = eventStreamInfo.getRecords().stream().mapToInt(record -> record.split("\n").length ).sum(); logger.info("Total records {}: {}", fileType, recordCount); logger.info("Visitor onRecords for fileType {} called {} times", fileType, eventStreamInfo.getCountOnRecordsCalled()); logger.info("Visitor onStats for fileType {}, {}", fileType, eventStreamInfo.getStats()); logger.info("Visitor onContinuations for fileType {}, {}", fileType, eventStreamInfo.getCountContinuationEvents()); return eventStreamInfo; } static SelectObjectContentResponseHandler buildResponseHandler(EventStreamInfo eventStreamInfo) { // Use a Visitor to process the response stream. This visitor logs information and gathers details while processing. final SelectObjectContentResponseHandler.Visitor visitor = SelectObjectContentResponseHandler.Visitor.builder() .onRecords(r -> { logger.info("Record event received."); eventStreamInfo.addRecord(r.payload().asUtf8String()); eventStreamInfo.incrementOnRecordsCalled(); }) .onCont(ce -> { logger.info("Continuation event received."); eventStreamInfo.incrementContinuationEvents(); }) .onProgress(pe -> { Progress progress = pe.details(); logger.info("Progress event received:\n bytesScanned:{}\nbytesProcessed: {}\nbytesReturned:{}", progress.bytesScanned(), progress.bytesProcessed(), progress.bytesReturned()); }) .onEnd(ee -> logger.info("End event received.")) .onStats(se -> { logger.info("Stats event received."); eventStreamInfo.addStats(se.details()); }) .build(); // Build the SelectObjectContentResponseHandler with the visitor that processes the stream. return SelectObjectContentResponseHandler.builder() .subscriber(visitor).build(); } // The EventStreamInfo class is used to store information gathered while processing the response stream. static class EventStreamInfo { private final List<String> records = new ArrayList<>(); private Integer countOnRecordsCalled = 0; private Integer countContinuationEvents = 0; private Stats stats; void incrementOnRecordsCalled() { countOnRecordsCalled++; } void incrementContinuationEvents() { countContinuationEvents++; } void addRecord(String record) { records.add(record); } void addStats(Stats stats) { this.stats = stats; } public List<String> getRecords() { return records; } public Integer getCountOnRecordsCalled() { return countOnRecordsCalled; } public Integer getCountContinuationEvents() { return countContinuationEvents; } public Stats getStats() { return stats; } }
  • 有关API详细信息,请参阅 “AWS SDK for Java 2.x API参考 SelectObjectContent” 中的。