翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
データの書き込み (挿入とアップサート)
レコードのバッチの書き込み
次のコードスニペットを使用して、Amazon Timestream テーブルにデータを書き込むことができます。データをバッチで書き込むと、書き込みのコストを最適化するのに役立ちます。詳細については、「書き込み回数の計算」を参照してください。
注記
これらのコードスニペットは、GitHub
- Java
-
public void writeRecords() { System.out.println("Writing records"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record cpuUtilization = new Record() .withDimensions(dimensions) .withMeasureName("cpu_utilization") .withMeasureValue("13.5") .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); Record memoryUtilization = new Record() .withDimensions(dimensions) .withMeasureName("memory_utilization") .withMeasureValue("40") .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withRecords(records); try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
- Java v2
-
public void writeRecords() { System.out.println("Writing records"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record cpuUtilization = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .measureName("cpu_utilization") .measureValue("13.5") .time(String.valueOf(time)).build(); Record memoryUtilization = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .measureName("memory_utilization") .measureValue("40") .time(String.valueOf(time)).build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME).tableName(TABLE_NAME).records(records).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
- Go
-
now := time.Now() currentTimeInSeconds := now.Unix() writeRecordsInput := ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), Records: []*timestreamwrite.Record{ ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, }, } _, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
- Python
-
def write_records(self): print("Writing records") current_time = self._current_milli_time() dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] cpu_utilization = { 'Dimensions': dimensions, 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5', 'MeasureValueType': 'DOUBLE', 'Time': current_time } memory_utilization = { 'Dimensions': dimensions, 'MeasureName': 'memory_utilization', 'MeasureValue': '40', 'MeasureValueType': 'DOUBLE', 'Time': current_time } records = [cpu_utilization, memory_utilization] try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes={}) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _print_rejected_records_exceptions(err): print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) if "ExistingVersion" in rr: print("Rejected record existing version: ", rr["ExistingVersion"]) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
- Node.js
-
次のスニペットでは、 AWS SDK for JavaScript V2 スタイルを使用しています。これは、Node.js のサンプル Amazon Timestream for LiveAnalytics アプリケーション on GitHub のサンプルアプリケーション
に基づいています。 async function writeRecords() { console.log("Writing records"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const cpuUtilization = { 'Dimensions': dimensions, 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5', 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const memoryUtilization = { 'Dimensions': dimensions, 'MeasureName': 'memory_utilization', 'MeasureValue': '40', 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records }; const request = writeClient.writeRecords(params); await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } ); }
- .NET
-
public async Task WriteRecords() { Console.WriteLine("Writing records"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var cpuUtilization = new Record { Dimensions = dimensions, MeasureName = "cpu_utilization", MeasureValue = "13.6", MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; var memoryUtilization = new Record { Dimensions = dimensions, MeasureName = "memory_utilization", MeasureValue = "40", MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; List<Record> records = new List<Record> { cpuUtilization, memoryUtilization }; try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }
共通の属性を持つレコードのバッチの書き込み
時系列データに、多くのデータポイントに共通する測定やディメンションがある場合は、次の最適化されたバージョンの writeRecords API Wordを使用して Timestream for LiveAnalytics にデータを挿入することもできます。バッチ処理で共通の属性を使用すると、「」で説明されているように、書き込みのコストをさらに最適化できます書き込み回数の計算。
注記
これらのコードスニペットは、GitHub
- Java
-
public void writeRecordsWithCommonAttributes() { System.out.println("Writing records with extracting common attributes"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); Record cpuUtilization = new Record() .withMeasureName("cpu_utilization") .withMeasureValue("13.5"); Record memoryUtilization = new Record() .withMeasureName("memory_utilization") .withMeasureValue("40"); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsRequest.setRecords(records); try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
- Java v2
-
public void writeRecordsWithCommonAttributes() { System.out.println("Writing records with extracting common attributes"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)).build(); Record cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("13.5").build(); Record memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("40").build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
- Go
-
now = time.Now() currentTimeInSeconds = now.Unix() writeRecordsCommonAttributesInput := ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), CommonAttributes: ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, Records: []*timestreamwrite.Record{ ×treamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), }, ×treamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), }, }, } _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Ingest records is successful") }
- Python
-
def write_records_with_common_attributes(self): print("Writing records extracting common attributes") current_time = self._current_milli_time() dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] common_attributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': current_time } cpu_utilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' } memory_utilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' } records = [cpu_utilization, memory_utilization] try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _print_rejected_records_exceptions(err): print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) if "ExistingVersion" in rr: print("Rejected record existing version: ", rr["ExistingVersion"]) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
- Node.js
-
次のスニペットでは、 AWS SDK for JavaScript V2 スタイルを使用しています。これは、Node.js のサンプル Amazon Timestream for LiveAnalytics アプリケーション on GitHub のサンプルアプリケーション
に基づいています。 async function writeRecordsWithCommonAttributes() { console.log("Writing records with common attributes"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const commonAttributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const cpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' }; const memoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records, CommonAttributes: commonAttributes }; const request = writeClient.writeRecords(params); await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } ); }
- .NET
-
public async Task WriteRecordsWithCommonAttributes() { Console.WriteLine("Writing records with common attributes"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; var cpuUtilization = new Record { MeasureName = "cpu_utilization", MeasureValue = "13.6" }; var memoryUtilization = new Record { MeasureName = "memory_utilization", MeasureValue = "40" }; List<Record> records = new List<Record>(); records.Add(cpuUtilization); records.Add(memoryUtilization); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }
レコードの更新
Amazon Timestream のデフォルトの書き込みは、データが追加としてのみ保存され、重複するレコードが拒否される最初のライターがセマンティクスを優先しますが、最後のライターがセマンティクスを優先して Amazon Timestream にデータを書き込む機能を必要とするアプリケーションがあり、バージョンが最も高いレコードがシステムに保存されます。既存のレコードを更新する必要があるアプリケーションもあります。これらのシナリオに対応するために、Amazon Timestream はデータをアップサートする機能を提供します。Upsert は、レコードが存在しない場合はシステムにレコードを挿入し、存在する場合はレコードを更新するオペレーションです。
WriteRecords
リクエストの送信中にレコード定義Version
に を含めることで、レコードをアップサートできます。Amazon Timestream は、レコードが最も高い のレコードを保存しますVersion
。以下のコードサンプルは、データをアップサートする方法を示しています。
注記
これらのコードスニペットは、GitHub
- Java
-
public void writeRecordsWithUpsert() { System.out.println("Writing records with upsert"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)) .withVersion(version); Record cpuUtilization = new Record() .withMeasureName("cpu_utilization") .withMeasureValue("13.5"); Record memoryUtilization = new Record() .withMeasureName("memory_utilization") .withMeasureValue("40"); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsRequest.setRecords(records); // write records for first time try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for first time: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for retry: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1; commonAttributes.setVersion(version); cpuUtilization.setMeasureValue("14.5"); memoryUtilization.setMeasureValue("50"); List<Record> upsertedRecords = new ArrayList<>(); upsertedRecords.add(cpuUtilization); upsertedRecords.add(memoryUtilization); WriteRecordsRequest writeRecordsUpsertRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsUpsertRequest.setRecords(upsertedRecords); try { WriteRecordsResult writeRecordsUpsertResult = amazonTimestreamWrite.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with lower version: " + writeRecordsUpsertResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("WriteRecords Status for upsert with lower version: "); printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with higher version as new data in generated version = System.currentTimeMillis(); commonAttributes.setVersion(version); writeRecordsUpsertRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsUpsertRequest.setRecords(upsertedRecords); try { WriteRecordsResult writeRecordsUpsertResult = amazonTimestreamWrite.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with higher version: " + writeRecordsUpsertResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } }
- Java v2
-
public void writeRecordsWithUpsert() { System.out.println("Writing records with upsert"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); Record cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("13.5").build(); Record memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("40").build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for first time: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for retry: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1; commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("14.5").build(); memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("50").build(); List<Record> upsertedRecords = new ArrayList<>(); upsertedRecords.add(cpuUtilization); upsertedRecords.add(memoryUtilization); WriteRecordsRequest writeRecordsUpsertRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(upsertedRecords).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with lower version: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("WriteRecords Status for upsert with lower version: "); printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with higher version as new data in generated version = System.currentTimeMillis(); commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); writeRecordsUpsertRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(upsertedRecords).build(); try { WriteRecordsResponse writeRecordsUpsertResponse = timestreamWriteClient.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with higher version: " + writeRecordsUpsertResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } }
- Go
-
// Below code will ingest and upsert cpu_utilization and memory_utilization metric for a host on // region=us-east-1, az=az1, and hostname=host1 fmt.Println("Ingesting records and set version as currentTimeInMills, hit enter to continue") reader.ReadString('\n') // Get current time in seconds. now = time.Now() currentTimeInSeconds = now.Unix() // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source version := time.Now().Round(time.Millisecond).UnixNano() / 1e6 // set version as currentTimeInMills writeRecordsCommonAttributesUpsertInput := ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), CommonAttributes: ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), Version: &version, }, Records: []*timestreamwrite.Record{ ×treamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), }, ×treamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), }, }, } // write records for first time _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Frist-time write records is successful") } fmt.Println("Retry same writeRecordsRequest with same records and versions. Because writeRecords API is idempotent, this will success. hit enter to continue") reader.ReadString('\n') _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Retry write records for same request is successful") } fmt.Println("Upsert with lower version, this would fail because a higher version is required to update the measure value. hit enter to continue") reader.ReadString('\n') version -= 1 writeRecordsCommonAttributesUpsertInput.CommonAttributes.Version = &version updated_cpu_utilization := ×treamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("14.5"), } updated_memory_utilization := ×treamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("50"), } writeRecordsCommonAttributesUpsertInput.Records = []*timestreamwrite.Record{ updated_cpu_utilization, updated_memory_utilization, } _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records with lower version is successful") } fmt.Println("Upsert with higher version as new data in generated, this would success. hit enter to continue") reader.ReadString('\n') version = time.Now().Round(time.Millisecond).UnixNano() / 1e6 // set version as currentTimeInMills writeRecordsCommonAttributesUpsertInput.CommonAttributes.Version = &version _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records with higher version is successful") }
- Python
-
def write_records_with_upsert(self): print("Writing records with upsert") current_time = self._current_milli_time() # To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source version = int(self._current_milli_time()) dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] common_attributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': current_time, 'Version': version } cpu_utilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' } memory_utilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' } records = [cpu_utilization, memory_utilization] # write records for first time try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status for first time: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status for retry: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1 common_attributes["Version"] = version cpu_utilization["MeasureValue"] = '14.5' memory_utilization["MeasureValue"] = '50' upsertedRecords = [cpu_utilization, memory_utilization] try: upsertedResult = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=upsertedRecords, CommonAttributes=common_attributes) print("WriteRecords Status for upsert with lower version: [%s]" % upsertedResult['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # upsert with higher version as new data is generated version = int(self._current_milli_time()) common_attributes["Version"] = version try: upsertedResult = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=upsertedRecords, CommonAttributes=common_attributes) print("WriteRecords Upsert Status: [%s]" % upsertedResult['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
- Node.js
-
次のスニペットでは、 AWS SDK for JavaScript V2 スタイルを使用しています。これは、Node.js のサンプル Amazon Timestream for LiveAnalytics アプリケーション on GitHub のサンプルアプリケーション
に基づいています。 async function writeRecordsWithUpsert() { console.log("Writing records with upsert"); const currentTime = Date.now().toString(); // Unix time in milliseconds // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source let version = Date.now(); const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const commonAttributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const cpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' }; const memoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records, CommonAttributes: commonAttributes }; const request = writeClient.writeRecords(params); // write records for first time await request.promise().then( (data) => { console.log("Write records successful for first time."); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(request); } } ); // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. await request.promise().then( (data) => { console.log("Write records successful for retry."); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(request); } } ); // upsert with lower version, this would fail because a higher version is required to update the measure value. version--; const commonAttributesWithLowerVersion = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const updatedCpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '14.5' }; const updatedMemoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '50' }; const upsertedRecords = [updatedCpuUtilization, updatedMemoryUtilization]; const upsertedParamsWithLowerVersion = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: upsertedRecords, CommonAttributes: commonAttributesWithLowerVersion }; const upsertRequestWithLowerVersion = writeClient.writeRecords(upsertedParamsWithLowerVersion); await upsertRequestWithLowerVersion.promise().then( (data) => { console.log("Write records for upsert with lower version successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(upsertRequestWithLowerVersion); } } ); // upsert with higher version as new data in generated version = Date.now(); const commonAttributesWithHigherVersion = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const upsertedParamsWithHigherVerion = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: upsertedRecords, CommonAttributes: commonAttributesWithHigherVersion }; const upsertRequestWithHigherVersion = writeClient.writeRecords(upsertedParamsWithHigherVerion); await upsertRequestWithHigherVersion.promise().then( (data) => { console.log("Write records upsert successful with higher version"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(upsertedParamsWithHigherVerion); } } ); }
- .NET
-
public async Task WriteRecordsWithUpsert() { Console.WriteLine("Writing records with upsert"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = now.ToUnixTimeMilliseconds(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString, Version = version }; var cpuUtilization = new Record { MeasureName = "cpu_utilization", MeasureValue = "13.6" }; var memoryUtilization = new Record { MeasureName = "memory_utilization", MeasureValue = "40" }; List<Record> records = new List<Record>(); records.Add(cpuUtilization); records.Add(memoryUtilization); // write records for first time try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"WriteRecords Status for first time: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"WriteRecords Status for retry: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version--; Type recordType = typeof(Record); recordType.GetProperty("Version").SetValue(commonAttributes, version); recordType.GetProperty("MeasureValue").SetValue(cpuUtilization, "14.6"); recordType.GetProperty("MeasureValue").SetValue(memoryUtilization, "50"); List<Record> upsertedRecords = new List<Record> { cpuUtilization, memoryUtilization }; try { var writeRecordsUpsertRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = upsertedRecords, CommonAttributes = commonAttributes }; WriteRecordsResponse upsertResponse = await writeClient.WriteRecordsAsync(writeRecordsUpsertRequest); Console.WriteLine($"WriteRecords Status for upsert with lower version: {upsertResponse.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // upsert with higher version as new data in generated now = DateTimeOffset.UtcNow; version = now.ToUnixTimeMilliseconds(); recordType.GetProperty("Version").SetValue(commonAttributes, version); try { var writeRecordsUpsertRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = upsertedRecords, CommonAttributes = commonAttributes }; WriteRecordsResponse upsertResponse = await writeClient.WriteRecordsAsync(writeRecordsUpsertRequest); Console.WriteLine($"WriteRecords Status for upsert with higher version: {upsertResponse.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }
複数メジャー属性の例
この例は、マルチティア属性の記述を示しています。複数測定属性は、追跡しているデバイスまたはアプリケーションが複数のメトリクスまたはイベントを同じタイムスタンプで発行する場合に便利です。
注記
これらのコードスニペットは、GitHub
- Java
-
package com.amazonaws.services.timestream; import static com.amazonaws.services.timestream.Main.DATABASE_NAME; import static com.amazonaws.services.timestream.Main.REGION; import static com.amazonaws.services.timestream.Main.TABLE_NAME; import java.util.ArrayList; import java.util.List; import com.amazonaws.services.timestreamwrite.AmazonTimestreamWrite; import com.amazonaws.services.timestreamwrite.model.Dimension; import com.amazonaws.services.timestreamwrite.model.MeasureValue; import com.amazonaws.services.timestreamwrite.model.MeasureValueType; import com.amazonaws.services.timestreamwrite.model.Record; import com.amazonaws.services.timestreamwrite.model.RejectedRecordsException; import com.amazonaws.services.timestreamwrite.model.WriteRecordsRequest; import com.amazonaws.services.timestreamwrite.model.WriteRecordsResult; public class multimeasureAttributeExample { AmazonTimestreamWrite timestreamWriteClient; public multimeasureAttributeExample(AmazonTimestreamWrite client) { this.timestreamWriteClient = client; } public void writeRecordsMultiMeasureValueSingleRecord() { System.out.println("Writing records with multi value attributes"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue(REGION); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withTime(String.valueOf(time)) .withVersion(version); MeasureValue cpuUtilization = new MeasureValue() .withName("cpu_utilization") .withType(MeasureValueType.DOUBLE) .withValue("13.5"); MeasureValue memoryUtilization = new MeasureValue() .withName("memory_utilization") .withType(MeasureValueType.DOUBLE) .withValue("40"); Record computationalResources = new Record() .withMeasureName("cpu_memory") .withMeasureValues(cpuUtilization, memoryUtilization) .withMeasureValueType(MeasureValueType.MULTI); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes) .withRecords(records); // write records for first time try { WriteRecordsResult writeRecordResult = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordResult .getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } public void writeRecordsMultiMeasureValueMultipleRecords() { System.out.println( "Writing records with multi value attributes mixture type"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue(REGION); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withTime(String.valueOf(time)) .withVersion(version); MeasureValue cpuUtilization = new MeasureValue() .withName("cpu_utilization") .withType(MeasureValueType.DOUBLE) .withValue("13"); MeasureValue memoryUtilization =new MeasureValue() .withName("memory_utilization") .withType(MeasureValueType.DOUBLE) .withValue("40"); MeasureValue activeCores = new MeasureValue() .withName("active_cores") .withType(MeasureValueType.BIGINT) .withValue("4"); Record computationalResources = new Record() .withMeasureName("computational_utilization") .withMeasureValues(cpuUtilization, memoryUtilization, activeCores) .withMeasureValueType(MeasureValueType.MULTI); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes) .withRecords(records); // write records for first time try { WriteRecordsResult writeRecordResult = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordResult .getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } private void printRejectedRecordsException(RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); e.getRejectedRecords().forEach(System.out::println); } }
- Java v2
-
package com.amazonaws.services.timestream; import java.util.ArrayList; import java.util.List; import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient; import software.amazon.awssdk.services.timestreamwrite.model.Dimension; import software.amazon.awssdk.services.timestreamwrite.model.MeasureValue; import software.amazon.awssdk.services.timestreamwrite.model.MeasureValueType; import software.amazon.awssdk.services.timestreamwrite.model.Record; import software.amazon.awssdk.services.timestreamwrite.model.RejectedRecordsException; import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsRequest; import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsResponse; import static com.amazonaws.services.timestream.Main.DATABASE_NAME; import static com.amazonaws.services.timestream.Main.TABLE_NAME; public class multimeasureAttributeExample { TimestreamWriteClient timestreamWriteClient; public multimeasureAttributeExample(TimestreamWriteClient client) { this.timestreamWriteClient = client; } public void writeRecordsMultiMeasureValueSingleRecord() { System.out.println("Writing records with multi value attributes"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .time(String.valueOf(time)) .version(version) .build(); MeasureValue cpuUtilization = MeasureValue.builder() .name("cpu_utilization") .type(MeasureValueType.DOUBLE) .value("13.5").build(); MeasureValue memoryUtilization = MeasureValue.builder() .name("memory_utilization") .type(MeasureValueType.DOUBLE) .value("40").build(); Record computationalResources = Record .builder() .measureName("cpu_memory") .measureValues(cpuUtilization, memoryUtilization) .measureValueType(MeasureValueType.MULTI) .build(); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordsResponse .sdkHttpResponse() .statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } public void writeRecordsMultiMeasureValueMultipleRecords() { System.out.println( "Writing records with multi value attributes mixture type"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .time(String.valueOf(time)) .version(version) .build(); MeasureValue cpuUtilization = MeasureValue.builder() .name("cpu_utilization") .type(MeasureValueType.DOUBLE) .value("13.5").build(); MeasureValue memoryUtilization = MeasureValue.builder() .name("memory_utilization") .type(MeasureValueType.DOUBLE) .value("40").build(); MeasureValue activeCores = MeasureValue.builder() .name("active_cores") .type(MeasureValueType.BIGINT) .value("4").build(); Record computationalResources = Record .builder() .measureName("computational_utilization") .measureValues(cpuUtilization, memoryUtilization, activeCores) .measureValueType(MeasureValueType.MULTI) .build(); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordsResponse .sdkHttpResponse() .statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } private void printRejectedRecordsException(RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); e.rejectedRecords().forEach(System.out::println); } }
- Go
-
now := time.Now() currentTimeInSeconds := now.Unix() writeRecordsInput := ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), Records: []*timestreamwrite.Record{ ×treamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ ×treamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, ×treamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, ×treamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("metrics"), MeasureValueType: aws.String("MULTI"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), MeasureValues: []*timestreamwrite.MeasureValue{ ×treamwrite.MeasureValue{ Name: aws.String("cpu_utilization"), Value: aws.String("13.5"), Type: aws.String("DOUBLE"), }, ×treamwrite.MeasureValue{ Name: aws.String("memory_utilization"), Value: aws.String("40"), Type: aws.String("DOUBLE"), }, }, }, }, } _, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
- Python
-
import time import boto3 import psutil import os from botocore.config import Config DATABASE_NAME = os.environ['DATABASE_NAME'] TABLE_NAME = os.environ['TABLE_NAME'] COUNTRY = "UK" CITY = "London" HOSTNAME = "MyHostname" # You can make it dynamic using socket.gethostname() INTERVAL = 1 # Seconds def prepare_common_attributes(): common_attributes = { 'Dimensions': [ {'Name': 'country', 'Value': COUNTRY}, {'Name': 'city', 'Value': CITY}, {'Name': 'hostname', 'Value': HOSTNAME} ], 'MeasureName': 'utilization', 'MeasureValueType': 'MULTI' } return common_attributes def prepare_record(current_time): record = { 'Time': str(current_time), 'MeasureValues': [] } return record def prepare_measure(measure_name, measure_value): measure = { 'Name': measure_name, 'Value': str(measure_value), 'Type': 'DOUBLE' } return measure def write_records(records, common_attributes): try: result = write_client.write_records(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME, CommonAttributes=common_attributes, Records=records) status = result['ResponseMetadata']['HTTPStatusCode'] print("Processed %d records. WriteRecords HTTPStatusCode: %s" % (len(records), status)) except Exception as err: print("Error:", err) if __name__ == '__main__': print("writing data to database {} table {}".format( DATABASE_NAME, TABLE_NAME)) session = boto3.Session() write_client = session.client('timestream-write', config=Config( read_timeout=20, max_pool_connections=5000, retries={'max_attempts': 10})) query_client = session.client('timestream-query') # Not used common_attributes = prepare_common_attributes() records = [] while True: current_time = int(time.time() * 1000) cpu_utilization = psutil.cpu_percent() memory_utilization = psutil.virtual_memory().percent swap_utilization = psutil.swap_memory().percent disk_utilization = psutil.disk_usage('/').percent record = prepare_record(current_time) record['MeasureValues'].append(prepare_measure('cpu', cpu_utilization)) record['MeasureValues'].append(prepare_measure('memory', memory_utilization)) record['MeasureValues'].append(prepare_measure('swap', swap_utilization)) record['MeasureValues'].append(prepare_measure('disk', disk_utilization)) records.append(record) print("records {} - cpu {} - memory {} - swap {} - disk {}".format( len(records), cpu_utilization, memory_utilization, swap_utilization, disk_utilization)) if len(records) == 100: write_records(records, common_attributes) records = [] time.sleep(INTERVAL)
- Node.js
-
次のスニペットでは、 AWS SDK for JavaScript V2 スタイルを使用しています。これは、Node.js のサンプル Amazon Timestream for LiveAnalytics アプリケーション on GitHub のサンプルアプリケーション
に基づいています。 async function writeRecords() { console.log("Writing records"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const record = { 'Dimensions': dimensions, 'MeasureName': 'metrics', 'MeasureValues': [ { 'Name': 'cpu_utilization', 'Value': '40', 'Type': 'DOUBLE', }, { 'Name': 'memory_utilization', 'Value': '13.5', 'Type': 'DOUBLE', }, ], 'MeasureValueType': 'MULTI', 'Time': currentTime.toString() } const records = [record]; const params = { DatabaseName: 'DatabaseName', TableName: 'TableName', Records: records }; const response = await writeClient.writeRecords(params); console.log(response); }
- .NET
-
using System; using System.IO; using System.Collections.Generic; using Amazon.TimestreamWrite; using Amazon.TimestreamWrite.Model; using System.Threading.Tasks; namespace TimestreamDotNetSample { static class MultiMeasureValueConstants { public const string MultiMeasureValueSampleDb = "multiMeasureValueSampleDb"; public const string MultiMeasureValueSampleTable = "multiMeasureValueSampleTable"; } public class MultiValueAttributesExample { private readonly AmazonTimestreamWriteClient writeClient; public MultiValueAttributesExample(AmazonTimestreamWriteClient writeClient) { this.writeClient = writeClient; } public async Task WriteRecordsMultiMeasureValueSingleRecord() { Console.WriteLine("Writing records with multi value attributes"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, Time = currentTimeString }; var cpuUtilization = new MeasureValue { Name = "cpu_utilization", Value = "13.6", Type = "DOUBLE" }; var memoryUtilization = new MeasureValue { Name = "memory_utilization", Value = "40", Type = "DOUBLE" }; var computationalRecord = new Record { MeasureName = "cpu_memory", MeasureValues = new List<MeasureValue> {cpuUtilization, memoryUtilization}, MeasureValueType = "MULTI" }; List<Record> records = new List<Record>(); records.Add(computationalRecord); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = MultiMeasureValueConstants.MultiMeasureValueSampleDb, TableName = MultiMeasureValueConstants.MultiMeasureValueSampleTable, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } } public async Task WriteRecordsMultiMeasureValueMultipleRecords() { Console.WriteLine("Writing records with multi value attributes mixture type"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, Time = currentTimeString }; var cpuUtilization = new MeasureValue { Name = "cpu_utilization", Value = "13.6", Type = "DOUBLE" }; var memoryUtilization = new MeasureValue { Name = "memory_utilization", Value = "40", Type = "DOUBLE" }; var activeCores = new MeasureValue { Name = "active_cores", Value = "4", Type = "BIGINT" }; var computationalRecord = new Record { MeasureName = "computational_utilization", MeasureValues = new List<MeasureValue> {cpuUtilization, memoryUtilization, activeCores}, MeasureValueType = "MULTI" }; var aliveRecord = new Record { MeasureName = "is_healthy", MeasureValue = "true", MeasureValueType = "BOOLEAN" }; List<Record> records = new List<Record>(); records.Add(computationalRecord); records.Add(aliveRecord); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = MultiMeasureValueConstants.MultiMeasureValueSampleDb, TableName = MultiMeasureValueConstants.MultiMeasureValueSampleTable, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } } } }
書き込み失敗の処理
Amazon Timestream での書き込みは、次の 1 つ以上の理由で失敗することがあります。
メモリストアの保持期間外のタイムスタンプを持つレコードがあります。
Timestream で定義された制限を超えるディメンションやメジャーを含むレコードがあります。
-
Amazon Timestream が重複レコードを検出しました。同じディメンション、タイムスタンプ、メジャー名を持つレコードが複数あるが、次の場合、レコードは重複としてマークされます。
測定値は異なります。
リクエストにバージョンが存在しないか、新しいレコードのバージョン の値が既存の値以下です。この理由で Amazon Timestream がデータを拒否
RejectedRecords
した場合、 のExistingVersion
フィールドには、Amazon Timestream に保存されているレコードの最新バージョンが含まれます。更新を強制するには、レコードのバージョンを より大きい値に設定してリクエストを再送信しますExistingVersion
。
エラーと拒否されたレコードの詳細については、「エラーと単語」を参照してください。 RejectedRecord
Timestream にレコードを書き込もうRejectedRecordsException
としたときにアプリケーションが を受け取った場合は、拒否されたレコードを解析して、次のように書き込み失敗の詳細を確認できます。
注記
これらのコードスニペットは、GitHub
- Java
-
try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); }
- Java v2
-
try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); }
- Go
-
_, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
- Python
-
try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) print("Other records were written successfully. ") except Exception as err: print("Error:", err)
- Node.js
-
次のスニペットでは、 AWS SDK for JavaScript V2 スタイルを使用しています。これは、Node.js のサンプル Amazon Timestream for LiveAnalytics アプリケーションの on GitHub のサンプルアプリケーション
に基づいています。 await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } );
- .NET
-
try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); }