Executar UNLOAD consulta - Amazon Timestream

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Executar UNLOAD consulta

Os exemplos de código a seguir chamam uma UNLOAD consulta. Para obter mais informações sobre o UNLOAD, consulte Usando UNLOAD para exportar os resultados da consulta para o S3 do Timestream for LiveAnalytics. Para obter exemplos de UNLOAD consultas, consulteExemplo de caso de uso UNLOAD do Timestream for LiveAnalytics.

Crie e execute uma UNLOAD consulta

Java
// When you have a SELECT like below String QUERY_1 = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + DATABASE_NAME + "." + UNLOAD_TABLE_NAME + " WHERE time BETWEEN ago(2d) AND now()"; // You can construct UNLOAD query as follows UnloadQuery unloadQuery = UnloadQuery.builder() .selectQuery(QUERY_1) .bucketName("timestream-sample-<region>-<accountId>") .resultsPrefix("without_partition") .format(CSV) .compression(UnloadQuery.Compression.GZIP) .build(); QueryResult unloadResult = runQuery(unloadQuery.getUnloadQuery()); // Run UNLOAD query (Similar to how you run SELECT query) // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination private QueryResult runQuery(String queryString) { QueryResult queryResult = null; try { QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(queryString); queryResult = queryClient.query(queryRequest); while (true) { parseQueryResult(queryResult); if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } return queryResult; } // Utility that helps to construct UNLOAD query @Builder static class UnloadQuery { private String selectQuery; private String bucketName; private String resultsPrefix; private Format format; private Compression compression; private EncryptionType encryptionType; private List<String> partitionColumns; private String kmsKey; private Character csvFieldDelimiter; private Character csvEscapeCharacter; public String getUnloadQuery() { String destination = constructDestination(); String withClause = constructOptionalParameters(); return String.format("UNLOAD (%s) TO '%s' %s", selectQuery, destination, withClause); } private String constructDestination() { return "s3://" + this.bucketName + "/" + this.resultsPrefix + "/"; } private String constructOptionalParameters() { boolean isOptionalParametersPresent = Objects.nonNull(format) || Objects.nonNull(compression) || Objects.nonNull(encryptionType) || Objects.nonNull(partitionColumns) || Objects.nonNull(kmsKey) || Objects.nonNull(csvFieldDelimiter) || Objects.nonNull(csvEscapeCharacter); String withClause = ""; if (isOptionalParametersPresent) { StringJoiner optionalParameters = new StringJoiner(","); if (Objects.nonNull(format)) { optionalParameters.add("format = '" + format + "'"); } if (Objects.nonNull(compression)) { optionalParameters.add("compression = '" + compression + "'"); } if (Objects.nonNull(encryptionType)) { optionalParameters.add("encryption = '" + encryptionType + "'"); } if (Objects.nonNull(kmsKey)) { optionalParameters.add("kms_key = '" + kmsKey + "'"); } if (Objects.nonNull(csvFieldDelimiter)) { optionalParameters.add("field_delimiter = '" + csvFieldDelimiter + "'"); } if (Objects.nonNull(csvEscapeCharacter)) { optionalParameters.add("escaped_by = '" + csvEscapeCharacter + "'"); } if (Objects.nonNull(partitionColumns) && !partitionColumns.isEmpty()) { final StringJoiner partitionedByList = new StringJoiner(","); partitionColumns.forEach(column -> partitionedByList.add("'" + column + "'")); optionalParameters.add(String.format("partitioned_by = ARRAY[%s]", partitionedByList)); } withClause = String.format("WITH (%s)", optionalParameters); } return withClause; } public enum Format { CSV, PARQUET } public enum Compression { GZIP, NONE } public enum EncryptionType { SSE_S3, SSE_KMS } @Override public String toString() { return getUnloadQuery(); } }
Java v2
// When you have a SELECT like below String QUERY_1 = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + DATABASE_NAME + "." + UNLOAD_TABLE_NAME + " WHERE time BETWEEN ago(2d) AND now()"; //You can construct UNLOAD query as follows UnloadQuery unloadQuery = UnloadQuery.builder() .selectQuery(QUERY_1) .bucketName("timestream-sample-<region>-<accountId>") .resultsPrefix("without_partition") .format(CSV) .compression(UnloadQuery.Compression.GZIP) .build(); QueryResponse unloadResponse = runQuery(unloadQuery.getUnloadQuery()); // Run UNLOAD query (Similar to how you run SELECT query) // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination private QueryResponse runQuery(String queryString) { QueryResponse finalResponse = null; try { QueryRequest queryRequest = QueryRequest.builder().queryString(queryString).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { parseQueryResult(queryResponse); finalResponse = queryResponse; } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } return finalResponse; } // Utility that helps to construct UNLOAD query @Builder static class UnloadQuery { private String selectQuery; private String bucketName; private String resultsPrefix; private Format format; private Compression compression; private EncryptionType encryptionType; private List<String> partitionColumns; private String kmsKey; private Character csvFieldDelimiter; private Character csvEscapeCharacter; public String getUnloadQuery() { String destination = constructDestination(); String withClause = constructOptionalParameters(); return String.format("UNLOAD (%s) TO '%s' %s", selectQuery, destination, withClause); } private String constructDestination() { return "s3://" + this.bucketName + "/" + this.resultsPrefix + "/"; } private String constructOptionalParameters() { boolean isOptionalParametersPresent = Objects.nonNull(format) || Objects.nonNull(compression) || Objects.nonNull(encryptionType) || Objects.nonNull(partitionColumns) || Objects.nonNull(kmsKey) || Objects.nonNull(csvFieldDelimiter) || Objects.nonNull(csvEscapeCharacter); String withClause = ""; if (isOptionalParametersPresent) { StringJoiner optionalParameters = new StringJoiner(","); if (Objects.nonNull(format)) { optionalParameters.add("format = '" + format + "'"); } if (Objects.nonNull(compression)) { optionalParameters.add("compression = '" + compression + "'"); } if (Objects.nonNull(encryptionType)) { optionalParameters.add("encryption = '" + encryptionType + "'"); } if (Objects.nonNull(kmsKey)) { optionalParameters.add("kms_key = '" + kmsKey + "'"); } if (Objects.nonNull(csvFieldDelimiter)) { optionalParameters.add("field_delimiter = '" + csvFieldDelimiter + "'"); } if (Objects.nonNull(csvEscapeCharacter)) { optionalParameters.add("escaped_by = '" + csvEscapeCharacter + "'"); } if (Objects.nonNull(partitionColumns) && !partitionColumns.isEmpty()) { final StringJoiner partitionedByList = new StringJoiner(","); partitionColumns.forEach(column -> partitionedByList.add("'" + column + "'")); optionalParameters.add(String.format("partitioned_by = ARRAY[%s]", partitionedByList)); } withClause = String.format("WITH (%s)", optionalParameters); } return withClause; } public enum Format { CSV, PARQUET } public enum Compression { GZIP, NONE } public enum EncryptionType { SSE_S3, SSE_KMS } @Override public String toString() { return getUnloadQuery(); } }
Go
// When you have a SELECT like below var Query = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + *databaseName + "." + *tableName + " WHERE time BETWEEN ago(2d) AND now()" // You can construct UNLOAD query as follows var unloadQuery = UnloadQuery{ Query: "SELECT user_id, ip_address, session_id, measure_name, time, query, quantity, product_id, channel, event FROM " + *databaseName + "." + *tableName + " WHERE time BETWEEN ago(2d) AND now()", Partitioned_by: []string{}, Compression: "GZIP", Format: "CSV", S3Location: bucketName, ResultPrefix: "without_partition", } // Run UNLOAD query (Similar to how you run SELECT query) // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination queryInput := &timestreamquery.QueryInput{ QueryString: build_query(unloadQuery), } err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { if (lastPage) { var response = parseQueryResult(page) var unloadFiles = getManifestAndMetadataFiles(s3Svc, response) displayColumns(unloadFiles, unloadQuery.Partitioned_by) displayResults(s3Svc, unloadFiles) } return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } // Utility that helps to construct UNLOAD query type UnloadQuery struct { Query string Partitioned_by []string Format string S3Location string ResultPrefix string Compression string } func build_query(unload_query UnloadQuery) *string { var query_results_s3_path = "'s3://" + unload_query.S3Location + "/" + unload_query.ResultPrefix + "/'" var query = "UNLOAD(" + unload_query.Query + ") TO " + query_results_s3_path + " WITH ( " if (len(unload_query.Partitioned_by) > 0) { query = query + "partitioned_by=ARRAY[" for i, column := range unload_query.Partitioned_by { if i == 0 { query = query + "'" + column + "'" } else { query = query + ",'" + column + "'" } } query = query + "]," } query = query + " format='" + unload_query.Format + "', " query = query + " compression='" + unload_query.Compression + "')" fmt.Println(query) return aws.String(query) }
Python
# When you have a SELECT like below QUERY_1 = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + database_name + "." + table_name + " WHERE time BETWEEN ago(2d) AND now()" # You can construct UNLOAD query as follows UNLOAD_QUERY_1 = UnloadQuery(QUERY_1, "timestream-sample-<region>-<accountId>", "without_partition", "CSV", "GZIP", "") # Run UNLOAD query (Similar to how you run SELECT query) # https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination def run_query(self, query_string): try: page_iterator = self.paginator.paginate(QueryString=UNLOAD_QUERY_1) except Exception as err: print("Exception while running query:", err) # Utility that helps to construct UNLOAD query class UnloadQuery: def __init__(self, query, s3_bucket_location, results_prefix, format, compression , partition_by): self.query = query self.s3_bucket_location = s3_bucket_location self.results_prefix = results_prefix self.format = format self.compression = compression self.partition_by = partition_by def build_query(self): query_results_s3_path = "'s3://" + self.s3_bucket_location + "/" + self.results_prefix + "/'" unload_query = "UNLOAD(" unload_query = unload_query + self.query unload_query = unload_query + ") " unload_query = unload_query + " TO " + query_results_s3_path unload_query = unload_query + " WITH ( " if(len(self.partition_by) > 0) : unload_query = unload_query + " partitioned_by = ARRAY" + str(self.partition_by) + "," unload_query = unload_query + " format='" + self.format + "', " unload_query = unload_query + " compression='" + self.compression + "')" return unload_query
Node.js
// When you have a SELECT like below QUERY_1 = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + database_name + "." + table_name + " WHERE time BETWEEN ago(2d) AND now()" // You can construct UNLOAD query as follows UNLOAD_QUERY_1 = new UnloadQuery(QUERY_1, "timestream-sample-<region>-<accountId>", "without_partition", "CSV", "GZIP", "") // Run UNLOAD query (Similar to how you run SELECT query) // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination async runQuery(query = UNLOAD_QUERY_1, nextToken) { const params = new QueryCommand({ QueryString: query }); if (nextToken) { params.NextToken = nextToken; } await queryClient.send(params).then( (response) => { if (response.NextToken) { runQuery(queryClient, query, response.NextToken); } else { await parseAndDisplayResults(response); } }, (err) => { console.error("Error while querying:", err); }); } class UnloadQuery { constructor(query, s3_bucket_location, results_prefix, format, compression , partition_by) { this.query = query; this.s3_bucket_location = s3_bucket_location this.results_prefix = results_prefix this.format = format this.compression = compression this.partition_by = partition_by } buildQuery() { const query_results_s3_path = "'s3://" + this.s3_bucket_location + "/" + this.results_prefix + "/'" let unload_query = "UNLOAD(" unload_query = unload_query + this.query unload_query = unload_query + ") " unload_query = unload_query + " TO " + query_results_s3_path unload_query = unload_query + " WITH ( " if(this.partition_by.length > 0) { let partitionBy = "" this.partition_by.forEach((str, i) => { partitionBy = partitionBy + (i ? ",'" : "'") + str + "'" }) unload_query = unload_query + " partitioned_by = ARRAY[" + partitionBy + "]," } unload_query = unload_query + " format='" + this.format + "', " unload_query = unload_query + " compression='" + this.compression + "')" return unload_query } }

