

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Tangkapan data perubahan DynamoDB Streams
<a name="Streams"></a>

 DynamoDB Streams menangkap urutan waktu modifikasi tingkat item di tabel DynamoDB mana pun dan menyimpan informasi ini dalam log hingga 24 jam. Aplikasi dapat mengakses log ini dan melihat item data seperti yang muncul sebelum dan setelah diubah, hampir secara waktu nyata.

 Enkripsi saat istirahat mengenkripsi data dalam DynamoDB streams. Untuk informasi selengkapnya, lihat [Enkripsi DynamoDB saat diam](EncryptionAtRest.md).

*Streami DynamoDB* adalah aliran berurutan informasi tentang perubahan item dalam tabel DynamoDB. Saat Anda mengaktifkan aliran pada tabel, DynamoDB menangkap informasi tentang setiap modifikasi pada item data dalam tabel.

Setiap kali aplikasi membuat, memperbarui, atau menghapus item dalam tabel, DynamoDB Streams menulis rekaman aliran dengan atribut kunci primer dari item yang diubah. *Catatan stream* berisi informasi tentang modifikasi data pada satu item dalam tabel DynamoDB. Anda dapat mengonfigurasi aliran sehingga rekaman aliran menangkap informasi tambahan, seperti gambar "sebelum" dan "sesudah" dari item yang diubah.

DynamoDB Streams membantu memastikan hal berikut:
+ Setiap catatan stream muncul tepat sekali dalam stream.
+ Untuk setiap item yang diubah dalam tabel DynamoDB, catatan stream muncul dalam urutan yang sama seperti modifikasi aktual untuk item.

DynamoDB Streams menulis catatan aliran hampir secara real-time sehingga Anda dapat membangun aplikasi yang menggunakan aliran ini dan mengambil tindakan berdasarkan kontennya.

