Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Panduan: Adaptor DynamoDB Streams Kinesis
Bagian ini adalah panduan aplikasi Java yang menggunakan Perpustakaan Klien Amazon Kinesis dan Adaptor Amazon DynamoDB Streams Kinesis. Aplikasi ini memperlihatkan contoh replikasi data, di mana aktivitas penulisan dari satu tabel diterapkan ke tabel kedua, dengan konten kedua tabel tetap sinkron. Untuk kode sumber, lihat Program lengkap: Adaptor DynamoDB Streams Kinesis.
Program ini melakukan hal berikut:
-
Menciptakan dua tabel DynamoDB bernama
KCL-Demo-src
danKCL-Demo-dst
. Masing-masing tabel ini memiliki stream yang diaktifkan. -
Menghasilkan aktivitas pembaruan dalam tabel sumber dengan menambahkan, memperbarui, dan menghapus item. Hal ini menyebabkan data akan ditulis ke stream tabel.
-
Membaca catatan dari stream, merekonstruksinya sebagai permintaan DynamoDB, dan menerapkan permintaan ke tabel tujuan.
-
Memindai tabel sumber dan tujuan untuk memastikan bahwa isinya identik.
-
Membersihkan dengan menghapus tabel.
Langkah-langkah ini dijelaskan di bagian berikut, dan aplikasi lengkap ditampilkan di akhir panduan.
Topik
Langkah 1: Buat tabel DynamoDB
Langkah pertama adalah membuat dua tabel DynamoDB—tabel sumber dan tabel tujuan. StreamViewType
pada aliran tabel sumber adalah NEW_IMAGE
. Ini berarti bahwa setiap kali item diubah dalam tabel ini, gambar "setelah" item tersebut ditulis ke aliran. Dengan cara ini, aliran melacak semua aktivitas penulisan di tabel.
Contoh berikut menunjukkan kode yang digunakan untuk membuat kedua tabel.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Langkah 2: Hasilkan aktivitas pembaruan di tabel sumber
Langkah selanjutnya adalah menghasilkan beberapa aktivitas menulis pada tabel sumber. Saat aktivitas ini berlangsung, aliran tabel sumber juga diperbarui hampir secara waktu nyata.
Aplikasi mendefinisikan kelas pembantu dengan metode yang memanggilPutItem
,UpdateItem
, dan DeleteItem
API operasi untuk menulis data. Contoh kode berikut menunjukkan bagaimana metode ini digunakan.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Langkah 3: Proses alirannya
Sekarang program mulai memproses aliran. Adaptor Kinesis DynamoDB Streams bertindak sebagai lapisan transparan antara titik akhir DynamoDB Streams KCL dan DynamoDB, sehingga kode dapat digunakan sepenuhnya daripada harus melakukan panggilan DynamoDB Streams tingkat rendah. KCL Program ini melakukan tugas-tugas berikut:
-
Ini mendefinisikan kelas prosesor rekaman,
StreamsRecordProcessor
, dengan metode yang sesuai dengan definisi KCL antarmuka:initialize
,processRecords
, danshutdown
. MetodeprocessRecords
berisi logika yang diperlukan untuk membaca dari stream tabel sumber dan menulis ke tabel tujuan. -
Ini mendefinisikan sebuah pabrik kelas untuk kelas prosesor catatan (
StreamsRecordProcessorFactory
). Ini diperlukan untuk program Java yang menggunakan fileKCL. -
Ini membuat instance baru KCL
Worker
, yang terkait dengan pabrik kelas. -
Ini mematikan
Worker
saat pemrosesan catatan selesai.
Untuk mempelajari lebih lanjut tentang definisi KCL antarmuka, lihat Mengembangkan konsumen menggunakan pustaka klien Kinesis di Panduan Pengembang Amazon Kinesis Data Streams.
Contoh kode berikut menunjukkan loop utama dalam StreamsRecordProcessor
. Pernyataan case
menentukan apa tindakan apa yang harus dilakukan, berdasarkan OperationType
yang muncul dalam catatan stream.
for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
Langkah 4: Pastikan bahwa kedua tabel memiliki isi identik
Pada titik ini, isi sumber dan tujuan tabel tersinkronisasi. Aplikasi menerbitkan permintaan Scan
terhadap kedua tabel untuk memverifikasi bahwa isinya, pada kenyataannya, identik.
DemoHelper
Kelas berisi ScanTable
metode yang memanggil tingkat rendah Scan
API. Contoh berikut menunjukkan cara penggunaannya.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }
Langkah 5: Bersihkan
Demo selesai, sehingga aplikasi menghapus tabel sumber dan tujuan. Lihat contoh kode berikut. Bahkan setelah tabel dihapus, alirannya tetap tersedia hingga 24 jam, setelah itu tabel akan dihapus secara otomatis.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));