Analise a UNLOAD resposta e obtenha a contagem de linhas, o link do manifesto e o link de metadados

Java
// Parsing UNLOAD query response is similar to how you parse SELECT query response: // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing // But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed // (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) public UnloadResponse parseResult(QueryResult queryResult) { Map<String, String> outputMap = new HashMap<>(); for (int i = 0; i < queryResult.getColumnInfo().size(); i++) { outputMap.put(queryResult.getColumnInfo().get(i).getName(), queryResult.getRows().get(0).getData().get(i).getScalarValue()); } return new UnloadResponse(outputMap); } @Getter class UnloadResponse { private final String metadataFile; private final String manifestFile; private final int rows; public UnloadResponse(Map<String, String> unloadResponse) { this.metadataFile = unloadResponse.get("metadataFile"); this.manifestFile = unloadResponse.get("manifestFile"); this.rows = Integer.parseInt(unloadResponse.get("rows")); } }
Java v2
// Parsing UNLOAD query response is similar to how you parse SELECT query response: // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing // But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed // (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) public UnloadResponse parseResult(QueryResponse queryResponse) { Map<String, String> outputMap = new HashMap<>(); for (int i = 0; i < queryResponse.columnInfo().size(); i++) { outputMap.put(queryResponse.columnInfo().get(i).name(), queryResponse.rows().get(0).data().get(i).scalarValue()); } return new UnloadResponse(outputMap); } @Getter class UnloadResponse { private final String metadataFile; private final String manifestFile; private final int rows; public UnloadResponse(Map<String, String> unloadResponse) { this.metadataFile = unloadResponse.get("metadataFile"); this.manifestFile = unloadResponse.get("manifestFile"); this.rows = Integer.parseInt(unloadResponse.get("rows")); } }
Go
// Parsing UNLOAD query response is similar to how you parse SELECT query response: // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing // But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed // (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) func parseQueryResult(queryOutput *timestreamquery.QueryOutput) map[string]string { var columnInfo = queryOutput.ColumnInfo; fmt.Println("ColumnInfo", columnInfo) fmt.Println("QueryId", queryOutput.QueryId) fmt.Println("QueryStatus", queryOutput.QueryStatus) return parseResponse(columnInfo, queryOutput.Rows[0]) } func parseResponse(columnInfo []*timestreamquery.ColumnInfo, row *timestreamquery.Row) map[string]string { var datum = row.Data response := make(map[string]string) for i, column := range columnInfo { response[*column.Name] = *datum[i].ScalarValue } return response }
Python
# Parsing UNLOAD query response is similar to how you parse SELECT query response: # https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing # But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed # (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) for page in page_iterator: last_page = page response = self._parse_unload_query_result(last_page) def _parse_unload_query_result(self, query_result): column_info = query_result['ColumnInfo'] print("ColumnInfo: %s" % column_info) print("QueryId: %s" % query_result['QueryId']) print("QueryStatus:%s" % query_result['QueryStatus']) return self.parse_unload_response(column_info, query_result['Rows'][0]) def parse_unload_response(self, column_info, row): response = {} data = row['Data'] for i, column in enumerate(column_info): response[column['Name']] = data[i]['ScalarValue'] print("Rows: %s" % response['rows']) print("Metadata File location: %s" % response['metadataFile']) print("Manifest File location: %s" % response['manifestFile']) return response
Node.js
# Parsing UNLOAD query response is similar to how you parse SELECT query response: # https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing # But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed # (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) async parseAndDisplayResults(data, query) { const columnInfo = data['ColumnInfo']; console.log("ColumnInfo:", columnInfo) console.log("QueryId: %s", data['QueryId']) console.log("QueryStatus:", data['QueryStatus']) await this.parseResponse(columnInfo, data['Rows'][0], query) } async parseResponse(columnInfo, row, query) { let response = {} const data = row['Data'] columnInfo.forEach((column, i) => { response[column['Name']] = data[i]['ScalarValue'] }) console.log("Manifest file", response['manifestFile']); console.log("Metadata file", response['metadataFile']); return response }

