

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Keyspaces での変更データキャプチャ (CDC) ストリームの使用
<a name="cdc"></a>

Amazon Keyspaces 変更データキャプチャ (CDC) は、Amazon Keyspaces テーブルからの行レベルの変更イベントをほぼリアルタイムで記録します。

Amazon Keyspaces CDC を使用すると、産業用 IoT や不正検出などのイベント駆動型のユースケースや、全文検索やデータアーカイブなどのデータ処理のユースケースが可能になります。Amazon Keyspaces CDC がストリームでキャプチャする変更イベントは、データ分析、テキスト検索、ML トレーニング/推論、アーカイブのための継続的なデータバックアップなどのビジネスクリティカルな機能を実行するダウンストリームアプリケーションで消費できます。例えば、ストリームデータを Amazon OpenSearch Service、Amazon Redshift、Amazon S3 などの AWS 分析およびストレージサービスに転送して、さらに処理できます。

Amazon Keyspaces CDC は、テーブルの時系列および重複排除された変更レコードを提供し、データスループットと保持時間の自動スケーリングは最大 24 時間です。

Amazon Keyspaces CDC ストリームは完全にサーバーレスであり、変更イベントをキャプチャするためにデータインフラストラクチャを管理する必要はありません。さらに、Amazon Keyspaces CDC はコンピューティングまたはストレージのテーブル容量を消費しません。詳細については、「[Amazon Keyspaces での変更データキャプチャ (CDC) ストリームの仕組み](cdc_how-it-works.md)」を参照してください。

Amazon Keyspaces Streams API を使用して、Amazon Keyspaces CDC ストリームを消費し、コンテンツに基づいてアクションを実行するアプリケーションを構築できます。使用可能なエンドポイントについては、「」を参照してください[Amazon Keyspaces で CDC ストリームエンドポイントにアクセスする方法](CDC_access-endpoints.md)。

Streams API で Amazon Keyspaces で使用できるすべてのオペレーションの完全なリストについては、[https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html)」を参照してください。

**Topics**
+ [Amazon Keyspaces での変更データキャプチャ (CDC) ストリームの仕組み](cdc_how-it-works.md)
+ [Amazon Keyspaces で変更データキャプチャ (CDC) ストリームを使用する方法](cdc_how-to-use.md)

# Amazon Keyspaces での変更データキャプチャ (CDC) ストリームの仕組み
<a name="cdc_how-it-works"></a>

このセクションでは、Amazon Keyspaces での変更データキャプチャ (CDC) ストリームの仕組みの概要を説明します。

Amazon Keyspaces 変更データキャプチャ (CDC) は、行レベルの変更の順序付けられたシーケンスを Amazon Keyspaces テーブルに記録し、この情報を*ストリーム*と呼ばれるログに最大 24 時間保存します。行レベルを変更するたびに、プライマリキー列情報と、すべての列を含む行の「前」状態と「後」状態を保持する新しい CDC レコードが生成されます。アプリケーションはストリームにアクセスし、ミューテーションをほぼリアルタイムで表示できます。

テーブルで CDC を有効にすると、Amazon Keyspaces は新しい CDC ストリームを作成し、テーブル内のすべての変更に関する情報をキャプチャし始めます。CDC ストリームには、次の形式の Amazon リソースネーム (ARN) があります。

```
arn:${Partition}:cassandra:{Region}:${Account}:/keyspace/${keyspaceName}/table/${tableName}/stream/${streamLabel}
```

最初に CDC ストリームを有効にするときに、CDC ストリームが各レコードに対して収集する情報のタイプまたは*ビュータイプ*を選択できます。その後、ストリームのビュータイプを変更することはできません。Amazon Keyspaces では、次のビュータイプがサポートされています。
+ `NEW_AND_OLD_IMAGES` – ミューテーションの前と後に行のバージョンをキャプチャします。これがデフォルトです。
+ `NEW_IMAGE` – ミューテーション後の行のバージョンをキャプチャします。
+ `OLD_IMAGE` – ミューテーションの前に行のバージョンをキャプチャします。
+ `KEYS_ONLY` – 変更された行のパーティションキーとクラスタリングキーをキャプチャします。

すべての CDC ストリームはレコードで構成されます。各レコードは、Amazon Keyspaces テーブルの 1 行の変更を表します。レコードは*、シャードと呼ばれるグループに論理的に整理されます*。これらのグループは、プライマリキーの範囲 (パーティションキーとクラスタリングキーの範囲の組み合わせ) によって論理的に整理され、Amazon Keyspaces の内部構造です。各シャードは複数のレコードのコンテナとして機能し、これらのレコードにアクセスして反復するために必要な情報が含まれています。

