Panduan: Adaptor DynamoDB Streams Kinesis - Amazon DynamoDB

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Panduan: Adaptor DynamoDB Streams Kinesis

Bagian ini adalah panduan aplikasi Java yang menggunakan Perpustakaan Klien Amazon Kinesis dan Adaptor Amazon DynamoDB Streams Kinesis. Aplikasi ini memperlihatkan contoh replikasi data, di mana aktivitas penulisan dari satu tabel diterapkan ke tabel kedua, dengan konten kedua tabel tetap sinkron. Untuk kode sumber, lihat Program lengkap: Adaptor DynamoDB Streams Kinesis.

Program ini melakukan hal berikut:

  1. Menciptakan dua tabel DynamoDB bernama KCL-Demo-src dan KCL-Demo-dst. Masing-masing tabel ini memiliki stream yang diaktifkan.

  2. Menghasilkan aktivitas pembaruan dalam tabel sumber dengan menambahkan, memperbarui, dan menghapus item. Hal ini menyebabkan data akan ditulis ke stream tabel.

  3. Membaca catatan dari stream, merekonstruksinya sebagai permintaan DynamoDB, dan menerapkan permintaan ke tabel tujuan.

  4. Memindai tabel sumber dan tujuan untuk memastikan bahwa isinya identik.

  5. Membersihkan dengan menghapus tabel.

Langkah-langkah ini dijelaskan di bagian berikut, dan aplikasi lengkap ditampilkan di akhir panduan.

Langkah 1: Buat tabel DynamoDB

Langkah pertama adalah membuat dua tabel DynamoDB—tabel sumber dan tabel tujuan. StreamViewType pada aliran tabel sumber adalah NEW_IMAGE. Ini berarti bahwa setiap kali item diubah dalam tabel ini, gambar "setelah" item tersebut ditulis ke aliran. Dengan cara ini, aliran melacak semua aktivitas penulisan di tabel.

Contoh berikut menunjukkan kode yang digunakan untuk membuat kedua tabel.

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

Langkah 2: Hasilkan aktivitas pembaruan di tabel sumber

Langkah selanjutnya adalah menghasilkan beberapa aktivitas menulis pada tabel sumber. Saat aktivitas ini berlangsung, aliran tabel sumber juga diperbarui hampir secara waktu nyata.

Aplikasi mendefinisikan kelas pembantu dengan metode yang memanggilPutItem,UpdateItem, dan DeleteItem API operasi untuk menulis data. Contoh kode berikut menunjukkan bagaimana metode ini digunakan.

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

Langkah 3: Proses alirannya

Sekarang program mulai memproses aliran. Adaptor Kinesis DynamoDB Streams bertindak sebagai lapisan transparan antara titik akhir DynamoDB Streams KCL dan DynamoDB, sehingga kode dapat digunakan sepenuhnya daripada harus melakukan panggilan DynamoDB Streams tingkat rendah. KCL Program ini melakukan tugas-tugas berikut:

  • Ini mendefinisikan kelas prosesor rekaman,StreamsRecordProcessor, dengan metode yang sesuai dengan definisi KCL antarmuka:initialize,processRecords, danshutdown. Metode processRecords berisi logika yang diperlukan untuk membaca dari stream tabel sumber dan menulis ke tabel tujuan.

  • Ini mendefinisikan sebuah pabrik kelas untuk kelas prosesor catatan (StreamsRecordProcessorFactory). Ini diperlukan untuk program Java yang menggunakan fileKCL.

  • Ini membuat instance baru KCLWorker, yang terkait dengan pabrik kelas.

  • Ini mematikan Worker saat pemrosesan catatan selesai.

Untuk mempelajari lebih lanjut tentang definisi KCL antarmuka, lihat Mengembangkan konsumen menggunakan pustaka klien Kinesis di Panduan Pengembang Amazon Kinesis Data Streams.

Contoh kode berikut menunjukkan loop utama dalam StreamsRecordProcessor. Pernyataan case menentukan apa tindakan apa yang harus dilakukan, berdasarkan OperationType yang muncul dalam catatan stream.

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

Langkah 4: Pastikan bahwa kedua tabel memiliki isi identik

Pada titik ini, isi sumber dan tujuan tabel tersinkronisasi. Aplikasi menerbitkan permintaan Scan terhadap kedua tabel untuk memverifikasi bahwa isinya, pada kenyataannya, identik.

DemoHelperKelas berisi ScanTable metode yang memanggil tingkat rendah ScanAPI. Contoh berikut menunjukkan cara penggunaannya.

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

Langkah 5: Bersihkan

Demo selesai, sehingga aplikasi menghapus tabel sumber dan tujuan. Lihat contoh kode berikut. Bahkan setelah tabel dihapus, alirannya tetap tersedia hingga 24 jam, setelah itu tabel akan dihapus secara otomatis.

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));