Leia e analise o conteúdo do manifesto

Java
// Read and parse manifest content public UnloadManifest getUnloadManifest(UnloadResponse unloadResponse) throws IOException { AmazonS3URI s3URI = new AmazonS3URI(unloadResponse.getManifestFile()); S3Object s3Object = s3Client.getObject(s3URI.getBucket(), s3URI.getKey()); String manifestFileContent = new String(IOUtils.toByteArray(s3Object.getObjectContent()), StandardCharsets.UTF_8); return new Gson().fromJson(manifestFileContent, UnloadManifest.class); } class UnloadManifest { @Getter public class FileMetadata { long content_length_in_bytes; long row_count; } @Getter public class ResultFile { String url; FileMetadata file_metadata; } @Getter public class QueryMetadata { long total_content_length_in_bytes; long total_row_count; String result_format; String result_version; } @Getter public class Author { String name; String manifest_file_version; } @Getter private List<ResultFile> result_files; @Getter private QueryMetadata query_metadata; @Getter private Author author; }
Java v2
// Read and parse manifest content public UnloadManifest getUnloadManifest(UnloadResponse unloadResponse) throws URISyntaxException { // Space needs to encoded to use S3 parseUri function S3Uri s3Uri = s3Utilities.parseUri(URI.create(unloadResponse.getManifestFile().replace(" ", "%20"))); ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(GetObjectRequest.builder() .bucket(s3Uri.bucket().orElseThrow(() -> new URISyntaxException(unloadResponse.getManifestFile(), "Invalid S3 URI"))) .key(s3Uri.key().orElseThrow(() -> new URISyntaxException(unloadResponse.getManifestFile(), "Invalid S3 URI"))) .build()); String manifestFileContent = new String(objectBytes.asByteArray(), StandardCharsets.UTF_8); return new Gson().fromJson(manifestFileContent, UnloadManifest.class); } class UnloadManifest { @Getter public class FileMetadata { long content_length_in_bytes; long row_count; } @Getter public class ResultFile { String url; FileMetadata file_metadata; } @Getter public class QueryMetadata { long total_content_length_in_bytes; long total_row_count; String result_format; String result_version; } @Getter public class Author { String name; String manifest_file_version; } @Getter private List<ResultFile> result_files; @Getter private QueryMetadata query_metadata; @Getter private Author author; }
Go
// Read and parse manifest content func getManifestFile(s3Svc *s3.S3, response map[string]string) Manifest { var manifestBuf = getObject(s3Svc, response["manifestFile"]) var manifest Manifest json.Unmarshal(manifestBuf.Bytes(), &manifest) return manifest } func getObject(s3Svc *s3.S3, s3Uri string) *bytes.Buffer { u,_ := url.Parse(s3Uri) getObjectInput := &s3.GetObjectInput{ Key: aws.String(u.Path), Bucket: aws.String(u.Host), } getObjectOutput, err := s3Svc.GetObject(getObjectInput) if err != nil { fmt.Println("Error: %s\n", err.Error()) } buf := new(bytes.Buffer) buf.ReadFrom(getObjectOutput.Body) return buf } // Unload's Manifest structure type Manifest struct { Author interface{} Query_metadata map[string]any Result_files []struct { File_metadata interface{} Url string } }}
Python
def __get_manifest_file(self, response): manifest = self.get_object(response['manifestFile']).read().decode('utf-8') parsed_manifest = json.loads(manifest) print("Manifest contents: \n%s" % parsed_manifest) def get_object(self, uri): try: bucket, key = uri.replace("s3://", "").split("/", 1) s3_client = boto3.client('s3', region_name=<region>) response = s3_client.get_object(Bucket=bucket, Key=key) return response['Body'] except Exception as err: print("Failed to get the object for URI:", uri) raise err
Node.js
// Read and parse manifest content async getManifestFile(response) { let manifest; await this.getS3Object(response['manifestFile']).then( (data) => { manifest = JSON.parse(data); } ); return manifest; } async getS3Object(uri) { const {bucketName, key} = this.getBucketAndKey(uri); const params = new GetObjectCommand({ Bucket: bucketName, Key: key }) const response = await this.s3Client.send(params); return await response.Body.transformToString(); } getBucketAndKey(uri) { const [bucketName] = uri.replace("s3://", "").split("/", 1); const key = uri.replace("s3://", "").split('/').slice(1).join('/'); return {bucketName, key}; }

