

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Bekerja dengan aliran change data capture (CDC) di Amazon Keyspaces
<a name="cdc"></a>

Amazon Keyspaces change data capture (CDC) merekam peristiwa perubahan tingkat baris dari tabel Amazon Keyspaces secara real time. 

Amazon Keyspaces CDC memungkinkan kasus penggunaan berbasis peristiwa seperti IoT industri dan deteksi penipuan serta kasus penggunaan pemrosesan data seperti pencarian teks lengkap dan arsip data. Peristiwa perubahan yang ditangkap Amazon Keyspaces CDC dalam aliran dapat dikonsumsi oleh aplikasi hilir yang menjalankan fungsi penting bisnis seperti analisis data, pencarian teks, pelatihan/inferensi ML, dan pencadangan data berkelanjutan untuk arsip. Misalnya, Anda dapat mentransfer data streaming ke layanan AWS analitik dan penyimpanan seperti Amazon OpenSearch Service, Amazon Redshift, dan Amazon S3 untuk diproses lebih lanjut.

Amazon Keyspaces CDC menawarkan catatan perubahan yang diurutkan waktu dan de-duplikasi untuk tabel, dengan penskalaan otomatis throughput data dan waktu retensi hingga 24 jam. 

Aliran CDC Amazon Keyspaces sepenuhnya tanpa server, dan Anda tidak perlu mengelola infrastruktur data untuk menangkap peristiwa perubahan. Selain itu, Amazon Keyspaces CDC tidak menggunakan kapasitas tabel apa pun untuk komputasi atau penyimpanan. Untuk informasi selengkapnya, lihat [Cara kerja stream change data capture (CDC) di Amazon Keyspaces](cdc_how-it-works.md).

Anda dapat menggunakan Amazon Keyspaces Streams API untuk membangun aplikasi yang menggunakan aliran CDC Amazon Keyspaces dan mengambil tindakan berdasarkan konten. Untuk titik akhir yang tersedia, lihat[Cara mengakses titik akhir aliran CDC di Amazon Keyspaces](CDC_access-endpoints.md).

Untuk daftar lengkap semua operasi yang tersedia untuk Amazon Keyspaces di API Streams, lihat Referensi API Amazon [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html) Streams.

**Topics**
+ [Cara kerja stream change data capture (CDC) di Amazon Keyspaces](cdc_how-it-works.md)
+ [Cara menggunakan stream change data capture (CDC) di Amazon Keyspaces](cdc_how-to-use.md)

# Cara kerja stream change data capture (CDC) di Amazon Keyspaces
<a name="cdc_how-it-works"></a>

Bagian ini memberikan gambaran umum tentang cara kerja aliran change data capture (CDC) di Amazon Keyspaces. 

Amazon Keyspaces change data capture (CDC) merekam urutan urutan modifikasi tingkat baris di tabel Amazon Keyspaces dan menyimpan informasi ini dalam log yang *disebut* stream hingga 24 jam. Setiap modifikasi tingkat baris menghasilkan catatan CDC baru yang menyimpan informasi kolom kunci utama serta status “sebelum” dan “setelah” baris termasuk semua kolom. Aplikasi dapat mengakses aliran dan melihat mutasi dalam waktu nyaris nyata.

Saat Anda mengaktifkan CDC di meja Anda, Amazon Keyspaces membuat aliran CDC baru dan mulai menangkap informasi tentang setiap modifikasi dalam tabel. Aliran CDC memiliki Nama Sumber Daya Amazon (ARN) dengan format berikut: 

```
arn:${Partition}:cassandra:{Region}:${Account}:/keyspace/${keyspaceName}/table/${tableName}/stream/${streamLabel}
```

Anda dapat memilih jenis informasi atau *jenis tampilan* yang dikumpulkan oleh aliran CDC untuk setiap rekaman saat pertama kali mengaktifkan aliran CDC. Anda tidak dapat mengubah jenis tampilan aliran sesudahnya. Amazon Keyspaces mendukung jenis tampilan berikut:
+ `NEW_AND_OLD_IMAGES`— Menangkap versi baris sebelum dan sesudah mutasi. Ini adalah opsi default.
+ `NEW_IMAGE`— Menangkap versi baris setelah mutasi.
+ `OLD_IMAGE`— Menangkap versi baris sebelum mutasi.
+ `KEYS_ONLY`— Menangkap kunci partisi dan pengelompokan baris yang bermutasi.

Setiap aliran CDC terdiri dari catatan. Setiap catatan mewakili modifikasi baris tunggal dalam tabel Amazon Keyspaces. Catatan secara logis diatur ke dalam kelompok-kelompok yang dikenal sebagai *pecahan.* Kelompok-kelompok ini secara logis diatur oleh rentang kunci utama (kombinasi kunci partisi, rentang kunci pengelompokan) dan merupakan konstruksi internal Amazon Keyspaces. Setiap pecahan bertindak sebagai wadah untuk beberapa catatan, dan berisi informasi yang diperlukan untuk mengakses dan mengulang melalui catatan ini.

