

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Come utilizzare i flussi di acquisizione dei dati di modifica (CDC) in Amazon Keyspaces
<a name="cdc_how-to-use"></a>

**Topics**
+ [Configurazione delle autorizzazioni](configure-cdc-permissions.md)
+ [Accedi agli endpoint di flusso CDC](CDC_access-endpoints.md)
+ [Abilita uno stream CDC per una nuova tabella](keyspaces-enable-cdc-new-table.md)
+ [Abilita uno stream CDC per una tabella esistente](keyspaces-enable-cdc-alter-table.md)
+ [Disattiva uno stream CDC](keyspaces-delete-cdc.md)
+ [Visualizza gli stream CDC](keyspaces-view-cdc.md)
+ [Accedi agli stream CDC](keyspaces-records-cdc.md)
+ [Usa KCL per elaborare i flussi](cdc_how-to-use-kcl.md)

# Configura le autorizzazioni per lavorare con i flussi CDC in Amazon Keyspaces
<a name="configure-cdc-permissions"></a>

Per abilitare gli stream CDC, il principale, ad esempio un utente o un ruolo IAM, necessita delle seguenti autorizzazioni.

Per ulteriori informazioni su AWS Identity and Access Management, vedere. [AWS Identity and Access Management per Amazon Keyspaces](security-iam.md)

## Autorizzazioni per abilitare uno stream CDC per una tabella
<a name="cdc-permissions-enable"></a>