**Topics**
+ [Titik akhir untuk DynamoDB Streams](#Streams.Endpoints)
+ [Mengaktifkan aliran](#Streams.Enabling)
+ [Membaca dan memproses aliran](#Streams.Processing)
+ [DynamoDB Streams dan Waktu untuk Tayang](time-to-live-ttl-streams.md)
+ [Menggunakan adaptor DynamoDB Streams Kinesis untuk memproses catatan aliran](Streams.KCLAdapter.md)
+ [API tingkat rendah DynamoDB Streams: Contoh Java](Streams.LowLevel.Walkthrough.md)
+ [DynamoDB Streams dan pemicu AWS Lambda](Streams.Lambda.md)
+ [DynamoDB Streams dan Apache Flink](StreamsApacheFlink.xml.md)

## Titik akhir untuk DynamoDB Streams
<a name="Streams.Endpoints"></a>

AWS mempertahankan endpoint terpisah untuk DynamoDB dan DynamoDB Streams. Untuk bekerja dengan tabel dan indeks basis data, aplikasi Anda harus mengakses titik akhir DynamoDB. Untuk membaca dan memproses catatan DynamoDB Streams, aplikasi Anda harus mengakses titik akhir DynamoDB Streams di Wilayah yang sama.

DynamoDB Streams menawarkan dua set endpoint. File tersebut adalah:
+ **IPv4-only endpoint: Titik** akhir dengan konvensi `streams.dynamodb.<region>.amazonaws.com` penamaan.
+ **Dual-stack endpoint**: Endpoint baru yang kompatibel dengan keduanya IPv4 dan IPv6 dan mengikuti konvensi penamaan. `streams-dynamodb.<region>.api.aws`

**catatan**  
Untuk daftar lengkap Wilayah dan titik akhir DynamoDB dan DynamoDB Streams, lihat [Wilayah dan titik akhir](https://docs.aws.amazon.com/general/latest/gr/rande.html) di *Referensi Umum AWS*.

 AWS SDKs Menyediakan klien terpisah untuk DynamoDB dan DynamoDB Streams. Bergantung pada kebutuhan Anda, aplikasi Anda dapat mengakses titik akhir DynamoDB, titik akhir DynamoDB Streams, atau keduanya secara bersamaan. Untuk terhubung ke kedua titik akhir, aplikasi Anda harus membuat instance dua klien—satu untuk DynamoDB dan satu lagi untuk DynamoDB Streams.

## Mengaktifkan aliran
<a name="Streams.Enabling"></a>

Anda dapat mengaktifkan aliran pada tabel baru saat Anda membuatnya menggunakan AWS CLI atau salah satu tabel AWS SDKs. Anda juga dapat mengaktifkan atau menonaktifkan aliran pada tabel yang sudah ada, atau mengubah pengaturan aliran. DynamoDB Streams beroperasi secara asinkron, sehingga tidak ada dampak kinerja pada tabel jika Anda mengaktifkan aliran.

Cara termudah untuk mengelola DynamoDB Streams adalah dengan menggunakan Konsol Manajemen AWS.

1. Masuk ke Konsol Manajemen AWS dan buka konsol DynamoDB di. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Pada dasbor konsol DynamoDB, pilih **Tabel** dan pilih tabel yang ada.

1. Pilih tab **Ekspor dan aliran**.

1. **Di bagian detail **aliran DynamoDB**, pilih Aktifkan.**

1. Pada halaman Aliran **DynamoDB**, pilih informasi yang akan ditulis ke aliran setiap kali data dalam tabel diubah:
   + **Atribut kunci saja** — Hanya atribut kunci dari item yang dimodifikasi.
   + **Gambar baru** — Seluruh item, saat muncul setelah diubah.
   + **Gambar lama** — Seluruh item, saat muncul sebelum diubah.
   + **Gambar baru dan lama** — Baik gambar baru dan lama dari item.

   Ketika pengaturan seperti yang Anda inginkan, pilih **Aktifkan aliran**.

1. (Opsional) Untuk menonaktifkan aliran yang ada, pilih **Matikan di bawah detail** aliran **DynamoDB**.

Anda juga dapat menggunakan operasi `CreateTable` atau `UpdateTable` API untuk mengaktifkan atau memodifikasi aliran. Parameter `StreamSpecification` menentukan bagaimana aliran dikonfigurasi:
+ `StreamEnabled` — Menentukan apakah stream diaktifkan (`true`) atau dinonaktifkan (`false`) untuk tabel.
+ `StreamViewType` — Menentukan informasi yang akan ditulis ke stream setiap kali data dalam tabel dimodifikasi:
  + `KEYS_ONLY` — Hanya atribut kunci dari item yang dimodifikasi.
  + `NEW_IMAGE` — Keseluruhan item, seperti yang muncul setelah diubah.
  + `OLD_IMAGE` — Keseluruhan item, seperti yang terlihat sebelum diubah.
  + `NEW_AND_OLD_IMAGES` — Baik gambar baru dan lama dari item.

Anda dapat mengaktifkan atau menonaktifkan stream kapan saja. Namun, Anda menerima `ValidationException` jika Anda mencoba untuk mengaktifkan stream pada tabel yang sudah memiliki stream. Anda juga menerima `ValidationException` jika Anda mencoba menonaktifkan aliran pada tabel yang tidak memiliki aliran.

Ketika Anda mengatur `StreamEnabled` ke `true`, DynamoDB menciptakan stream baru dengan deskriptor stream unik yang ditugaskan untuk itu. Jika Anda menonaktifkan dan kemudian mengaktifkan kembali stream pada tabel, stream baru dibuat dengan deskriptor stream yang berbeda.

Setiap stream diidentifikasi secara unik oleh Amazon Resource Name (ARN). Berikut ini adalah contoh ARN untuk stream pada tabel DynamoDB bernama `TestTable`.

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

Untuk menentukan deskriptor stream terbaru untuk tabel, terbitkan permintaan `DescribeTable` DynamoDB dan cari elemen `LatestStreamArn` dalam respons.

**catatan**  
Hal ini tidak mungkin untuk mengedit `StreamViewType` setelah stream telah diatur. Jika Anda perlu melakukan perubahan pada stream setelah pengaturan, Anda harus menonaktifkan stream saat ini dan membuat yang baru.

## Membaca dan memproses aliran
<a name="Streams.Processing"></a>

Untuk membaca dan memproses aliran, aplikasi Anda harus terhubung ke titik akhir DynamoDB Streams dan menerbitkan permintaan API.

Stream terdiri dari *catatan aliran*. Setiap catatan stream mewakili modifikasi data tunggal dalam tabel DynamoDB di mana aliran berada. Setiap catatan stream ditugaskan nomor urut, mencerminkan urutan catatan diterbitkan ke aliran.

Rekaman streaming disusun ke dalam grup, atau *serpihan*. Setiap serpihan bertindak sebagai kontainer untuk beberapa catatan aliran, dan berisi informasi yang diperlukan untuk mengakses dan iterasi melalui catatan-catatan ini. Catatan stream dalam serpihan dihapus secara otomatis setelah 24 jam.

Serpihan bersifat fana: Mereka dibuat dan dihapus secara otomatis, sesuai kebutuhan. Setiap serpihan juga dapat dibagi menjadi beberapa serpihan baru; ini juga terjadi secara otomatis. (Pecahan induk juga mungkin hanya memiliki satu pecahan turunan.) Sebuah pecahan mungkin terpecah sebagai respons terhadap aktivitas tulis tingkat tinggi pada tabel induknya, sehingga aplikasi dapat memproses rekaman dari beberapa pecahan secara paralel.

Jika Anda menonaktifkan aliran, semua pecahan yang terbuka akan ditutup. Data dalam aliran akan terus dapat dibaca selama 24 jam.

Karena serpihan memiliki garis keturunan (induk dan turunan), aplikasi harus selalu memproses serpihan induk sebelum memproses serpihan anak. Hal ini membantu memastikan bahwa rekaman aliran juga diproses dalam urutan yang benar. (Jika Anda menggunakan DynamoDB Streams Kinesis Adapter, hal ini akan ditangani untuk Anda. Aplikasi Anda memproses pecahan dan catatan aliran dalam urutan yang benar. Secara otomatis menangani pecahan baru atau yang sudah habis masa berlakunya, selain pecahan yang terpecah saat aplikasi berjalan. Misalnya informasi lebih lanjut, lihat [Menggunakan adaptor DynamoDB Streams Kinesis untuk memproses catatan aliran](Streams.KCLAdapter.md).)

Diagram berikut menunjukkan hubungan antara aliran, serpihan dalam aliran, dan rekaman aliran dalam serpihan.

![\[Struktur DynamoDB Streams. Catatan aliran yang mewakili modifikasi data diatur ke dalam pecahan.\]](http://docs.aws.amazon.com/id_id/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**catatan**  
Jika Anda melakukan operasi `PutItem` atau `UpdateItem` yang tidak mengubah data dalam item, DynamoDB Streams *tidak* menulis catatan stream untuk operasi itu.

Untuk mengakses aliran dan memproses rekaman aliran di dalamnya, Anda harus melakukan hal berikut:
+ Tentukan ARN unik aliran yang ingin Anda akses.
+ Tentukan pecahan mana dalam aliran yang berisi rekaman aliran yang Anda minati.
+ Akses pecahan dan ambil rekaman aliran yang Anda inginkan.

**catatan**  
Maksimum dua proses harus membaca dari pecahan aliran yang sama pada waktu yang sama. Memiliki lebih dari dua pembaca per shard dapat mengakibatkan throttling.

DynamoDB Streams API menyediakan tindakan berikut untuk digunakan oleh program aplikasi:
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)` — Mengembalikan daftar deskriptor stream untuk akun dan titik akhir saat ini. Anda secara opsional dapat meminta hanya deskriptor stream untuk nama tabel tertentu.
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)`— Mengembalikan informasi tentang aliran, termasuk status aliran saat ini, Nama Sumber Daya Amazon (ARN), komposisi pecahannya, dan tabel DynamoDB yang sesuai. Anda dapat menggunakan `ShardFilter` bidang ini secara opsional untuk mengambil pecahan anak yang ada yang terkait dengan pecahan induk.
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)` — Mengembalikan *iterator serpihan*, yang menggambarkan lokasi di dalam serpihan. Anda dapat meminta iterator yang menyediakan akses ke titik terlama, titik terbaru, atau titik tertentu dalam serpihan.
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)` - Mengembalikan catatan aliran dari dalam serpihan yang diberikan. Anda harus memberikan iterator serpihan yang dikembalikan dari permintaan `GetShardIterator`.

Untuk deskripsi lengkap operasi API ini, termasuk contoh permintaan dan tanggapan, lihat [Referensi API Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html).

### Penemuan pecahan
<a name="Streams.ShardDiscovery"></a>



Temukan pecahan baru di aliran DynamoDB Anda dengan dua metode canggih. Sebagai pengguna Amazon DynamoDB Streams, Anda memiliki dua cara efektif untuk melacak dan mengidentifikasi pecahan baru:

**Polling seluruh topologi aliran**  
Gunakan `DescribeStream` API untuk melakukan polling streaming secara teratur. Ini mengembalikan semua pecahan dalam aliran, termasuk pecahan baru yang telah dibuat. Dengan membandingkan hasil dari waktu ke waktu, Anda dapat mendeteksi pecahan yang baru ditambahkan.

**Menemukan pecahan anak**  
Gunakan `DescribeStream` API dengan `ShardFilter` parameter untuk menemukan subset pecahan. Dengan menentukan pecahan induk dalam permintaan, DynamoDB Streams akan mengembalikan pecahan turunan langsungnya. Pendekatan ini berguna ketika Anda hanya perlu melacak garis keturunan pecahan tanpa memindai seluruh aliran.   
Aplikasi yang mengkonsumsi data dari DynamoDB Streams dapat secara efisien bertransisi dari membaca pecahan tertutup ke pecahan turunannya menggunakan parameter `ShardFilter` ini, menghindari panggilan berulang ke API untuk mengambil dan melintasi `DescribeStream` peta pecahan untuk semua pecahan tertutup dan terbuka. Ini membantu menemukan pecahan anak dengan cepat setelah pecahan induk ditutup, membuat aplikasi pemrosesan aliran Anda lebih responsif dan hemat biaya.

Kedua metode memberdayakan Anda untuk tetap berada di atas struktur DynamoDB Streams yang berkembang, memastikan Anda tidak pernah melewatkan pembaruan data penting atau modifikasi pecahan.

### Batas retensi data untuk DynamoDB Streams
<a name="Streams.DataRetention"></a>

Semua data dalam DynamoDB Streams tunduk pada masa hidup 24 jam. Anda dapat mengambil dan menganalisis 24 jam terakhir aktivitas untuk setiap tabel yang ditentukan. Namun, data yang lebih lama dari 24 jam rentan terhadap pemangkasan (penghapusan) setiap saat.

Jika Anda menonaktifkan stream pada tabel, data di stream terus dapat dibaca selama 24 jam. Setelah waktu ini, data kedaluwarsa dan catatan stream secara otomatis dihapus. Tidak ada mekanisme untuk menghapus stream yang ada secara manual. Anda harus menunggu hingga batas retensi berakhir (24 jam), dan semua catatan stream akan dihapus.

# DynamoDB Streams dan Waktu untuk Tayang
<a name="time-to-live-ttl-streams"></a>

Anda dapat mencadangkan, atau memproses, item yang dihapus oleh [Waktu untuk Tayang](TTL.md) (TTL) dengan mengaktifkan Amazon DynamoDB Stream pada tabel dan memproses catatan stream dari item yang kedaluwarsa. Untuk informasi selengkapnya, lihat [Membaca dan memproses aliran](Streams.md#Streams.Processing).

Catatan stream berisi bidang identitas pengguna `Records[<index>].userIdentity`.

Item yang dihapus oleh proses Waktu untuk Tayang setelah kedaluwarsa memiliki bidang berikut:
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**catatan**  
Saat Anda menggunakan TTL dalam tabel global, wilayah tempat TTL dilakukan akan memiliki `userIdentity` bidang yang ditetapkan. Bidang ini tidak akan disetel di wilayah lain saat penghapusan direplikasi.

JSON berikut menunjukkan bagian yang relevan dari catatan stream tunggal.

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## Menggunakan DynamoDB Streams dan Lambda untuk mengarsipkan item TTL yang dihapus
<a name="streams-archive-ttl-deleted-items"></a>

Menggabungkan [Waktu untuk Tayang (TTL) DynamoDB, DynamoDB Streams,](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html) dan [AWS Lambda](https://aws.amazon.com/lambda/) dapat membantu menyederhanakan pengarsipan [data, mengurangi biaya penyimpanan DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), dan mengurangi kompleksitas kode. Menggunakan Lambda sebagai konsumen aliran memberikan banyak keuntungan, terutama pengurangan biaya dibandingkan dengan konsumen lain seperti Kinesis Client Library (KCL). Anda tidak dikenakan biaya untuk panggilan API `GetRecords` di aliran DynamoDB saat menggunakan Lambda untuk mengkonsumsi peristiwa, dan Lambda dapat menyediakan pemfilteran peristiwa dengan mengidentifikasi pola JSON dalam peristiwa streaming. Dengan pemfilteran konten pola peristiwa, Anda dapat menentukan hingga lima filter berbeda untuk mengontrol peristiwa mana yang dikirim ke Lambda untuk diproses. Hal ini membantu mengurangi pemanggilan fungsi Lambda Anda, menyederhanakan kode, dan mengurangi biaya keseluruhan.

Meskipun DynamoDB Streams berisi semua modifikasi data, seperti tindakan `Create`, `Modify`, dan `Remove`, hal ini dapat mengakibatkan pemanggilan yang tidak diinginkan pada fungsi Lambda arsip Anda. Misalnya, Anda memiliki tabel dengan 2 juta modifikasi data per jam yang mengalir ke aliran, namun kurang dari 5 persen di antaranya adalah penghapusan item yang akan kedaluwarsa melalui proses TTL dan perlu diarsipkan. Dengan [filter sumber acara Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), fungsi Lambda hanya akan dipanggil 100.000 kali per jam. Hasil dari pemfilteran peristiwa adalah Anda hanya dikenakan biaya untuk pemanggilan yang diperlukan, bukan 2 juta pemanggilan yang akan Anda dapatkan tanpa pemfilteran peristiwa.

Pemfilteran peristiwa diterapkan pada [pemetaan sumber peristiwa Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html), yang merupakan sumber daya yang membaca dari peristiwa yang dipilih—aliran DynamoDB—dan memanggil fungsi Lambda. Dalam diagram berikut, Anda dapat melihat bagaimana item Time to Live yang dihapus digunakan oleh fungsi Lambda menggunakan aliran dan filter peristiwa.

![\[Item yang dihapus melalui proses TTL memulai fungsi Lambda yang menggunakan aliran dan filter acara.\]](http://docs.aws.amazon.com/id_id/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### Pola filter peristiwa DynamoDB Waktu untuk Tayang
<a name="ttl-event-filter-pattern"></a>

Menambahkan JSON berikut ke [kriteria filter](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html) pemetaan sumber peristiwa Anda memungkinkan invokasi fungsi Lambda Anda hanya untuk item yang dihapus TTL:

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### Membuat pemetaan sumber AWS Lambda acara
<a name="create-event-source-mapping"></a>

Gunakan cuplikan kode berikut untuk membuat pemetaan sumber peristiwa terfilter yang dapat Anda sambungkan ke aliran DynamoDB tabel. Setiap blok kode menyertakan pola filter peristiwa.

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# Menggunakan adaptor DynamoDB Streams Kinesis untuk memproses catatan aliran
<a name="Streams.KCLAdapter"></a>

Menggunakan Adaptor Amazon Kinesis adalah cara yang disarankan untuk menggunakan aliran dari Amazon DynamoDB. DynamoDB Streams API sengaja mirip dengan Kinesis Data Streams. Di kedua layanan, aliran data terdiri dari pecahan, yang merupakan wadah untuk rekaman aliran. Kedua layanan APIs berisi`ListStreams`,, `DescribeStream``GetShards`, dan `GetShardIterator` operasi. (Meskipun tindakan DynamoDB Streams ini serupa dengan tindakan serupa di Kinesis Data Streams, tindakan tersebut tidak 100 persen identik.)

Sebagai pengguna DynamoDB Streams, Anda dapat menggunakan pola desain yang ditemukan dalam KCL untuk memproses serpihan dan rekaman aliran DynamoDB Streams. Untuk melakukan ini, Anda menggunakan Adaptor DynamoDB Streams Kinesis. Adaptor Kinesis mengimplementasikan antarmuka Kinesis Data Streams sehingga KCL dapat digunakan untuk menggunakan dan memproses catatan dari DynamoDB Streams. [Untuk petunjuk tentang cara mengatur dan menginstal Adaptor Kinesis DynamoDB Streams, lihat repositori. GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)

Anda dapat menulis aplikasi untuk Kinesis Data Streams menggunakan Kinesis Client Library (KCL). KCL menyederhanakan pengodean dengan menyediakan abstraksi yang berguna di atas Kinesis Data Streams API tingkat rendah. Untuk informasi selengkapnya tentang KCL, lihat [Mengembangkan konsumen menggunakan Kinesis client library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) dalam *Panduan Developer Amazon Kinesis Data Streams*.

DynamoDB merekomendasikan penggunaan KCL versi 3.x dengan SDK AWS for Java v2.x. [Adaptor Kinesis DynamoDB Streams versi AWS 1.x saat ini dengan AWS SDK untuk Java SDK untuk v1.x akan terus didukung sepenuhnya sepanjang siklus hidupnya sebagaimana dimaksud selama periode transisi sesuai dengan kebijakan pemeliharaan dan Tools.AWS SDKs ](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)

**catatan**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami sangat menyarankan Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman Perpustakaan [Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat [Menggunakan Perpustakaan Klien Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat Migrasi dari KCL 1.x ke KCL 3.x.

Diagram berikut menunjukkan bagaimana perpustakaan ini berinteraksi satu sama lain.

![\[Interaksi antara DynamoDB Streams, Kinesis Data Streams, dan KCL untuk memproses rekaman DynamoDB Streams.\]](http://docs.aws.amazon.com/id_id/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Dengan Adaptor Kinesis DynamoDB Streams, Anda dapat mulai mengembangkan antarmuka KCL, dengan panggilan API diarahkan secara mulus ke titik akhir DynamoDB Streams.

Saat aplikasi Anda dimulai, aplikasi akan memanggil KCL untuk membuat instance pekerja. Anda harus memberi pekerja informasi konfigurasi untuk aplikasi, seperti deskriptor aliran dan AWS kredensil, dan nama kelas prosesor rekaman yang Anda berikan. Saat menjalankan kode di pemroses rekaman, pekerja melakukan tugas-tugas berikut:
+ Menghubungkan ke aliran
+ Menghitung pecahan dalam aliran
+ Memeriksa dan menghitung pecahan anak dari pecahan induk tertutup di dalam aliran
+ Mengkoordinasikan asosiasi serpihan dengan pekerja lain (jika ada)
+ Membuat instance pemroses rekaman untuk setiap pecahan yang dikelolanya
+ Menarik catatan dari aliran
+ Menskalakan tingkat panggilan GetRecords API selama throughput tinggi (jika mode catch-up dikonfigurasi)
+ Mendorong rekaman ke pemroses rekaman yang sesuai
+ Catatan yang diproses di pos pemeriksaan
+ Menyeimbangkan asosiasi pekerja pecahan ketika jumlah instans pekerja berubah
+ Menyeimbangkan asosiasi pekerja pecahan saat pecahan dipisahkan

Adaptor KCL mendukung mode catch-up, fitur penyesuaian laju panggilan otomatis untuk menangani peningkatan throughput sementara. Ketika kelambatan pemrosesan aliran melebihi ambang batas yang dapat dikonfigurasi (default satu menit), mode catch-up menskalakan frekuensi panggilan GetRecords API dengan nilai yang dapat dikonfigurasi (default 3x) untuk mengambil catatan lebih cepat, lalu kembali normal setelah jeda turun. Ini berharga selama periode throughput tinggi di mana aktivitas penulisan DynamoDB dapat membanjiri konsumen menggunakan tingkat polling default. Mode catch-up dapat diaktifkan melalui parameter `catchupEnabled` konfigurasi (default false).

**catatan**  
Untuk deskripsi konsep KCL yang tercantum di sini, lihat [Mengembangkan konsumen menggunakan Kinesis client library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) di *Panduan Pengembang Amazon Kinesis Data Streams*.  
Untuk informasi lebih lanjut tentang menggunakan stream dengan lihat AWS Lambda [DynamoDB Streams dan pemicu AWS Lambda](Streams.Lambda.md)

# Migrasi dari KCL 1.x ke KCL 3.x
<a name="streams-migrating-kcl"></a>

## Ikhtisar
<a name="migrating-kcl-overview"></a>

Panduan ini memberikan petunjuk untuk memigrasikan aplikasi konsumen Anda dari KCL 1.x ke KCL 3.x. Karena perbedaan arsitektur antara KCL 1.x dan KCL 3.x, migrasi memerlukan pembaruan beberapa komponen untuk memastikan kompatibilitas.

KCL 1.x menggunakan kelas dan antarmuka yang berbeda dibandingkan dengan KCL 3.x. Anda harus memigrasikan prosesor rekaman, pabrik prosesor rekaman, dan kelas pekerja ke format yang kompatibel dengan KCL 3.x terlebih dahulu, dan ikuti langkah-langkah migrasi untuk migrasi KCL 1.x ke KCL 3.x.

## Langkah migrasi
<a name="migration-steps"></a>

**Topics**
+ [Langkah 1: Migrasikan prosesor rekaman](#step1-record-processor)
+ [Langkah 2: Migrasikan pabrik prosesor rekaman](#step2-record-processor-factory)
+ [Langkah 3: Migrasikan pekerja](#step3-worker-migration)
+ [Langkah 4: Ikhtisar dan rekomendasi konfigurasi KCL 3.x](#step4-configuration-migration)
+ [Langkah 5: Migrasi dari KCL 2.x ke KCL 3.x](#step5-kcl2-to-kcl3)

### Langkah 1: Migrasikan prosesor rekaman
<a name="step1-record-processor"></a>

Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk adaptor KCL 1.x DynamoDB Streams Kinesis:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Untuk memigrasikan kelas RecordProcessor**

1. Ubah antarmuka dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` dan `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` menjadi `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` sebagai berikut:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Perbarui pernyataan impor untuk `initialize` dan `processRecords` metode:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Ganti `shutdownRequested` metode dengan metode baru berikut:`leaseLost`,`shardEnded`, dan`shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Berikut ini adalah versi terbaru dari kelas prosesor rekaman:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**catatan**  
DynamoDB Streams Kinesis Adapter sekarang menggunakan model Record. SDKv2 Dalam SDKv2, `AttributeValue` objek kompleks (`BS`,`NS`, `M``L`,`SS`) tidak pernah mengembalikan null. Gunakan`hasBs()`,`hasNs()`,`hasM()`,`hasL()`, `hasSs()` metode untuk memverifikasi apakah nilai-nilai ini ada.

### Langkah 2: Migrasikan pabrik prosesor rekaman
<a name="step2-record-processor-factory"></a>

Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Untuk memigrasikan `RecordProcessorFactory`**
+ Ubah antarmuka yang diimplementasikan dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` ke`software.amazon.kinesis.processor.ShardRecordProcessorFactory`, sebagai berikut:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Berikut ini adalah contoh pabrik prosesor rekaman di 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Langkah 3: Migrasikan pekerja
<a name="step3-worker-migration"></a>

**Dalam versi 3.0 dari KCL, kelas baru, yang disebut **Scheduler**, menggantikan kelas Worker.** Berikut ini adalah contoh pekerja KCL 1.x:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Untuk memigrasikan pekerja**

1. Ubah `import` pernyataan untuk `Worker` kelas ke pernyataan impor untuk `Scheduler` dan `ConfigsBuilder` kelas.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Impor `StreamTracker` dan ubah impor `StreamsWorkerFactory` ke`StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Pilih posisi untuk memulai aplikasi. Bisa jadi `TRIM_HORIZON` atau`LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Buat sebuah `StreamTracker` instance.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Buat `AmazonDynamoDBStreamsAdapterClient` objek.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Buat `ConfigsBuilder` objek.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Buat `Scheduler` menggunakan `ConfigsBuilder` seperti yang ditunjukkan pada contoh berikut:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**penting**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`Pengaturan mempertahankan kompatibilitas antara DynamoDB Streams Kinesis Adapter untuk KCL v3 dan KCL v1, bukan antara KCL v2 dan v3.

### Langkah 4: Ikhtisar dan rekomendasi konfigurasi KCL 3.x
<a name="step4-configuration-migration"></a>

[Untuk penjelasan rinci tentang konfigurasi yang diperkenalkan setelah KCL 1.x yang relevan di KCL 3.x lihat konfigurasi KCL dan [konfigurasi klien migrasi KCL](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html).](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)

**penting**  
Alih-alih langsung membuat objek`checkpointConfig`,,,`coordinatorConfig`, `processorConfig` dan `leaseManagementConfig` `metricsConfig``retrievalConfig`, kami sarankan menggunakan `ConfigsBuilder` untuk mengatur konfigurasi di KCL 3.x dan versi yang lebih baru untuk menghindari masalah inisialisasi Scheduler. `ConfigsBuilder`menyediakan cara yang lebih fleksibel dan dapat dipelihara untuk mengkonfigurasi aplikasi KCL Anda.

#### Konfigurasi dengan nilai default pembaruan di KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Dalam KCL versi 1.x, nilai default untuk `billingMode` diatur ke. `PROVISIONED` Namun, dengan KCL versi 3.x, defaultnya `billingMode` adalah `PAY_PER_REQUEST` (mode on-demand). Kami menyarankan Anda menggunakan mode kapasitas sesuai permintaan untuk tabel sewa Anda untuk secara otomatis menyesuaikan kapasitas berdasarkan penggunaan Anda. Untuk panduan tentang penggunaan kapasitas yang disediakan untuk tabel sewa Anda, lihat [Praktik terbaik untuk tabel sewa dengan mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html) kapasitas yang disediakan.

`idleTimeBetweenReadsInMillis`  
Dalam KCL versi 1.x, nilai default untuk diatur ke `idleTimeBetweenReadsInMillis` adalah 1.000 (atau 1 detik). KCL versi 3.x menetapkan nilai default `dleTimeBetweenReadsInMillis` untuk i menjadi 1.500 (atau 1,5 detik), tetapi Amazon DynamoDB Streams Kinesis Adapter mengganti nilai default menjadi 1.000 (atau 1 detik).

#### Konfigurasi baru di KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Konfigurasi ini mendefinisikan interval waktu sebelum pecahan yang baru ditemukan mulai diproses, dan dihitung sebagai 1,5 ×. `leaseAssignmentIntervalMillis` Jika pengaturan ini tidak dikonfigurasi secara eksplisit, interval waktu default menjadi 1,5 ×. `failoverTimeMillis` Memproses pecahan baru melibatkan pemindaian tabel sewa dan menanyakan indeks sekunder global (GSI) pada tabel sewa. Menurunkan `leaseAssignmentIntervalMillis` peningkatan frekuensi operasi pemindaian dan kueri ini, menghasilkan biaya DynamoDB yang lebih tinggi. Kami merekomendasikan pengaturan nilai ini ke 2000 (atau 2 detik) untuk meminimalkan keterlambatan dalam memproses pecahan baru.

`shardConsumerDispatchPollIntervalMillis`  
Konfigurasi ini mendefinisikan interval antara jajak pendapat berturut-turut oleh konsumen shard untuk memicu transisi status. Di KCL versi 1.x, perilaku ini dikendalikan oleh `idleTimeInMillis` parameter, yang tidak diekspos sebagai pengaturan yang dapat dikonfigurasi. Dengan KCL versi 3.x, kami sarankan untuk mengatur konfigurasi ini agar sesuai dengan nilai yang digunakan ` idleTimeInMillis` dalam pengaturan KCL versi 1.x Anda.

### Langkah 5: Migrasi dari KCL 2.x ke KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Untuk memastikan kelancaran transisi dan kompatibilitas dengan versi Kinesis Client Library (KCL) terbaru, ikuti langkah 5-8 dalam petunjuk panduan migrasi untuk [meningkatkan dari KCL 2.x ke KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

[Untuk masalah pemecahan masalah umum KCL 3.x, lihat Memecahkan masalah aplikasi konsumen KCL.](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)

# Putar kembali ke versi KCL sebelumnya
<a name="kcl-migration-rollback"></a>

Topik ini menjelaskan cara mengembalikan aplikasi konsumen Anda ke versi KCL sebelumnya. Proses roll-back terdiri dari dua langkah:

1. Jalankan [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Menerapkan ulang kode versi KCL sebelumnya.

## Langkah 1: Jalankan Alat Migrasi KCL
<a name="kcl-migration-rollback-step1"></a>

Ketika Anda perlu memutar kembali ke versi KCL sebelumnya, Anda harus menjalankan Alat Migrasi KCL. Alat ini melakukan dua tugas penting:
+ Ini menghapus tabel metadata yang disebut tabel metrik pekerja dan indeks sekunder global pada tabel sewa di DynamoDB. Artefak ini dibuat oleh KCL 3.x tetapi tidak diperlukan saat Anda memutar kembali ke versi sebelumnya.
+ Itu membuat semua pekerja berjalan dalam mode yang kompatibel dengan KCL 1.x dan mulai menggunakan algoritma load balancing yang digunakan dalam versi KCL sebelumnya. Jika Anda memiliki masalah dengan algoritme penyeimbangan beban baru di KCL 3.x, ini akan segera mengurangi masalah.

**penting**  
Tabel status koordinator di DynamoDB harus ada dan tidak boleh dihapus selama proses migrasi, rollback, dan rollforward.

**catatan**  
Sangat penting bahwa semua pekerja dalam aplikasi konsumen Anda menggunakan algoritma load balancing yang sama pada waktu tertentu. Alat Migrasi KCL memastikan bahwa semua pekerja di aplikasi konsumen KCL 3.x Anda beralih ke mode yang kompatibel dengan KCL 1.x sehingga semua pekerja menjalankan algoritma load balancing yang sama selama rollback aplikasi ke versi KCL sebelumnya.

Anda dapat mengunduh [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) di direktori skrip repositori [ GitHubKCL](https://github.com/awslabs/amazon-kinesis-client/tree/master). Jalankan skrip dari pekerja atau host dengan izin yang sesuai untuk menulis ke tabel status koordinator, tabel metrik pekerja, dan tabel sewa. Pastikan [izin IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) yang sesuai dikonfigurasi untuk aplikasi konsumen KCL. Jalankan skrip hanya sekali per aplikasi KCL menggunakan perintah yang ditentukan:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameter
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Ganti *region* dengan Anda Wilayah AWS.

`--application_name`  
Parameter ini diperlukan jika Anda menggunakan nama default untuk tabel metadata DynamoDB Anda (tabel sewa, tabel status koordinator, dan tabel metrik pekerja). Jika Anda telah menentukan nama kustom untuk tabel ini, Anda dapat menghilangkan parameter ini. Ganti *applicationName* dengan nama aplikasi KCL Anda yang sebenarnya. Alat ini menggunakan nama ini untuk mendapatkan nama tabel default jika nama kustom tidak disediakan.

`--lease_table_name`  
Parameter ini diperlukan ketika Anda telah menetapkan nama kustom untuk tabel sewa dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti *leaseTableName* dengan nama tabel kustom yang Anda tentukan untuk tabel sewa Anda.

`--coordinator_state_table_name`  
Parameter ini diperlukan ketika Anda telah menetapkan nama kustom untuk tabel status koordinator dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti *coordinatorStateTableName* dengan nama tabel kustom yang Anda tentukan untuk tabel status koordinator Anda.

`--worker_metrics_table_name`  
Parameter ini diperlukan ketika Anda telah menetapkan nama khusus untuk tabel metrik pekerja dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti *workerMetricsTableName* dengan nama tabel kustom yang Anda tentukan untuk tabel metrik pekerja Anda.

## Langkah 2: Menerapkan ulang kode dengan versi KCL sebelumnya
<a name="kcl-migration-rollback-step2"></a>

**penting**  
Setiap penyebutan versi 2.x dalam output yang dihasilkan oleh Alat Migrasi KCL harus ditafsirkan sebagai mengacu pada KCL versi 1.x. Menjalankan skrip tidak melakukan rollback lengkap, itu hanya mengalihkan algoritma load balancing ke yang digunakan dalam KCL versi 1.x.

Setelah menjalankan KCL Migration Tool untuk rollback, Anda akan melihat salah satu pesan berikut:

Pesan 1  
“Rollback selesai. Aplikasi Anda menjalankan fungsionalitas yang kompatibel dengan 2x. Harap kembalikan ke binari aplikasi Anda sebelumnya dengan menerapkan kode dengan versi KCL Anda sebelumnya.”  
**Tindakan yang diperlukan:** Ini berarti pekerja Anda berjalan dalam mode kompatibel KCL 1.x. Menerapkan ulang kode dengan versi KCL sebelumnya ke pekerja Anda.

Pesan 2  
“Rollback selesai. Aplikasi KCL Anda menjalankan fungsionalitas 3x dan akan mengembalikan ke fungsionalitas yang kompatibel dengan 2x. Jika Anda tidak melihat mitigasi setelah periode waktu yang singkat, harap kembalikan ke binari aplikasi Anda sebelumnya dengan menerapkan kode dengan versi KCL Anda sebelumnya.  
**Tindakan yang diperlukan:** Ini berarti pekerja Anda berjalan dalam mode KCL 3.x dan Alat Migrasi KCL mengalihkan semua pekerja ke mode yang kompatibel dengan KCL 1.x. Menerapkan ulang kode dengan versi KCL sebelumnya ke pekerja Anda.

Pesan 3  
“Aplikasi sudah digulung kembali. KCLv3 Sumber daya apa pun yang dapat dihapus dibersihkan untuk menghindari biaya hingga aplikasi dapat digulirkan dengan migrasi.  
**Tindakan yang diperlukan:** Ini berarti bahwa pekerja Anda sudah diputar kembali untuk berjalan dalam mode kompatibel KCL 1.x. Menerapkan ulang kode dengan versi KCL sebelumnya ke pekerja Anda.

# Gulung maju ke KCL 3.x setelah rollback
<a name="kcl-migration-rollforward"></a>

Topik ini menjelaskan cara meneruskan aplikasi konsumen Anda ke KCL 3.x setelah rollback. Ketika Anda perlu maju, Anda harus menyelesaikan proses dua langkah:

1. Jalankan [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Terapkan kode dengan KCL 3.x.

## Langkah 1: Jalankan Alat Migrasi KCL
<a name="kcl-migration-rollforward-step1"></a>

Jalankan KCL Migration Tool dengan perintah berikut untuk maju ke KCL 3.x:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameter
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Ganti *region* dengan Anda Wilayah AWS.

`--application_name`  
Parameter ini diperlukan jika Anda menggunakan nama default untuk tabel status koordinator Anda. Jika Anda telah menentukan nama kustom untuk tabel status koordinator, Anda dapat menghilangkan parameter ini. Ganti *applicationName* dengan nama aplikasi KCL Anda yang sebenarnya. Alat ini menggunakan nama ini untuk mendapatkan nama tabel default jika nama kustom tidak disediakan.

`--coordinator_state_table_name`  
Parameter ini diperlukan ketika Anda telah menetapkan nama kustom untuk tabel status koordinator dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti *coordinatorStateTableName* dengan nama tabel kustom yang Anda tentukan untuk tabel status koordinator Anda.

Setelah Anda menjalankan alat migrasi dalam mode roll-forward, KCL membuat sumber daya DynamoDB berikut yang diperlukan untuk KCL 3.x:
+ Indeks Sekunder Global pada tabel sewa
+ Tabel metrik pekerja

## Langkah 2: Menyebarkan kode dengan KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Setelah menjalankan Alat Migrasi KCL untuk maju, terapkan kode Anda dengan KCL 3.x ke pekerja Anda. Untuk menyelesaikan migrasi, lihat [Langkah 8: Selesaikan migrasi](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Panduan: Adaptor DynamoDB Streams Kinesis
<a name="Streams.KCLAdapter.Walkthrough"></a>

Bagian ini adalah panduan aplikasi Java yang menggunakan Perpustakaan Klien Amazon Kinesis dan Adaptor Amazon DynamoDB Streams Kinesis. Aplikasi ini memperlihatkan contoh replikasi data, di mana aktivitas penulisan dari satu tabel diterapkan ke tabel kedua, dengan konten kedua tabel tetap sinkron. Untuk kode sumber, lihat [Program lengkap: Adaptor DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Program ini melakukan hal berikut:

1. Menciptakan dua tabel DynamoDB bernama `KCL-Demo-src` dan `KCL-Demo-dst`. Masing-masing tabel ini memiliki stream yang diaktifkan.

1. Menghasilkan aktivitas pembaruan dalam tabel sumber dengan menambahkan, memperbarui, dan menghapus item. Hal ini menyebabkan data akan ditulis ke stream tabel.

1. Membaca catatan dari stream, merekonstruksinya sebagai permintaan DynamoDB, dan menerapkan permintaan ke tabel tujuan.

1. Memindai tabel sumber dan tujuan untuk memastikan bahwa isinya identik.

1. Membersihkan dengan menghapus tabel.

Langkah-langkah ini dijelaskan di bagian berikut, dan aplikasi lengkap ditampilkan di akhir panduan.

**Topics**
+ [Langkah 1: Buat tabel DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Langkah 2: Hasilkan aktivitas pembaruan di tabel sumber](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Langkah 3: Proses alirannya](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Langkah 4: Pastikan bahwa kedua tabel memiliki isi identik](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Langkah 5: Bersihkan](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Program lengkap: Adaptor DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Langkah 1: Buat tabel DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

Langkah pertama adalah membuat dua tabel DynamoDB—tabel sumber dan tabel tujuan. `StreamViewType` pada aliran tabel sumber adalah `NEW_IMAGE`. Ini berarti bahwa setiap kali item diubah dalam tabel ini, gambar "setelah" item tersebut ditulis ke aliran. Dengan cara ini, aliran melacak semua aktivitas penulisan di tabel.

Contoh berikut menunjukkan kode yang digunakan untuk membuat kedua tabel.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Langkah 2: Hasilkan aktivitas pembaruan di tabel sumber
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

Langkah selanjutnya adalah menghasilkan beberapa aktivitas menulis pada tabel sumber. Saat aktivitas ini berlangsung, aliran tabel sumber juga diperbarui hampir secara waktu nyata.

Aplikasi ini mendefinisikan kelas pembantu dengan metode yang memanggil operasi `PutItem`, `UpdateItem`, dan API `DeleteItem` untuk menulis data. Contoh kode berikut menunjukkan bagaimana metode ini digunakan.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Langkah 3: Proses alirannya
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Sekarang program mulai memproses aliran. Adaptor Kinesis DynamoDB Streams bertindak sebagai lapisan transparan antara KCL dan titik akhir DynamoDB Streams, sehingga kode dapat sepenuhnya menggunakan KCL daripada harus melakukan panggilan DynamoDB Streams tingkat rendah. Program ini melakukan tugas-tugas berikut:
+ Ini mendefinisikan kelas prosesor catatan, `StreamsRecordProcessor`, dengan metode yang sesuai dengan definisi antarmuka KCL: `initialize`, `processRecords`, dan `shutdown`. Metode `processRecords` berisi logika yang diperlukan untuk membaca dari stream tabel sumber dan menulis ke tabel tujuan.
+ Ini mendefinisikan sebuah pabrik kelas untuk kelas prosesor catatan (`StreamsRecordProcessorFactory`). Hal ini diperlukan untuk program Java yang menggunakan KCL.
+ Ini menginstanskan KCL `Worker` baru, yang terkait dengan pabrik kelas.
+ Ini mematikan `Worker` saat pemrosesan catatan selesai.

Secara opsional, aktifkan mode catch-up dalam konfigurasi Adaptor KCL Streams Anda untuk secara otomatis menskalakan laju panggilan GetRecords API sebesar 3x (default) saat kelambatan pemrosesan streaming melebihi satu menit (default), membantu konsumen streaming Anda menangani lonjakan throughput tinggi di tabel Anda.

Untuk mempelajari lebih lanjut tentang definisi antarmuka KCL, lihat [Mengembangkan konsumen menggunakan Kinesis client library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) di *Panduan Pengembang Amazon Kinesis Data Streams*. 

Contoh kode berikut menunjukkan loop utama dalam `StreamsRecordProcessor`. Pernyataan `case` menentukan apa tindakan apa yang harus dilakukan, berdasarkan `OperationType` yang muncul dalam catatan stream.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Langkah 4: Pastikan bahwa kedua tabel memiliki isi identik
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

Pada titik ini, isi sumber dan tujuan tabel tersinkronisasi. Aplikasi menerbitkan permintaan `Scan` terhadap kedua tabel untuk memverifikasi bahwa isinya, pada kenyataannya, identik.

Kelas `DemoHelper` berisi metode `ScanTable` yang memanggil API `Scan` tingkat rendah. Contoh berikut menunjukkan cara penggunaannya.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Langkah 5: Bersihkan
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

Demo selesai, sehingga aplikasi menghapus tabel sumber dan tujuan. Lihat contoh kode berikut. Bahkan setelah tabel dihapus, alirannya tetap tersedia hingga 24 jam, setelah itu tabel akan dihapus secara otomatis.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Program lengkap: Adaptor DynamoDB Streams Kinesis
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Berikut ini adalah program Java lengkap yang melakukan tugas yang dijelaskan dalam [Panduan: Adaptor DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.md). Saat Anda menjalankannya, Anda akan melihat output yang serupa dengan yang seperti berikut.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**penting**  
 Untuk menjalankan program ini, pastikan bahwa aplikasi klien memiliki akses ke DynamoDB dan CloudWatch Amazon menggunakan kebijakan. Untuk informasi selengkapnya, lihat [Kebijakan berbasis identitas untuk DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

Kode sumber terdiri dari empat `.java` file. Untuk membangun program ini, tambahkan dependensi berikut, yang mencakup Amazon Kinesis Client Library (KCL) 3.x dan SDK AWS for Java v2 sebagai dependensi transitif:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

File sumbernya adalah:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.jawa
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.jawa
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.jawa
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.jawa
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# API tingkat rendah DynamoDB Streams: Contoh Java
<a name="Streams.LowLevel.Walkthrough"></a>

**catatan**  
Kode di halaman ini tidak lengkap dan tidak menangani semua skenario untuk menggunakan Amazon DynamoDB Streams. Cara yang disarankan untuk menggunakan catatan aliran dari DynamoDB adalah melalui Adaptor Amazon Kinesis menggunakan Kinesis Client Library (KCL), seperti yang dijelaskan dalam [Menggunakan adaptor DynamoDB Streams Kinesis untuk memproses catatan aliran](Streams.KCLAdapter.md).

Bagian ini berisi program Java yang menunjukkan aksi DynamoDB Streams. Program ini melakukan hal berikut. Program ini melakukan hal berikut:

1. Membuat tabel DynamoDB dengan aliran diaktifkan.

1. Menjelaskan pengaturan aliran untuk tabel ini.

1. Memodifikasi data dalam tabel.

1. Menjelaskan pecahan di aliran.

1. Membaca catatan aliran dari serpihan.

1. Mengambil pecahan anak dan melanjutkan membaca catatan.

1. Membersihkan.

Saat Anda menjalankan program, Anda akan melihat output seperti berikut.

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example Contoh**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# DynamoDB Streams dan pemicu AWS Lambda
<a name="Streams.Lambda"></a>

Amazon DynamoDB terintegrasi AWS Lambda sehingga Anda dapat *membuat* pemicu —potongan kode yang secara otomatis merespons peristiwa di DynamoDB Streams. Dengan pemicu, Anda dapat membangun aplikasi yang bereaksi terhadap modifikasi data di tabel DynamoDB.

**Topics**
+ [Tutorial \$11: Menggunakan filter untuk memproses semua peristiwa dengan Amazon AWS Lambda DynamoDB dan menggunakan AWS CLI](Streams.Lambda.Tutorial.md)
+ [Tutorial \$12: Menggunakan filter untuk memproses beberapa peristiwa dengan DynamoDB dan Lambda](Streams.Lambda.Tutorial2.md)
+ [Praktik terbaik menggunakan DynamoDB Streams dengan Lambda](Streams.Lambda.BestPracticesWithDynamoDB.md)

Jika Anda mengaktifkan DynamoDB Streams pada tabel, Anda dapat mengaitkan aliran Amazon Resource Name (ARN) dengan fungsi yang Anda tulis. AWS Lambda Semua tindakan mutasi pada tabel DynamoDB tersebut kemudian dapat ditangkap sebagai item di aliran. Misalnya, Anda dapat menyetel pemicu sehingga ketika item dalam tabel diubah, rekaman baru segera muncul di aliran tabel tersebut. 

**catatan**  
Jika Anda berlangganan lebih dari dua fungsi Lambda ke satu aliran DynamoDB, pelambatan baca mungkin terjadi.

Layanan [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) ini melakukan polling aliran untuk catatan baru empat kali per detik. Ketika rekaman aliran baru tersedia, fungsi Lambda Anda dipanggil secara sinkron. Anda dapat berlangganan hingga dua fungsi Lambda ke aliran DynamoDB yang sama. Jika Anda berlangganan lebih dari dua fungsi Lambda ke aliran DynamoDB yang sama, pelambatan baca mungkin terjadi.

Fungsi Lambda dapat mengirimkan pemberitahuan, memulai alur kerja, atau melakukan banyak tindakan lain yang Anda tentukan. Anda dapat menulis fungsi Lambda dengan mudah menyalin setiap catatan aliran ke penyimpanan persisten, seperti Amazon S3 File Gateway (Amazon S3), dan membuat jejak audit permanen dari aktivitas penulisan di tabel Anda. Selain itu, misalkan Anda mempunyai aplikasi game seluler yang menulis ke tabel `GameScores`. Setiap kali atribut `TopScore` dari tabel `GameScores` diperbarui, catatan stream yang sesuai ditulis ke stream tabel ini. Peristiwa ini kemudian dapat memicu fungsi Lambda yang memposting pesan ucapan selamat di jaringan media sosial. Fungsi ini juga dapat ditulis untuk mengabaikan catatan stream yang tidak memperbarui `GameScores`, atau yang tidak memodifikasi atribut `TopScore`.

Jika fungsi Anda mengembalikan kesalahan, Lambda akan mencoba ulang batch tersebut hingga berhasil diproses atau datanya habis masa berlakunya. Anda juga dapat mengonfigurasi Lambda untuk mencoba ulang dengan batch yang lebih kecil, membatasi jumlah percobaan ulang, membuang catatan setelah terlalu lama, dan opsi lainnya.

Sebagai praktik terbaik kinerja, fungsi Lambda harus berumur pendek. Untuk menghindari penundaan pemrosesan yang tidak perlu, logika yang rumit juga tidak boleh dijalankan. Khususnya untuk aliran kecepatan tinggi, lebih baik memicu alur kerja fungsi langkah pasca-pemrosesan asinkron daripada Lambdas yang berjalan lama secara sinkron.

 Anda dapat menggunakan pemicu Lambda di berbagai AWS akun dengan mengonfigurasi kebijakan berbasis sumber daya pada aliran DynamoDB untuk memberikan akses baca lintas akun ke fungsi Lambda. Untuk mempelajari lebih lanjut tentang cara mengonfigurasi streaming agar memungkinkan akses lintas akun, lihat [Berbagi akses dengan fungsi AWS Lambda lintas akun di](rbac-cross-account-access.md#shared-access-cross-acount-lambda) Panduan Pengembang DynamoDB.

Untuk informasi selengkapnya AWS Lambda, lihat [Panduan AWS Lambda Pengembang](https://docs.aws.amazon.com/lambda/latest/dg/).

# Tutorial \$11: Menggunakan filter untuk memproses semua peristiwa dengan Amazon AWS Lambda DynamoDB dan menggunakan AWS CLI
<a name="Streams.Lambda.Tutorial"></a>

 

Dalam tutorial ini, Anda akan membuat AWS Lambda pemicu untuk memproses aliran dari tabel DynamoDB.

**Topics**
+ [Langkah 1: Buat tabel DynamoDB dengan aliran diaktifkan](#Streams.Lambda.Tutorial.CreateTable)
+ [Langkah 2: Buat peran eksekusi Lambda](#Streams.Lambda.Tutorial.CreateRole)
+ [Langkah 3: Buat Topik Amazon SNS](#Streams.Lambda.Tutorial.SNSTopic)
+ [Langkah 4: Buat dan uji fungsi Lambda](#Streams.Lambda.Tutorial.LambdaFunction)
+ [Langkah 5: Buat dan uji pemicu](#Streams.Lambda.Tutorial.CreateTrigger)

Skenario untuk tutorial ini adalah Woofer, sebuah jejaring sosial sederhana. Pengguna Woofer berkomunikasi menggunakan *bark* (pesan teks singkat) yang dikirim ke pengguna Woofer lainnya. Diagram berikut menunjukkan komponen dan alur kerja untuk aplikasi ini.

![\[Alur kerja aplikasi Woofer dari tabel DynamoDB, catatan aliran, fungsi Lambda, dan topik Amazon SNS.\]](http://docs.aws.amazon.com/id_id/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. Seorang pengguna menulis item ke tabel DynamoDB (`BarkTable`). Setiap item dalam tabel mewakili bark.

1. Sebuah catatan stream baru ditulis untuk mencerminkan bahwa item baru telah ditambahkan ke `BarkTable`.

1. Rekaman aliran baru memicu AWS Lambda fungsi (`publishNewBark`).

1. Jika catatan stream menunjukkan bahwa item baru ditambahkan ke `BarkTable`, fungsi Lambda membaca data dari catatan stream dan menerbitkan pesan ke topik di Amazon Simple Notification Service (Amazon SNS).

1. Pesan diterima oleh pelanggan untuk topik Amazon SNS. (Dalam tutorial ini, satu-satunya pelanggan adalah alamat email.)

**Sebelum Anda Memulai**  
Tutorial ini menggunakan AWS Command Line Interface AWS CLI. Jika Anda belum melakukannya, ikuti petunjuk di [Panduan Pengguna AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/) untuk menginstal dan mengkonfigurasi AWS CLI.

## Langkah 1: Buat tabel DynamoDB dengan aliran diaktifkan
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

Pada langkah ini, Anda membuat tabel DynamoDB (`BarkTable`) untuk menyimpan semua bark dari pengguna Woofer. Kunci primer terdiri dari `Username` (kunci partisi) dan `Timestamp` (kunci urutan). Kedua atribut ini berjenis string.

`BarkTable` memiliki aliran yang diaktifkan. Kemudian dalam tutorial ini, Anda membuat pemicu dengan mengaitkan AWS Lambda fungsi dengan aliran.

1. Masukkan perintah berikut untuk membuat tabel.

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. Dalam output, cari `LatestStreamArn`.

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   Buat catatan tentang `region` dan `accountID`, karena Anda membutuhkannya untuk langkah-langkah lain dalam tutorial ini.

## Langkah 2: Buat peran eksekusi Lambda
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

Pada langkah ini, Anda membuat peran AWS Identity and Access Management (IAM) (`WooferLambdaRole`) dan menetapkan izin untuk itu. Peran ini digunakan oleh fungsi Lambda yang Anda buat di [Langkah 4: Buat dan uji fungsi Lambda](#Streams.Lambda.Tutorial.LambdaFunction). 

Anda juga membuat kebijakan untuk peran. Kebijakan ini berisi semua izin yang dibutuhkan fungsi Lambda pada saat waktu aktif.

1. Buat file bernama `trust-relationship.json` dengan isi berikut ini.

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. Masukkan perintah berikut untuk membuat `WooferLambdaRole`.

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. Buat file bernama `role-policy.json` dengan isi berikut ini. (Ganti `region` dan `accountID` dengan AWS Wilayah dan ID akun Anda.)

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   Kebijakan ini memiliki empat pernyataan yang memungkinkan `WooferLambdaRole` untuk melakukan hal berikut:
   + Menjalankan fungsi Lambda (`publishNewBark`). Anda membuat fungsi nanti dalam tutorial ini.
   + Akses CloudWatch Log Amazon. Fungsi Lambda menulis diagnostik ke CloudWatch Log saat runtime.
   + Membaca data dari stream DynamoDB untuk `BarkTable`.
   + Memublikasikan pesan ke Amazon SNS.

1. Masukkan perintah berikut untuk melampirkan kebijakan ke `WooferLambdaRole`.

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## Langkah 3: Buat Topik Amazon SNS
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

Pada langkah ini, Anda membuat topik Amazon SNS (`wooferTopic`) dan mendaftarkan langganan alamat email untuk itu. Fungsi Lambda Anda menggunakan topik ini untuk memublikasikan bark baru dari pengguna Woofer.

1. Masukkan perintah berikut untuk membuat topik Amazon SNS.

   ```
   aws sns create-topic --name wooferTopic
   ```

1. Masukkan perintah berikut untuk mendaftarkan langganan alamat email ke `wooferTopic`. (Ganti `region` dan `accountID` dengan Wilayah AWS dan ID akun Anda, dan ganti `example@example.com` dengan alamat email yang valid.)

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS mengirimkan pesan konfirmasi ke alamat email Anda. Pilih tautan **Konfirmasi langganan** di pesan tersebut untuk menyelesaikan proses berlangganan.

## Langkah 4: Buat dan uji fungsi Lambda
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

Pada langkah ini, Anda membuat AWS Lambda fungsi (`publishNewBark`) untuk memproses catatan aliran dari`BarkTable`.

Fungsi `publishNewBark` hanya memproses hanya peristiwa stream yang sesuai dengan item baru di `BarkTable`. Fungsi membaca data dari peristiwa tersebut, dan kemudian memanggil Amazon SNS untuk memublikasikannya.

1. Buat file bernama `publishNewBark.js` dengan isi berikut ini. Ganti `region` dan `accountID` dengan AWS Wilayah dan ID akun Anda.

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. Buat file zip untuk menampung `publishNewBark.js`. Jika Anda memiliki utilitas baris perintah zip, Anda dapat memasukkan perintah berikut untuk melakukan hal ini.

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. Ketika Anda membuat fungsi Lambda, Anda menentukan Amazon Resource Name (ARN) untuk `WooferLambdaRole`, yang Anda buat di [Langkah 2: Buat peran eksekusi Lambda](#Streams.Lambda.Tutorial.CreateRole). Masukkan perintah berikut untuk mengambil ARN ini.

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   Dalam output, cari ARN untuk `WooferLambdaRole`.

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   Masukkan perintah berikut untuk membuat fungsi Lambda. Ganti *roleARN* dengan ARN untuk. `WooferLambdaRole`

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. Sekarang uji `publishNewBark` untuk memverifikasi bahwa ia bekerja. Untuk melakukannya, Anda memberikan input yang menyerupai catatan nyata dari DynamoDB Streams.

   Buat file bernama `payload.json` dengan isi berikut ini. Ganti `region` dan `accountID` dengan ID akun Wilayah AWS dan Anda.

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   Masukkan perintah berikut untuk menguji fungsi `publishNewBark`.

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   Jika tes berhasil, Anda akan melihat output sebagai berikut.

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   Selain itu, file `output.txt` akan berisi teks berikut.

   ```
   "Successfully processed 1 records."
   ```

   Anda juga akan menerima pesan email baru dalam beberapa menit.
**catatan**  
AWS Lambda menulis informasi diagnostik ke Amazon CloudWatch Logs. Jika Anda mengalami kesalahan dengan fungsi Lambda Anda, Anda dapat menggunakan diagnostik ini untuk tujuan pemecahan masalah:  
Buka CloudWatch konsol di [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
Pilih **Log** di panel navigasi.
Pilih grup log berikut ini: `/aws/lambda/publishNewBark`
Pilih stream log terbaru untuk melihat output (dan kesalahan) dari fungsi.

## Langkah 5: Buat dan uji pemicu
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

Dalam [Langkah 4: Buat dan uji fungsi Lambda](#Streams.Lambda.Tutorial.LambdaFunction), Anda menguji fungsi Lambda untuk memastikan bahwa itu berjalan dengan benar. Pada langkah ini, Anda membuat *pemicu* dengan mengaitkan fungsi Lambda (`publishNewBark`) dengan sumber peristiwa (aliran `BarkTable`).

1. Saat Anda membuat pemicu, Anda perlu menentukan ARN untuk stream `BarkTable`. Masukkan perintah berikut untuk mengambil ARN ini.

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   Dalam output, cari `LatestStreamArn`.

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. Masukkan perintah berikut untuk membuat pemicu. Ganti `streamARN` dengan ARN stream yang sebenarnya.

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. Uji pemicu. Masukkan perintah berikut untuk menambahkan item ke `BarkTable`.

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   Anda akan menerima pesan email baru dalam beberapa menit.

1. Buka konsol DynamoDB dan tambahkan beberapa item lagi ke `BarkTable`. Anda harus menentukan nilai untuk atribut `Username` dan `Timestamp`. (Anda juga harus menentukan nilai untuk `Message`, meskipun tidak diperlukan.) Anda akan menerima pesan email baru untuk setiap item yang ditambahkan ke `BarkTable`.

   Fungsi Lambda memproses hanya item baru yang Anda tambahkan ke `BarkTable`. Jika Anda memperbarui atau menghapus item dalam tabel, fungsi tidak melakukan apa pun.

**catatan**  
AWS Lambda menulis informasi diagnostik ke Amazon CloudWatch Logs. Jika Anda mengalami kesalahan dengan fungsi Lambda Anda, Anda dapat menggunakan diagnostik ini untuk tujuan pemecahan masalah.  
Buka CloudWatch konsol di [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
Pilih **Log** di panel navigasi.
Pilih grup log berikut ini: `/aws/lambda/publishNewBark`
Pilih stream log terbaru untuk melihat output (dan kesalahan) dari fungsi.

# Tutorial \$12: Menggunakan filter untuk memproses beberapa peristiwa dengan DynamoDB dan Lambda
<a name="Streams.Lambda.Tutorial2"></a>

Dalam tutorial ini, Anda akan membuat AWS Lambda pemicu untuk memproses hanya beberapa peristiwa dalam aliran dari tabel DynamoDB.

**Topics**
+ [Menyatukan semuanya - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [Menyatukan semuanya - CDK](#Streams.Lambda.Tutorial2.CDK)

Dengan [pemfilteran peristiwa Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), Anda dapat menggunakan ekspresi filter untuk mengontrol peristiwa mana yang dikirim Lambda ke fungsi Anda untuk diproses. Anda dapat mengonfigurasi hingga 5 filter berbeda per aliran DynamoDB. Jika Anda menggunakan jendela batching, Lambda menerapkan kriteria filter untuk setiap acara baru untuk melihat apakah itu harus disertakan dalam batch saat ini.

Filter diterapkan melalui struktur yang disebut `FilterCriteria`. 3 atribut utama `FilterCriteria` adalah`metadata properties`, `data properties`, dan `filter patterns`. 

Berikut adalah contoh struktur dari acara DynamoDB Streams:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` adalah bidang objek peristiwa. Dalam kasus DynamoDB Streams, `metadata properties` adalah bidang seperti `dynamodb` atau `eventName`. 

`data properties` adalah bidang badan peristiwa. Untuk memfilter `data properties`, pastikan untuk memasukkannya ke `FilterCriteria` dalam kunci yang tepat. Untuk sumber peristiwa DynamoDB, kunci data adalah `NewImage` atau `OldImage`.

Akhirnya, aturan filter akan menentukan ekspresi filter yang ingin Anda terapkan ke properti tertentu. Berikut ini adalah beberapa contohnya:


| Operator perbandingan | Contoh | Sintaks aturan (Sebagian) | 
| --- | --- | --- | 
|  Null  |  Jenis Produk adalah null  |  `{ "product_type": { "S": null } } `  | 
|  Kosong  |  Nama produk kosong  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Sama dengan  |  Negara bagian sama dengan Florida  |  `{ "state": { "S": ["FL"] } } `  | 
|  Dan  |  Negara bagian produk sama dengan Florida dan kategori produk Cokelat  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  Atau  |  Negara bagian produk adalah Florida atau California  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Bukan  |  Negara bagian produk bukan Florida  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Exists  |  Produk Rumahan ada  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  Tidak ada  |  Produk Rumahan tidak ada  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Dimulai dengan  |  PK dimulai dengan PERUSAHAAN  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Anda dapat menentukan hingga 5 pola penyaringan peristiwa untuk fungsi Lambda. Perhatikan bahwa masing-masing dari 5 peristiwa tersebut akan dievaluasi sebagai OR logis. Jadi jika Anda mengkonfigurasi dua filter bernama `Filter_One` dan`Filter_Two`, fungsi Lambda akan mengeksekusi `Filter_One` OR `Filter_Two`.

**catatan**  
Di halaman [pemfilteran acara Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) ada beberapa opsi untuk memfilter dan membandingkan nilai numerik, namun dalam kasus peristiwa filter DynamoDB itu tidak berlaku karena angka di DynamoDB disimpan sebagai string. Misalnya ` "quantity": { "N": "50" }`, kita tahu itu nomor karena properti `"N"`.

## Menyatukan semuanya - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

Untuk menampilkan fungsionalitas pemfilteran acara dalam praktiknya, berikut adalah contoh CloudFormation template. Templat ini akan menghasilkan tabel DynamoDB Sederhana dengan PK Kunci Partisi dan SK Kunci Urutan dengan Amazon DynamoDB Streams diaktifkan. Ini akan membuat fungsi lambda dan peran Eksekusi Lambda sederhana yang memungkinkan penulisan log ke Amazon Cloudwatch, dan membaca peristiwa dari Amazon DynamoDB Stream. Ini juga akan menambahkan pemetaan sumber peristiwa antara DynamoDB Streams dan fungsi Lambda, sehingga fungsi tersebut dapat dijalankan setiap kali ada kejadian di Amazon DynamoDB Stream.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

Setelah Anda menerapkan templat pembentukan cloud ini, Anda dapat memasukkan Item Amazon DynamoDB berikut:

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Berkat fungsi lambda sederhana yang disertakan sebaris dalam template pembentukan cloud ini, Anda akan melihat peristiwa di grup CloudWatch log Amazon untuk fungsi lambda sebagai berikut:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**Contoh Filter**
+ **Hanya produk yang cocok dengan status tertentu**

Contoh ini memodifikasi CloudFormation template untuk menyertakan filter untuk mencocokkan semua produk yang berasal dari Florida, dengan singkatan “FL”.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Setelah Anda menerapkan kembali tumpukan, Anda dapat menambahkan item DynamoDB berikut ke tabel. Perhatikan bahwa itu tidak akan muncul di log fungsi Lambda, karena produk dalam contoh ini berasal dari California.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **Hanya item yang dimulai dengan beberapa nilai di PK dan SK**

Contoh ini memodifikasi CloudFormation template untuk menyertakan kondisi berikut:

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Perhatikan kondisi AND mengharuskan kondisi berada di dalam pola, di mana PK dan SK Kunci berada dalam ekspresi yang sama dipisahkan oleh koma.

Baik mulai dengan beberapa nilai pada PK dan SK atau dari keadaan tertentu.

Contoh ini memodifikasi CloudFormation template untuk menyertakan kondisi berikut:

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Perhatikan kondisi OR ditambahkan dengan memperkenalkan pola baru di bagian filter.

## Menyatukan semuanya - CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

Contoh templat pembentukan proyek CDK berikut berjalan melalui fungsionalitas penyaringan acara. Sebelum bekerja dengan proyek CDK ini, Anda perlu [menginstal prasyarat](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html) termasuk [ menjalankan skrip persiapan](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html).

**Buat proyek CDK**

Pertama buat AWS CDK proyek baru, dengan memanggil `cdk init` dalam direktori kosong.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

Perintah `cdk init` menggunakan nama folder proyek untuk memberi nama berbagai elemen proyek, termasuk kelas, subfolder, dan file. Tanda hubung apa pun dalam nama folder diubah menjadi garis bawah. Nama tersebut harus mengikuti bentuk pengenal Python. Misalnya, seharusnya tidak dimulai dengan angka atau berisi spasi.

Untuk bekerja dengan proyek baru, aktifkan lingkungan virtualnya. Ini memungkinkan dependensi proyek diinstal secara lokal di folder proyek, bukan secara global.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**catatan**  
Anda mungkin mengenali ini sebagai Mac/Linux perintah untuk mengaktifkan lingkungan virtual. Templat Python menyertakan file batch, `source.bat`, yang memungkinkan perintah yang sama untuk digunakan pada Windows. Perintah Windows tradisional `.venv\Scripts\activate.bat` juga berfungsi. Jika Anda menginisialisasi AWS CDK proyek Anda menggunakan AWS CDK Toolkit v1.70.0 atau yang lebih lama, lingkungan virtual Anda ada di direktori, bukan. `.env` `.venv` 

**Infrastruktur Dasar**

Buka file `./ddb_filters/ddb_filters_stack.py` dengan editor teks pilihan Anda. File ini dibuat secara otomatis saat Anda membuat AWS CDK proyek. 

Selanjutnya, tambahkan fungsi `_create_ddb_table` dan `_set_ddb_trigger_function`. Fungsi-fungsi ini akan membuat tabel DynamoDB dengan kunci partisi PK dan mengurutkan kunci SK dalam mode penyediaan mode sesuai permintaan, dengan Amazon DynamoDB Streams diaktifkan secara default untuk menampilkan gambar Baru dan Lama.

Fungsi Lambda akan disimpan di folder `lambda` di bagian file `app.py`. File ini akan dibuat nanti. Ini akan mencakup variabel lingkungan `APP_TABLE_NAME`, yang akan menjadi nama Tabel Amazon DynamoDB yang dibuat oleh tumpukan ini. Dalam fungsi yang sama kami akan memberikan izin baca aliran ke fungsi Lambda. Akhirnya, hal tersebut akan berlangganan DynamoDB Streams sebagai sumber acara untuk fungsi lambda. 

Di akhir file dalam metode `__init__`, Anda akan memanggil konstruksi masing-masing untuk menginisialisasi mereka dalam tumpukan. Untuk proyek yang lebih besar yang memerlukan komponen dan layanan tambahan, mungkin yang terbaik adalah mendefinisikan konstruksi ini di luar tumpukan dasar. 

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

Sekarang kita akan membuat fungsi lambda yang sangat sederhana yang akan mencetak log ke Amazon CloudWatch. Untuk melakukannya, buat folder baru bernama `lambda`.

```
mkdir lambda
touch app.py
```

Menggunakan editor teks favorit Anda, tambahkan konten berikut ke file `app.py`:

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

Memastikan Anda berada di folder `/ddb_filters/`, ketikkan perintah berikut untuk membuat aplikasi sampel:

```
cdk deploy
```

Pada titik tertentu Anda akan diminta untuk mengonfirmasi apakah Anda ingin menerapkan solusi tersebut. Terima perubahan dengan mengetik `Y`.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

Setelah perubahan diterapkan, buka AWS konsol Anda dan tambahkan satu item ke tabel Anda. 

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

 CloudWatch Log sekarang harus berisi semua informasi dari entri ini. 

**Contoh Filter**
+ **Hanya produk yang cocok dengan status tertentu**

Buka file `ddb_filters/ddb_filters/ddb_filters_stack.py`, dan modifikasi untuk menyertakan filter yang cocok dengan semua produk yang setara dengan “FL”. Ini dapat direvisi tepat di bawah `event_subscription` di baris 45.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **Hanya item yang dimulai dengan beberapa nilai di PK dan SK**

Ubah skrip python untuk menyertakan kondisi berikut:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **Baik mulai dengan beberapa nilai pada PK dan SK atau dari keadaan tertentu.**

Ubah skrip python untuk menyertakan kondisi berikut:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Perhatikan bahwa kondisi OR ditambahkan dengan menambahkan lebih banyak elemen ke array Filter.

**Pembersihan**

Temukan tumpukan filter di dasar direktori kerja Anda, dan jalankan`cdk destroy`. Anda akan diminta untuk mengonfirmasi penghapusan sumber daya:

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# Praktik terbaik menggunakan DynamoDB Streams dengan Lambda
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

 AWS Lambda Fungsi berjalan di dalam *wadah* —lingkungan eksekusi yang diisolasi dari fungsi lain. Ketika Anda menjalankan fungsi untuk pertama kalinya, AWS Lambda membuat wadah baru dan mulai mengeksekusi kode fungsi.

Fungsi Lambda memiliki *handler* yang dijalankan sekali per permohonan. Handler berisi logika bisnis utama untuk fungsi. Misalnya, fungsi Lambda yang ditampilkan dalam [Langkah 4: Buat dan uji fungsi Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) memiliki handler yang dapat memproses catatan dalam DynamoDB stream. 

Anda juga dapat memberikan kode inisialisasi yang hanya berjalan satu kali—setelah penampung dibuat, tetapi sebelumnya AWS Lambda menjalankan handler untuk pertama kalinya. Fungsi Lambda yang ditampilkan di [Langkah 4: Buat dan uji fungsi Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) memiliki kode inisialisasi yang mengimpor SDK untuk JavaScript di Node.js, dan membuat klien untuk Amazon SNS. Objek-objek ini hanya boleh didefinisikan sekali, di luar handler.

Setelah fungsi berjalan, AWS Lambda mungkin memilih untuk menggunakan kembali wadah untuk pemanggilan fungsi berikutnya. Dalam kasus ini, handler fungsi Anda mungkin dapat menggunakan kembali sumber daya yang Anda tetapkan dalam kode inisialisasi Anda. (Anda tidak dapat mengontrol berapa lama AWS Lambda akan mempertahankan kontainer, atau apakah kontainer akan digunakan kembali.)

Untuk pemicu DynamoDB AWS Lambda menggunakan, kami merekomendasikan hal berikut:
+ AWS klien layanan harus dipakai dalam kode inisialisasi, bukan di handler. Ini memungkinkan AWS Lambda untuk menggunakan kembali koneksi yang ada, selama masa pakai kontainer.
+ Secara umum, Anda tidak perlu secara eksplisit mengelola koneksi atau menerapkan penyatuan koneksi karena AWS Lambda mengelola ini untuk Anda.

Konsumen Lambda untuk aliran DynamoDB tidak menjamin persis sekali pengiriman dan dapat menyebabkan duplikat sesekali. Pastikan kode fungsi Lambda Anda idempoten untuk mencegah timbulnya masalah tak terduga karena pemrosesan duplikat.

Untuk informasi selengkapnya, lihat [Praktik terbaik untuk bekerja dengan AWS Lambda fungsi](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html) di *Panduan AWS Lambda Pengembang*.

# DynamoDB Streams dan Apache Flink
<a name="StreamsApacheFlink.xml"></a>

Anda dapat menggunakan catatan Amazon DynamoDB Streams dengan Apache Flink. Dengan [Amazon Managed Service untuk Apache Flink](https://aws.amazon.com/managed-service-apache-flink/), Anda dapat mengubah dan menganalisis data streaming secara real time menggunakan Apache Flink. Apache Flink adalah kerangka pemrosesan aliran sumber terbuka untuk memproses data real-time. Konektor Amazon DynamoDB Streams untuk Apache Flink menyederhanakan pembuatan dan pengelolaan beban kerja Apache Flink dan memungkinkan Anda mengintegrasikan aplikasi dengan aplikasi lain. Layanan AWS

Amazon Managed Service untuk Apache Flink membantu Anda membangun aplikasi pemrosesan end-to-end streaming dengan cepat untuk analitik log, analitik clickstream, Internet of Things (IoT), teknologi iklan, game, dan banyak lagi. Empat kasus penggunaan yang paling umum adalah streaming extract-transform-load (ETL), aplikasi berbasis peristiwa, analitik real-time responsif, dan kueri interaktif aliran data. [Untuk informasi selengkapnya tentang menulis ke Apache Flink dari Amazon DynamoDB Streams, lihat Amazon DynamoDB Streams Connector.](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)