Memulai Kinesis Data Streams untuk Amazon DynamoDB - Amazon DynamoDB

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memulai Kinesis Data Streams untuk Amazon DynamoDB

Bagian ini menjelaskan cara menggunakan Kinesis Data Streams untuk tabel Amazon DynamoDB dengan konsol Amazon DynamoDB, (), dan. AWS Command Line Interface AWS CLI API

Membuat aliran data Amazon Kinesis yang aktif

Semua contoh ini menggunakan tabel Music DynamoDB yang dibuat sebagai bagian dari tutorial Memulai dengan DynamoDB.

Untuk mempelajari lebih lanjut tentang cara membangun konsumen dan menghubungkan aliran data Kinesis Anda ke layanan AWS lain, lihat Membaca data dari Kinesis Data Streams dalam panduan pengembang Amazon Kinesis Data Streams.

catatan

Saat pertama kali menggunakan KDS pecahan, sebaiknya atur pecahan Anda agar naik dan turun dengan pola penggunaan. Setelah mengumpulkan lebih banyak data tentang pola penggunaan, Anda dapat menyesuaikan serpihan di aliran agar sesuai.

Console
  1. Masuk ke AWS Management Console dan buka konsol Kinesis di. https://console.aws.amazon.com/kinesis/

  2. Pilih Buat aliran data dan ikuti petunjuk untuk membuat stream yang disebut samplestream.

  3. Buka konsol DynamoDB di. https://console.aws.amazon.com/dynamodb/

  4. Di panel navigasi di sisi kiri konsol, pilih Tabel.

  5. Pilih tabel Musik.

  6. Pilih tab Ekspor dan aliran.

  7. (Opsional) Di bawah detail aliran data Amazon Kinesis, Anda dapat mengubah presisi stempel waktu rekaman dari mikrodetik (default) menjadi milidetik.

  8. Pilih samplestream dari daftar menurun.

  9. Pilih tombol Hidupkan.

AWS CLI
  1. Buat Kinesis Data Streams bernama samplestream dengan menggunakan perintah create-stream.

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    Lihat Pertimbangan manajemen pecahan untuk Kinesis Data Streams sebelum menetapkan jumlah serpihan untuk Kinesis data stream.

  2. Periksa apakah stream Kinesis aktif dan siap digunakan dengan menggunakan perintah describe-stream.

    aws kinesis describe-stream --stream-name samplestream
  3. Aktifkan Kinesis streaming pada tabel DynamoDB dengan menggunakan perintah enable-kinesis-streaming-destination DynamoDB. Ganti stream-arn nilai dengan yang ditampilkan describe-stream pada langkah sebelumnya. Secara opsional, aktifkan streaming dengan presisi nilai stempel waktu yang lebih granular (mikrodetik) yang dikembalikan pada setiap catatan.

    Aktifkan streaming dengan presisi stempel waktu mikrodetik:

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND

    Atau aktifkan streaming dengan presisi stempel waktu default (milidetik):

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
  4. Periksa apakah streaming Kinesis aktif pada tabel DynamoDB dengan menggunakan perintah describe-kinesis-streaming-destination DynamoDB.

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. Menulis data ke Daftar Tabel DynamoDB dengan menggunakan perintah put-item, seperti yang dijelaskan dalam Panduan Developer DynamoDB.

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. Gunakan CLI perintah Kinesis get-records untuk mengambil konten aliran Kinesis. Kemudian gunakan potongan kode berikut untuk melakukan deserialisasi konten stream.

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. Ikuti petunjuk dalam panduan developer Kinesis Data Streams untuk menciptakan Kinesis data stream bernama samplestream menggunakan Java.

    Lihat Pertimbangan manajemen pecahan untuk Kinesis Data Streams sebelum menetapkan jumlah serpihan untuk Kinesis data stream.

  2. Gunakan cuplikan kode berikut untuk mengaktifkan Kinesis streaming pada tabel DynamoDB. Secara opsional, aktifkan streaming dengan presisi nilai stempel waktu yang lebih granular (mikrodetik) yang dikembalikan pada setiap catatan.

    Aktifkan streaming dengan presisi stempel waktu mikrodetik:

    EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder() .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND) .build(); EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .enableKinesisStreamingConfiguration(enableKdsConfig) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);

    Atau aktifkan streaming dengan presisi stempel waktu default (milidetik):

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. Ikuti instruksi dalam panduan pengembang Kinesis Data Streams untuk membaca dari aliran data yang dibuat.

  4. Gunakan potongan kode berikut untuk melakukan deserialisasi konten stream

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }

Membuat perubahan pada aliran data Amazon Kinesis yang aktif

Bagian ini menjelaskan cara membuat perubahan pada Kinesis Data Streams aktif untuk penyiapan DynamoDB dengan menggunakan konsol, dan. AWS CLI API

AWS Management Console

  1. Buka konsol DynamoDB di https://console.aws.amazon.com/dynamodb/

  2. Pergi ke meja Anda.

  3. Pilih Ekspor dan Aliran.

AWS CLI

  1. Hubungi describe-kinesis-streaming-destination untuk mengonfirmasi bahwa streaming tersebutACTIVE.

  2. PanggilanUpdateKinesisStreamingDestination, seperti dalam contoh ini:

    aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
  3. Hubungi describe-kinesis-streaming-destination untuk mengonfirmasi bahwa streaming tersebutUPDATING.

  4. Panggil describe-kinesis-streaming-destination secara berkala hingga status streaming ACTIVE kembali. Biasanya diperlukan waktu hingga 5 menit agar pembaruan presisi stempel waktu diterapkan. Setelah status ini diperbarui, itu menunjukkan bahwa pembaruan selesai dan nilai presisi baru akan diterapkan pada catatan masa depan.

  5. Tulis ke meja menggunakanputItem.

  6. Gunakan get-records perintah Kinesis untuk mendapatkan konten aliran.

  7. Konfirmasikan bahwa penulisan memiliki presisi yang diinginkan. ApproximateCreationDateTime

Jawa API

  1. Berikan cuplikan kode yang membuat UpdateKinesisStreamingDestination permintaan dan respons. UpdateKinesisStreamingDestination

  2. Berikan cuplikan kode yang membuat DescribeKinesisStreamingDestination permintaan dan file. DescribeKinesisStreamingDestination response

  3. Panggil describe-kinesis-streaming-destination secara berkala hingga status streaming ACTIVE kembali, menunjukkan bahwa pembaruan selesai dan nilai presisi baru akan diterapkan pada catatan masa depan.

  4. Lakukan menulis ke meja.

  5. Baca dari aliran dan deserialisasi konten streaming.

  6. Konfirmasikan bahwa penulisan memiliki presisi yang diinginkan. ApproximateCreationDateTime