![\[Aliran CDC Amazon Keyspaces terdiri dari pecahan yang mewakili catatan CDC dari kumpulan mutasi baris.\]](http://docs.aws.amazon.com/id_id/keyspaces/latest/devguide/images/keyspaces_cdc.png)


Setiap catatan CDC diberi nomor urut, yang mencerminkan urutan catatan diterbitkan dalam pecahan. Nomor urut dijamin akan meningkat dan unik dalam setiap pecahan.

Amazon Keyspaces membuat dan menghapus pecahan secara otomatis. Berdasarkan beban lalu lintas Amazon Keyspaces juga dapat membagi atau menggabungkan pecahan dari waktu ke waktu. Misalnya, Amazon Keyspaces dapat membagi satu pecahan menjadi beberapa pecahan baru atau menggabungkan pecahan menjadi pecahan tunggal baru. Amazon Keyspaces APIs mempublikasikan informasi aliran shard dan CDC untuk memungkinkan aplikasi yang mengkonsumsi memproses catatan dalam urutan yang benar dengan mengakses seluruh grafik garis keturunan pecahan. 

Amazon Keyspaces CDC didasarkan pada prinsip-prinsip berikut yang dapat Anda andalkan saat membuat aplikasi:
+ Setiap catatan mutasi tingkat baris muncul tepat sekali dalam aliran CDC.
+ Saat Anda mengkonsumsi pecahan dalam urutan garis keturunan, setiap catatan mutasi tingkat baris muncul dalam urutan yang sama dengan urutan mutasi aktual pada kunci utama.

**Topics**
+ [Retensi data](#CDC_how-it-works-data-retention)
+ [Kedaluwarsa data TTL](#CDC_how-it-works-ttl)
+ [Operasi batch](#CDC_how-it-works-batch-operations)
+ [Kolom statis](#CDC_how-it-works-static)
+ [Enkripsi saat diam](#CDC_how-it-works-encryption)
+ [Replikasi multi-Region](#CDC_how-it-works-mrr)
+ [Integrasi dengan AWS layanan](#howitworks_integration)

## Cara kerja retensi data untuk aliran CDC di Amazon Keyspaces
<a name="CDC_how-it-works-data-retention"></a>

Amazon Keyspaces menyimpan catatan dalam aliran CDC selama 24 jam. Anda tidak dapat mengubah periode retensi. Jika Anda menonaktifkan CDC di atas meja, data dalam aliran terus dapat dibaca selama 24 jam. Setelah waktu ini, data kedaluwarsa dan catatan dihapus secara otomatis. 

## Cara kedaluwarsa data Time to Live (TTL) bekerja dengan aliran CDC di Amazon Keyspaces
<a name="CDC_how-it-works-ttl"></a>

Amazon Keyspaces menunjukkan waktu kedaluwarsa pada column/cell tingkat serta tingkat baris dalam bidang metadata yang disebut `expirationTime` dalam catatan perubahan CDC. Saat Amazon Keyspaces TTL mendeteksi kedaluwarsa sel, CDC membuat catatan perubahan baru yang menunjukkan TTL sebagai asal perubahan. Untuk informasi lebih lanjut tentang TTL, lihat[Data kedaluwarsa dengan Time to Live (TTL) untuk Amazon Keyspaces (untuk Apache Cassandra)](TTL.md).

## Cara kerja operasi batch untuk aliran CDC di Amazon Keyspaces
<a name="CDC_how-it-works-batch-operations"></a>

Operasi Batch secara internal dibagi menjadi modifikasi tingkat baris individu. Amazon Keyspaces menyimpan semua catatan dalam aliran CDC pada tingkat baris, bahkan jika modifikasi terjadi dalam operasi batch. Amazon Keyspaces mempertahankan urutan catatan dalam aliran CDC dalam urutan yang sama dengan urutan mutasi yang terjadi pada tingkat baris atau pada kunci utama.

## Cara kerja kolom statis di aliran CDC di Amazon Keyspaces
<a name="CDC_how-it-works-static"></a>

Nilai kolom statis dibagi di antara semua baris dalam partisi di Cassandra. Karena perilaku ini, Amazon Keyspaces menangkap pembaruan apa pun ke kolom statis sebagai catatan terpisah dalam aliran CDC. Contoh berikut merangkum perilaku mutasi kolom statis: 
+ Ketika hanya kolom statis diperbarui, aliran CDC berisi modifikasi baris untuk kolom statis sebagai satu-satunya kolom di baris.
+ Ketika baris diperbarui tanpa perubahan pada kolom statis, aliran CDC berisi modifikasi baris yang berisi semua kolom kecuali kolom statis.
+ Ketika baris diperbarui bersama dengan kolom statis, aliran CDC berisi dua modifikasi baris terpisah, satu untuk kolom statis dan yang lainnya untuk sisa baris. 

## Cara kerja enkripsi saat istirahat untuk aliran CDC di Amazon Keyspaces
<a name="CDC_how-it-works-encryption"></a>

Untuk mengenkripsi data saat istirahat di log yang dipesan CDC, Amazon Keyspaces menggunakan kunci enkripsi yang sama yang sudah digunakan untuk tabel. Untuk informasi selengkapnya tentang enkripsi diam, lihat [Enkripsi saat istirahat di Amazon Keyspaces](EncryptionAtRest.md).

## Cara kerja replikasi multi-wilayah untuk aliran CDC di Amazon Keyspaces
<a name="CDC_how-it-works-mrr"></a>

Anda dapat mengaktifkan dan menonaktifkan aliran CDC untuk setiap replika tabel Multi-wilayah dengan menggunakan `update-table` API atau perintah CQL. `ALTER TABLE` Karena replikasi asinkron dan resolusi konflik, aliran CDC untuk tabel Multi-wilayah tidak konsisten. Wilayah AWS Oleh karena itu, catatan yang ditangkap Amazon Keyspaces dalam aliran mungkin muncul dalam urutan yang berbeda di Wilayah yang berbeda.

Untuk informasi selengkapnya tentang replikasi Multi-wilayah, lihat. [Replikasi Multi-Wilayah untuk Amazon Keyspaces (untuk Apache Cassandra)](multiRegion-replication.md)

## Streaming CDC dan integrasi dengan layanan AWS
<a name="howitworks_integration"></a>

### Cara bekerja dengan titik akhir VPC untuk aliran CDC di Amazon Keyspaces
<a name="CDC_how-it-works-vpc"></a>

Anda dapat menggunakan titik akhir VPC untuk mengakses aliran CDC Amazon Keyspaces. Untuk informasi tentang cara membuat dan mengakses titik akhir VPC untuk aliran, lihat. [Menggunakan Amazon Keyspaces CDC stream dengan antarmuka VPC endpoint](vpc-endpoints-streams.md)

### Cara CloudWatch kerja pemantauan dengan streaming CDC di Amazon Keyspaces
<a name="CDC_how-it-works-monitoring"></a>

Anda dapat menggunakan Amazon CloudWatch untuk memantau panggilan API yang dilakukan ke titik akhir CDC Amazon Keyspaces. Untuk informasi selengkapnya tentang metrik yang tersedia, lihat[Metrik untuk Amazon Keyspaces mengubah pengambilan data (CDC)](metrics-dimensions.md#keyspaces-cdc-metrics).

### Cara CloudTrail kerja logging dengan streaming CDC di Amazon Keyspaces
<a name="CDC_how-it-works-logging"></a>

Amazon Keyspaces CDC terintegrasi dengan AWS CloudTrail, layanan yang menyediakan catatan tindakan yang diambil oleh pengguna, peran, atau layanan AWS di Amazon Keyspaces. CloudTrail menangkap panggilan API Data Definition Language (DDL) dan panggilan API Data Manipulation Language (DML/Data Manipulation Language) untuk Amazon Keyspaces sebagai event. Panggilan yang diambil termasuk panggilan dari konsol Amazon Keyspaces dan panggilan terprogram ke operasi Amazon Keyspaces API.

Untuk informasi lebih lanjut tentang peristiwa CDC yang ditangkap oleh CloudTrail, lihat[Mencatat panggilan API Amazon Keyspaces dengan AWS CloudTrail](logging-using-cloudtrail.md).

### Cara kerja penandaan untuk aliran CDC di Amazon Keyspaces
<a name="CDC_how-it-works-tagging"></a>

Amazon Keyspaces CDC stream adalah sumber daya yang dapat diberi tag. Anda dapat menandai aliran saat membuat tabel secara terprogram menggunakan CQL, AWS SDK, atau file. AWS CLI Anda juga dapat menandai aliran yang ada, menghapus tag, atau melihat tag aliran. Lihat informasi yang lebih lengkap di [Tandai ruang kunci, tabel, dan aliran di Amazon Keyspaces](Tagging.Operations.md).

# Cara menggunakan stream change data capture (CDC) di Amazon Keyspaces
<a name="cdc_how-to-use"></a>

**Topics**
+ [Mengatur Konfigurasi Izin](configure-cdc-permissions.md)
+ [Akses titik akhir aliran CDC](CDC_access-endpoints.md)
+ [Aktifkan aliran CDC untuk tabel baru](keyspaces-enable-cdc-new-table.md)
+ [Aktifkan aliran CDC untuk tabel yang ada](keyspaces-enable-cdc-alter-table.md)
+ [Nonaktifkan aliran CDC](keyspaces-delete-cdc.md)
+ [Lihat aliran CDC](keyspaces-view-cdc.md)
+ [Akses aliran CDC](keyspaces-records-cdc.md)
+ [Gunakan KCL untuk memproses aliran](cdc_how-to-use-kcl.md)

# Konfigurasikan izin untuk bekerja dengan aliran CDC di Amazon Keyspaces
<a name="configure-cdc-permissions"></a>

Untuk mengaktifkan aliran CDC, prinsipal, misalnya pengguna atau peran IAM, memerlukan izin berikut.

Untuk informasi lebih lanjut tentang AWS Identity and Access Management, lihat[AWS Identity and Access Management untuk Amazon Keyspaces](security-iam.md).

## Izin untuk mengaktifkan aliran CDC untuk tabel
<a name="cdc-permissions-enable"></a>

[Untuk mengaktifkan aliran CDC untuk tabel Amazon Keyspaces, prinsipal pertama-tama memerlukan izin untuk membuat atau mengubah tabel dan kedua izin untuk membuat CDC peran terkait layanan. AWSService RoleForAmazonKeyspaces](using-service-linked-roles-CDC-streams.md#service-linked-role-permissions-CDC-streams) Amazon Keyspaces menggunakan peran terkait layanan untuk mempublikasikan CloudWatch metrik ke akun Anda atas nama Anda

Kebijakan IAM berikut adalah contohnya.

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Effect":"Allow",
            "Action":[
                "cassandra:Create",
                "cassandra:CreateMultiRegionResource",
                "cassandra:Alter",
                "cassandra:AlterMultiRegionResource"
            ],
            "Resource":[
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/*",
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
            ]
        },
        {
            "Sid": "KeyspacesCDCServiceLinkedRole",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/cassandra-streams.amazonaws.com/AWSServiceRoleForAmazonKeyspacesCDC",
            "Condition": {
              "StringLike": {
                "iam:AWSServiceName": "cassandra-streams.amazonaws.com"
              }
            }
        }
    ]
}
```

Untuk menonaktifkan aliran, hanya `ALTER TABLE` izin yang diperlukan.

## Izin untuk melihat aliran CDC
<a name="cdc-permissions-view"></a>

Untuk melihat atau mencantumkan aliran CDC, prinsipal memerlukan izin baca untuk ruang kunci sistem. Untuk informasi selengkapnya, lihat [`system_schema_mcs`](working-with-keyspaces.md#keyspace_system_schema_mcs).

Kebijakan IAM berikut adalah contohnya.

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":"cassandra:Select",
         "Resource":[
             "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
         ]
      }
   ]
}
```

Untuk melihat atau mencantumkan aliran CDC dengan AWS CLI atau Amazon Keyspaces API, prinsipal memerlukan izin tambahan untuk tindakan dan. `cassandra:ListStreams` `cassandra:GetStream`

Kebijakan IAM berikut adalah contohnya.

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cassandra:Select",
        "cassandra:ListStreams",
        "cassandra:GetStream"
      ],
      "Resource": "*"
    }
  ]
}
```

## Izin untuk membaca aliran CDC
<a name="cdc-permissions-read"></a>

Untuk membaca aliran CDC, prinsipal memerlukan izin berikut.

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      }
   ]
}
```

## Izin untuk memproses stream CDC Amazon Keyspaces dengan Kinesis Client Library (KCL)
<a name="cdc-permissions-kcl"></a>

Untuk memproses aliran CDC Amazon Keyspaces dengan KCL, prinsipal IAM memerlukan izin berikut. 
+ `Amazon Keyspaces`— Akses hanya-baca ke aliran CDC Amazon Keyspaces tertentu.
+ `DynamoDB`— Izin untuk membuat `shard lease` tabel, membaca dan menulis akses ke tabel, dan akses baca ke indeks seperti yang diperlukan untuk pemrosesan aliran KCL.
+ `CloudWatch`— Izin untuk mempublikasikan data metrik dari Amazon Keyspaces CDC mengalirkan pemrosesan dengan KCL ke namespace aplikasi klien KCL Anda di akun Anda. CloudWatch Untuk informasi selengkapnya tentang pemantauan, lihat [Memantau Perpustakaan Klien Kinesis dengan Amazon](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html). CloudWatch

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:UpdateTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-WorkerMetricStats",
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-CoordinatorState"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:Query"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME/index/*"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "cloudwatch:PutMetricData"
         ],
         "Resource":"*"
      }
   ]
}
```

# Cara mengakses titik akhir aliran CDC di Amazon Keyspaces
<a name="CDC_access-endpoints"></a>

Amazon Keyspaces mempertahankan [titik akhir](programmatic.endpoints.md#global_endpoints) terpisah untuk keyspaces/tables dan untuk aliran CDC di masing-masing tempat Wilayah AWS Amazon Keyspaces tersedia. Untuk mengakses aliran CDC, pilih Wilayah tabel dan ganti `cassandra` awalan dengan `cassandra-streams` nama titik akhir seperti yang ditunjukkan pada contoh berikut:

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/keyspaces/latest/devguide/CDC_access-endpoints.html)

Tabel berikut berisi daftar lengkap endpoint publik yang tersedia untuk Amazon Keyspaces change data capture streams. Amazon Keyspaces CDC streams mendukung keduanya IPv4 dan IPv6. Semua titik akhir publik, misalnya`cassandra-streams.us-east-1.api.aws`, adalah titik akhir tumpukan ganda yang dapat dikonfigurasi untuk IPv4 dan. IPv6 

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/keyspaces/latest/devguide/CDC_access-endpoints.html)

# Aktifkan aliran CDC saat membuat tabel baru di Amazon Keyspaces
<a name="keyspaces-enable-cdc-new-table"></a>

Untuk mengaktifkan aliran CDC saat Anda membuat tabel, Anda dapat menggunakan `CREATE TABLE` pernyataan di CQL atau `create-table` perintah dengan. AWS CLI

Untuk setiap baris yang diubah dalam tabel, Amazon Keyspaces dapat menangkap perubahan berikut berdasarkan pilihan `view_type` yang `cdc_specification` Anda pilih:
+ `NEW_AND_OLD_IMAGES`— kedua versi baris, sebelum dan sesudah perubahan. Ini adalah opsi default.
+ `NEW_IMAGE`— versi baris setelah perubahan.
+ `OLD_IMAGE`— versi baris sebelum perubahan.
+ `KEYS_ONLY`— kunci partisi dan pengelompokan baris yang diubah.

Untuk informasi tentang cara menandai aliran, lihat[Tambahkan tag ke aliran baru saat membuat tabel](Tagging.Operations.new.table.stream.md).

**catatan**  
Amazon Keyspaces CDC memerlukan keberadaan peran terkait layanan (`AWSServiceRoleForAmazonKeyspacesCDC`) yang menerbitkan data metrik dari aliran CDC Amazon Keyspaces ke dalam akun Anda atas nama Anda. `"cloudwatch:namespace": "AWS/Cassandra"` CloudWatch Peran ini dibuat secara otomatis untuk Anda. Untuk informasi selengkapnya, lihat [Menggunakan peran untuk aliran CDC Amazon Keyspaces](using-service-linked-roles-CDC-streams.md).

------
#### [ Cassandra Query Language (CQL) ]

**Aktifkan aliran CDC saat Anda membuat tabel dengan CQL**

1. 

   ```
   CREATE TABLE mykeyspace.mytable (a text, b text, PRIMARY KEY(a)) 
   WITH CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'NEW_IMAGE'}} AND CDC = TRUE;
   ```

1. Untuk mengonfirmasi pengaturan streaming, Anda dapat menggunakan pernyataan berikut.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   Output dari pernyataan itu akan terlihat mirip dengan ini.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |   mytable  | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741383893782', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T21:44:53.783', 'status': 'ENABLED', 'view_type': 'NEW_IMAGE'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}>
   ```

------
#### [ CLI ]

**Aktifkan aliran CDC saat Anda membuat tabel dengan AWS CLI**

1. Untuk membuat stream Anda dapat menggunakan sintaks berikut. 

   ```
   aws keyspaces create-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --schema-definition 'allColumns=[{name=a,type=text},{name=b,type=text}],partitionKeys=[{name=a}]' \
   --cdc-specification status=ENABLED,viewType=NEW_IMAGE
   ```

1. Output dari perintah itu menunjukkan `create-table` respons standar dan terlihat mirip dengan contoh ini. 

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------

# Aktifkan aliran CDC untuk tabel yang ada di Amazon Keyspaces
<a name="keyspaces-enable-cdc-alter-table"></a>

Untuk mengaktifkan aliran CDC untuk tabel yang ada, Anda dapat menggunakan `ALTER TABLE` pernyataan di CQL, `update-table` perintah dengan AWS CLI, atau Anda dapat menggunakan konsol.

Untuk setiap baris yang diubah dalam tabel, Amazon Keyspaces dapat menangkap perubahan berikut berdasarkan pilihan `view_type` yang `cdc_specification` Anda pilih:
+ `NEW_AND_OLD_IMAGES`— kedua versi baris, sebelum dan sesudah perubahan. Ini adalah opsi default.
+ `NEW_IMAGE`— versi baris setelah perubahan.
+ `OLD_IMAGE`— versi baris sebelum perubahan.
+ `KEYS_ONLY`— kunci partisi dan pengelompokan baris yang diubah.

Untuk informasi tentang cara menandai aliran, lihat[Tambahkan tag baru ke aliran](Tagging.Operations.existing.stream.md).

**catatan**  
Amazon Keyspaces CDC memerlukan keberadaan peran terkait layanan (`AWSServiceRoleForAmazonKeyspacesCDC`) yang menerbitkan data metrik dari aliran CDC Amazon Keyspaces ke dalam akun Anda atas nama Anda. `"cloudwatch:namespace": "AWS/Cassandra"` CloudWatch Peran ini dibuat secara otomatis untuk Anda. Untuk informasi selengkapnya, lihat [Menggunakan peran untuk aliran CDC Amazon Keyspaces](using-service-linked-roles-CDC-streams.md).

------
#### [ Cassandra Query Language (CQL) ]

**Aktifkan aliran (aliran CDC) dengan CQL**

Anda dapat menggunakan `ALTER TABLE` untuk mengaktifkan aliran untuk tabel yang ada.

1. Contoh berikut membuat aliran yang hanya menangkap perubahan pada partisi dan kunci pengelompokan dari baris yang diubah.

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = TRUE
   AND CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'KEYS_ONLY'}};
   ```

1. Untuk memverifikasi pengaturan aliran, Anda dapat menggunakan pernyataan berikut.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   Output dari pernyataan terlihat mirip dengan ini.

   ```
    keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |    mytable | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385897045', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T22:20:10.454', 'status': 'ENABLED', 'view_type': 'KEYS_ONLY'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**Buat aliran CDC dengan AWS CLI**

1. Untuk membuat aliran untuk tabel yang ada, Anda dapat menggunakan sintaks berikut.

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=ENABLED,viewType=NEW_AND_OLD_IMAGES
   ```

1. Output dari perintah itu menunjukkan `create-table` respons standar dan terlihat mirip dengan contoh ini.

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------
#### [ Console ]

**Aktifkan aliran CDC dengan konsol Amazon Keyspaces**

1. [Masuk ke Konsol Manajemen AWS, dan buka konsol Amazon Keyspaces di https://console.aws.amazon.com/keyspaces/ rumah.](https://console.aws.amazon.com/keyspaces/home)

1. Di panel navigasi, pilih **Tabel**, lalu pilih tabel dari daftar.

1. Pilih tab **Streams**.

1. Pilih **Edit** untuk mengaktifkan aliran.

1. Pilih **Aktifkan aliran**.

1. Pilih **Lihat jenis** aliran. Pilihan berikut tersedia. Perhatikan bahwa Anda tidak dapat mengubah jenis tampilan aliran setelah dibuat.
   + **Gambar baru dan lama** — Amazon Keyspaces menangkap kedua versi baris, sebelum dan sesudah perubahan. Ini adalah opsi default.
   + **Gambar baru** - Amazon Keyspaces hanya menangkap versi baris setelah perubahan.
   + **Gambar lama** - Amazon Keyspaces hanya menangkap versi baris sebelum perubahan.
   + **Hanya kunci utama** — Amazon Keyspaces hanya menangkap partisi dan pengelompokan kolom kunci dari baris yang diubah.

1. Untuk menyelesaikannya, pilih **Simpan perubahan**.

------

# Nonaktifkan aliran CDC di Amazon Keyspaces
<a name="keyspaces-delete-cdc"></a>

Untuk menonaktifkan aliran CDC di ruang kunci, Anda dapat menggunakan `ALTER TABLE` pernyataan di CQL, `update-table` perintah dengan AWS CLI, atau konsol.

------
#### [ Cassandra Query Language (CQL) ]

**Nonaktifkan aliran (aliran CDC) dengan CQL**

1. Untuk menonaktifkan aliran, Anda dapat menggunakan pernyataan berikut.

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = FALSE;
   ```

1. Untuk mengonfirmasi bahwa aliran dinonaktifkan, Anda dapat menggunakan pernyataan berikut.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   Output dari pernyataan itu terlihat mirip dengan ini.

   ```
    keyspace_name | table_name | cdc   | custom_properties
   ---------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      mykeyspace  |   mytable  | False | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385668642', 'throughput_mode': 'PAY_PER_REQUEST'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**Nonaktifkan aliran (aliran CDC) dengan AWS CLI**

1. Untuk menonaktifkan aliran, Anda dapat menggunakan perintah berikut.

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=DISABLED
   ```

1. Output dari perintah terlihat mirip dengan contoh ini.

   ```
   {
       "keyspaceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/",
       "streamName": "my_stream"
   }
   ```

------
#### [ Console ]

**Nonaktifkan aliran (aliran CDC) dengan konsol Amazon Keyspaces**

1. [Masuk ke Konsol Manajemen AWS, dan buka konsol Amazon Keyspaces di https://console.aws.amazon.com/keyspaces/ rumah.](https://console.aws.amazon.com/keyspaces/home)

1. Di panel navigasi, pilih **Tabel**, lalu pilih tabel dari daftar.

1. Pilih tab **Streams**.

1. Pilih **Edit**.

1. Batalkan pilihan **Aktifkan aliran**. 

1. Pilih **Simpan perubahan** untuk menonaktifkan aliran.

------

# Lihat aliran CDC di Amazon Keyspaces
<a name="keyspaces-view-cdc"></a>

Untuk melihat atau mencantumkan semua aliran di ruang kunci, Anda dapat melakukan kueri tabel `system_schema_mcs.streams` di ruang kunci sistem menggunakan pernyataan di CQL, atau menggunakan `list-stream` perintah `get-stream` and dengan, atau konsol. AWS CLI

Untuk izin yang diperlukan, lihat [Konfigurasikan izin untuk bekerja dengan aliran CDC di Amazon Keyspaces](configure-cdc-permissions.md).

------
#### [ Cassandra Query Language (CQL) ]

**Lihat aliran CDC dengan CQL**
+ Untuk memantau status CDC tabel Anda, Anda dapat menggunakan pernyataan berikut.

  ```
  SELECT custom_properties
  FROM system_schema_mcs.tables 
  WHERE keyspace_name='my_keyspace' and table_name='my_table';
  ```

  Output dari perintah terlihat mirip dengan ini.

  ```
  ...
  custom_properties
  ----------------------------------------------------------------------------------
  {'cdc_specification':{'status': 'Enabled', 'view_type': 'NEW_IMAGE', 'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label''}}
  ...
  ```

------
#### [ CLI ]

**Lihat aliran CDC dengan AWS CLI**

1. Contoh ini menunjukkan cara melihat informasi aliran untuk tabel.

   ```
   aws keyspaces get-table \
   --keyspace-name 'my_keyspace' \
   --table-name 'my_table'
   ```

   Output dari perintah terlihat seperti ini.

   ```
   {
       "keyspaceName": "my_keyspace",
       "tableName": "my_table",
       ... Other fields ...,
       "latestStreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label",
       "cdcSpecification": {
           "status": "ENABLED",
           "viewType": "NEW_AND_OLD_IMAGES"    
       }
   }
   ```

1. Anda dapat mencantumkan semua aliran di akun Anda dalam yang ditentukan Wilayah AWS. Perintah berikut adalah contoh dari ini.

   ```
   aws keyspacesstreams list-streams --region us-east-1
   ```

   Output dari perintah bisa terlihat mirip dengan ini.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"Create a keyspace with the name catalog. Note
                                   that streams are not supported in multi-Region keyspaces.
               "TableName": "t2",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_2/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_3"
               "TableName": "t1",
           }
       ]
   }
   ```

1. Anda juga dapat mencantumkan aliran CDC untuk ruang kunci tertentu menggunakan parameter berikut. 

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --region us-east-1
   ```

   Output dari perintah terlihat mirip dengan ini.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

1. Anda juga dapat mencantumkan aliran CDC untuk tabel tertentu menggunakan parameter berikut. 

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --table-name t2 --region us-east-1
   ```

   Output dari perintah terlihat mirip dengan ini.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

------
#### [ Console ]

**Lihat aliran CDC di konsol Amazon Keyspaces**

1. [Masuk ke Konsol Manajemen AWS, dan buka konsol Amazon Keyspaces di https://console.aws.amazon.com/keyspaces/ rumah.](https://console.aws.amazon.com/keyspaces/home)

1. Di panel navigasi, pilih **Tabel**, lalu pilih tabel dari daftar.

1. Pilih tab **Streams** untuk meninjau detail streaming.

------

# Akses catatan dalam aliran CDC di Amazon Keyspaces
<a name="keyspaces-records-cdc"></a>

Untuk mengakses rekaman dalam aliran, Anda menggunakan [Amazon Keyspaces Streams](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html) API. Bagian berikut berisi contoh tentang cara mengakses catatan menggunakan file AWS CLI.

Untuk izin yang diperlukan, lihat [Konfigurasikan izin untuk bekerja dengan aliran CDC di Amazon Keyspaces](configure-cdc-permissions.md).

**Mengakses catatan dalam aliran menggunakan AWS CLI**

1. Anda dapat menggunakan Amazon Keyspaces Streams API untuk mengakses catatan perubahan aliran. Untuk informasi selengkapnya, lihat [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html). Untuk mengambil pecahan dalam aliran, Anda dapat menggunakan `get-stream` API seperti yang ditunjukkan pada contoh berikut.

   ```
   aws keyspacesstreams get-stream \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL'
   ```

   Berikut adalah contoh output.

   ```
   {
      "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2023-05-11T21:21:33.291",
      "StreamStatus": "ENABLED",
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "CreationRequestDateTime": "<CREATION_TIME>",
      "KeyspaceName": "mykeyspace",
      "TableName": "mytable",
      "StreamLabel": "2023-05-11T21:21:33.291",
       "Shards": [
           {
               "SequenceNumberRange": {
                   "EndingSequenceNumber": "<END_SEQUENCE_NUMBER>",
                   "StartingSequenceNumber": "<START_SEQUENCE_NUMBER>"
               },
               "ShardId": "<SHARD_ID>"
           },
       ]
   }
   ```

1. Untuk mengambil catatan dari aliran, Anda mulai dengan mendapatkan iterator yang memberi Anda titik awal untuk mengakses catatan. Untuk melakukan ini, Anda dapat menggunakan pecahan dalam aliran CDC yang dikembalikan oleh API pada langkah sebelumnya. Untuk mengumpulkan iterator, Anda dapat menggunakan `get-shard-iterator` API. Untuk contoh ini, Anda menggunakan iterator tipe `TRIM_HORIZON` yang mengambil dari titik atau awal yang dipangkas terakhir) dari pecahan.

   ```
   aws keyspacesstreams get-shard-iterator \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL' \
   --shard-id 'SHARD_ID' \
   --shard-iterator-type 'TRIM_HORIZON'
   ```

   Output dari perintah terlihat seperti pada contoh berikut.

   ```
   {
       "ShardIterator": "<SHARD_ITERATOR>" 
   }
   ```

1. Untuk mengambil catatan CDC menggunakan `get-records` API, Anda dapat menggunakan iterator yang dikembalikan pada langkah terakhir. Perintah berikut adalah contoh dari ini.

   ```
   aws keyspacesstreams get-records \
   --shard-iterator 'SHARD_ITERATOR' \
   --limit 100
   ```

# Gunakan Kinesis Client Library (KCL) untuk memproses stream Amazon Keyspaces
<a name="cdc_how-to-use-kcl"></a>

Topik ini menjelaskan cara menggunakan Kinesis Client Library (KCL) untuk mengkonsumsi dan memproses data dari Amazon Keyspaces change data capture (CDC) stream.

Alih-alih bekerja langsung dengan Amazon Keyspaces Streams API, bekerja dengan Kinesis Client Library (KCL) memberikan banyak manfaat, misalnya:
+ Dibangun pada pelacakan garis keturunan pecahan dan penanganan iterator. 
+ Penyeimbangan beban otomatis di seluruh pekerja.
+ Toleransi kesalahan dan pemulihan dari kegagalan pekerja.
+ Checkpointing untuk melacak kemajuan pemrosesan.
+ Adaptasi terhadap perubahan kapasitas aliran.
+ Komputasi terdistribusi yang disederhanakan untuk memproses catatan CDC.

Bagian berikut menguraikan mengapa dan bagaimana menggunakan Kinesis Client Library (KCL) untuk memproses stream dan memberikan contoh untuk memproses aliran CDC Amazon Keyspaces dengan KCL.

Untuk informasi tentang harga, lihat harga [Amazon Keyspaces (untuk Apache Cassandra](https://aws.amazon.com/keyspaces/pricing)).

## Apa itu Perpustakaan Klien Kinesis?
<a name="cdc-kcl-what-is"></a>

Kinesis Client Library (KCL) adalah pustaka perangkat lunak Java mandiri yang dirancang untuk menyederhanakan proses konsumsi dan pemrosesan data dari aliran. KCL menangani banyak tugas kompleks yang terkait dengan komputasi terdistribusi, memungkinkan Anda fokus pada penerapan logika bisnis Anda saat memproses data aliran. KCL mengelola aktivitas seperti load balancing di beberapa pekerja, menanggapi kegagalan pekerja, memeriksa catatan yang diproses, dan menanggapi perubahan jumlah pecahan dalam aliran.

Untuk memproses aliran CDC Amazon Keyspaces, Anda dapat menggunakan pola desain yang ditemukan di KCL untuk bekerja dengan pecahan aliran dan catatan aliran. KCL menyederhanakan pengodean dengan menyediakan abstraksi yang berguna di atas Kinesis Data Streams API tingkat rendah. Untuk informasi selengkapnya tentang KCL, lihat [Mengembangkan konsumen dengan KCL](https://docs.aws.amazon.com/kinesis/latest/dev/develop-kcl-consumers.html) di Panduan Pengembang *Amazon Kinesis* Data Streams.

 Untuk menulis aplikasi menggunakan KCL, Anda menggunakan Adaptor Kinesis Streams Amazon Keyspaces. Adaptor Kinesis mengimplementasikan antarmuka Kinesis Data Streams sehingga Anda dapat menggunakan KCL untuk mengkonsumsi dan memproses catatan dari aliran Amazon Keyspaces. Untuk petunjuk tentang cara mengatur dan menginstal adaptor Kinesis stream Amazon Keyspaces, kunjungi repositori. [GitHub](https://github.com/aws/keyspaces-streams-kinesis-adapter)

Diagram berikut menunjukkan bagaimana pustaka ini berinteraksi satu sama lain.

![\[Interaksi antara aplikasi klien dan Kinesis Data Streams, KCL, Amazon Keyspaces Streams Kinesis Adapter, dan Amazon Keyspaces saat memproses rekaman aliran CDC APIs Amazon Keyspaces.\]](http://docs.aws.amazon.com/id_id/keyspaces/latest/devguide/images/keyspaces-streams-kinesis-adapter.png)


KCL sering diperbarui untuk menggabungkan versi yang lebih baru dari pustaka yang mendasari, peningkatan keamanan, dan perbaikan bug. Kami menyarankan Anda menggunakan versi terbaru KCL untuk menghindari masalah yang diketahui dan mendapatkan manfaat dari semua peningkatan terbaru. Untuk menemukan versi KCL terbaru, lihat repositori [KCL GitHub ](https://github.com/awslabs/amazon-kinesis-client).

## Konsep KCL
<a name="cdc-kcl-concepts"></a>

Sebelum Anda menerapkan aplikasi konsumen menggunakan KCL, Anda harus memahami konsep-konsep berikut:

**Aplikasi konsumen KCL**  
Aplikasi konsumen KCL adalah program yang memproses data dari aliran CDC Amazon Keyspaces. KCL bertindak sebagai perantara antara kode aplikasi konsumen Anda dan aliran CDC Amazon Keyspaces.

**Pekerja**  
Pekerja adalah unit eksekusi aplikasi konsumen KCL Anda yang memproses data dari aliran CDC Amazon Keyspaces. Aplikasi Anda dapat menjalankan beberapa pekerja yang didistribusikan di beberapa instance.

**Rekam prosesor**  
Prosesor rekaman adalah logika dalam aplikasi Anda yang memproses data dari pecahan di aliran CDC Amazon Keyspaces. Prosesor rekaman dipakai oleh pekerja untuk setiap pecahan yang dikelolanya.

**Sewa**  
Sewa merupakan tanggung jawab pemrosesan untuk pecahan. Pekerja menggunakan sewa untuk mengoordinasikan pekerja mana yang memproses pecahan mana. KCL menyimpan data sewa dalam tabel di Amazon DynamoDB.

**Pos pemeriksaan**  
Pos pemeriksaan adalah catatan posisi dalam pecahan di mana prosesor rekaman telah berhasil memproses catatan. Checkpointing memungkinkan aplikasi Anda untuk melanjutkan pemrosesan dari tempat yang ditinggalkannya jika pekerja gagal.

Dengan adaptor Kinesis Amazon Keyspaces, Anda dapat mulai mengembangkan antarmuka KCL, dengan panggilan API diarahkan secara mulus ke titik akhir aliran Amazon Keyspaces. Untuk daftar titik akhir yang tersedia, lihat[Cara mengakses titik akhir aliran CDC di Amazon Keyspaces](CDC_access-endpoints.md).

Saat aplikasi Anda dimulai, aplikasi akan memanggil KCL untuk membuat instance pekerja. Anda harus memberi pekerja informasi konfigurasi untuk aplikasi, seperti deskriptor aliran dan AWS kredensil, dan nama kelas prosesor rekaman yang Anda berikan. Saat menjalankan kode di pemroses rekaman, pekerja melakukan tugas-tugas berikut:
+ Menghubungkan ke aliran
+ Menghitung pecahan dalam aliran
+ Mengkoordinasikan asosiasi serpihan dengan pekerja lain (jika ada)
+ Membuat instance pemroses rekaman untuk setiap pecahan yang dikelolanya
+ Menarik catatan dari aliran
+ Mendorong rekaman ke pemroses rekaman yang sesuai
+ Catatan yang diproses di pos pemeriksaan
+ Menyeimbangkan asosiasi pekerja pecahan ketika jumlah instans pekerja berubah
+ Menyeimbangkan asosiasi pekerja pecahan saat pecahan dipisahkan

# Menerapkan aplikasi konsumen KCL untuk aliran CDC Amazon Keyspaces
<a name="cdc-kcl-implementation"></a>

Topik ini memberikan step-by-step panduan untuk menerapkan aplikasi konsumen KCL untuk memproses aliran CDC Amazon Keyspaces.

1. Prasyarat: Sebelum Anda mulai, pastikan Anda memiliki:
   + Tabel Amazon Keyspaces dengan aliran CDC
   + Izin IAM yang diperlukan untuk prinsipal IAM untuk mengakses aliran CDC Amazon Keyspaces, membuat dan mengakses tabel DynamoDB untuk pemrosesan aliran KCL, dan izin untuk mempublikasikan metrik. CloudWatch Untuk informasi selengkapnya dan contoh kebijakan, lihat[Izin untuk memproses stream CDC Amazon Keyspaces dengan Kinesis Client Library (KCL)](configure-cdc-permissions.md#cdc-permissions-kcl).
   + Pastikan AWS kredenal yang valid diatur dalam konfigurasi lokal Anda. Untuk informasi selengkapnya, lihat [Simpan kunci akses untuk akses terprogram](aws.credentials.manage.md).
   + Java Development Kit (JDK) 8 atau lebih baru
   + Persyaratan yang tercantum dalam [Readme](https://github.com/aws/keyspaces-streams-kinesis-adapter) di Github.

1. <a name="cdc-kcl-add-dependencies"></a>Pada langkah ini, Anda menambahkan dependensi KCL ke proyek Anda. Untuk Maven, tambahkan yang berikut ini ke pom.xml Anda:

   ```
   <dependencies>
           <dependency>
               <groupId>software.amazon.kinesis</groupId>
               <artifactId>amazon-kinesis-client</artifactId>
               <version>3.1.0</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.keyspaces</groupId>
               <artifactId>keyspaces-streams-kinesis-adapter</artifactId>
               <version>1.0.0</version>
           </dependency>
       </dependencies>
   ```
**catatan**  
Selalu periksa versi terbaru KCL di repositori [KCL GitHub ](https://github.com/awslabs/amazon-kinesis-client).

1. <a name="cdc-kcl-factory"></a>Buat kelas pabrik yang menghasilkan instance prosesor rekaman:

   ```
   import software.amazon.awssdk.services.keyspacesstreams.model.Record;
   import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord;
   import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput;
   import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
   import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
   import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
   import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
   
   public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor {
       private String shardId;
   
       @Override
       public void initialize(InitializationInput initializationInput) {
           this.shardId = initializationInput.shardId();
           System.out.println("Initializing record processor for shard: " + shardId);
       }
   
       @Override
       public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) {
           try {
               for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) {
                   Record keyspacesRecord = record.getRecord();
                   System.out.println("Received record: " + keyspacesRecord);
               }
   
               if (!processRecordsInput.records().isEmpty()) {
                   RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
                   try {
                       checkpointer.checkpoint();
                       System.out.println("Checkpoint successful for shard: " + shardId);
                   } catch (Exception e) {
                       System.out.println("Error while checkpointing for shard: " + shardId + " " + e);
                   }
               }
           } catch (Exception e) {
               System.out.println("Error processing records for shard: " + shardId + " " + e);
           }
       }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
           System.out.println("Lease lost for shard: " + shardId);
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           System.out.println("Shard ended: " + shardId);
           try {
               // This is required. Checkpoint at the end of the shard
               shardEndedInput.checkpointer().checkpoint();
               System.out.println("Final checkpoint successful for shard: " + shardId);
           } catch (Exception e) {
               System.out.println("Error while final checkpointing for shard: " + shardId + " " + e);
               throw new RuntimeException("Error while final checkpointing", e);
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           System.out.println("Shutdown requested for shard " + shardId);
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (Exception e) {
               System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-factory"></a>Buat pabrik rekaman seperti yang ditunjukkan pada contoh berikut.

   ```
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   import java.util.Queue;
   import java.util.concurrent.ConcurrentLinkedQueue;
   
   public class RecordProcessorFactory implements ShardRecordProcessorFactory {
       private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>();
   
       @Override
       public ShardRecordProcessor shardRecordProcessor() {
           System.out.println("Creating new RecordProcessor");
           RecordProcessor processor = new RecordProcessor();
           processors.add(processor);
           return processor;
       }
   }
   ```

1. <a name="cdc-kcl-consumer"></a>Pada langkah ini Anda membuat kelas dasar untuk dikonfigurasi KCLv3 dan adaptor Amazon Keyspaces.

   ```
   import com.example.KCLExample.utils.RecordProcessorFactory;
   import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient;
   import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory;
   import java.util.Arrays;
   import java.util.List;
   import java.util.concurrent.ExecutionException;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
   import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   import software.amazon.kinesis.coordinator.CoordinatorConfig;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.leases.LeaseManagementConfig;
   import software.amazon.kinesis.processor.ProcessorConfig;
   import software.amazon.kinesis.processor.StreamTracker;
   import software.amazon.kinesis.retrieval.polling.PollingConfig;
   
   public class KCLTestBase {
   
       protected KeyspacesStreamsClient streamsClient;
       protected KinesisAsyncClient adapterClient;
       protected DynamoDbAsyncClient dynamoDbAsyncClient;
       protected CloudWatchAsyncClient cloudWatchClient;
       protected Region region;
       protected RecordProcessorFactory recordProcessorFactory;
       protected Scheduler scheduler;
       protected Thread schedulerThread;
   
       public void baseSetUp() {
           recordProcessorFactory = new RecordProcessorFactory();
           setupKCLBase();
       }
   
       protected void setupKCLBase() {
           region = Region.US_EAST_1;
   
           streamsClient = KeyspacesStreamsClient.builder()
                   .region(region)
                   .build();
           adapterClient = new AmazonKeyspacesStreamsAdapterClient(
                   streamsClient,
                   region);
           dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                   .region(region)
                   .build();
           cloudWatchClient = CloudWatchAsyncClient.builder()
                   .region(region)
                   .build();
       }
   
       protected void startScheduler(Scheduler scheduler) {
           this.scheduler = scheduler;
           schedulerThread = new Thread(() -> scheduler.run());
           schedulerThread.start();
       }
   
       protected void shutdownScheduler() {
           if (scheduler != null) {
               scheduler.shutdown();
               try {
                   schedulerThread.join(30000);
               } catch (InterruptedException e) {
                   System.out.println("Error while shutting down scheduler " + e);
               }
           }
       }
   
       protected Scheduler createScheduler(String streamArn, String leaseTableName) {
           String workerId = "worker-" + System.currentTimeMillis();
   
           // Create ConfigsBuilder
           ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName);
   
           // Configure retrieval config for polling
           PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient);
   
           // Create the Scheduler
           return StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   configsBuilder.coordinatorConfig(),
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig),
                   streamsClient,
                   region
           );
       }
   
       private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) {
           ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamArn,
                   leaseTableName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchClient,
                   workerId,
                   recordProcessorFactory);
   
           configureCoordinator(configsBuilder.coordinatorConfig());
           configureLeaseManagement(configsBuilder.leaseManagementConfig());
           configureProcessor(configsBuilder.processorConfig());
           configureStreamTracker(configsBuilder, streamArn);
   
           return configsBuilder;
       }
   
       private void configureCoordinator(CoordinatorConfig config) {
           config.skipShardSyncAtWorkerInitializationIfLeasesExist(true)
                   .parentShardPollIntervalMillis(1000)
                   .shardConsumerDispatchPollIntervalMillis(500);
       }
   
       private void configureLeaseManagement(LeaseManagementConfig config) {
           config.shardSyncIntervalMillis(0)
                   .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0)
                   .leasesRecoveryAuditorExecutionFrequencyMillis(5000)
                   .leaseAssignmentIntervalMillis(1000L);
       }
   
       private void configureProcessor(ProcessorConfig config) {
           config.callProcessRecordsEvenForEmptyRecordList(true);
       }
   
       private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) {
           StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
                   streamArn,
                   InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
           );
           configsBuilder.streamTracker(streamTracker);
       }
   
       public void deleteAllDdbTables(String baseTableName) {
           List<String> tablesToDelete = Arrays.asList(
                   baseTableName,
                   baseTableName + "-CoordinatorState",
                   baseTableName + "-WorkerMetricStats"
           );
   
           for (String tableName : tablesToDelete) {
               deleteTable(tableName);
           }
       }
   
       private void deleteTable(String tableName) {
           DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                   .tableName(tableName)
                   .build();
   
           try {
               DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get();
               System.out.println("Table deletion response " + response);
           } catch (InterruptedException | ExecutionException e) {
               System.out.println("Error deleting table: " + tableName + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-processor"></a>Pada langkah ini Anda menerapkan kelas prosesor rekaman untuk aplikasi Anda untuk mulai memproses peristiwa perubahan.

   ```
    import software.amazon.kinesis.coordinator.Scheduler;
   
   public class KCLTest {
   
       private static final int APP_RUNTIME_SECONDS = 1800;
       private static final int SLEEP_INTERNAL_MS = 60*1000;
   
       public static void main(String[] args) {
           KCLTestBase kclTestBase;
   
           kclTestBase = new KCLTestBase();
           kclTestBase.baseSetUp();
   
           // Create and start scheduler
           String leaseTableName = generateUniqueApplicationName();
   
           // Update below to your Stream ARN
           String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529";
           Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName);
           kclTestBase.startScheduler(scheduler);
   
           // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this
           // example we will shut it down after APP_RUNTIME_SECONDS
           long startTime = System.currentTimeMillis();
           long endTime = startTime + (APP_RUNTIME_SECONDS * 1000);
           while (System.currentTimeMillis() < endTime) {
               try {
                   // Print and sleep every minute
                   Thread.sleep(SLEEP_INTERNAL_MS);
                   System.out.println("Application is running");
               } catch (InterruptedException e) {
                   System.out.println("Interrupted while waiting for records");
                   Thread.currentThread().interrupt();
                   break;
               }
           }
   
           // Stop the scheduler
           kclTestBase.shutdownScheduler();
           kclTestBase.deleteAllDdbTables(leaseTableName);
       }
   
       public static String generateUniqueApplicationName() {
           String timestamp = String.valueOf(System.currentTimeMillis());
           String randomString = java.util.UUID.randomUUID().toString().substring(0, 8);
           return String.format("KCL-App-%s-%s", timestamp, randomString);
       }
   }
   ```

## Praktik terbaik
<a name="cdc-kcl-best-practices"></a>

Ikuti praktik terbaik ini saat menggunakan KCL dengan aliran CDC Amazon Keyspaces:

**Penanganan kesalahan**  
Terapkan penanganan kesalahan yang kuat di prosesor rekaman Anda untuk menangani pengecualian dengan anggun. Pertimbangkan untuk menerapkan logika coba lagi untuk kegagalan sementara.

**Frekuensi pos pemeriksaan**  
Seimbangkan frekuensi checkpointing untuk meminimalkan pemrosesan duplikat sekaligus memastikan pelacakan kemajuan yang wajar. Pemeriksaan yang terlalu sering dapat memengaruhi kinerja, sementara pos pemeriksaan yang terlalu jarang dapat menyebabkan pemrosesan ulang lebih banyak jika pekerja gagal.

**Penskalaan pekerja**  
Skala jumlah pekerja berdasarkan jumlah pecahan di aliran CDC Anda. Titik awal yang baik adalah memiliki satu pekerja per pecahan, tetapi Anda mungkin perlu menyesuaikan berdasarkan persyaratan pemrosesan Anda.

**Pemantauan**  
Gunakan CloudWatch metrik yang disediakan oleh KCL untuk memantau kesehatan dan kinerja aplikasi konsumen Anda. Metrik utama termasuk latensi pemrosesan, usia pos pemeriksaan, dan jumlah sewa.

**Pengujian**  
Uji aplikasi konsumen Anda secara menyeluruh, termasuk skenario seperti kegagalan pekerja, resharding aliran, dan berbagai kondisi beban.

## Menggunakan KCL dengan bahasa non-Java
<a name="cdc-kcl-non-java"></a>

Sementara KCL terutama perpustakaan Java, Anda dapat menggunakannya dengan bahasa pemrograman lain melalui. MultiLangDaemon MultiLangDaemon Ini adalah daemon berbasis Java yang mengelola interaksi antara prosesor rekaman non-Java Anda dan KCL.

KCL menyediakan dukungan untuk bahasa-bahasa berikut:
+ Python
+ Ruby
+ Node.js
+ .NET

Untuk informasi selengkapnya tentang penggunaan KCL dengan bahasa non-Java, lihat dokumentasi [ MultiLangDaemon KCL](https://github.com/awslabs/amazon-kinesis-client/tree/master/amazon-kinesis-client-multilang).

## Pemecahan masalah
<a name="cdc-kcl-troubleshooting"></a>

Bagian ini memberikan solusi untuk masalah umum yang mungkin Anda temui saat menggunakan KCL dengan aliran CDC Amazon Keyspaces.

**Pemrosesan lambat**  
Jika aplikasi konsumen Anda memproses catatan secara perlahan, pertimbangkan:  
+ Meningkatkan jumlah instans pekerja
+ Mengoptimalkan logika pemrosesan rekaman Anda
+ Memeriksa kemacetan dalam sistem hilir

**Pemrosesan duplikat**  
Jika Anda melihat pemrosesan duplikat catatan, periksa logika pos pemeriksaan Anda. Pastikan Anda melakukan checkpointing setelah berhasil memproses catatan.

**Kegagalan pekerja**  
Jika pekerja sering gagal, periksa:  
+ Kendala sumber daya (CPU, memori)
+ Masalah konektivitas jaringan
+ Masalah izin

**Masalah tabel sewa**  
Jika Anda mengalami masalah dengan tabel sewa KCL:  
+ Periksa apakah aplikasi Anda memiliki izin yang sesuai untuk mengakses tabel Amazon Keyspaces
+ Verifikasi bahwa tabel memiliki throughput yang disediakan yang memadai