

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Panduan: Adaptor DynamoDB Streams Kinesis
<a name="Streams.KCLAdapter.Walkthrough"></a>

Bagian ini adalah panduan aplikasi Java yang menggunakan Perpustakaan Klien Amazon Kinesis dan Adaptor Amazon DynamoDB Streams Kinesis. Aplikasi ini memperlihatkan contoh replikasi data, di mana aktivitas penulisan dari satu tabel diterapkan ke tabel kedua, dengan konten kedua tabel tetap sinkron. Untuk kode sumber, lihat [Program lengkap: Adaptor DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Program ini melakukan hal berikut:

1. Menciptakan dua tabel DynamoDB bernama `KCL-Demo-src` dan `KCL-Demo-dst`. Masing-masing tabel ini memiliki stream yang diaktifkan.

1. Menghasilkan aktivitas pembaruan dalam tabel sumber dengan menambahkan, memperbarui, dan menghapus item. Hal ini menyebabkan data akan ditulis ke stream tabel.

1. Membaca catatan dari stream, merekonstruksinya sebagai permintaan DynamoDB, dan menerapkan permintaan ke tabel tujuan.

1. Memindai tabel sumber dan tujuan untuk memastikan bahwa isinya identik.

1. Membersihkan dengan menghapus tabel.

Langkah-langkah ini dijelaskan di bagian berikut, dan aplikasi lengkap ditampilkan di akhir panduan.

**Topics**
+ [Langkah 1: Buat tabel DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Langkah 2: Hasilkan aktivitas pembaruan di tabel sumber](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Langkah 3: Proses alirannya](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Langkah 4: Pastikan bahwa kedua tabel memiliki isi identik](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Langkah 5: Bersihkan](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Program lengkap: Adaptor DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Langkah 1: Buat tabel DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

Langkah pertama adalah membuat dua tabel DynamoDB—tabel sumber dan tabel tujuan. `StreamViewType` pada aliran tabel sumber adalah `NEW_IMAGE`. Ini berarti bahwa setiap kali item diubah dalam tabel ini, gambar "setelah" item tersebut ditulis ke aliran. Dengan cara ini, aliran melacak semua aktivitas penulisan di tabel.

Contoh berikut menunjukkan kode yang digunakan untuk membuat kedua tabel.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Langkah 2: Hasilkan aktivitas pembaruan di tabel sumber
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

Langkah selanjutnya adalah menghasilkan beberapa aktivitas menulis pada tabel sumber. Saat aktivitas ini berlangsung, aliran tabel sumber juga diperbarui hampir secara waktu nyata.

Aplikasi ini mendefinisikan kelas pembantu dengan metode yang memanggil operasi `PutItem`, `UpdateItem`, dan API `DeleteItem` untuk menulis data. Contoh kode berikut menunjukkan bagaimana metode ini digunakan.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Langkah 3: Proses alirannya
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Sekarang program mulai memproses aliran. Adaptor Kinesis DynamoDB Streams bertindak sebagai lapisan transparan antara KCL dan titik akhir DynamoDB Streams, sehingga kode dapat sepenuhnya menggunakan KCL daripada harus melakukan panggilan DynamoDB Streams tingkat rendah. Program ini melakukan tugas-tugas berikut:
+ Ini mendefinisikan kelas prosesor catatan, `StreamsRecordProcessor`, dengan metode yang sesuai dengan definisi antarmuka KCL: `initialize`, `processRecords`, dan `shutdown`. Metode `processRecords` berisi logika yang diperlukan untuk membaca dari stream tabel sumber dan menulis ke tabel tujuan.
+ Ini mendefinisikan sebuah pabrik kelas untuk kelas prosesor catatan (`StreamsRecordProcessorFactory`). Hal ini diperlukan untuk program Java yang menggunakan KCL.
+ Ini menginstanskan KCL `Worker` baru, yang terkait dengan pabrik kelas.
+ Ini mematikan `Worker` saat pemrosesan catatan selesai.

Secara opsional, aktifkan mode catch-up dalam konfigurasi Adaptor KCL Streams Anda untuk secara otomatis menskalakan laju panggilan GetRecords API sebesar 3x (default) saat kelambatan pemrosesan streaming melebihi satu menit (default), membantu konsumen streaming Anda menangani lonjakan throughput tinggi di tabel Anda.

Untuk mempelajari lebih lanjut tentang definisi antarmuka KCL, lihat [Mengembangkan konsumen menggunakan Kinesis client library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) di *Panduan Pengembang Amazon Kinesis Data Streams*. 

Contoh kode berikut menunjukkan loop utama dalam `StreamsRecordProcessor`. Pernyataan `case` menentukan apa tindakan apa yang harus dilakukan, berdasarkan `OperationType` yang muncul dalam catatan stream.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Langkah 4: Pastikan bahwa kedua tabel memiliki isi identik
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

Pada titik ini, isi sumber dan tujuan tabel tersinkronisasi. Aplikasi menerbitkan permintaan `Scan` terhadap kedua tabel untuk memverifikasi bahwa isinya, pada kenyataannya, identik.

Kelas `DemoHelper` berisi metode `ScanTable` yang memanggil API `Scan` tingkat rendah. Contoh berikut menunjukkan cara penggunaannya.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Langkah 5: Bersihkan
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

Demo selesai, sehingga aplikasi menghapus tabel sumber dan tujuan. Lihat contoh kode berikut. Bahkan setelah tabel dihapus, alirannya tetap tersedia hingga 24 jam, setelah itu tabel akan dihapus secara otomatis.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```