[Per abilitare uno stream CDC per una tabella Amazon Keyspaces, il principale necessita innanzitutto delle autorizzazioni per creare o modificare una tabella e in secondo luogo delle autorizzazioni per creare il ruolo CDC collegato al servizio. AWSService RoleForAmazonKeyspaces](using-service-linked-roles-CDC-streams.md#service-linked-role-permissions-CDC-streams) Amazon Keyspaces utilizza il ruolo collegato al servizio per pubblicare i CloudWatch parametri nel tuo account per tuo conto.

La seguente policy IAM ne è un esempio.

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Effect":"Allow",
            "Action":[
                "cassandra:Create",
                "cassandra:CreateMultiRegionResource",
                "cassandra:Alter",
                "cassandra:AlterMultiRegionResource"
            ],
            "Resource":[
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/*",
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
            ]
        },
        {
            "Sid": "KeyspacesCDCServiceLinkedRole",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/cassandra-streams.amazonaws.com/AWSServiceRoleForAmazonKeyspacesCDC",
            "Condition": {
              "StringLike": {
                "iam:AWSServiceName": "cassandra-streams.amazonaws.com"
              }
            }
        }
    ]
}
```

Per disabilitare uno stream, sono necessarie solo `ALTER TABLE` le autorizzazioni.

## Autorizzazioni per visualizzare uno stream CDC
<a name="cdc-permissions-view"></a>

Per visualizzare o elencare gli stream CDC, il principale necessita delle autorizzazioni di lettura per lo spazio delle chiavi di sistema. Per ulteriori informazioni, consulta [`system_schema_mcs`](working-with-keyspaces.md#keyspace_system_schema_mcs).

La seguente politica IAM ne è un esempio.

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":"cassandra:Select",
         "Resource":[
             "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
         ]
      }
   ]
}
```

Per visualizzare o elencare gli stream CDC con l' AWS CLI API Amazon Keyspaces, il principale necessita di autorizzazioni aggiuntive per le azioni e. `cassandra:ListStreams` `cassandra:GetStream`

La seguente policy IAM ne è un esempio.

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cassandra:Select",
        "cassandra:ListStreams",
        "cassandra:GetStream"
      ],
      "Resource": "*"
    }
  ]
}
```

## Autorizzazioni per leggere uno stream CDC
<a name="cdc-permissions-read"></a>

Per leggere gli stream CDC, il principale necessita delle seguenti autorizzazioni.

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      }
   ]
}
```

## Autorizzazioni per elaborare gli stream CDC di Amazon Keyspaces con la Kinesis Client Library (KCL)
<a name="cdc-permissions-kcl"></a>

Per elaborare i flussi CDC di Amazon Keyspaces con KCL, il responsabile IAM necessita delle seguenti autorizzazioni. 
+ `Amazon Keyspaces`— Accesso in sola lettura a uno stream Amazon Keyspaces CDC specificato.
+ `DynamoDB`— Autorizzazioni per creare `shard lease` tabelle, accesso in lettura e scrittura alle tabelle e accesso in lettura all'indice, come richiesto per l'elaborazione del flusso KCL.
+ `CloudWatch`— Autorizzazioni a pubblicare dati metrici da flussi Amazon Keyspaces CDC elaborati con KCL nello spazio dei nomi dell'applicazione client KCL del tuo account. CloudWatch Per ulteriori informazioni sul monitoraggio, consulta [Monitorare la Kinesis Client Library con Amazon](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html). CloudWatch

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:UpdateTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-WorkerMetricStats",
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-CoordinatorState"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:Query"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME/index/*"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "cloudwatch:PutMetricData"
         ],
         "Resource":"*"
      }
   ]
}
```

# Come accedere agli endpoint di streaming CDC in Amazon Keyspaces
<a name="CDC_access-endpoints"></a>

Amazon Keyspaces mantiene [endpoint separati per keyspaces/tables e per i](programmatic.endpoints.md#global_endpoints) flussi CDC in ognuno dei quali è disponibile Regione AWS Amazon Keyspaces. Per accedere a uno stream CDC, seleziona la regione della tabella e sostituisci il `cassandra` prefisso con `cassandra-streams` nel nome dell'endpoint, come mostrato nell'esempio seguente:

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/keyspaces/latest/devguide/CDC_access-endpoints.html)

La tabella seguente contiene un elenco completo degli endpoint pubblici disponibili per. Amazon Keyspaces change data capture streams Amazon Keyspaces CDC streams supporta entrambi IPv4 e. IPv6 Tutti gli endpoint pubblici, ad esempio`cassandra-streams.us-east-1.api.aws`, sono endpoint dual stack che possono essere configurati per e. IPv4 IPv6 

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/keyspaces/latest/devguide/CDC_access-endpoints.html)

# Abilita uno stream CDC durante la creazione di una nuova tabella in Amazon Keyspaces
<a name="keyspaces-enable-cdc-new-table"></a>

Per abilitare uno stream CDC quando crei una tabella, puoi usare l'`CREATE TABLE`istruzione in CQL o il `create-table` comando con. AWS CLI

Per ogni riga modificata nella tabella, Amazon Keyspaces può acquisire le seguenti modifiche in base a `view_type` quella `cdc_specification` selezionata:
+ `NEW_AND_OLD_IMAGES`— entrambe le versioni della riga, prima e dopo la modifica. Questa è l’impostazione predefinita.
+ `NEW_IMAGE`— la versione della riga dopo la modifica.
+ `OLD_IMAGE`— la versione della riga prima della modifica.
+ `KEYS_ONLY`— le chiavi di partizione e di clustering della riga che è stata modificata.

Per informazioni su come etichettare uno stream, consulta. [Aggiungi tag a un nuovo stream durante la creazione di una tabella](Tagging.Operations.new.table.stream.md)

**Nota**  
Amazon Keyspaces CDC richiede la presenza di un ruolo collegato al servizio (`AWSServiceRoleForAmazonKeyspacesCDC`) che pubblica i dati metrici provenienti dai flussi di Amazon Keyspaces CDC nel tuo account per tuo conto. `"cloudwatch:namespace": "AWS/Cassandra"` CloudWatch Questo ruolo viene creato automaticamente per te. Per ulteriori informazioni, consulta [Utilizzo dei ruoli per gli stream CDC di Amazon Keyspaces](using-service-linked-roles-CDC-streams.md).

------
#### [ Cassandra Query Language (CQL) ]

**Abilita uno stream CDC quando crei una tabella con CQL**

1. 

   ```
   CREATE TABLE mykeyspace.mytable (a text, b text, PRIMARY KEY(a)) 
   WITH CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'NEW_IMAGE'}} AND CDC = TRUE;
   ```

1. Per confermare le impostazioni dello stream, puoi usare la seguente dichiarazione.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   L'output di tale dichiarazione dovrebbe essere simile a questo.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |   mytable  | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741383893782', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T21:44:53.783', 'status': 'ENABLED', 'view_type': 'NEW_IMAGE'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}>
   ```

------
#### [ CLI ]

**Abilita uno stream CDC quando crei una tabella con AWS CLI**

1. Per creare uno stream puoi usare la seguente sintassi. 

   ```
   aws keyspaces create-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --schema-definition 'allColumns=[{name=a,type=text},{name=b,type=text}],partitionKeys=[{name=a}]' \
   --cdc-specification status=ENABLED,viewType=NEW_IMAGE
   ```

1. L'output di quel comando mostra la `create-table` risposta standard ed è simile a questo esempio. 

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------

# Abilita uno stream CDC per una tabella esistente in Amazon Keyspaces
<a name="keyspaces-enable-cdc-alter-table"></a>

Per abilitare uno stream CDC per una tabella esistente, puoi usare l'`ALTER TABLE`istruzione in CQL, il `update-table` comando con o puoi usare la AWS CLI console.

Per ogni riga modificata nella tabella, Amazon Keyspaces può acquisire le seguenti modifiche in base a `view_type` quella `cdc_specification` selezionata:
+ `NEW_AND_OLD_IMAGES`— entrambe le versioni della riga, prima e dopo la modifica. Questa è l’impostazione predefinita.
+ `NEW_IMAGE`— la versione della riga dopo la modifica.
+ `OLD_IMAGE`— la versione della riga prima della modifica.
+ `KEYS_ONLY`— le chiavi di partizione e di clustering della riga che è stata modificata.

Per informazioni su come etichettare uno stream, consulta. [Aggiungere nuovi tag a uno stream](Tagging.Operations.existing.stream.md)

**Nota**  
Amazon Keyspaces CDC richiede la presenza di un ruolo collegato al servizio (`AWSServiceRoleForAmazonKeyspacesCDC`) che pubblica i dati metrici provenienti dai flussi di Amazon Keyspaces CDC nel tuo account per tuo conto. `"cloudwatch:namespace": "AWS/Cassandra"` CloudWatch Questo ruolo viene creato automaticamente per te. Per ulteriori informazioni, consulta [Utilizzo dei ruoli per gli stream CDC di Amazon Keyspaces](using-service-linked-roles-CDC-streams.md).

------
#### [ Cassandra Query Language (CQL) ]

**Abilita uno stream (flusso CDC) con CQL**

È possibile utilizzare `ALTER TABLE` per abilitare uno stream per una tabella esistente.

1. L'esempio seguente crea uno stream che acquisisce solo le modifiche alle chiavi di partizione e di clustering di una riga modificata.

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = TRUE
   AND CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'KEYS_ONLY'}};
   ```

1. Per verificare le impostazioni dello stream, è possibile utilizzare la seguente dichiarazione.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   L'output dell'istruzione è simile a questo.

   ```
    keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |    mytable | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385897045', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T22:20:10.454', 'status': 'ENABLED', 'view_type': 'KEYS_ONLY'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**Crea uno stream CDC con il AWS CLI**

1. Per creare uno stream per una tabella esistente è possibile utilizzare la seguente sintassi.

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=ENABLED,viewType=NEW_AND_OLD_IMAGES
   ```

1. L'output di quel comando mostra la `create-table` risposta standard ed è simile a questo esempio.

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------
#### [ Console ]

**Abilita uno stream CDC con la console Amazon Keyspaces**

1. [Accedi a e apri Console di gestione AWS la console Amazon Keyspaces a casahttps://console.aws.amazon.com/keyspaces/.](https://console.aws.amazon.com/keyspaces/home)

1. Nel pannello di navigazione, scegli **Tabelle**, quindi scegli una tabella dall'elenco.

1. Scegli la scheda **Streams**.

1. Scegli **Modifica** per abilitare uno streaming.

1. Seleziona **Attiva gli stream.**

1. Scegli il **tipo di visualizzazione** dello stream. Sono disponibili le seguenti opzioni. Tieni presente che non puoi modificare il tipo di visualizzazione di uno stream dopo che è stato creato.
   + **Immagini nuove e vecchie**: Amazon Keyspaces acquisisce entrambe le versioni della riga, prima e dopo la modifica. Questa è l’impostazione predefinita.
   + **Nuova immagine**: Amazon Keyspaces acquisisce solo la versione della riga dopo la modifica.
   + **Vecchia immagine**: Amazon Keyspaces acquisisce solo la versione della riga prima della modifica.
   + **Solo chiave primaria**: Amazon Keyspaces acquisisce solo le colonne chiave di partizione e clustering della riga che è stata modificata.

1. **Per finire, scegli Salva modifiche.**

------

# Disattiva uno stream CDC in Amazon Keyspaces
<a name="keyspaces-delete-cdc"></a>

Per disabilitare uno stream CDC in un keyspace, puoi usare l'`ALTER TABLE`istruzione in CQL, il `update-table` comando con o la console. AWS CLI

------
#### [ Cassandra Query Language (CQL) ]

**Disabilita uno stream (flusso CDC) con CQL**

1. Per disabilitare uno stream, puoi usare la seguente dichiarazione.

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = FALSE;
   ```

1. Per confermare che lo stream è disabilitato, puoi usare la seguente dichiarazione.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   L'output di questa dichiarazione è simile a questo.

   ```
    keyspace_name | table_name | cdc   | custom_properties
   ---------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      mykeyspace  |   mytable  | False | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385668642', 'throughput_mode': 'PAY_PER_REQUEST'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**Disabilita uno stream (flusso CDC) con il AWS CLI**

1. Per disabilitare uno stream, puoi usare il seguente comando.

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=DISABLED
   ```

1. L'output del comando è simile a quello di questo esempio.

   ```
   {
       "keyspaceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/",
       "streamName": "my_stream"
   }
   ```

------
#### [ Console ]

**Disattiva uno stream (stream CDC) con la console Amazon Keyspaces**

1. [Accedi a e apri Console di gestione AWS la console Amazon Keyspaces a casahttps://console.aws.amazon.com/keyspaces/.](https://console.aws.amazon.com/keyspaces/home)

1. Nel pannello di navigazione, scegli **Tabelle**, quindi scegli una tabella dall'elenco.

1. Scegli la scheda **Streams**.

1. Scegli **Modifica**.

1. **Deseleziona Attiva gli stream**. 

1. Scegli **Salva modifiche** per disabilitare lo streaming.

------

# Visualizza gli stream CDC in Amazon Keyspaces
<a name="keyspaces-view-cdc"></a>

Per visualizzare o elencare tutti i flussi in keyspace, puoi interrogare la tabella `system_schema_mcs.streams` nello spazio delle chiavi di sistema utilizzando un'istruzione in CQL o utilizzare `list-stream` i comandi `get-stream` and con o la console. AWS CLI

Per le autorizzazioni richieste, consulta [Configura le autorizzazioni per lavorare con i flussi CDC in Amazon Keyspaces](configure-cdc-permissions.md).

------
#### [ Cassandra Query Language (CQL) ]

**Visualizza gli stream CDC con CQL**
+ Per monitorare lo stato CDC della tabella, è possibile utilizzare la seguente dichiarazione.

  ```
  SELECT custom_properties
  FROM system_schema_mcs.tables 
  WHERE keyspace_name='my_keyspace' and table_name='my_table';
  ```

  L'output del comando è simile a questo.

  ```
  ...
  custom_properties
  ----------------------------------------------------------------------------------
  {'cdc_specification':{'status': 'Enabled', 'view_type': 'NEW_IMAGE', 'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label''}}
  ...
  ```

------
#### [ CLI ]

**Visualizza gli stream CDC con il AWS CLI**

1. Questo esempio mostra come visualizzare le informazioni sullo stream per una tabella.

   ```
   aws keyspaces get-table \
   --keyspace-name 'my_keyspace' \
   --table-name 'my_table'
   ```

   L'output del comando è simile al seguente.

   ```
   {
       "keyspaceName": "my_keyspace",
       "tableName": "my_table",
       ... Other fields ...,
       "latestStreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label",
       "cdcSpecification": {
           "status": "ENABLED",
           "viewType": "NEW_AND_OLD_IMAGES"    
       }
   }
   ```

1. Puoi elencare tutti gli stream del tuo account in un determinato Regione AWS modo. Il comando seguente ne è un esempio.

   ```
   aws keyspacesstreams list-streams --region us-east-1
   ```

   L'output del comando potrebbe essere simile a questo.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"Create a keyspace with the name catalog. Note
                                   that streams are not supported in multi-Region keyspaces.
               "TableName": "t2",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_2/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_3"
               "TableName": "t1",
           }
       ]
   }
   ```

1. È inoltre possibile elencare gli stream CDC per un determinato keyspace utilizzando i seguenti parametri. 

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --region us-east-1
   ```

   L'output del comando è simile a questo.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

1. È inoltre possibile elencare gli stream CDC per una determinata tabella utilizzando i seguenti parametri. 

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --table-name t2 --region us-east-1
   ```

   L'output del comando è simile a questo.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

------
#### [ Console ]

**Visualizza gli stream CDC nella console Amazon Keyspaces**

1. [Accedi a e apri Console di gestione AWS la console Amazon Keyspaces a casahttps://console.aws.amazon.com/keyspaces/.](https://console.aws.amazon.com/keyspaces/home)

1. Nel pannello di navigazione, scegli **Tabelle**, quindi scegli una tabella dall'elenco.

1. Scegli la scheda **Streams** per rivedere i dettagli dello stream.

------

# Accedi ai record negli stream CDC in Amazon Keyspaces
<a name="keyspaces-records-cdc"></a>

Per accedere ai record di uno stream, utilizzi l'API [Amazon Keyspaces Streams](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html). La sezione seguente contiene esempi su come accedere ai record utilizzando. AWS CLI

Per le autorizzazioni richieste, consulta [Configura le autorizzazioni per lavorare con i flussi CDC in Amazon Keyspaces](configure-cdc-permissions.md).

**Accedere ai record di uno stream utilizzando il AWS CLI**

1. Puoi utilizzare l'API Amazon Keyspaces Streams per accedere ai record delle modifiche dello stream. Per ulteriori informazioni, consulta [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html) API Reference. Per recuperare gli shard all'interno dello stream, puoi utilizzare l'`get-stream`API come mostrato nell'esempio seguente.

   ```
   aws keyspacesstreams get-stream \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL'
   ```

   Di seguito è riportato un esempio di output.

   ```
   {
      "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2023-05-11T21:21:33.291",
      "StreamStatus": "ENABLED",
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "CreationRequestDateTime": "<CREATION_TIME>",
      "KeyspaceName": "mykeyspace",
      "TableName": "mytable",
      "StreamLabel": "2023-05-11T21:21:33.291",
       "Shards": [
           {
               "SequenceNumberRange": {
                   "EndingSequenceNumber": "<END_SEQUENCE_NUMBER>",
                   "StartingSequenceNumber": "<START_SEQUENCE_NUMBER>"
               },
               "ShardId": "<SHARD_ID>"
           },
       ]
   }
   ```

1. Per recuperare i record dallo stream, iniziate con un iteratore che vi fornisca il punto di partenza per l'accesso ai record. A tale scopo, puoi utilizzare gli shard all'interno del flusso CDC restituito dall'API nel passaggio precedente. Per raccogliere l'iteratore, puoi utilizzare l'API. `get-shard-iterator` Per questo esempio, si utilizza un iteratore di tipo `TRIM_HORIZON` che recupera dall'ultimo punto (o inizio) tagliato dello shard.

   ```
   aws keyspacesstreams get-shard-iterator \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL' \
   --shard-id 'SHARD_ID' \
   --shard-iterator-type 'TRIM_HORIZON'
   ```

   L'output del comando è simile a quello dell'esempio seguente.

   ```
   {
       "ShardIterator": "<SHARD_ITERATOR>" 
   }
   ```

1. Per recuperare i record CDC utilizzando l'`get-records`API, puoi utilizzare l'iteratore restituito nell'ultimo passaggio. Il comando seguente ne è un esempio.

   ```
   aws keyspacesstreams get-records \
   --shard-iterator 'SHARD_ITERATOR' \
   --limit 100
   ```

# Usa la Kinesis Client Library (KCL) per elaborare i flussi di Amazon Keyspaces
<a name="cdc_how-to-use-kcl"></a>

Questo argomento descrive come utilizzare la Kinesis Client Library (KCL) per consumare ed elaborare i dati dai flussi di Amazon Keyspaces change data capture (CDC).

Invece di lavorare direttamente con l'API Amazon Keyspaces Streams, lavorare con la Kinesis Client Library (KCL) offre molti vantaggi, ad esempio:
+ Monitoraggio del lignaggio degli shard e gestione degli iteratori integrati. 
+ Bilanciamento automatico del carico tra i lavoratori.
+ Tolleranza agli errori e ripristino in caso di guasti dei lavoratori.
+ Checkpointing per tenere traccia dell'avanzamento dell'elaborazione.
+ Adattamento alle variazioni della capacità del flusso.
+ Elaborazione distribuita semplificata per l'elaborazione dei record CDC.

La sezione seguente descrive perché e come utilizzare la Kinesis Client Library (KCL) per elaborare i flussi e fornisce un esempio per l'elaborazione di un flusso CDC Amazon Keyspaces con KCL.

Per informazioni sui prezzi, consulta i prezzi di [Amazon Keyspaces (per Apache Cassandra](https://aws.amazon.com/keyspaces/pricing)).

## Cos'è la Kinesis Client Library?
<a name="cdc-kcl-what-is"></a>

La Kinesis Client Library (KCL) è una libreria software Java autonoma progettata per semplificare il processo di consumo ed elaborazione dei dati dai flussi. KCL gestisce molte delle attività complesse associate all'elaborazione distribuita, consentendoti di concentrarti sull'implementazione della logica aziendale durante l'elaborazione dei dati di flusso. KCL gestisce attività come il bilanciamento del carico tra più lavoratori, la risposta agli errori dei lavoratori, il checkpoint dei record elaborati e la risposta alle variazioni del numero di shard nel flusso.

Per elaborare gli stream CDC di Amazon Keyspaces, puoi utilizzare i modelli di progettazione disponibili in KCL per lavorare con shard di stream e record di stream. KCL semplifica la codifica fornendo astrazioni utili sull'API Kinesis Data Streams di basso livello. Per ulteriori informazioni su KCL, consulta [Development consumer with KCL](https://docs.aws.amazon.com/kinesis/latest/dev/develop-kcl-consumers.html) nella *Amazon Kinesis* Data Streams Developer Guide.

 Per scrivere applicazioni utilizzando KCL, si utilizza l'adattatore Kinesis di Amazon Keyspaces Streams. Il Kinesis Adapter implementa l'interfaccia Kinesis Data Streams in modo da poter utilizzare KCL per consumare ed elaborare i record dai flussi di Amazon Keyspaces. Per istruzioni su come configurare e installare l'adattatore Kinesis di Amazon Keyspaces Streams, visita il repository. [GitHub](https://github.com/aws/keyspaces-streams-kinesis-adapter)

Il diagramma seguente mostra come queste librerie interagiscono tra loro.

![\[Interazione tra un'applicazione client e Kinesis Data Streams, KCL, Amazon Keyspaces Streams Kinesis Adapter e Amazon Keyspaces durante l'elaborazione dei record di flusso CDC di APIs Amazon Keyspaces.\]](http://docs.aws.amazon.com/it_it/keyspaces/latest/devguide/images/keyspaces-streams-kinesis-adapter.png)


KCL viene aggiornato frequentemente per incorporare versioni più recenti delle librerie sottostanti, miglioramenti della sicurezza e correzioni di bug. Ti consigliamo di utilizzare l'ultima versione di KCL per evitare problemi noti e beneficiare di tutti gli ultimi miglioramenti. Per trovare l'ultima versione di KCL, consulta l'archivio [ GitHub KCL](https://github.com/awslabs/amazon-kinesis-client).

## Concetti KCL
<a name="cdc-kcl-concepts"></a>

Prima di implementare un'applicazione consumer utilizzando KCL, è necessario comprendere i seguenti concetti:

**Applicazione consumer KCL**  
Un'applicazione consumer KCL è un programma che elabora i dati da un flusso CDC di Amazon Keyspaces. Il KCL funge da intermediario tra il codice dell'applicazione consumer e lo stream CDC di Amazon Keyspaces.

**Lavoratore**  
Un worker è un'unità di esecuzione della tua applicazione consumer KCL che elabora i dati dal flusso CDC di Amazon Keyspaces. La tua applicazione può eseguire più worker distribuiti su più istanze.

**Processore di registrazione**  
Un record processor è la logica dell'applicazione che elabora i dati da uno shard nel flusso CDC di Amazon Keyspaces. Un processore di registrazione viene istanziato da un worker per ogni shard che gestisce.

**Leasing**  
Un leasing rappresenta la responsabilità del trattamento di uno shard. I lavoratori utilizzano i contratti di locazione per coordinare quale lavoratore sta elaborando quale frammento. KCL archivia i dati di leasing in una tabella in Amazon DynamoDB.

**Checkpoint**  
Un checkpoint è una registrazione della posizione nello shard fino alla quale l'elaboratore di dischi ha elaborato con successo i record. Il checkpointing consente all'applicazione di riprendere l'elaborazione dal punto in cui era stata interrotta se un lavoratore fallisce.

Con l'adattatore Amazon Keyspaces Kinesis, puoi iniziare a sviluppare utilizzando l'interfaccia KCL, con le chiamate API dirette senza problemi all'endpoint di streaming Amazon Keyspaces. Per un elenco degli endpoint disponibili, consulta. [Come accedere agli endpoint di streaming CDC in Amazon Keyspaces](CDC_access-endpoints.md)

Quando si avvia l'applicazione , quest'ultima richiama la libreria KCL per creare un'istanza di un worker. È necessario fornire al lavoratore le informazioni di configurazione per l'applicazione, come il descrittore di flusso e AWS le credenziali, e il nome di una classe di processore di record fornita. Durante l'esecuzione del codice nel processore di record, il worker completa le seguenti attività:
+ Si collega al flusso
+ Enumera le partizioni all'interno del flusso
+ Coordina le associazioni di shard con altri processi di lavoro (se presenti)
+ Crea istanze di un elaboratore di record per ogni shard che gestisce
+ Estrae i record di dati dal flusso
+ Inserisce i record nell'elaboratore di record corrispondente
+ Controlla i record elaborati
+ Bilancia le associazioni tra shard e processi di lavoro quando il conteggio delle istanze del lavoro cambia
+ Bilancia le associazioni tra partizioni e worker quando le partizioni sono suddivise

# Implementa un'applicazione consumer KCL per gli stream CDC di Amazon Keyspaces
<a name="cdc-kcl-implementation"></a>

Questo argomento fornisce una step-by-step guida all'implementazione di un'applicazione consumer KCL per elaborare i flussi CDC di Amazon Keyspaces.

1. Prerequisiti: prima di iniziare, assicurati di avere:
   + Una tabella Amazon Keyspaces con un flusso CDC
   + Autorizzazioni IAM necessarie per consentire al principale IAM di accedere al flusso CDC di Amazon Keyspaces, creare e accedere a tabelle DynamoDB per l'elaborazione dei flussi KCL e autorizzazioni su cui pubblicare le metriche. CloudWatch Per ulteriori informazioni e un esempio di policy, consulta. [Autorizzazioni per elaborare gli stream CDC di Amazon Keyspaces con la Kinesis Client Library (KCL)](configure-cdc-permissions.md#cdc-permissions-kcl)
   + Assicuratevi che nella configurazione locale siano impostate AWS credenziali valide. Per ulteriori informazioni, consulta [Memorizza le chiavi di accesso per l'accesso programmatico](aws.credentials.manage.md).
   + Java Development Kit (JDK) 8 o versione successiva
   + Requisiti elencati nel [Readme](https://github.com/aws/keyspaces-streams-kinesis-adapter) su Github.

1. <a name="cdc-kcl-add-dependencies"></a>In questo passaggio, aggiungi la dipendenza KCL al tuo progetto. Per Maven, aggiungi quanto segue al tuo pom.xml:

   ```
   <dependencies>
           <dependency>
               <groupId>software.amazon.kinesis</groupId>
               <artifactId>amazon-kinesis-client</artifactId>
               <version>3.1.0</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.keyspaces</groupId>
               <artifactId>keyspaces-streams-kinesis-adapter</artifactId>
               <version>1.0.0</version>
           </dependency>
       </dependencies>
   ```
**Nota**  
[Controlla sempre l'ultima versione di KCL nel repository KCL. GitHub ](https://github.com/awslabs/amazon-kinesis-client)

1. <a name="cdc-kcl-factory"></a>Crea una classe factory che produca istanze di processori di registrazione:

   ```
   import software.amazon.awssdk.services.keyspacesstreams.model.Record;
   import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord;
   import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput;
   import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
   import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
   import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
   import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
   
   public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor {
       private String shardId;
   
       @Override
       public void initialize(InitializationInput initializationInput) {
           this.shardId = initializationInput.shardId();
           System.out.println("Initializing record processor for shard: " + shardId);
       }
   
       @Override
       public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) {
           try {
               for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) {
                   Record keyspacesRecord = record.getRecord();
                   System.out.println("Received record: " + keyspacesRecord);
               }
   
               if (!processRecordsInput.records().isEmpty()) {
                   RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
                   try {
                       checkpointer.checkpoint();
                       System.out.println("Checkpoint successful for shard: " + shardId);
                   } catch (Exception e) {
                       System.out.println("Error while checkpointing for shard: " + shardId + " " + e);
                   }
               }
           } catch (Exception e) {
               System.out.println("Error processing records for shard: " + shardId + " " + e);
           }
       }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
           System.out.println("Lease lost for shard: " + shardId);
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           System.out.println("Shard ended: " + shardId);
           try {
               // This is required. Checkpoint at the end of the shard
               shardEndedInput.checkpointer().checkpoint();
               System.out.println("Final checkpoint successful for shard: " + shardId);
           } catch (Exception e) {
               System.out.println("Error while final checkpointing for shard: " + shardId + " " + e);
               throw new RuntimeException("Error while final checkpointing", e);
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           System.out.println("Shutdown requested for shard " + shardId);
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (Exception e) {
               System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-factory"></a>Create un record factory come illustrato nell'esempio seguente.

   ```
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   import java.util.Queue;
   import java.util.concurrent.ConcurrentLinkedQueue;
   
   public class RecordProcessorFactory implements ShardRecordProcessorFactory {
       private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>();
   
       @Override
       public ShardRecordProcessor shardRecordProcessor() {
           System.out.println("Creating new RecordProcessor");
           RecordProcessor processor = new RecordProcessor();
           processors.add(processor);
           return processor;
       }
   }
   ```

1. <a name="cdc-kcl-consumer"></a>In questo passaggio crei la classe base da configurare KCLv3 e l'adattatore Amazon Keyspaces.

   ```
   import com.example.KCLExample.utils.RecordProcessorFactory;
   import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient;
   import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory;
   import java.util.Arrays;
   import java.util.List;
   import java.util.concurrent.ExecutionException;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
   import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   import software.amazon.kinesis.coordinator.CoordinatorConfig;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.leases.LeaseManagementConfig;
   import software.amazon.kinesis.processor.ProcessorConfig;
   import software.amazon.kinesis.processor.StreamTracker;
   import software.amazon.kinesis.retrieval.polling.PollingConfig;
   
   public class KCLTestBase {
   
       protected KeyspacesStreamsClient streamsClient;
       protected KinesisAsyncClient adapterClient;
       protected DynamoDbAsyncClient dynamoDbAsyncClient;
       protected CloudWatchAsyncClient cloudWatchClient;
       protected Region region;
       protected RecordProcessorFactory recordProcessorFactory;
       protected Scheduler scheduler;
       protected Thread schedulerThread;
   
       public void baseSetUp() {
           recordProcessorFactory = new RecordProcessorFactory();
           setupKCLBase();
       }
   
       protected void setupKCLBase() {
           region = Region.US_EAST_1;
   
           streamsClient = KeyspacesStreamsClient.builder()
                   .region(region)
                   .build();
           adapterClient = new AmazonKeyspacesStreamsAdapterClient(
                   streamsClient,
                   region);
           dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                   .region(region)
                   .build();
           cloudWatchClient = CloudWatchAsyncClient.builder()
                   .region(region)
                   .build();
       }
   
       protected void startScheduler(Scheduler scheduler) {
           this.scheduler = scheduler;
           schedulerThread = new Thread(() -> scheduler.run());
           schedulerThread.start();
       }
   
       protected void shutdownScheduler() {
           if (scheduler != null) {
               scheduler.shutdown();
               try {
                   schedulerThread.join(30000);
               } catch (InterruptedException e) {
                   System.out.println("Error while shutting down scheduler " + e);
               }
           }
       }
   
       protected Scheduler createScheduler(String streamArn, String leaseTableName) {
           String workerId = "worker-" + System.currentTimeMillis();
   
           // Create ConfigsBuilder
           ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName);
   
           // Configure retrieval config for polling
           PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient);
   
           // Create the Scheduler
           return StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   configsBuilder.coordinatorConfig(),
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig),
                   streamsClient,
                   region
           );
       }
   
       private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) {
           ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamArn,
                   leaseTableName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchClient,
                   workerId,
                   recordProcessorFactory);
   
           configureCoordinator(configsBuilder.coordinatorConfig());
           configureLeaseManagement(configsBuilder.leaseManagementConfig());
           configureProcessor(configsBuilder.processorConfig());
           configureStreamTracker(configsBuilder, streamArn);
   
           return configsBuilder;
       }
   
       private void configureCoordinator(CoordinatorConfig config) {
           config.skipShardSyncAtWorkerInitializationIfLeasesExist(true)
                   .parentShardPollIntervalMillis(1000)
                   .shardConsumerDispatchPollIntervalMillis(500);
       }
   
       private void configureLeaseManagement(LeaseManagementConfig config) {
           config.shardSyncIntervalMillis(0)
                   .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0)
                   .leasesRecoveryAuditorExecutionFrequencyMillis(5000)
                   .leaseAssignmentIntervalMillis(1000L);
       }
   
       private void configureProcessor(ProcessorConfig config) {
           config.callProcessRecordsEvenForEmptyRecordList(true);
       }
   
       private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) {
           StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
                   streamArn,
                   InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
           );
           configsBuilder.streamTracker(streamTracker);
       }
   
       public void deleteAllDdbTables(String baseTableName) {
           List<String> tablesToDelete = Arrays.asList(
                   baseTableName,
                   baseTableName + "-CoordinatorState",
                   baseTableName + "-WorkerMetricStats"
           );
   
           for (String tableName : tablesToDelete) {
               deleteTable(tableName);
           }
       }
   
       private void deleteTable(String tableName) {
           DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                   .tableName(tableName)
                   .build();
   
           try {
               DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get();
               System.out.println("Table deletion response " + response);
           } catch (InterruptedException | ExecutionException e) {
               System.out.println("Error deleting table: " + tableName + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-processor"></a>In questa fase implementerai la classe del processore di record per l'applicazione per avviare l'elaborazione degli eventi di modifica.

   ```
    import software.amazon.kinesis.coordinator.Scheduler;
   
   public class KCLTest {
   
       private static final int APP_RUNTIME_SECONDS = 1800;
       private static final int SLEEP_INTERNAL_MS = 60*1000;
   
       public static void main(String[] args) {
           KCLTestBase kclTestBase;
   
           kclTestBase = new KCLTestBase();
           kclTestBase.baseSetUp();
   
           // Create and start scheduler
           String leaseTableName = generateUniqueApplicationName();
   
           // Update below to your Stream ARN
           String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529";
           Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName);
           kclTestBase.startScheduler(scheduler);
   
           // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this
           // example we will shut it down after APP_RUNTIME_SECONDS
           long startTime = System.currentTimeMillis();
           long endTime = startTime + (APP_RUNTIME_SECONDS * 1000);
           while (System.currentTimeMillis() < endTime) {
               try {
                   // Print and sleep every minute
                   Thread.sleep(SLEEP_INTERNAL_MS);
                   System.out.println("Application is running");
               } catch (InterruptedException e) {
                   System.out.println("Interrupted while waiting for records");
                   Thread.currentThread().interrupt();
                   break;
               }
           }
   
           // Stop the scheduler
           kclTestBase.shutdownScheduler();
           kclTestBase.deleteAllDdbTables(leaseTableName);
       }
   
       public static String generateUniqueApplicationName() {
           String timestamp = String.valueOf(System.currentTimeMillis());
           String randomString = java.util.UUID.randomUUID().toString().substring(0, 8);
           return String.format("KCL-App-%s-%s", timestamp, randomString);
       }
   }
   ```

## Best practice
<a name="cdc-kcl-best-practices"></a>

Segui queste best practice quando usi KCL con gli stream CDC di Amazon Keyspaces:

**Gestione errori**  
Implementa una solida gestione degli errori nel tuo processore di dischi per gestire le eccezioni con garbo. Prendi in considerazione l'implementazione della logica di ripetizione dei tentativi per i guasti transitori.

**Frequenza dei checkpoint**  
Bilancia la frequenza dei checkpoint per ridurre al minimo l'elaborazione duplicata garantendo al contempo un ragionevole monitoraggio dei progressi. Un checkpoint troppo frequente può influire sulle prestazioni, mentre un checkpoint troppo raro può portare a un maggior numero di ritrattamenti in caso di fallimento di un lavoratore.

**Scalabilità dei lavoratori**  
Ridimensiona il numero di lavoratori in base al numero di shard presenti nel tuo stream CDC. Un buon punto di partenza è avere un lavoratore per shard, ma potrebbe essere necessario adattarlo in base ai requisiti di elaborazione.

**Monitoraggio**  
Utilizzate CloudWatch le metriche fornite da KCL per monitorare lo stato e le prestazioni della vostra applicazione consumer. Le metriche chiave includono la latenza di elaborazione, l'età dei checkpoint e il numero di leasing.

**Test in corso**  
Testa a fondo la tua applicazione consumer, includendo scenari come guasti dei lavoratori, resharding degli stream e condizioni di carico variabili.

## Utilizzo di KCL con linguaggi non Java
<a name="cdc-kcl-non-java"></a>

Sebbene KCL sia principalmente una libreria Java, puoi utilizzarla con altri linguaggi di programmazione tramite. MultiLangDaemon Il MultiLangDaemon è un demone basato su Java che gestisce l'interazione tra il processore di record non Java e KCL.

KCL fornisce supporto per le seguenti lingue:
+ Python
+ Ruby
+ Node.js
+ .NET

[Per ulteriori informazioni sull'uso di KCL con linguaggi non Java, consulta la documentazione di KCL. MultiLangDaemon ](https://github.com/awslabs/amazon-kinesis-client/tree/master/amazon-kinesis-client-multilang)

## risoluzione dei problemi
<a name="cdc-kcl-troubleshooting"></a>

Questa sezione fornisce soluzioni ai problemi più comuni che potresti riscontrare quando usi KCL con i flussi CDC di Amazon Keyspaces.

**Elaborazione lenta**  
Se la tua applicazione consumer elabora i record lentamente, considera:  
+ Aumentare il numero di istanze di lavoro
+ Ottimizzazione della logica di elaborazione dei record
+ Verifica della presenza di strozzature nei sistemi a valle

**Elaborazione duplicata**  
Se riscontri un'elaborazione duplicata dei record, controlla la logica del checkpoint. Assicurati di effettuare il checkpoint dopo aver elaborato correttamente i record.

**Fallimenti dei lavoratori**  
Se i lavoratori falliscono frequentemente, controlla:  
+ Limiti di risorse (CPU, memoria)
+ Eventi di connettività di rete
+ Problemi a livello di autorizzazioni

**Problemi relativi ai tavoli di leasing**  
Se riscontri problemi con la tabella di leasing KCL:  
+ Verifica che la tua applicazione disponga delle autorizzazioni appropriate per accedere alla tabella Amazon Keyspaces
+ Verifica che la tabella abbia un throughput assegnato sufficiente