Leia e analise o conteúdo de metadados

Java
// Read and parse metadata content public UnloadMetadata getUnloadMetadata(UnloadResponse unloadResponse) throws IOException { AmazonS3URI s3URI = new AmazonS3URI(unloadResponse.getMetadataFile()); S3Object s3Object = s3Client.getObject(s3URI.getBucket(), s3URI.getKey()); String metadataFileContent = new String(IOUtils.toByteArray(s3Object.getObjectContent()), StandardCharsets.UTF_8); final Gson gson = new GsonBuilder() .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE) .create(); return gson.fromJson(metadataFileContent, UnloadMetadata.class); } class UnloadMetadata { @JsonProperty("ColumnInfo") List<ColumnInfo> columnInfo; @JsonProperty("Author") Author author; @Data public class Author { @JsonProperty("Name") String name; @JsonProperty("MetadataFileVersion") String metadataFileVersion; } }
Java v2
// Read and parse metadata content public UnloadMetadata getUnloadMetadata(UnloadResponse unloadResponse) throws URISyntaxException { // Space needs to encoded to use S3 parseUri function S3Uri s3Uri = s3Utilities.parseUri(URI.create(unloadResponse.getMetadataFile().replace(" ", "%20"))); ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(GetObjectRequest.builder() .bucket(s3Uri.bucket().orElseThrow(() -> new URISyntaxException(unloadResponse.getMetadataFile(), "Invalid S3 URI"))) .key(s3Uri.key().orElseThrow(() -> new URISyntaxException(unloadResponse.getMetadataFile(), "Invalid S3 URI"))) .build()); String metadataFileContent = new String(objectBytes.asByteArray(), StandardCharsets.UTF_8); final Gson gson = new GsonBuilder() .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE) .create(); return gson.fromJson(metadataFileContent, UnloadMetadata.class); } class UnloadMetadata { @JsonProperty("ColumnInfo") List<ColumnInfo> columnInfo; @JsonProperty("Author") Author author; @Data public class Author { @JsonProperty("Name") String name; @JsonProperty("MetadataFileVersion") String metadataFileVersion; } }
Go
// Read and parse metadata content func getMetadataFile(s3Svc *s3.S3, response map[string]string) Metadata { var metadataBuf = getObject(s3Svc, response["metadataFile"]) var metadata Metadata json.Unmarshal(metadataBuf.Bytes(), &metadata) return metadata } func getObject(s3Svc *s3.S3, s3Uri string) *bytes.Buffer { u,_ := url.Parse(s3Uri) getObjectInput := &s3.GetObjectInput{ Key: aws.String(u.Path), Bucket: aws.String(u.Host), } getObjectOutput, err := s3Svc.GetObject(getObjectInput) if err != nil { fmt.Println("Error: %s\n", err.Error()) } buf := new(bytes.Buffer) buf.ReadFrom(getObjectOutput.Body) return buf } // Unload's Metadata structure type Metadata struct { Author interface{} ColumnInfo []struct { Name string Type map[string]string } }
Python
def __get_metadata_file(self, response): metadata = self.get_object(response['metadataFile']).read().decode('utf-8') parsed_metadata = json.loads(metadata) print("Metadata contents: \n%s" % parsed_metadata) def get_object(self, uri): try: bucket, key = uri.replace("s3://", "").split("/", 1) s3_client = boto3.client('s3', region_name=<region>) response = s3_client.get_object(Bucket=bucket, Key=key) return response['Body'] except Exception as err: print("Failed to get the object for URI:", uri) raise err
Node.js
// Read and parse metadata content async getMetadataFile(response) { let metadata; await this.getS3Object(response['metadataFile']).then( (data) => { metadata = JSON.parse(data); } ); return metadata; } async getS3Object(uri) { const {bucketName, key} = this.getBucketAndKey(uri); const params = new GetObjectCommand({ Bucket: bucketName, Key: key }) const response = await this.s3Client.send(params); return await response.Body.transformToString(); } getBucketAndKey(uri) { const [bucketName] = uri.replace("s3://", "").split("/", 1); const key = uri.replace("s3://", "").split('/').slice(1).join('/'); return {bucketName, key}; }