![\[Amazon Keyspaces CDC ストリームは、行ミューテーションの集合の CDC レコードを表すシャードで構成されます。\]](http://docs.aws.amazon.com/ja_jp/keyspaces/latest/devguide/images/keyspaces_cdc.png)


各 CDC レコードには、シャード内でレコードが発行された順序を反映したシーケンス番号が割り当てられます。シーケンス番号は、各シャード内で増加し、一意であることが保証されます。

Amazon Keyspaces はシャードを自動的に作成および削除します。トラフィックの負荷に基づいて、Amazon Keyspaces は時間の経過とともにシャードを分割またはマージすることもできます。例えば、Amazon Keyspaces は 1 つのシャードを複数の新しいシャードに分割したり、シャードを新しい 1 つのシャードにマージしたりできます。Amazon Keyspaces APIsシャードと CDC ストリーム情報を公開し、シャードの系統グラフ全体にアクセスして、消費するアプリケーションが正しい順序でレコードを処理できるようにします。

Amazon Keyspaces CDC は、アプリケーションを構築するときに信頼できる以下の原則に基づいています。
+ 各行レベルのミューテーションレコードは、CDC ストリームに 1 回だけ表示されます。
+ シャードを系統順に使用すると、各行レベルのミューテーションレコードは、プライマリキーの実際のミューテーション順序と同じ順序で表示されます。

**Topics**
+ [データ保持](#CDC_how-it-works-data-retention)
+ [TTL データの有効期限](#CDC_how-it-works-ttl)
+ [バッチオペレーション](#CDC_how-it-works-batch-operations)
+ [静的列](#CDC_how-it-works-static)
+ [保管中の暗号化](#CDC_how-it-works-encryption)
+ [マルチリージョンレプリケーション](#CDC_how-it-works-mrr)
+ [AWS サービスとの統合](#howitworks_integration)

## Amazon Keyspaces の CDC ストリームのデータ保持の仕組み
<a name="CDC_how-it-works-data-retention"></a>

Amazon Keyspaces は、CDC ストリームのレコードを 24 時間保持します。保持期間を変更することはできません。テーブルで CDC を無効にすると、ストリーム内のデータは 24 時間読み取られます。この時間が経過すると、データの有効期限が切れ、レコードは自動的に削除されます。

## Amazon Keyspaces の CDC ストリームで有効期限 (TTL) データの有効期限がどのように機能するか
<a name="CDC_how-it-works-ttl"></a>

Amazon Keyspaces は、CDC 変更レコード`expirationTime`の というメタデータフィールドで、列/セルレベルでの有効期限と行レベルを表示します。Amazon Keyspaces TTL がセルの有効期限を検出すると、CDC は TTL を変更のオリジンとして表示する新しい変更レコードを作成します。TTL の詳細については、「[Amazon Keyspaces (Apache Cassandra 向け) で有効期限 (TTL) を使用してデータを期限切れにする](TTL.md)」を参照してください。

## Amazon Keyspaces での CDC ストリームのバッチオペレーションの仕組み
<a name="CDC_how-it-works-batch-operations"></a>

バッチオペレーションは、内部的に個々の行レベルの変更に分割されます。Amazon Keyspaces は、変更がバッチオペレーションで発生した場合でも、CDC ストリーム内のすべてのレコードを行レベルで保持します。Amazon Keyspaces は、CDC ストリーム内のレコードの順序を、行レベルまたはプライマリキーで発生したミューテーション順序と同じ順序で維持します。

## Amazon Keyspaces の CDC ストリームでの静的列の仕組み
<a name="CDC_how-it-works-static"></a>

静的列値は、Cassandra のパーティション内のすべての行間で共有されます。この動作により、Amazon Keyspaces は静的列への更新を CDC ストリーム内の別のレコードとしてキャプチャします。次の例は、静的列ミューテーションの動作をまとめたものです。
+ 静的列のみが更新されると、CDC ストリームには、静的列の行変更が行内の唯一の列として含まれます。
+ 静的列を変更せずに行が更新されると、CDC ストリームには静的列を除くすべての列を含む行変更が含まれます。
+ 行が静的列とともに更新されると、CDC ストリームには 2 つの個別の行変更が含まれます。1 つは静的列用、もう 1 つは残りの行用です。

## Amazon Keyspaces の CDC ストリームの保管時の暗号化の仕組み
<a name="CDC_how-it-works-encryption"></a>

CDC 順序付きログに保管中のデータを暗号化するために、Amazon Keyspaces はテーブルに既に使用されているのと同じ暗号化キーを使用します。保管時の暗号化の詳細については、「[Amazon Keyspaces の保管データ暗号化](EncryptionAtRest.md)」を参照してください。

## Amazon Keyspaces の CDC ストリームでのマルチリージョンレプリケーションの仕組み
<a name="CDC_how-it-works-mrr"></a>

`update-table` API または CQL コマンドを使用して、マルチリージョンテーブルの個々のレプリカの CDC `ALTER TABLE` ストリームを有効または無効にできます。非同期レプリケーションと競合の解決により、マルチリージョンテーブルの CDC ストリームは全体で一貫性がありません AWS リージョン。したがって、Amazon Keyspaces がストリームでキャプチャするレコードは、異なるリージョンで異なる順序で表示される可能性があります。

マルチリージョンレプリケーションの詳細については、「」を参照してください[Amazon Keyspaces のマルチリージョンレプリケーション (Apache Cassandra 用）](multiRegion-replication.md)。

## CDC ストリームと AWS サービスとの統合
<a name="howitworks_integration"></a>

### Amazon Keyspaces で CDC ストリームの VPC エンドポイントを操作する方法
<a name="CDC_how-it-works-vpc"></a>

VPC エンドポイントを使用して Amazon Keyspaces CDC ストリームにアクセスできます。ストリームの VPC エンドポイントを作成してアクセスする方法については、「」を参照してください[インターフェイス VPC エンドポイントでの Amazon Keyspaces CDC ストリームの使用](vpc-endpoints-streams.md)。

### Amazon Keyspaces の CDC ストリームに対する CloudWatch によるモニタリングの仕組み
<a name="CDC_how-it-works-monitoring"></a>

Amazon CloudWatch を使用して、Amazon Keyspaces CDC エンドポイントに対して行われた API コールをモニタリングできます。使用可能なメトリクスの詳細については、「」を参照してください[Amazon Keyspaces 変更データキャプチャ (CDC) のメトリクス](metrics-dimensions.md#keyspaces-cdc-metrics)。

### Amazon Keyspaces の CDC ストリームでの CloudTrail を使用したログ記録の仕組み
<a name="CDC_how-it-works-logging"></a>

Amazon Keyspaces CDC は AWS CloudTrail、Amazon Keyspaces のユーザー、ロール、または のサービスによって実行されたアクションを記録する AWS サービスである と統合されています。CloudTrail では Amazon Keyspaces のデータ定義言語 (DDL) API コールとデータ操作言語 (DML) API コールがイベントとしてキャプチャされます。キャプチャされたコールには、Amazon Keyspaces コンソールからのコールと、Amazon Keyspaces API オペレーションへのプログラムによるコールが含まれます。

CloudTrail でキャプチャされた CDC イベントの詳細については、「」を参照してください[を使用した Amazon Keyspaces API コールのログ記録 AWS CloudTrail](logging-using-cloudtrail.md)。

### Amazon Keyspaces の CDC ストリームでのタグ付けの仕組み
<a name="CDC_how-it-works-tagging"></a>

Amazon Keyspaces CDC ストリームはタグ付け可能なリソースです。CQL、 AWS SDK、または を使用してプログラムでテーブルを作成するときに、ストリームにタグを付けることができます AWS CLI。既存のストリームにタグ付けしたり、タグを削除したり、ストリームのタグを表示したりすることもできます。詳細については、「[Amazon Keyspaces のキースペース、テーブル、ストリームにタグを付ける](Tagging.Operations.md)」を参照してください。

# Amazon Keyspaces で変更データキャプチャ (CDC) ストリームを使用する方法
<a name="cdc_how-to-use"></a>

**Topics**
+ [アクセス許可の設定](configure-cdc-permissions.md)
+ [CDC ストリームエンドポイントにアクセスする](CDC_access-endpoints.md)
+ [新しいテーブルの CDC ストリームを有効にする](keyspaces-enable-cdc-new-table.md)
+ [既存のテーブルの CDC ストリームを有効にする](keyspaces-enable-cdc-alter-table.md)
+ [CDC ストリームを無効にする](keyspaces-delete-cdc.md)
+ [CDC ストリームを表示する](keyspaces-view-cdc.md)
+ [CDC ストリームにアクセスする](keyspaces-records-cdc.md)
+ [ストリームの処理に KCL を使用する](cdc_how-to-use-kcl.md)

# Amazon Keyspaces で CDC ストリームを操作するアクセス許可を設定する
<a name="configure-cdc-permissions"></a>

CDC ストリームを有効にするには、IAM ユーザーやロールなどのプリンシパルに次のアクセス許可が必要です。

詳細については AWS Identity and Access Management、「」を参照してください[AWS Identity and Access Management Amazon Keyspaces 用](security-iam.md)。

## テーブルの CDC ストリームを有効にするアクセス許可
<a name="cdc-permissions-enable"></a>

Amazon Keyspaces テーブルの CDC ストリームを有効にするには、プリンシパルにまずテーブルを作成または変更するためのアクセス許可が必要であり、次にサービスにリンクされたロール [AWSServiceRoleForAmazonKeyspacesCDC](using-service-linked-roles-CDC-streams.md#service-linked-role-permissions-CDC-streams) を作成するためのアクセス許可が必要です。Amazon Keyspaces は、サービスにリンクされたロールを使用して、ユーザーに代わって CloudWatch メトリクスをアカウントに発行します。

次の IAM ポリシーがその例です。

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Effect":"Allow",
            "Action":[
                "cassandra:Create",
                "cassandra:CreateMultiRegionResource",
                "cassandra:Alter",
                "cassandra:AlterMultiRegionResource"
            ],
            "Resource":[
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/*",
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
            ]
        },
        {
            "Sid": "KeyspacesCDCServiceLinkedRole",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/cassandra-streams.amazonaws.com/AWSServiceRoleForAmazonKeyspacesCDC",
            "Condition": {
              "StringLike": {
                "iam:AWSServiceName": "cassandra-streams.amazonaws.com"
              }
            }
        }
    ]
}
```

ストリームを無効にするには、`ALTER TABLE`アクセス許可のみが必要です。

## CDC ストリームを表示するアクセス許可
<a name="cdc-permissions-view"></a>

CDC ストリームを表示または一覧表示するには、プリンシパルにシステムキースペースの読み取りアクセス許可が必要です。詳細については、「[`system_schema_mcs`](working-with-keyspaces.md#keyspace_system_schema_mcs)」を参照してください。

次の IAM ポリシーがその例です。

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":"cassandra:Select",
         "Resource":[
             "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
         ]
      }
   ]
}
```

または Amazon Keyspaces API を使用して CDC ストリームを表示 AWS CLI または一覧表示するには、プリンシパルにアクション `cassandra:ListStreams`および に対する追加のアクセス許可が必要です`cassandra:GetStream`。

次の IAM ポリシーがその例です。

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cassandra:Select",
        "cassandra:ListStreams",
        "cassandra:GetStream"
      ],
      "Resource": "*"
    }
  ]
}
```

## CDC ストリームを読み取るアクセス許可
<a name="cdc-permissions-read"></a>

CDC ストリームを読み取るには、プリンシパルに次のアクセス許可が必要です。

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      }
   ]
}
```

## Kinesis Client Library (KCL) を使用して Amazon Keyspaces CDC ストリームを処理するアクセス許可
<a name="cdc-permissions-kcl"></a>

KCL で Amazon Keyspaces CDC ストリームを処理するには、IAM プリンシパルに次のアクセス許可が必要です。
+ `Amazon Keyspaces` – 指定された Amazon Keyspaces CDC ストリームへの読み取り専用アクセス。
+ `DynamoDB` – KCL ストリーム処理に必要な`shard lease`テーブルの作成、テーブルへの読み取りおよび書き込みアクセス、インデックスへの読み取りアクセスを行うためのアクセス許可。
+ `CloudWatch` – KCL を使用した Amazon Keyspaces CDC ストリーム処理から CloudWatch アカウントの KCL クライアントアプリケーションの名前空間にメトリクスデータを発行するアクセス許可。モニタリングの詳細については、[Amazon CloudWatch で Kinesis Client Library をモニタリングする](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html)」を参照してください。

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:UpdateTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-WorkerMetricStats",
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-CoordinatorState"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:Query"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME/index/*"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "cloudwatch:PutMetricData"
         ],
         "Resource":"*"
      }
   ]
}
```

# Amazon Keyspaces で CDC ストリームエンドポイントにアクセスする方法
<a name="CDC_access-endpoints"></a>

Amazon Keyspaces は、Amazon Keyspaces AWS リージョン が利用可能な各 で、キースペース/テーブルと CDC ストリーム用に個別の[エンドポイント](programmatic.endpoints.md#global_endpoints)を維持します。CDC ストリームにアクセスするには、次の例に示すように、テーブルのリージョンを選択し、エンドポイント名`cassandra-streams`で`cassandra`プレフィックスを に置き換えます。

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/keyspaces/latest/devguide/CDC_access-endpoints.html)

次の表に、使用可能なパブリックエンドポイントの完全なリストを示します Amazon Keyspaces change data capture streams。 は IPv4 と IPv6 の両方 Amazon Keyspaces CDC streams をサポートしています。などのすべてのパブリックエンドポイントは`cassandra-streams.us-east-1.api.aws`、IPv4 および IPv6 用に設定できるデュアルスタックエンドポイントです。

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/keyspaces/latest/devguide/CDC_access-endpoints.html)

# Amazon Keyspaces で新しいテーブルを作成するときに CDC ストリームを有効にする
<a name="keyspaces-enable-cdc-new-table"></a>

テーブルの作成時に CDC ストリームを有効にするには、CQL で `CREATE TABLE`ステートメントを使用するか、 で `create-table` コマンドを使用します AWS CLI。

テーブル内の変更された行ごとに、Amazon Keyspaces は、`cdc_specification`選択した `view_type`の に基づいて次の変更をキャプチャできます。
+ `NEW_AND_OLD_IMAGES` – 変更前と変更後の両方のバージョンの行。これがデフォルトです。
+ `NEW_IMAGE` – 変更後の行のバージョン。
+ `OLD_IMAGE` – 変更前の行のバージョン。
+ `KEYS_ONLY` – 変更された行のパーティションキーとクラスタリングキー。

ストリームにタグを付ける方法については、「」を参照してください[テーブルの作成時に新しいストリームにタグを追加する](Tagging.Operations.new.table.stream.md)。

**注記**  
Amazon Keyspaces CDC には、ユーザーに代わって Amazon Keyspaces CDC ストリームから CloudWatch アカウントの `"cloudwatch:namespace": "AWS/Cassandra"` にメトリクスデータを発行するサービスにリンクされたロール (`AWSServiceRoleForAmazonKeyspacesCDC`) が必要です。このロールは自動的に作成されます。詳細については、「[Amazon Keyspaces CDC ストリームのロールの使用](using-service-linked-roles-CDC-streams.md)」を参照してください。

------
#### [ Cassandra Query Language (CQL) ]

**CQL でテーブルを作成するときに CDC ストリームを有効にする**

1. 

   ```
   CREATE TABLE mykeyspace.mytable (a text, b text, PRIMARY KEY(a)) 
   WITH CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'NEW_IMAGE'}} AND CDC = TRUE;
   ```

1. ストリーム設定を確認するには、次のステートメントを使用します。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   そのステートメントの出力は次のようになります。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |   mytable  | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741383893782', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T21:44:53.783', 'status': 'ENABLED', 'view_type': 'NEW_IMAGE'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}>
   ```

------
#### [ CLI ]

**でテーブルを作成するときに CDC ストリームを有効にする AWS CLI**

1. ストリームを作成するには、次の構文を使用できます。

   ```
   aws keyspaces create-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --schema-definition 'allColumns=[{name=a,type=text},{name=b,type=text}],partitionKeys=[{name=a}]' \
   --cdc-specification status=ENABLED,viewType=NEW_IMAGE
   ```

1. そのコマンドの出力は標準`create-table`レスポンスを示し、この例のようになります。

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------

# Amazon Keyspaces で既存のテーブルの CDC ストリームを有効にする
<a name="keyspaces-enable-cdc-alter-table"></a>

既存のテーブルの CDC ストリームを有効にするには、CQL で `ALTER TABLE`ステートメントを使用するか、 で `update-table` コマンドを使用するか AWS CLI、 コンソールを使用できます。

テーブル内の変更された行ごとに、Amazon Keyspaces は、`cdc_specification`選択した `view_type`の に基づいて次の変更をキャプチャできます。
+ `NEW_AND_OLD_IMAGES` – 変更前と変更後の両方のバージョンの行。これがデフォルトです。
+ `NEW_IMAGE` – 変更後の行のバージョン。
+ `OLD_IMAGE` – 変更前の行のバージョン。
+ `KEYS_ONLY` – 変更された行のパーティションキーとクラスタリングキー。

ストリームにタグを付ける方法については、「」を参照してください[ストリームに新しいタグを追加する](Tagging.Operations.existing.stream.md)。

**注記**  
Amazon Keyspaces CDC には、ユーザーに代わって Amazon Keyspaces CDC ストリームから CloudWatch アカウントの `"cloudwatch:namespace": "AWS/Cassandra"` にメトリクスデータを発行するサービスにリンクされたロール (`AWSServiceRoleForAmazonKeyspacesCDC`) が必要です。このロールは自動的に作成されます。詳細については、「[Amazon Keyspaces CDC ストリームのロールの使用](using-service-linked-roles-CDC-streams.md)」を参照してください。

------
#### [ Cassandra Query Language (CQL) ]

**CQL でストリーム (CDC ストリーム) を有効にする**

を使用して`ALTER TABLE`、既存のテーブルのストリームを有効にできます。

1. 次の例では、変更された行のパーティションキーとクラスタリングキーの変更のみをキャプチャするストリームを作成します。

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = TRUE
   AND CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'KEYS_ONLY'}};
   ```

1. ストリーム設定を確認するには、次のステートメントを使用します。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   ステートメントの出力は次のようになります。

   ```
    keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |    mytable | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385897045', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T22:20:10.454', 'status': 'ENABLED', 'view_type': 'KEYS_ONLY'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**を使用して CDC ストリームを作成する AWS CLI**

1. 既存のテーブルのストリームを作成するには、次の構文を使用できます。

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=ENABLED,viewType=NEW_AND_OLD_IMAGES
   ```

1. そのコマンドの出力は標準`create-table`レスポンスを示し、この例のようになります。

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------
#### [ Console ]

**Amazon Keyspaces コンソールで CDC ストリームを有効にする**

1. にサインインし AWS マネジメントコンソール、[https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home) で Amazon Keyspaces コンソールを開きます。

1. ナビゲーションペインで、**テーブル**を選択し、リストからテーブルを選択します。

1. **ストリーム**タブを選択します。

1. **編集** を選択してストリームを有効にします。

1. **ストリームをオンにする** を選択します。

1. ストリームの表示**タイプ**を選択します。以下のオプションが利用できます。作成後にストリームのビュータイプを変更することはできません。
   + **新しいイメージと古いイメージ** – Amazon Keyspaces は、変更前と変更後の両方のバージョンの行をキャプチャします。これがデフォルトです。
   + **新しいイメージ** – Amazon Keyspaces は、変更後の行のバージョンのみをキャプチャします。
   + **古いイメージ** – Amazon Keyspaces は、変更前の行のバージョンのみをキャプチャします。
   + **プライマリキーのみ** – Amazon Keyspaces は、変更された行のパーティションとクラスタリングキー列のみをキャプチャします。

1. 終了するには、**変更の保存**を選択します。

------

# Amazon Keyspaces で CDC ストリームを無効にする
<a name="keyspaces-delete-cdc"></a>

キースペースで CDC ストリームを無効にするには、CQL で `ALTER TABLE`ステートメント、 で `update-table` コマンド AWS CLI、または コンソールを使用できます。

------
#### [ Cassandra Query Language (CQL) ]

**CQL でストリーム (CDC ストリーム) を無効にする**

1. ストリームを無効にするには、次のステートメントを使用できます。

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = FALSE;
   ```

1. ストリームが無効になっていることを確認するには、次のステートメントを使用できます。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   そのステートメントの出力は次のようになります。

   ```
    keyspace_name | table_name | cdc   | custom_properties
   ---------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      mykeyspace  |   mytable  | False | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385668642', 'throughput_mode': 'PAY_PER_REQUEST'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**でストリーム (CDC ストリーム) を無効にする AWS CLI**

1. ストリームを無効にするには、次のコマンドを使用します。

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=DISABLED
   ```

1. コマンドの出力は次の例のようになります。

   ```
   {
       "keyspaceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/",
       "streamName": "my_stream"
   }
   ```

------
#### [ Console ]

**Amazon Keyspaces コンソールでストリーム (CDC ストリーム) を無効にする**

1. にサインインし AWS マネジメントコンソール、[https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home) で Amazon Keyspaces コンソールを開きます。

1. ナビゲーションペインでテーブルを選択し****、リストからテーブルを選択します。

1. **ストリーム**タブを選択します。

1. **[編集]** を選択します。

1. 選択解除 **ストリームをオンにします**。

1. **変更を保存**を選択してストリームを無効にします。

------

# Amazon Keyspaces で CDC ストリームを表示する
<a name="keyspaces-view-cdc"></a>

キースペース内のすべてのストリームを表示または一覧表示するには、CQL `system_schema_mcs.streams`の ステートメントを使用してシステムキースペースのテーブルをクエリするか、 AWS CLIまたは コンソールで `get-stream`および `list-stream` コマンドを使用します。

必要なアクセス許可については、「[Amazon Keyspaces で CDC ストリームを操作するアクセス許可を設定する](configure-cdc-permissions.md)」を参照してください。

------
#### [ Cassandra Query Language (CQL) ]

**CQL で CDC ストリームを表示する**
+ テーブルの CDC ステータスをモニタリングするには、次のステートメントを使用できます。

  ```
  SELECT custom_properties
  FROM system_schema_mcs.tables 
  WHERE keyspace_name='my_keyspace' and table_name='my_table';
  ```

  コマンドの出力は次のようになります。

  ```
  ...
  custom_properties
  ----------------------------------------------------------------------------------
  {'cdc_specification':{'status': 'Enabled', 'view_type': 'NEW_IMAGE', 'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label''}}
  ...
  ```

------
#### [ CLI ]

**で CDC ストリームを表示する AWS CLI**

1. この例では、テーブルのストリーム情報を表示する方法を示します。

   ```
   aws keyspaces get-table \
   --keyspace-name 'my_keyspace' \
   --table-name 'my_table'
   ```

   コマンドの出力は次のようになります。

   ```
   {
       "keyspaceName": "my_keyspace",
       "tableName": "my_table",
       ... Other fields ...,
       "latestStreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label",
       "cdcSpecification": {
           "status": "ENABLED",
           "viewType": "NEW_AND_OLD_IMAGES"    
       }
   }
   ```

1. 指定した のアカウント内のすべてのストリームを一覧表示できます AWS リージョン。次のコマンドは、その一例です。

   ```
   aws keyspacesstreams list-streams --region us-east-1
   ```

   コマンドの出力は次のようになります。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"Create a keyspace with the name catalog. Note
                                   that streams are not supported in multi-Region keyspaces.
               "TableName": "t2",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_2/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_3"
               "TableName": "t1",
           }
       ]
   }
   ```

1. 次のパラメータを使用して、特定のキースペースの CDC ストリームを一覧表示することもできます。

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --region us-east-1
   ```

   コマンドの出力は次のようになります。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

1. 次のパラメータを使用して、特定のテーブルの CDC ストリームを一覧表示することもできます。

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --table-name t2 --region us-east-1
   ```

   コマンドの出力は次のようになります。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

------
#### [ Console ]

**Amazon Keyspaces コンソールで CDC ストリームを表示する**

1. にサインインし AWS マネジメントコンソール、[https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home) で Amazon Keyspaces コンソールを開きます。

1. ナビゲーションペインでテーブルを選択し****、リストからテーブルを選択します。

1. **ストリーム**タブを選択して、ストリームの詳細を確認します。

------

# Amazon Keyspaces の CDC ストリームのレコードにアクセスする
<a name="keyspaces-records-cdc"></a>

ストリーム内のレコードにアクセスするには、[Amazon Keyspaces Streams API ](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html)を使用します。次のセクションでは、 を使用してレコードにアクセスする方法の例を示します AWS CLI。

必要なアクセス許可については、「[Amazon Keyspaces で CDC ストリームを操作するアクセス許可を設定する](configure-cdc-permissions.md)」を参照してください。

**を使用してストリーム内のレコードにアクセスする AWS CLI**

1. Amazon Keyspaces Streams API を使用して、ストリームの変更レコードにアクセスできます。詳細については、[https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html)」を参照してください。ストリーム内のシャードを取得するには、次の例に示すように `get-stream` API を使用できます。

   ```
   aws keyspacesstreams get-stream \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL'
   ```

   次は出力の例です。

   ```
   {
      "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2023-05-11T21:21:33.291",
      "StreamStatus": "ENABLED",
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "CreationRequestDateTime": "<CREATION_TIME>",
      "KeyspaceName": "mykeyspace",
      "TableName": "mytable",
      "StreamLabel": "2023-05-11T21:21:33.291",
       "Shards": [
           {
               "SequenceNumberRange": {
                   "EndingSequenceNumber": "<END_SEQUENCE_NUMBER>",
                   "StartingSequenceNumber": "<START_SEQUENCE_NUMBER>"
               },
               "ShardId": "<SHARD_ID>"
           },
       ]
   }
   ```

1. ストリームからレコードを取得するには、まずレコードにアクセスするための開始点を提供するイテレーターを取得します。これを行うには、前のステップで API によって返された CDC ストリーム内のシャードを使用できます。イテレーターを収集するには、 `get-shard-iterator` API を使用できます。この例では、シャードの最後のトリミングポイントまたは先頭から が取得`TRIM_HORIZON`するタイプのイテレーターを使用します）。

   ```
   aws keyspacesstreams get-shard-iterator \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL' \
   --shard-id 'SHARD_ID' \
   --shard-iterator-type 'TRIM_HORIZON'
   ```

   コマンドの出力は次の例のようになります。

   ```
   {
       "ShardIterator": "<SHARD_ITERATOR>" 
   }
   ```

1. `get-records` API を使用して CDC レコードを取得するには、最後のステップで返されたイテレーターを使用できます。次のコマンドは、その一例です。

   ```
   aws keyspacesstreams get-records \
   --shard-iterator 'SHARD_ITERATOR' \
   --limit 100
   ```

# Kinesis Client Library (KCL) を使用して Amazon Keyspaces ストリームを処理する
<a name="cdc_how-to-use-kcl"></a>

このトピックでは、Kinesis Client Library (KCL) を使用して Amazon Keyspaces 変更データキャプチャ (CDC) ストリームのデータを使用および処理する方法について説明します。

Amazon Keyspaces Streams API を直接使用する代わりに、Kinesis Client Library (KCL) を使用すると、次のような多くの利点があります。
+ シャード系統の追跡とイテレーター処理が組み込まれています。
+ ワーカー間の自動負荷分散。
+ ワーカーの障害に対する耐障害性と復旧。
+ 処理の進行状況を追跡するためのチェックポイント。
+ ストリーム容量の変化への適応。
+ CDC レコードを処理するための簡素化された分散コンピューティング。

次のセクションでは、Kinesis Client Library (KCL) を使用してストリームを処理する理由と方法について説明します。また、KCL を使用して Amazon Keyspaces CDC ストリームを処理する例を示します。

料金については、「[Amazon Keyspaces (for Apache Cassandra) pricing (Amazon Keyspaces (Apache Cassandra 向け) の料金)](https://aws.amazon.com/keyspaces/pricing)」を参照してください。

## Kinesis Client Library (KCL) とは何ですか?
<a name="cdc-kcl-what-is"></a>

Kinesis Client Library (KCL) は、ストリームからのデータの消費と処理のプロセスを簡素化するために設計されたスタンドアロンの Java ソフトウェアライブラリです。KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理し、ストリームデータを処理する際にビジネスロジックの実装に集中できます。KCL は、複数のワーカー間の負荷分散、ワーカーの障害への対応、処理されたレコードのチェックポイント作成、ストリーム内のシャード数の変化への対応などのアクティビティを管理します。

Amazon Keyspaces CDC ストリームを処理するには、KCL にある設計パターンを使用してストリームシャードとストリームレコードを操作できます。KCL は、低レベルの Kinesis Data Streams API の上で役に立つ抽象化を提供することによりコーディングを簡素化します。KCL の詳細については、*Amazon Kinesis Data Streams デベロッパーガイド*」の[「KCL を使用したコンシューマーの開発](https://docs.aws.amazon.com/kinesis/latest/dev/develop-kcl-consumers.html)」を参照してください。

 KCL を使用してアプリケーションを記述するには、Amazon Keyspaces Streams Kinesis Adapter を使用します。Kinesis Adapter は Kinesis Data Streams インターフェイスを実装しているため、KCL を使用して Amazon Keyspaces ストリームからのレコードを消費および処理できます。Amazon Keyspaces ストリーム Kinesis Adapter をセットアップしてインストールする方法については、[GitHub](https://github.com/aws/keyspaces-streams-kinesis-adapter) リポジトリを参照してください。

次の図は、これらのライブラリが相互にどのように相互作用するかを示しています。

![\[Amazon Keyspaces CDC ストリームレコードを処理するときのクライアントアプリケーションと Kinesis Data Streams、KCL、Amazon Keyspaces Streams Kinesis Adapter、Amazon Keyspaces APIs とのやり取り。\]](http://docs.aws.amazon.com/ja_jp/keyspaces/latest/devguide/images/keyspaces-streams-kinesis-adapter.png)


KCL は、基盤となるライブラリの新しいバージョンの取り込みや、セキュリティ強化、バグ修正を行うために頻繁に更新されています。既知の問題を回避し、最新の改善点を活用するためにも、常に最新バージョンの KCL を使用することをお勧めします。最新の KCL バージョンを確認するには、[「KCL GitHub リポジトリ](https://github.com/awslabs/amazon-kinesis-client)」を参照してください。

## KCL の概念
<a name="cdc-kcl-concepts"></a>

KCL を使用してコンシューマーアプリケーションを実装する前に、次の概念を理解しておく必要があります。

**KCL コンシューマーアプリケーション**  
KCL コンシューマーアプリケーションは、Amazon Keyspaces CDC ストリームからのデータを処理するプログラムです。KCL は、コンシューマーアプリケーションコードと Amazon Keyspaces CDC ストリームの間の仲介として機能します。

**ワーカー**  
ワーカーは、Amazon Keyspaces CDC ストリームからのデータを処理する KCL コンシューマーアプリケーションの実行ユニットです。アプリケーションは、複数のインスタンスに分散された複数のワーカーを実行できます。

**レコードプロセッサ**  
レコードプロセッサは、Amazon Keyspaces CDC ストリームのシャードからのデータを処理するアプリケーション内のロジックです。レコードプロセッサは、管理するシャードごとにワーカーによってインスタンス化されます。

**リース**  
リースは、シャードの処理責任を表します。ワーカーはリースを使用して、どのワーカーがどのシャードを処理しているかを調整します。KCL は、Amazon DynamoDB のテーブルにリースデータを保存します。

**チェックポイント**  
チェックポイントは、レコードプロセッサがレコードを正常に処理したシャード内の位置のレコードです。チェックポイントを使用すると、ワーカーが失敗した場合にアプリケーションが中断した場所から処理を再開できます。

Amazon Keyspaces Kinesis アダプターを使用すると、KCL インターフェイスに対して開発を開始できます。API コールは Amazon Keyspaces ストリームエンドポイントにシームレスに誘導されます。利用可能なエンドポイントのリストについては、「[Amazon Keyspaces で CDC ストリームエンドポイントにアクセスする方法](CDC_access-endpoints.md)」を参照してください。

アプリケーションは起動時に KCL をコールしてワーカーをインスタンス化します。ストリーム記述子やAWS認証情報、指定したレコードプロセッサクラスの名前など、アプリケーションの設定情報をワーカーに提供する必要があります。レコードプロセッサでコードを実行すると、ワーカーは次のタスクを実行します。
+ ストリームに接続する
+ ストリーミング内のシャードを列挙します。
+ シャードと他のワーカー (存在する場合) の関連付けを調整する
+ レコードプロセッサで管理する各シャードのレコードプロセッサをインスタンス化する
+ ストリーミングからレコードを取得します。
+ 対応するレコードプロセッサにレコードを送信する
+ 処理されたレコードのチェックポイントを作成する
+ ワーカーのインスタンス数が変化したときに、シャードとワーカーの関連付けを調整する
+ シャードが分割されたときに、シャードとワーカーの関連付けを調整します。

# Amazon Keyspaces CDC ストリーム用の KCL コンシューマーアプリケーションの実装
<a name="cdc-kcl-implementation"></a>

このトピックでは、Amazon Keyspaces CDC ストリームを処理する KCL コンシューマーアプリケーションを実装するためのstep-by-stepガイドを提供します。

1. 前提条件: 開始する前に、以下を確認してください。
   + CDC ストリームを含む Amazon Keyspaces テーブル
   + IAM プリンシパルが Amazon Keyspaces CDC ストリームにアクセスし、KCL ストリーム処理用の DynamoDB テーブルを作成してアクセスするために必要な IAM アクセス許可、および CloudWatch にメトリクスを発行するためのアクセス許可。詳細とポリシーの例については、「」を参照してください[Kinesis Client Library (KCL) を使用して Amazon Keyspaces CDC ストリームを処理するアクセス許可](configure-cdc-permissions.md#cdc-permissions-kcl)。
   + 有効なAWS認証情報がローカル設定で設定されていることを確認します。詳細については、「[プログラムによるアクセス用のアクセスキーを保存する](aws.credentials.manage.md)」を参照してください。
   + Java Development Kit (JDK) 8 以降
   + Github の [Readme](https://github.com/aws/keyspaces-streams-kinesis-adapter) に記載されている要件。

1. <a name="cdc-kcl-add-dependencies"></a>このステップでは、KCL 依存関係をプロジェクトに追加します。Maven の場合は、pom.xml に以下を追加します。

   ```
   <dependencies>
           <dependency>
               <groupId>software.amazon.kinesis</groupId>
               <artifactId>amazon-kinesis-client</artifactId>
               <version>3.1.0</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.keyspaces</groupId>
               <artifactId>keyspaces-streams-kinesis-adapter</artifactId>
               <version>1.0.0</version>
           </dependency>
       </dependencies>
   ```
**注記**  
常に KCL の最新バージョンを [KCL GitHub リポジトリ](https://github.com/awslabs/amazon-kinesis-client)で確認してください。

1. <a name="cdc-kcl-factory"></a>レコードプロセッサインスタンスを生成するファクトリクラスを作成します。

   ```
   import software.amazon.awssdk.services.keyspacesstreams.model.Record;
   import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord;
   import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput;
   import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
   import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
   import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
   import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
   
   public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor {
       private String shardId;
   
       @Override
       public void initialize(InitializationInput initializationInput) {
           this.shardId = initializationInput.shardId();
           System.out.println("Initializing record processor for shard: " + shardId);
       }
   
       @Override
       public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) {
           try {
               for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) {
                   Record keyspacesRecord = record.getRecord();
                   System.out.println("Received record: " + keyspacesRecord);
               }
   
               if (!processRecordsInput.records().isEmpty()) {
                   RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
                   try {
                       checkpointer.checkpoint();
                       System.out.println("Checkpoint successful for shard: " + shardId);
                   } catch (Exception e) {
                       System.out.println("Error while checkpointing for shard: " + shardId + " " + e);
                   }
               }
           } catch (Exception e) {
               System.out.println("Error processing records for shard: " + shardId + " " + e);
           }
       }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
           System.out.println("Lease lost for shard: " + shardId);
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           System.out.println("Shard ended: " + shardId);
           try {
               // This is required. Checkpoint at the end of the shard
               shardEndedInput.checkpointer().checkpoint();
               System.out.println("Final checkpoint successful for shard: " + shardId);
           } catch (Exception e) {
               System.out.println("Error while final checkpointing for shard: " + shardId + " " + e);
               throw new RuntimeException("Error while final checkpointing", e);
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           System.out.println("Shutdown requested for shard " + shardId);
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (Exception e) {
               System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-factory"></a>次の例に示すように、レコードファクトリを作成します。

   ```
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   import java.util.Queue;
   import java.util.concurrent.ConcurrentLinkedQueue;
   
   public class RecordProcessorFactory implements ShardRecordProcessorFactory {
       private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>();
   
       @Override
       public ShardRecordProcessor shardRecordProcessor() {
           System.out.println("Creating new RecordProcessor");
           RecordProcessor processor = new RecordProcessor();
           processors.add(processor);
           return processor;
       }
   }
   ```

1. <a name="cdc-kcl-consumer"></a>このステップでは、KCLv3 と Amazon Keyspaces アダプターを設定するベースクラスを作成します。

   ```
   import com.example.KCLExample.utils.RecordProcessorFactory;
   import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient;
   import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory;
   import java.util.Arrays;
   import java.util.List;
   import java.util.concurrent.ExecutionException;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
   import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   import software.amazon.kinesis.coordinator.CoordinatorConfig;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.leases.LeaseManagementConfig;
   import software.amazon.kinesis.processor.ProcessorConfig;
   import software.amazon.kinesis.processor.StreamTracker;
   import software.amazon.kinesis.retrieval.polling.PollingConfig;
   
   public class KCLTestBase {
   
       protected KeyspacesStreamsClient streamsClient;
       protected KinesisAsyncClient adapterClient;
       protected DynamoDbAsyncClient dynamoDbAsyncClient;
       protected CloudWatchAsyncClient cloudWatchClient;
       protected Region region;
       protected RecordProcessorFactory recordProcessorFactory;
       protected Scheduler scheduler;
       protected Thread schedulerThread;
   
       public void baseSetUp() {
           recordProcessorFactory = new RecordProcessorFactory();
           setupKCLBase();
       }
   
       protected void setupKCLBase() {
           region = Region.US_EAST_1;
   
           streamsClient = KeyspacesStreamsClient.builder()
                   .region(region)
                   .build();
           adapterClient = new AmazonKeyspacesStreamsAdapterClient(
                   streamsClient,
                   region);
           dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                   .region(region)
                   .build();
           cloudWatchClient = CloudWatchAsyncClient.builder()
                   .region(region)
                   .build();
       }
   
       protected void startScheduler(Scheduler scheduler) {
           this.scheduler = scheduler;
           schedulerThread = new Thread(() -> scheduler.run());
           schedulerThread.start();
       }
   
       protected void shutdownScheduler() {
           if (scheduler != null) {
               scheduler.shutdown();
               try {
                   schedulerThread.join(30000);
               } catch (InterruptedException e) {
                   System.out.println("Error while shutting down scheduler " + e);
               }
           }
       }
   
       protected Scheduler createScheduler(String streamArn, String leaseTableName) {
           String workerId = "worker-" + System.currentTimeMillis();
   
           // Create ConfigsBuilder
           ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName);
   
           // Configure retrieval config for polling
           PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient);
   
           // Create the Scheduler
           return StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   configsBuilder.coordinatorConfig(),
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig),
                   streamsClient,
                   region
           );
       }
   
       private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) {
           ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamArn,
                   leaseTableName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchClient,
                   workerId,
                   recordProcessorFactory);
   
           configureCoordinator(configsBuilder.coordinatorConfig());
           configureLeaseManagement(configsBuilder.leaseManagementConfig());
           configureProcessor(configsBuilder.processorConfig());
           configureStreamTracker(configsBuilder, streamArn);
   
           return configsBuilder;
       }
   
       private void configureCoordinator(CoordinatorConfig config) {
           config.skipShardSyncAtWorkerInitializationIfLeasesExist(true)
                   .parentShardPollIntervalMillis(1000)
                   .shardConsumerDispatchPollIntervalMillis(500);
       }
   
       private void configureLeaseManagement(LeaseManagementConfig config) {
           config.shardSyncIntervalMillis(0)
                   .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0)
                   .leasesRecoveryAuditorExecutionFrequencyMillis(5000)
                   .leaseAssignmentIntervalMillis(1000L);
       }
   
       private void configureProcessor(ProcessorConfig config) {
           config.callProcessRecordsEvenForEmptyRecordList(true);
       }
   
       private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) {
           StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
                   streamArn,
                   InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
           );
           configsBuilder.streamTracker(streamTracker);
       }
   
       public void deleteAllDdbTables(String baseTableName) {
           List<String> tablesToDelete = Arrays.asList(
                   baseTableName,
                   baseTableName + "-CoordinatorState",
                   baseTableName + "-WorkerMetricStats"
           );
   
           for (String tableName : tablesToDelete) {
               deleteTable(tableName);
           }
       }
   
       private void deleteTable(String tableName) {
           DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                   .tableName(tableName)
                   .build();
   
           try {
               DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get();
               System.out.println("Table deletion response " + response);
           } catch (InterruptedException | ExecutionException e) {
               System.out.println("Error deleting table: " + tableName + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-processor"></a>このステップでは、アプリケーションのレコードプロセッサクラスを実装して、変更イベントの処理を開始します。

   ```
    import software.amazon.kinesis.coordinator.Scheduler;
   
   public class KCLTest {
   
       private static final int APP_RUNTIME_SECONDS = 1800;
       private static final int SLEEP_INTERNAL_MS = 60*1000;
   
       public static void main(String[] args) {
           KCLTestBase kclTestBase;
   
           kclTestBase = new KCLTestBase();
           kclTestBase.baseSetUp();
   
           // Create and start scheduler
           String leaseTableName = generateUniqueApplicationName();
   
           // Update below to your Stream ARN
           String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529";
           Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName);
           kclTestBase.startScheduler(scheduler);
   
           // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this
           // example we will shut it down after APP_RUNTIME_SECONDS
           long startTime = System.currentTimeMillis();
           long endTime = startTime + (APP_RUNTIME_SECONDS * 1000);
           while (System.currentTimeMillis() < endTime) {
               try {
                   // Print and sleep every minute
                   Thread.sleep(SLEEP_INTERNAL_MS);
                   System.out.println("Application is running");
               } catch (InterruptedException e) {
                   System.out.println("Interrupted while waiting for records");
                   Thread.currentThread().interrupt();
                   break;
               }
           }
   
           // Stop the scheduler
           kclTestBase.shutdownScheduler();
           kclTestBase.deleteAllDdbTables(leaseTableName);
       }
   
       public static String generateUniqueApplicationName() {
           String timestamp = String.valueOf(System.currentTimeMillis());
           String randomString = java.util.UUID.randomUUID().toString().substring(0, 8);
           return String.format("KCL-App-%s-%s", timestamp, randomString);
       }
   }
   ```

## ベストプラクティス
<a name="cdc-kcl-best-practices"></a>

Amazon Keyspaces CDC ストリームで KCL を使用する場合は、次のベストプラクティスに従ってください。

**エラー処理**  
レコードプロセッサに堅牢なエラー処理を実装して、例外を適切に処理します。一時的な障害に対する再試行ロジックの実装を検討してください。

**チェックポイントの頻度**  
チェックポイントの頻度を調整して、妥当な進行状況の追跡を確保しながら、重複処理を最小限に抑えます。チェックポイントが頻繁すぎるとパフォーマンスに影響する可能性がありますが、チェックポイントが頻繁すぎると、ワーカーが失敗した場合に再処理が増える可能性があります。

**ワーカースケーリング**  
CDC ストリーム内のシャードの数に基づいてワーカーの数をスケールします。開始点として、シャードごとにワーカーを 1 人持つことをお勧めしますが、処理要件に基づいて調整する必要がある場合があります。

**モニタリング**  
KCL が提供する CloudWatch メトリクスを使用して、コンシューマーアプリケーションのヘルスとパフォーマンスをモニタリングします。主要なメトリクスには、処理レイテンシー、チェックポイント経過時間、リース数が含まれます。

**テスト**  
ワーカーの障害、ストリームのリシャーディング、さまざまな負荷条件などのシナリオを含め、コンシューマーアプリケーションを徹底的にテストします。

## Java 以外の言語で KCL を使用する
<a name="cdc-kcl-non-java"></a>

KCL は主に Java ライブラリですが、MultiLangDaemon を使用して他のプログラミング言語で使用できます。MultiLangDaemon は、非 Java レコードプロセッサと KCL 間のインタラクションを管理する Java ベースのデーモンです。

KCL では、次の言語がサポートされています。
+ Python
+ Ruby
+ Node.js
+ .NET

Java 以外の言語で KCL を使用する方法の詳細については、[KCL MultiLangDaemon ドキュメント](https://github.com/awslabs/amazon-kinesis-client/tree/master/amazon-kinesis-client-multilang)を参照してください。

## トラブルシューティング
<a name="cdc-kcl-troubleshooting"></a>

このセクションでは、Amazon Keyspaces CDC ストリームで KCL を使用する際に発生する可能性がある一般的な問題の解決策を示します。

**処理が遅い**  
コンシューマーアプリケーションでレコードの処理が遅い場合は、次の点を考慮してください。  
+ ワーカーインスタンスの数を増やす
+ レコード処理ロジックの最適化
+ ダウンストリームシステムのボトルネックの確認

**重複処理**  
レコードの処理が重複している場合は、チェックポイントロジックを確認してください。レコードを正常に処理した後、チェックポイントを使用していることを確認します。

**ワーカーの障害**  
ワーカーが頻繁に失敗する場合は、以下を確認してください。  
+ リソース制約 (CPU、メモリ)
+ ネットワーク接続の問題
+ アクセス許可の問題

**リーステーブルの問題**  
KCL リーステーブルで問題が発生した場合:  
+ アプリケーションに Amazon Keyspaces テーブルにアクセスするための適切なアクセス許可があることを確認します。
+ テーブルに十分なプロビジョニングされたスループットがあることを確認する