Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Abfrage ausführen
Ergebnisse paginieren
Wenn Sie eine Abfrage ausführen, gibt Timestream die Ergebnismenge paginiert zurück, um die Reaktionsfähigkeit Ihrer Anwendungen zu optimieren. Der folgende Codeausschnitt zeigt, wie Sie den Ergebnissatz paginieren können. Sie müssen alle Seiten der Ergebnismenge in einer Schleife durchgehen, bis Sie auf einen Nullwert stoßen. Paginierungstoken laufen 3 Stunden ab, nachdem sie von Timestream für ausgestellt wurden. LiveAnalytics
Anmerkung
Diese Codefragmente basieren auf vollständigen Beispielanwendungen auf. GitHub
- Java
-
private void runQuery(String queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(queryString); QueryResult queryResult = queryClient.query(queryRequest); while (true) { parseQueryResult(queryResult); if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } }
- Java v2
-
private void runQuery(String queryString) { try { QueryRequest queryRequest = QueryRequest.builder().queryString(queryString).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { parseQueryResult(queryResponse); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } }
- Go
-
func runQuery(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) { queryInput := ×treamquery.QueryInput{ QueryString: aws.String(*queryPtr), } fmt.Println("QueryInput:") fmt.Println(queryInput) // execute the query err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { // process query response queryStatus := page.QueryStatus fmt.Println("Current query status:", queryStatus) // query response metadata // includes column names and types metadata := page.ColumnInfo // fmt.Println("Metadata:") fmt.Println(metadata) header := "" for i := 0; i < len(metadata); i++ { header += *metadata[i].Name if i != len(metadata)-1 { header += ", " } } write(f, header) // query response data fmt.Println("Data:") // process rows rows := page.Rows for i := 0; i < len(rows); i++ { data := rows[i].Data value := processRowType(data, metadata) fmt.Println(value) write(f, value) } fmt.Println("Number of rows:", len(page.Rows)) return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } }
- Python
-
def run_query(self, query_string): try: page_iterator = self.paginator.paginate(QueryString=query_string) for page in page_iterator: self._parse_query_result(page) except Exception as err: print("Exception while running query:", err)
- Node.js
-
Der folgende Codeausschnitt verwendet den AWS SDK for JavaScript V2-Stil. Es basiert auf der Beispielanwendung unter Node.js Beispiel Amazon Timestream für die LiveAnalytics Anwendung auf GitHub
. async function getAllRows(query, nextToken) { const params = { QueryString: query }; if (nextToken) { params.NextToken = nextToken; } await queryClient.query(params).promise() .then( (response) => { parseQueryResult(response); if (response.NextToken) { getAllRows(query, response.NextToken); } }, (err) => { console.error("Error while querying:", err); }); }
- .NET
-
private async Task RunQueryAsync(string queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.QueryString = queryString; QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest); while (true) { ParseQueryResult(queryResponse); if (queryResponse.NextToken == null) { break; } queryRequest.NextToken = queryResponse.NextToken; queryResponse = await queryClient.QueryAsync(queryRequest); } } catch(Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries Console.WriteLine(e.ToString()); } }
Analysieren von Ergebnismengen
Sie können die folgenden Codefragmente verwenden, um Daten aus der Ergebnismenge zu extrahieren. Auf die Abfrageergebnisse kann bis zu 24 Stunden nach Abschluss einer Abfrage zugegriffen werden.
Anmerkung
Diese Codefragmente basieren auf vollständigen Beispielanwendungen auf. GitHub
- Java
-
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSSSS"); private static final long ONE_GB_IN_BYTES = 1073741824L; private void parseQueryResult(QueryResult response) { final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%"); double bytesScannedSoFar = ((double) currentStatusOfQuery.getCumulativeBytesScanned() / ONE_GB_IN_BYTES); System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); List<ColumnInfo> columnInfo = response.getColumnInfo(); List<Row> rows = response.getRows(); System.out.println("Metadata: " + columnInfo); System.out.println("Data: "); // iterate every row for (Row row : rows) { System.out.println(parseRow(columnInfo, row)); } } private String parseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.getData(); List<String> rowOutput = new ArrayList<>(); // iterate every column per row for (int j = 0; j < data.size(); j++) { ColumnInfo info = columnInfo.get(j); Datum datum = data.get(j); rowOutput.add(parseDatum(info, datum)); } return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseDatum(ColumnInfo info, Datum datum) { if (datum.isNullValue() != null && datum.isNullValue()) { return info.getName() + "=" + "NULL"; } Type columnType = info.getType(); // If the column is of TimeSeries Type if (columnType.getTimeSeriesMeasureValueColumnInfo() != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.getArrayColumnInfo() != null) { List<Datum> arrayValues = datum.getArrayValue(); return info.getName() + "=" + parseArray(info.getType().getArrayColumnInfo(), arrayValues); } // If the column is of Row Type else if (columnType.getRowColumnInfo() != null) { List<ColumnInfo> rowColumnInfo = info.getType().getRowColumnInfo(); Row rowValues = datum.getRowValue(); return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } private String parseTimeSeries(ColumnInfo info, Datum datum) { List<String> timeSeriesOutput = new ArrayList<>(); for (TimeSeriesDataPoint dataPoint : datum.getTimeSeriesValue()) { timeSeriesOutput.add("{time=" + dataPoint.getTime() + ", value=" + parseDatum(info.getType().getTimeSeriesMeasureValueColumnInfo(), dataPoint.getValue()) + "}"); } return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseScalarType(ColumnInfo info, Datum datum) { switch (ScalarType.fromValue(info.getType().getScalarType())) { case VARCHAR: return parseColumnName(info) + datum.getScalarValue(); case BIGINT: Long longValue = Long.valueOf(datum.getScalarValue()); return parseColumnName(info) + longValue; case INTEGER: Integer intValue = Integer.valueOf(datum.getScalarValue()); return parseColumnName(info) + intValue; case BOOLEAN: Boolean booleanValue = Boolean.valueOf(datum.getScalarValue()); return parseColumnName(info) + booleanValue; case DOUBLE: Double doubleValue = Double.valueOf(datum.getScalarValue()); return parseColumnName(info) + doubleValue; case TIMESTAMP: return parseColumnName(info) + LocalDateTime.parse(datum.getScalarValue(), TIMESTAMP_FORMATTER); case DATE: return parseColumnName(info) + LocalDate.parse(datum.getScalarValue(), DATE_FORMATTER); case TIME: return parseColumnName(info) + LocalTime.parse(datum.getScalarValue(), TIME_FORMATTER); case INTERVAL_DAY_TO_SECOND: case INTERVAL_YEAR_TO_MONTH: return parseColumnName(info) + datum.getScalarValue(); case UNKNOWN: return parseColumnName(info) + datum.getScalarValue(); default: throw new IllegalArgumentException("Given type is not valid: " + info.getType().getScalarType()); } } private String parseColumnName(ColumnInfo info) { return info.getName() == null ? "" : info.getName() + "="; } private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { List<String> arrayOutput = new ArrayList<>(); for (Datum datum : arrayValues) { arrayOutput.add(parseDatum(arrayColumnInfo, datum)); } return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(","))); }
- Java v2
-
private static final long ONE_GB_IN_BYTES = 1073741824L; private void parseQueryResult(QueryResponse response) { final QueryStatus currentStatusOfQuery = response.queryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%"); double bytesScannedSoFar = ((double) currentStatusOfQuery.cumulativeBytesScanned() / ONE_GB_IN_BYTES); System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); List<ColumnInfo> columnInfo = response.columnInfo(); List<Row> rows = response.rows(); System.out.println("Metadata: " + columnInfo); System.out.println("Data: "); // iterate every row for (Row row : rows) { System.out.println(parseRow(columnInfo, row)); } } private String parseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.data(); List<String> rowOutput = new ArrayList<>(); // iterate every column per row for (int j = 0; j < data.size(); j++) { ColumnInfo info = columnInfo.get(j); Datum datum = data.get(j); rowOutput.add(parseDatum(info, datum)); } return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseDatum(ColumnInfo info, Datum datum) { if (datum.nullValue() != null && datum.nullValue()) { return info.name() + "=" + "NULL"; } Type columnType = info.type(); // If the column is of TimeSeries Type if (columnType.timeSeriesMeasureValueColumnInfo() != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.arrayColumnInfo() != null) { List<Datum> arrayValues = datum.arrayValue(); return info.name() + "=" + parseArray(info.type().arrayColumnInfo(), arrayValues); } // If the column is of Row Type else if (columnType.rowColumnInfo() != null && columnType.rowColumnInfo().size() > 0) { List<ColumnInfo> rowColumnInfo = info.type().rowColumnInfo(); Row rowValues = datum.rowValue(); return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } private String parseTimeSeries(ColumnInfo info, Datum datum) { List<String> timeSeriesOutput = new ArrayList<>(); for (TimeSeriesDataPoint dataPoint : datum.timeSeriesValue()) { timeSeriesOutput.add("{time=" + dataPoint.time() + ", value=" + parseDatum(info.type().timeSeriesMeasureValueColumnInfo(), dataPoint.value()) + "}"); } return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseScalarType(ColumnInfo info, Datum datum) { return parseColumnName(info) + datum.scalarValue(); } private String parseColumnName(ColumnInfo info) { return info.name() == null ? "" : info.name() + "="; } private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { List<String> arrayOutput = new ArrayList<>(); for (Datum datum : arrayValues) { arrayOutput.add(parseDatum(arrayColumnInfo, datum)); } return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(","))); }
- Go
-
func processScalarType(data *timestreamquery.Datum) string { return *data.ScalarValue } func processTimeSeriesType(data []*timestreamquery.TimeSeriesDataPoint, columnInfo *timestreamquery.ColumnInfo) string { value := "" for k := 0; k < len(data); k++ { time := data[k].Time value += *time + ":" if columnInfo.Type.ScalarType != nil { value += processScalarType(data[k].Value) } else if columnInfo.Type.ArrayColumnInfo != nil { value += processArrayType(data[k].Value.ArrayValue, columnInfo.Type.ArrayColumnInfo) } else if columnInfo.Type.RowColumnInfo != nil { value += processRowType(data[k].Value.RowValue.Data, columnInfo.Type.RowColumnInfo) } else { fail("Bad data type") } if k != len(data)-1 { value += ", " } } return value } func processArrayType(datumList []*timestreamquery.Datum, columnInfo *timestreamquery.ColumnInfo) string { value := "" for k := 0; k < len(datumList); k++ { if columnInfo.Type.ScalarType != nil { value += processScalarType(datumList[k]) } else if columnInfo.Type.TimeSeriesMeasureValueColumnInfo != nil { value += processTimeSeriesType(datumList[k].TimeSeriesValue, columnInfo.Type.TimeSeriesMeasureValueColumnInfo) } else if columnInfo.Type.ArrayColumnInfo != nil { value += "[" value += processArrayType(datumList[k].ArrayValue, columnInfo.Type.ArrayColumnInfo) value += "]" } else if columnInfo.Type.RowColumnInfo != nil { value += "[" value += processRowType(datumList[k].RowValue.Data, columnInfo.Type.RowColumnInfo) value += "]" } else { fail("Bad column type") } if k != len(datumList)-1 { value += ", " } } return value } func processRowType(data []*timestreamquery.Datum, metadata []*timestreamquery.ColumnInfo) string { value := "" for j := 0; j < len(data); j++ { if metadata[j].Type.ScalarType != nil { // process simple data types value += processScalarType(data[j]) } else if metadata[j].Type.TimeSeriesMeasureValueColumnInfo != nil { // fmt.Println("Timeseries measure value column info") // fmt.Println(metadata[j].Type.TimeSeriesMeasureValueColumnInfo.Type) datapointList := data[j].TimeSeriesValue value += "[" value += processTimeSeriesType(datapointList, metadata[j].Type.TimeSeriesMeasureValueColumnInfo) value += "]" } else if metadata[j].Type.ArrayColumnInfo != nil { columnInfo := metadata[j].Type.ArrayColumnInfo // fmt.Println("Array column info") // fmt.Println(columnInfo) datumList := data[j].ArrayValue value += "[" value += processArrayType(datumList, columnInfo) value += "]" } else if metadata[j].Type.RowColumnInfo != nil { columnInfo := metadata[j].Type.RowColumnInfo datumList := data[j].RowValue.Data value += "[" value += processRowType(datumList, columnInfo) value += "]" } else { panic("Bad column type") } // comma seperated column values if j != len(data)-1 { value += ", " } } return value }
- Python
-
def _parse_query_result(self, query_result): query_status = query_result["QueryStatus"] progress_percentage = query_status["ProgressPercentage"] print(f"Query progress so far: {progress_percentage}%") bytes_scanned = float(query_status["CumulativeBytesScanned"]) / ONE_GB_IN_BYTES print(f"Data scanned so far: {bytes_scanned} GB") bytes_metered = float(query_status["CumulativeBytesMetered"]) / ONE_GB_IN_BYTES print(f"Data metered so far: {bytes_metered} GB") column_info = query_result['ColumnInfo'] print("Metadata: %s" % column_info) print("Data: ") for row in query_result['Rows']: print(self._parse_row(column_info, row)) def _parse_row(self, column_info, row): data = row['Data'] row_output = [] for j in range(len(data)): info = column_info[j] datum = data[j] row_output.append(self._parse_datum(info, datum)) return "{%s}" % str(row_output) def _parse_datum(self, info, datum): if datum.get('NullValue', False): return "%s=NULL" % info['Name'], column_type = info['Type'] # If the column is of TimeSeries Type if 'TimeSeriesMeasureValueColumnInfo' in column_type: return self._parse_time_series(info, datum) # If the column is of Array Type elif 'ArrayColumnInfo' in column_type: array_values = datum['ArrayValue'] return "%s=%s" % (info['Name'], self._parse_array(info['Type']['ArrayColumnInfo'], array_values)) # If the column is of Row Type elif 'RowColumnInfo' in column_type: row_column_info = info['Type']['RowColumnInfo'] row_values = datum['RowValue'] return self._parse_row(row_column_info, row_values) # If the column is of Scalar Type else: return self._parse_column_name(info) + datum['ScalarValue'] def _parse_time_series(self, info, datum): time_series_output = [] for data_point in datum['TimeSeriesValue']: time_series_output.append("{time=%s, value=%s}" % (data_point['Time'], self._parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'], data_point['Value']))) return "[%s]" % str(time_series_output) def _parse_array(self, array_column_info, array_values): array_output = [] for datum in array_values: array_output.append(self._parse_datum(array_column_info, datum)) return "[%s]" % str(array_output) @staticmethod def _parse_column_name(info): if 'Name' in info: return info['Name'] + "=" else: return ""
- Node.js
-
Der folgende Codeausschnitt verwendet den AWS SDK for JavaScript V2-Stil. Es basiert auf der Beispielanwendung unter Node.js Beispiel Amazon Timestream für die LiveAnalytics Anwendung auf GitHub
. function parseQueryResult(response) { const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; console.log("Metadata: " + JSON.stringify(columnInfo)); console.log("Data: "); rows.forEach(function (row) { console.log(parseRow(columnInfo, row)); }); } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for ( i = 0; i < data.length; i++ ) { info = columnInfo[i]; datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}` } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `${info.Name}=NULL`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`) }); return `[${timeSeriesOutput.join(", ")}]` } function parseScalarType(info, datum) { return parseColumnName(info) + datum.ScalarValue; } function parseColumnName(info) { return info.Name == null ? "" : `${info.Name}=`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]` }
- .NET
-
private void ParseQueryResult(QueryResponse response) { List<ColumnInfo> columnInfo = response.ColumnInfo; var options = new JsonSerializerOptions { IgnoreNullValues = true }; List<String> columnInfoStrings = columnInfo.ConvertAll(x => JsonSerializer.Serialize(x, options)); List<Row> rows = response.Rows; QueryStatus queryStatus = response.QueryStatus; Console.WriteLine("Current Query status:" + JsonSerializer.Serialize(queryStatus, options)); Console.WriteLine("Metadata:" + string.Join(",", columnInfoStrings)); Console.WriteLine("Data:"); foreach (Row row in rows) { Console.WriteLine(ParseRow(columnInfo, row)); } } private string ParseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.Data; List<string> rowOutput = new List<string>(); for (int j = 0; j < data.Count; j++) { ColumnInfo info = columnInfo[j]; Datum datum = data[j]; rowOutput.Add(ParseDatum(info, datum)); } return $"{{{string.Join(",", rowOutput)}}}"; } private string ParseDatum(ColumnInfo info, Datum datum) { if (datum.NullValue) { return $"{info.Name}=NULL"; } Amazon.TimestreamQuery.Model.Type columnType = info.Type; if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return ParseTimeSeries(info, datum); } else if (columnType.ArrayColumnInfo != null) { List<Datum> arrayValues = datum.ArrayValue; return $"{info.Name}={ParseArray(info.Type.ArrayColumnInfo, arrayValues)}"; } else if (columnType.RowColumnInfo != null && columnType.RowColumnInfo.Count > 0) { List<ColumnInfo> rowColumnInfo = info.Type.RowColumnInfo; Row rowValue = datum.RowValue; return ParseRow(rowColumnInfo, rowValue); } else { return ParseScalarType(info, datum); } } private string ParseTimeSeries(ColumnInfo info, Datum datum) { var timeseriesString = datum.TimeSeriesValue .Select(value => $"{{time={value.Time}, value={ParseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, value.Value)}}}") .Aggregate((current, next) => current + "," + next); return $"[{timeseriesString}]"; } private string ParseScalarType(ColumnInfo info, Datum datum) { return ParseColumnName(info) + datum.ScalarValue; } private string ParseColumnName(ColumnInfo info) { return info.Name == null ? "" : (info.Name + "="); } private string ParseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { return $"[{arrayValues.Select(value => ParseDatum(arrayColumnInfo, value)).Aggregate((current, next) => current + "," + next)}]"; }
Zugriff auf den Abfragestatus
Sie können auf den Abfragestatus zugreifenQueryResponse
, der Informationen über den Fortschritt einer Abfrage, die von einer Abfrage gescannten Byte und die von einer Abfrage gemessenen Byte enthält. Die bytesScanned
Werte bytesMetered
und sind kumulativ und werden beim Paging der Abfrageergebnisse kontinuierlich aktualisiert. Sie können diese Informationen verwenden, um zu verstehen, welche Byte von einer einzelnen Abfrage gescannt wurden, und anhand dieser Informationen auch bestimmte Entscheidungen treffen. Wenn Sie beispielsweise annehmen, dass der Abfragepreis 0,01 USD pro gescanntem GB beträgt, möchten Sie möglicherweise Abfragen stornieren, die 25 USD pro Abfrage oder X
GB überschreiten. Der folgende Codeausschnitt zeigt, wie das gemacht werden kann.
Anmerkung
Diese Codefragmente basieren auf vollständigen Beispielanwendungen auf. GitHub
- Java
-
private static final long ONE_GB_IN_BYTES = 1073741824L; private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB public void cancelQueryBasedOnQueryStatus() { System.out.println("Starting query: " + SELECT_ALL_QUERY); QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(SELECT_ALL_QUERY); QueryResult queryResult = queryClient.query(queryRequest); while (true) { final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { cancelQuery(queryResult); break; } if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } }
- Java v2
-
private static final long ONE_GB_IN_BYTES = 1073741824L; private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB public void cancelQueryBasedOnQueryStatus() { System.out.println("Starting query: " + SELECT_ALL_QUERY); QueryRequest queryRequest = QueryRequest.builder().queryString(SELECT_ALL_QUERY).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { final QueryStatus currentStatusOfQuery = queryResponse.queryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + "GB"); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { cancelQuery(queryResponse); break; } } }
- Go
-
const OneGbInBytes = 1073741824 // Assuming the price of query is $0.01 per GB const QueryCostPerGbInDollars = 0.01 func cancelQueryBasedOnQueryStatus(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) { queryInput := ×treamquery.QueryInput{ QueryString: aws.String(*queryPtr), } fmt.Println("QueryInput:") fmt.Println(queryInput) // execute the query err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { // process query response queryStatus := page.QueryStatus fmt.Println("Current query status:", queryStatus) bytes_metered := float64(*queryStatus.CumulativeBytesMetered) / float64(ONE_GB_IN_BYTES) if bytes_metered * QUERY_COST_PER_GB_IN_DOLLARS > 0.01 { cancelQuery(page, querySvc) return true } // query response metadata // includes column names and types metadata := page.ColumnInfo // fmt.Println("Metadata:") fmt.Println(metadata) header := "" for i := 0; i < len(metadata); i++ { header += *metadata[i].Name if i != len(metadata)-1 { header += ", " } } write(f, header) // query response data fmt.Println("Data:") // process rows rows := page.Rows for i := 0; i < len(rows); i++ { data := rows[i].Data value := processRowType(data, metadata) fmt.Println(value) write(f, value) } fmt.Println("Number of rows:", len(page.Rows)) return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } }
- Python
-
ONE_GB_IN_BYTES = 1073741824 # Assuming the price of query is $0.01 per GB QUERY_COST_PER_GB_IN_DOLLARS = 0.01 def cancel_query_based_on_query_status(self): try: print("Starting query: " + self.SELECT_ALL) page_iterator = self.paginator.paginate(QueryString=self.SELECT_ALL) for page in page_iterator: query_status = page["QueryStatus"] progress_percentage = query_status["ProgressPercentage"] print("Query progress so far: " + str(progress_percentage) + "%") bytes_metered = query_status["CumulativeBytesMetered"] / self.ONE_GB_IN_BYTES print("Bytes Metered so far: " + str(bytes_metered) + " GB") if bytes_metered * self.QUERY_COST_PER_GB_IN_DOLLARS > 0.01: self.cancel_query_for(page) break except Exception as err: print("Exception while running query:", err) traceback.print_exc(file=sys.stderr)
- Node.js
-
Der folgende Codeausschnitt verwendet den AWS SDK for JavaScript V2-Stil. Es basiert auf der Beispielanwendung unter Node.js Beispiel Amazon Timestream für die LiveAnalytics Anwendung auf GitHub
. function parseQueryResult(response) { const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; console.log("Metadata: " + JSON.stringify(columnInfo)); console.log("Data: "); rows.forEach(function (row) { console.log(parseRow(columnInfo, row)); }); } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for ( i = 0; i < data.length; i++ ) { info = columnInfo[i]; datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}` } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `${info.Name}=NULL`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`) }); return `[${timeSeriesOutput.join(", ")}]` } function parseScalarType(info, datum) { return parseColumnName(info) + datum.ScalarValue; } function parseColumnName(info) { return info.Name == null ? "" : `${info.Name}=`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]` }
- .NET
-
private static readonly long ONE_GB_IN_BYTES = 1073741824L; private static readonly double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB private async Task CancelQueryBasedOnQueryStatus(string queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.QueryString = queryString; QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest); while (true) { QueryStatus queryStatus = queryResponse.QueryStatus; double bytesMeteredSoFar = ((double) queryStatus.CumulativeBytesMetered / ONE_GB_IN_BYTES); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { await CancelQuery(queryResponse); break; } ParseQueryResult(queryResponse); if (queryResponse.NextToken == null) { break; } queryRequest.NextToken = queryResponse.NextToken; queryResponse = await queryClient.QueryAsync(queryRequest); } } catch(Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries Console.WriteLine(e.ToString()); } }
Weitere Informationen zum Stornieren einer Abfrage finden Sie unterAnfrage abbrechen.