Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Écrire des données (insertions et insertions)
Rubriques
Écrire des lots d'enregistrements
Vous pouvez utiliser les extraits de code suivants pour écrire des données dans une table Amazon Timestream. L'écriture de données par lots permet d'optimiser le coût des écritures. Pour plus d’informations, consultez Calcul du nombre d'écritures.
Note
Ces extraits de code sont basés sur des exemples complets d'applications sur. GitHub
- Java
-
public void writeRecords() { System.out.println("Writing records"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record cpuUtilization = new Record() .withDimensions(dimensions) .withMeasureName("cpu_utilization") .withMeasureValue("13.5") .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); Record memoryUtilization = new Record() .withDimensions(dimensions) .withMeasureName("memory_utilization") .withMeasureValue("40") .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withRecords(records); try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
- Java v2
-
public void writeRecords() { System.out.println("Writing records"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record cpuUtilization = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .measureName("cpu_utilization") .measureValue("13.5") .time(String.valueOf(time)).build(); Record memoryUtilization = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .measureName("memory_utilization") .measureValue("40") .time(String.valueOf(time)).build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME).tableName(TABLE_NAME).records(records).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
- Go
-
now := time.Now() currentTimeInSeconds := now.Unix() writeRecordsInput := ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), Records: []*timestreamwrite.Record{ ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, }, } _, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
- Python
-
def write_records(self): print("Writing records") current_time = self._current_milli_time() dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] cpu_utilization = { 'Dimensions': dimensions, 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5', 'MeasureValueType': 'DOUBLE', 'Time': current_time } memory_utilization = { 'Dimensions': dimensions, 'MeasureName': 'memory_utilization', 'MeasureValue': '40', 'MeasureValueType': 'DOUBLE', 'Time': current_time } records = [cpu_utilization, memory_utilization] try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes={}) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _print_rejected_records_exceptions(err): print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) if "ExistingVersion" in rr: print("Rejected record existing version: ", rr["ExistingVersion"]) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
- Node.js
-
L'extrait suivant utilise le style AWS SDK for JavaScript V2. Il est basé sur l'exemple d'application disponible sur Node.js, un exemple d'Amazon Timestream LiveAnalytics
pour une application sur. GitHub async function writeRecords() { console.log("Writing records"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const cpuUtilization = { 'Dimensions': dimensions, 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5', 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const memoryUtilization = { 'Dimensions': dimensions, 'MeasureName': 'memory_utilization', 'MeasureValue': '40', 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records }; const request = writeClient.writeRecords(params); await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } ); }
- .NET
-
public async Task WriteRecords() { Console.WriteLine("Writing records"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var cpuUtilization = new Record { Dimensions = dimensions, MeasureName = "cpu_utilization", MeasureValue = "13.6", MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; var memoryUtilization = new Record { Dimensions = dimensions, MeasureName = "memory_utilization", MeasureValue = "40", MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; List<Record> records = new List<Record> { cpuUtilization, memoryUtilization }; try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }
Écrire des lots d'enregistrements avec des attributs communs
Si les données de vos séries chronologiques comportent des mesures et/ou des dimensions communes à de nombreux points de données, vous pouvez également utiliser la version optimisée suivante writeRecords API pour insérer des données dans Timestream pour. LiveAnalytics L'utilisation d'attributs communs avec le traitement par lots permet d'optimiser davantage le coût des écritures, comme décrit dansCalcul du nombre d'écritures.
Note
Ces extraits de code sont basés sur des exemples complets d'applications sur. GitHub
- Java
-
public void writeRecordsWithCommonAttributes() { System.out.println("Writing records with extracting common attributes"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); Record cpuUtilization = new Record() .withMeasureName("cpu_utilization") .withMeasureValue("13.5"); Record memoryUtilization = new Record() .withMeasureName("memory_utilization") .withMeasureValue("40"); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsRequest.setRecords(records); try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
- Java v2
-
public void writeRecordsWithCommonAttributes() { System.out.println("Writing records with extracting common attributes"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)).build(); Record cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("13.5").build(); Record memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("40").build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
- Go
-
now = time.Now() currentTimeInSeconds = now.Unix() writeRecordsCommonAttributesInput := ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), CommonAttributes: ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, Records: []*timestreamwrite.Record{ ×treamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), }, ×treamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), }, }, } _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Ingest records is successful") }
- Python
-
def write_records_with_common_attributes(self): print("Writing records extracting common attributes") current_time = self._current_milli_time() dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] common_attributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': current_time } cpu_utilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' } memory_utilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' } records = [cpu_utilization, memory_utilization] try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _print_rejected_records_exceptions(err): print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) if "ExistingVersion" in rr: print("Rejected record existing version: ", rr["ExistingVersion"]) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
- Node.js
-
L'extrait suivant utilise le style AWS SDK for JavaScript V2. Il est basé sur l'exemple d'application disponible sur Node.js, un exemple d'Amazon Timestream LiveAnalytics
pour une application sur. GitHub async function writeRecordsWithCommonAttributes() { console.log("Writing records with common attributes"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const commonAttributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const cpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' }; const memoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records, CommonAttributes: commonAttributes }; const request = writeClient.writeRecords(params); await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } ); }
- .NET
-
public async Task WriteRecordsWithCommonAttributes() { Console.WriteLine("Writing records with common attributes"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; var cpuUtilization = new Record { MeasureName = "cpu_utilization", MeasureValue = "13.6" }; var memoryUtilization = new Record { MeasureName = "memory_utilization", MeasureValue = "40" }; List<Record> records = new List<Record>(); records.Add(cpuUtilization); records.Add(memoryUtilization); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }
Upserting records
Alors que l'écriture par défaut dans Amazon Timestream suit la sémantique du premier rédacteur gagne, où les données sont stockées sous forme d'ajout uniquement et les enregistrements dupliqués sont rejetés, certaines applications nécessitent la possibilité d'écrire des données dans Amazon Timestream en utilisant la sémantique du dernier rédacteur gagnant, où l'enregistrement contenant la version la plus élevée est stocké dans le système. Certaines applications nécessitent également la possibilité de mettre à jour les enregistrements existants. Pour répondre à ces scénarios, Amazon Timestream permet de modifier les données. Upsert est une opération qui insère un enregistrement dans le système lorsqu'il n'existe pas ou met à jour l'enregistrement lorsqu'il en existe un.
Vous pouvez modifier des enregistrements en incluant la définition d'enregistrement Version
lors de l'envoi d'une WriteRecords
demande. Amazon Timestream stockera l'enregistrement avec l'enregistrement le plus élevé. Version
L'exemple de code ci-dessous montre comment vous pouvez modifier des données :
Note
Ces extraits de code sont basés sur des exemples complets d'applications sur. GitHub
- Java
-
public void writeRecordsWithUpsert() { System.out.println("Writing records with upsert"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)) .withVersion(version); Record cpuUtilization = new Record() .withMeasureName("cpu_utilization") .withMeasureValue("13.5"); Record memoryUtilization = new Record() .withMeasureName("memory_utilization") .withMeasureValue("40"); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsRequest.setRecords(records); // write records for first time try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for first time: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for retry: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1; commonAttributes.setVersion(version); cpuUtilization.setMeasureValue("14.5"); memoryUtilization.setMeasureValue("50"); List<Record> upsertedRecords = new ArrayList<>(); upsertedRecords.add(cpuUtilization); upsertedRecords.add(memoryUtilization); WriteRecordsRequest writeRecordsUpsertRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsUpsertRequest.setRecords(upsertedRecords); try { WriteRecordsResult writeRecordsUpsertResult = amazonTimestreamWrite.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with lower version: " + writeRecordsUpsertResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("WriteRecords Status for upsert with lower version: "); printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with higher version as new data in generated version = System.currentTimeMillis(); commonAttributes.setVersion(version); writeRecordsUpsertRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsUpsertRequest.setRecords(upsertedRecords); try { WriteRecordsResult writeRecordsUpsertResult = amazonTimestreamWrite.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with higher version: " + writeRecordsUpsertResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } }
- Java v2
-
public void writeRecordsWithUpsert() { System.out.println("Writing records with upsert"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); Record cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("13.5").build(); Record memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("40").build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for first time: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for retry: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1; commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("14.5").build(); memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("50").build(); List<Record> upsertedRecords = new ArrayList<>(); upsertedRecords.add(cpuUtilization); upsertedRecords.add(memoryUtilization); WriteRecordsRequest writeRecordsUpsertRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(upsertedRecords).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with lower version: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("WriteRecords Status for upsert with lower version: "); printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with higher version as new data in generated version = System.currentTimeMillis(); commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); writeRecordsUpsertRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(upsertedRecords).build(); try { WriteRecordsResponse writeRecordsUpsertResponse = timestreamWriteClient.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with higher version: " + writeRecordsUpsertResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } }
- Go
-
// Below code will ingest and upsert cpu_utilization and memory_utilization metric for a host on // region=us-east-1, az=az1, and hostname=host1 fmt.Println("Ingesting records and set version as currentTimeInMills, hit enter to continue") reader.ReadString('\n') // Get current time in seconds. now = time.Now() currentTimeInSeconds = now.Unix() // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source version := time.Now().Round(time.Millisecond).UnixNano() / 1e6 // set version as currentTimeInMills writeRecordsCommonAttributesUpsertInput := ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), CommonAttributes: ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), Version: &version, }, Records: []*timestreamwrite.Record{ ×treamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), }, ×treamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), }, }, } // write records for first time _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Frist-time write records is successful") } fmt.Println("Retry same writeRecordsRequest with same records and versions. Because writeRecords API is idempotent, this will success. hit enter to continue") reader.ReadString('\n') _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Retry write records for same request is successful") } fmt.Println("Upsert with lower version, this would fail because a higher version is required to update the measure value. hit enter to continue") reader.ReadString('\n') version -= 1 writeRecordsCommonAttributesUpsertInput.CommonAttributes.Version = &version updated_cpu_utilization := ×treamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("14.5"), } updated_memory_utilization := ×treamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("50"), } writeRecordsCommonAttributesUpsertInput.Records = []*timestreamwrite.Record{ updated_cpu_utilization, updated_memory_utilization, } _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records with lower version is successful") } fmt.Println("Upsert with higher version as new data in generated, this would success. hit enter to continue") reader.ReadString('\n') version = time.Now().Round(time.Millisecond).UnixNano() / 1e6 // set version as currentTimeInMills writeRecordsCommonAttributesUpsertInput.CommonAttributes.Version = &version _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records with higher version is successful") }
- Python
-
def write_records_with_upsert(self): print("Writing records with upsert") current_time = self._current_milli_time() # To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source version = int(self._current_milli_time()) dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] common_attributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': current_time, 'Version': version } cpu_utilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' } memory_utilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' } records = [cpu_utilization, memory_utilization] # write records for first time try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status for first time: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status for retry: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1 common_attributes["Version"] = version cpu_utilization["MeasureValue"] = '14.5' memory_utilization["MeasureValue"] = '50' upsertedRecords = [cpu_utilization, memory_utilization] try: upsertedResult = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=upsertedRecords, CommonAttributes=common_attributes) print("WriteRecords Status for upsert with lower version: [%s]" % upsertedResult['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # upsert with higher version as new data is generated version = int(self._current_milli_time()) common_attributes["Version"] = version try: upsertedResult = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=upsertedRecords, CommonAttributes=common_attributes) print("WriteRecords Upsert Status: [%s]" % upsertedResult['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
- Node.js
-
L'extrait suivant utilise le style AWS SDK for JavaScript V2. Il est basé sur l'exemple d'application disponible sur Node.js, un exemple d'Amazon Timestream LiveAnalytics
pour une application sur. GitHub async function writeRecordsWithUpsert() { console.log("Writing records with upsert"); const currentTime = Date.now().toString(); // Unix time in milliseconds // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source let version = Date.now(); const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const commonAttributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const cpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' }; const memoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records, CommonAttributes: commonAttributes }; const request = writeClient.writeRecords(params); // write records for first time await request.promise().then( (data) => { console.log("Write records successful for first time."); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(request); } } ); // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. await request.promise().then( (data) => { console.log("Write records successful for retry."); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(request); } } ); // upsert with lower version, this would fail because a higher version is required to update the measure value. version--; const commonAttributesWithLowerVersion = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const updatedCpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '14.5' }; const updatedMemoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '50' }; const upsertedRecords = [updatedCpuUtilization, updatedMemoryUtilization]; const upsertedParamsWithLowerVersion = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: upsertedRecords, CommonAttributes: commonAttributesWithLowerVersion }; const upsertRequestWithLowerVersion = writeClient.writeRecords(upsertedParamsWithLowerVersion); await upsertRequestWithLowerVersion.promise().then( (data) => { console.log("Write records for upsert with lower version successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(upsertRequestWithLowerVersion); } } ); // upsert with higher version as new data in generated version = Date.now(); const commonAttributesWithHigherVersion = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const upsertedParamsWithHigherVerion = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: upsertedRecords, CommonAttributes: commonAttributesWithHigherVersion }; const upsertRequestWithHigherVersion = writeClient.writeRecords(upsertedParamsWithHigherVerion); await upsertRequestWithHigherVersion.promise().then( (data) => { console.log("Write records upsert successful with higher version"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(upsertedParamsWithHigherVerion); } } ); }
- .NET
-
public async Task WriteRecordsWithUpsert() { Console.WriteLine("Writing records with upsert"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = now.ToUnixTimeMilliseconds(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString, Version = version }; var cpuUtilization = new Record { MeasureName = "cpu_utilization", MeasureValue = "13.6" }; var memoryUtilization = new Record { MeasureName = "memory_utilization", MeasureValue = "40" }; List<Record> records = new List<Record>(); records.Add(cpuUtilization); records.Add(memoryUtilization); // write records for first time try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"WriteRecords Status for first time: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"WriteRecords Status for retry: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version--; Type recordType = typeof(Record); recordType.GetProperty("Version").SetValue(commonAttributes, version); recordType.GetProperty("MeasureValue").SetValue(cpuUtilization, "14.6"); recordType.GetProperty("MeasureValue").SetValue(memoryUtilization, "50"); List<Record> upsertedRecords = new List<Record> { cpuUtilization, memoryUtilization }; try { var writeRecordsUpsertRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = upsertedRecords, CommonAttributes = commonAttributes }; WriteRecordsResponse upsertResponse = await writeClient.WriteRecordsAsync(writeRecordsUpsertRequest); Console.WriteLine($"WriteRecords Status for upsert with lower version: {upsertResponse.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // upsert with higher version as new data in generated now = DateTimeOffset.UtcNow; version = now.ToUnixTimeMilliseconds(); recordType.GetProperty("Version").SetValue(commonAttributes, version); try { var writeRecordsUpsertRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = upsertedRecords, CommonAttributes = commonAttributes }; WriteRecordsResponse upsertResponse = await writeClient.WriteRecordsAsync(writeRecordsUpsertRequest); Console.WriteLine($"WriteRecords Status for upsert with higher version: {upsertResponse.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }
Exemple d'attribut à mesures multiples
Cet exemple illustre l'écriture d'attributs à plusieurs mesures. Les attributs à mesures multiples sont utiles lorsqu'un appareil ou une application que vous suivez émet plusieurs mesures ou événements au même horodatage.
Note
Ces extraits de code sont basés sur des exemples complets d'applications sur. GitHub
- Java
-
package com.amazonaws.services.timestream; import static com.amazonaws.services.timestream.Main.DATABASE_NAME; import static com.amazonaws.services.timestream.Main.REGION; import static com.amazonaws.services.timestream.Main.TABLE_NAME; import java.util.ArrayList; import java.util.List; import com.amazonaws.services.timestreamwrite.AmazonTimestreamWrite; import com.amazonaws.services.timestreamwrite.model.Dimension; import com.amazonaws.services.timestreamwrite.model.MeasureValue; import com.amazonaws.services.timestreamwrite.model.MeasureValueType; import com.amazonaws.services.timestreamwrite.model.Record; import com.amazonaws.services.timestreamwrite.model.RejectedRecordsException; import com.amazonaws.services.timestreamwrite.model.WriteRecordsRequest; import com.amazonaws.services.timestreamwrite.model.WriteRecordsResult; public class multimeasureAttributeExample { AmazonTimestreamWrite timestreamWriteClient; public multimeasureAttributeExample(AmazonTimestreamWrite client) { this.timestreamWriteClient = client; } public void writeRecordsMultiMeasureValueSingleRecord() { System.out.println("Writing records with multi value attributes"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue(REGION); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withTime(String.valueOf(time)) .withVersion(version); MeasureValue cpuUtilization = new MeasureValue() .withName("cpu_utilization") .withType(MeasureValueType.DOUBLE) .withValue("13.5"); MeasureValue memoryUtilization = new MeasureValue() .withName("memory_utilization") .withType(MeasureValueType.DOUBLE) .withValue("40"); Record computationalResources = new Record() .withMeasureName("cpu_memory") .withMeasureValues(cpuUtilization, memoryUtilization) .withMeasureValueType(MeasureValueType.MULTI); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes) .withRecords(records); // write records for first time try { WriteRecordsResult writeRecordResult = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordResult .getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } public void writeRecordsMultiMeasureValueMultipleRecords() { System.out.println( "Writing records with multi value attributes mixture type"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue(REGION); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withTime(String.valueOf(time)) .withVersion(version); MeasureValue cpuUtilization = new MeasureValue() .withName("cpu_utilization") .withType(MeasureValueType.DOUBLE) .withValue("13"); MeasureValue memoryUtilization =new MeasureValue() .withName("memory_utilization") .withType(MeasureValueType.DOUBLE) .withValue("40"); MeasureValue activeCores = new MeasureValue() .withName("active_cores") .withType(MeasureValueType.BIGINT) .withValue("4"); Record computationalResources = new Record() .withMeasureName("computational_utilization") .withMeasureValues(cpuUtilization, memoryUtilization, activeCores) .withMeasureValueType(MeasureValueType.MULTI); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes) .withRecords(records); // write records for first time try { WriteRecordsResult writeRecordResult = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordResult .getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } private void printRejectedRecordsException(RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); e.getRejectedRecords().forEach(System.out::println); } }
- Java v2
-
package com.amazonaws.services.timestream; import java.util.ArrayList; import java.util.List; import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient; import software.amazon.awssdk.services.timestreamwrite.model.Dimension; import software.amazon.awssdk.services.timestreamwrite.model.MeasureValue; import software.amazon.awssdk.services.timestreamwrite.model.MeasureValueType; import software.amazon.awssdk.services.timestreamwrite.model.Record; import software.amazon.awssdk.services.timestreamwrite.model.RejectedRecordsException; import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsRequest; import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsResponse; import static com.amazonaws.services.timestream.Main.DATABASE_NAME; import static com.amazonaws.services.timestream.Main.TABLE_NAME; public class multimeasureAttributeExample { TimestreamWriteClient timestreamWriteClient; public multimeasureAttributeExample(TimestreamWriteClient client) { this.timestreamWriteClient = client; } public void writeRecordsMultiMeasureValueSingleRecord() { System.out.println("Writing records with multi value attributes"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .time(String.valueOf(time)) .version(version) .build(); MeasureValue cpuUtilization = MeasureValue.builder() .name("cpu_utilization") .type(MeasureValueType.DOUBLE) .value("13.5").build(); MeasureValue memoryUtilization = MeasureValue.builder() .name("memory_utilization") .type(MeasureValueType.DOUBLE) .value("40").build(); Record computationalResources = Record .builder() .measureName("cpu_memory") .measureValues(cpuUtilization, memoryUtilization) .measureValueType(MeasureValueType.MULTI) .build(); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordsResponse .sdkHttpResponse() .statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } public void writeRecordsMultiMeasureValueMultipleRecords() { System.out.println( "Writing records with multi value attributes mixture type"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .time(String.valueOf(time)) .version(version) .build(); MeasureValue cpuUtilization = MeasureValue.builder() .name("cpu_utilization") .type(MeasureValueType.DOUBLE) .value("13.5").build(); MeasureValue memoryUtilization = MeasureValue.builder() .name("memory_utilization") .type(MeasureValueType.DOUBLE) .value("40").build(); MeasureValue activeCores = MeasureValue.builder() .name("active_cores") .type(MeasureValueType.BIGINT) .value("4").build(); Record computationalResources = Record .builder() .measureName("computational_utilization") .measureValues(cpuUtilization, memoryUtilization, activeCores) .measureValueType(MeasureValueType.MULTI) .build(); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordsResponse .sdkHttpResponse() .statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } private void printRejectedRecordsException(RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); e.rejectedRecords().forEach(System.out::println); } }
- Go
-
now := time.Now() currentTimeInSeconds := now.Unix() writeRecordsInput := ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), Records: []*timestreamwrite.Record{ ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("metrics"), MeasureValueType: aws.String("MULTI"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), MeasureValues: []*timestreamwrite.MeasureValue{ ×treamwrite.MeasureValue{ Name: aws.String("cpu_utilization"), Value: aws.String("13.5"), Type: aws.String("DOUBLE"), }, ×treamwrite.MeasureValue{ Name: aws.String("memory_utilization"), Value: aws.String("40"), Type: aws.String("DOUBLE"), }, }, }, }, } _, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
- Python
-
import time import boto3 import psutil import os from botocore.config import Config DATABASE_NAME = os.environ['DATABASE_NAME'] TABLE_NAME = os.environ['TABLE_NAME'] COUNTRY = "UK" CITY = "London" HOSTNAME = "MyHostname" # You can make it dynamic using socket.gethostname() INTERVAL = 1 # Seconds def prepare_common_attributes(): common_attributes = { 'Dimensions': [ {'Name': 'country', 'Value': COUNTRY}, {'Name': 'city', 'Value': CITY}, {'Name': 'hostname', 'Value': HOSTNAME} ], 'MeasureName': 'utilization', 'MeasureValueType': 'MULTI' } return common_attributes def prepare_record(current_time): record = { 'Time': str(current_time), 'MeasureValues': [] } return record def prepare_measure(measure_name, measure_value): measure = { 'Name': measure_name, 'Value': str(measure_value), 'Type': 'DOUBLE' } return measure def write_records(records, common_attributes): try: result = write_client.write_records(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME, CommonAttributes=common_attributes, Records=records) status = result['ResponseMetadata']['HTTPStatusCode'] print("Processed %d records. WriteRecords HTTPStatusCode: %s" % (len(records), status)) except Exception as err: print("Error:", err) if __name__ == '__main__': print("writing data to database {} table {}".format( DATABASE_NAME, TABLE_NAME)) session = boto3.Session() write_client = session.client('timestream-write', config=Config( read_timeout=20, max_pool_connections=5000, retries={'max_attempts': 10})) query_client = session.client('timestream-query') # Not used common_attributes = prepare_common_attributes() records = [] while True: current_time = int(time.time() * 1000) cpu_utilization = psutil.cpu_percent() memory_utilization = psutil.virtual_memory().percent swap_utilization = psutil.swap_memory().percent disk_utilization = psutil.disk_usage('/').percent record = prepare_record(current_time) record['MeasureValues'].append(prepare_measure('cpu', cpu_utilization)) record['MeasureValues'].append(prepare_measure('memory', memory_utilization)) record['MeasureValues'].append(prepare_measure('swap', swap_utilization)) record['MeasureValues'].append(prepare_measure('disk', disk_utilization)) records.append(record) print("records {} - cpu {} - memory {} - swap {} - disk {}".format( len(records), cpu_utilization, memory_utilization, swap_utilization, disk_utilization)) if len(records) == 100: write_records(records, common_attributes) records = [] time.sleep(INTERVAL)
- Node.js
-
L'extrait suivant utilise le style AWS SDK for JavaScript V2. Il est basé sur l'exemple d'application disponible sur Node.js, un exemple d'Amazon Timestream LiveAnalytics
pour une application sur. GitHub async function writeRecords() { console.log("Writing records"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const record = { 'Dimensions': dimensions, 'MeasureName': 'metrics', 'MeasureValues': [ { 'Name': 'cpu_utilization', 'Value': '40', 'Type': 'DOUBLE', }, { 'Name': 'memory_utilization', 'Value': '13.5', 'Type': 'DOUBLE', }, ], 'MeasureValueType': 'MULTI', 'Time': currentTime.toString() } const records = [record]; const params = { DatabaseName: 'DatabaseName', TableName: 'TableName', Records: records }; const response = await writeClient.writeRecords(params); console.log(response); }
- .NET
-
using System; using System.IO; using System.Collections.Generic; using Amazon.TimestreamWrite; using Amazon.TimestreamWrite.Model; using System.Threading.Tasks; namespace TimestreamDotNetSample { static class MultiMeasureValueConstants { public const string MultiMeasureValueSampleDb = "multiMeasureValueSampleDb"; public const string MultiMeasureValueSampleTable = "multiMeasureValueSampleTable"; } public class MultiValueAttributesExample { private readonly AmazonTimestreamWriteClient writeClient; public MultiValueAttributesExample(AmazonTimestreamWriteClient writeClient) { this.writeClient = writeClient; } public async Task WriteRecordsMultiMeasureValueSingleRecord() { Console.WriteLine("Writing records with multi value attributes"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, Time = currentTimeString }; var cpuUtilization = new MeasureValue { Name = "cpu_utilization", Value = "13.6", Type = "DOUBLE" }; var memoryUtilization = new MeasureValue { Name = "memory_utilization", Value = "40", Type = "DOUBLE" }; var computationalRecord = new Record { MeasureName = "cpu_memory", MeasureValues = new List<MeasureValue> {cpuUtilization, memoryUtilization}, MeasureValueType = "MULTI" }; List<Record> records = new List<Record>(); records.Add(computationalRecord); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = MultiMeasureValueConstants.MultiMeasureValueSampleDb, TableName = MultiMeasureValueConstants.MultiMeasureValueSampleTable, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } } public async Task WriteRecordsMultiMeasureValueMultipleRecords() { Console.WriteLine("Writing records with multi value attributes mixture type"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, Time = currentTimeString }; var cpuUtilization = new MeasureValue { Name = "cpu_utilization", Value = "13.6", Type = "DOUBLE" }; var memoryUtilization = new MeasureValue { Name = "memory_utilization", Value = "40", Type = "DOUBLE" }; var activeCores = new MeasureValue { Name = "active_cores", Value = "4", Type = "BIGINT" }; var computationalRecord = new Record { MeasureName = "computational_utilization", MeasureValues = new List<MeasureValue> {cpuUtilization, memoryUtilization, activeCores}, MeasureValueType = "MULTI" }; var aliveRecord = new Record { MeasureName = "is_healthy", MeasureValue = "true", MeasureValueType = "BOOLEAN" }; List<Record> records = new List<Record>(); records.Add(computationalRecord); records.Add(aliveRecord); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = MultiMeasureValueConstants.MultiMeasureValueSampleDb, TableName = MultiMeasureValueConstants.MultiMeasureValueSampleTable, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } } } }
Gestion des échecs d'écriture
Les écritures dans Amazon Timestream peuvent échouer pour une ou plusieurs des raisons suivantes :
Il existe des enregistrements dont l'horodatage se situe en dehors de la durée de conservation de la mémoire.
Certains enregistrements contiennent des dimensions et/ou des mesures qui dépassent les limites définies par Timestream.
-
Amazon Timestream a détecté des enregistrements dupliqués. Les enregistrements sont marqués comme doublons lorsque plusieurs enregistrements ont les mêmes dimensions, horodatages et noms de mesures, mais que :
Les valeurs de mesure sont différentes.
La version n'est pas présente dans la demande ou la valeur de la version dans le nouvel enregistrement est égale ou inférieure à la valeur existante. Si Amazon Timestream rejette les données pour cette raison,
ExistingVersion
le champ du contiendra la version actuelle deRejectedRecords
l'enregistrement telle qu'elle est stockée dans Amazon Timestream. Pour forcer une mise à jour, vous pouvez renvoyer la demande avec une version de l'enregistrement définie sur une valeur supérieure à.ExistingVersion
Pour plus d'informations sur les erreurs et les enregistrements rejetés, consultez la section Erreurs et RejectedRecord.
Si votre application reçoit un message RejectedRecordsException
lorsqu'elle tente d'écrire des enregistrements dans Timestream, vous pouvez analyser les enregistrements rejetés pour en savoir plus sur les échecs d'écriture, comme indiqué ci-dessous.
Note
Ces extraits de code sont basés sur des exemples complets d'applications sur. GitHub
- Java
-
try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); }
- Java v2
-
try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); }
- Go
-
_, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
- Python
-
try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) print("Other records were written successfully. ") except Exception as err: print("Error:", err)
- Node.js
-
L'extrait suivant utilise le style AWS SDK for JavaScript V2. Il est basé sur l'exemple d'application disponible sur Node.js, un exemple d'Amazon Timestream LiveAnalytics
pour une application sur. GitHub await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } );
- .NET
-
try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); }