

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Monitoraggio di flussi di attività di database
<a name="DBActivityStreams.Monitoring"></a>

I flussi di attività di database monitorano e segnalano le attività. Il flusso di attività viene raccolto e trasmesso a Amazon Kinesis. Da Kinesis, è possibile monitorare il flusso di attività oppure altri servizi e applicazioni possono utilizzare il flusso di attività per ulteriori analisi. È possibile trovare il nome del flusso Kinesis sottostante utilizzando il comando della AWS CLI `describe-db-clusters` o l'operazione `DescribeDBClusters` dell'API RDS.

Aurora gestisce il flusso Kinesis per tuo conto come segue:
+ Aurora crea automaticamente il flusso Kinesis con un periodo di conservazione di 24 ore. 
+  Aurora dimensiona il flusso Kinesis, se necessario. 
+  Se si interrompe il flusso di attività del database o si elimina il cluster di database, Aurora elimina il flusso Kinesis. 

Le seguenti categorie di attività vengono monitorate e inserite nel log di controllo del flusso di attività:
+ **Comandi SQL**: tutti i comandi SQL sono controllati e anche le istruzioni preparate, le funzioni integrate e le funzioni in PL/SQL Le chiamate alle procedure archiviate vengono controllate. Vengono inoltre controllate tutte le istruzioni SQL rilasciate all'interno di procedure o funzioni archiviate.
+ **Altre informazioni di database** – L'attività monitorata include l'istruzione SQL completa, il conteggio righe delle righe interessate da comandi DML, gli oggetti ai quali si accede e il nome del database univoco. Per Aurora PostgreSQL, i flussi di attività del database monitorano anche le variabili di bind e i parametri della stored procedure. 
**Importante**  
Il testo SQL completo di ogni istruzione è visibile nel registro di controllo del flusso di attività, inclusi eventuali dati sensibili. Tuttavia, le password degli utenti del database vengono omesse se Aurora può stabilirle dal contesto, come nell'istruzione SQL seguente.   

  ```
  ALTER ROLE role-name WITH password
  ```
+ **Informazioni di connessione** – L'attività monitorata include informazioni di sessione e di rete, l'ID di processo del server e i codici di uscita.

Se un flusso di attività restituisce un errore durante il monitoraggio dell'istanza database, riceverai una notifica mediante eventi RDS.

Nelle sezioni seguenti puoi accedere ai flussi di attività del database, eseguirne l’audit ed elaborarli.

**Topics**
+ [Accesso a un flusso di attività da Amazon Kinesis](DBActivityStreams.KinesisAccess.md)
+ [Contenuto ed esempi di log di audit per i flussi di attività del database](DBActivityStreams.AuditLog.md)
+ [Array JSON databaseActivityEventList per flussi di attività del database](DBActivityStreams.AuditLog.databaseActivityEventList.md)
+ [Elaborazione di un flusso di attività del database utilizzando l'SDK AWS](DBActivityStreams.CodeExample.md)

# Accesso a un flusso di attività da Amazon Kinesis
<a name="DBActivityStreams.KinesisAccess"></a>

Quando abiliti un flusso di attività per un cluster database, viene creato automaticamente un flusso Kinesis. Da Kinesis, puoi monitorare l'attività del database in tempo reale. Per analizzare ulteriormente l'attività del database, puoi connettere il flusso Kinesis ad applicazioni consumer. È inoltre possibile connettere il flusso alle applicazioni di gestione della conformità, ad esempio Security Guardium IBM o SecureSphere Database Audit and Protection Imperva.

Puoi accedere al tuo flusso Kinesis dalla console RDS o dalla console Kinesis.

**Come accedere a un flusso di attività da Kinesis utilizzando la console RDS**

1. Apri la console Amazon RDS all'indirizzo [https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/).

1. Nel pannello di navigazione, scegliere **Databases (Database)**.

1. Seleziona il cluster di database in cui hai avviato un flusso di attività.

1. Scegliere **Configuration (Configurazione)**.

1. In **Database activity stream** (Flusso di attività del database), scegli il collegamento sotto **Kinesis stream** (Flusso Kinesis).

1. Nella console Kinesis, scegli **Monitoring** (Monitoraggio) per iniziare l'osservazione dell'attività del database.

**Per accedere a un flusso di attività da Kinesis utilizzando la console Kinesis**

1. Aprire la console Kinesis all'indirizzo [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Scegliere il flusso di attività dall'elenco di flussi Kinesis.

   Il nome di un flusso di attività include il prefisso `aws-rds-das-cluster-` seguito dall'ID risorsa del cluster database. Di seguito è riportato un esempio. 

   ```
   aws-rds-das-cluster-NHVOV4PCLWHGF52NP
   ```

   Per utilizzare la console Amazon RDS per trovare l'ID risorsa per il cluster di database, scegli il cluster database dall'elenco di database, quindi seleziona la scheda **Configuration (Configurazione)**.

   Per utilizzare la AWS CLI per trovare il nome del flusso Kinesis completo per un flusso di attività, utilizzare una richiesta CLI [describe-db-clusters](https://docs.aws.amazon.com/cli/latest/reference/rds/describe-db-clusters.html) e prendere nota del valore di `ActivityStreamKinesisStreamName` nella risposta.

1. Scegliere **Monitoring (Monitoraggio)** per iniziare l'osservazione dell'attività di database.

Per ulteriori informazioni sull'utilizzo di Amazon Kinesis, consulta [Che cos'è Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html).

# Contenuto ed esempi di log di audit per i flussi di attività del database
<a name="DBActivityStreams.AuditLog"></a>

Gli eventi monitorati sono rappresentati nel flusso di attività del database come stringhe JSON. La struttura è costituita da un oggetto JSON contenente un `DatabaseActivityMonitoringRecord`, che a sua volta contiene un array `databaseActivityEventList` di eventi attività. 

**Nota**  
Per i flussi di attività del database, l’array JSON `paramList` non include valori null dalle applicazioni Hibernate.

**Topics**
+ [Esempi di log di verifica per un flusso di attività](#DBActivityStreams.AuditLog.Examples)
+ [DatabaseActivityMonitoringRecords Oggetto JSON](#DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords)
+ [databaseActivityEvents Oggetto JSON](#DBActivityStreams.AuditLog.databaseActivityEvents)

## Esempi di log di verifica per un flusso di attività
<a name="DBActivityStreams.AuditLog.Examples"></a>

Di seguito sono riportati registri di controllo JSON decrittografati di esempio di record di eventi attività.

**Example**  
Il seguente record di evento di attività mostra un accesso con l'utilizzo di un'istruzione SQL `CONNECT` (`command`) mediante un client psql (`clientApplication`).  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-10-30 00:39:49.940668+00",
          "logTime": "2019-10-30 00:39:49.990579+00",
          "statementId": 1,
          "substatementId": 1,
          "objectType": null,
          "command": "CONNECT",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "49804",
          "sessionId": "5ce5f7f0.474b",
          "rowCount": null,
          "commandText": null,
          "paramList": [],
          "pid": 18251,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "MISC",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Record di evento attività di un'istruzione SQL CONNECT Aurora MySQL**  
Il seguente record di evento di attività mostra un accesso con l'utilizzo di un'istruzione SQL `CONNECT` (`command`) mediante un client mysql (`clientApplication`).   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:13.267214+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"rdsadmin",
      "databaseName":"",
      "remoteHost":"localhost",
      "remotePort":"11053",
      "command":"CONNECT",
      "commandText":"",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"",
      "statementId":0,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725121",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:13.267207+00",
      "endTime":"2020-05-22 18:07:13.267213+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```

**Example Record di evento attività di Aurora PostgreSQL**  
Il seguente esempio mostra un evento `CREATE TABLE` per Aurora PostgreSQL.  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:36:54.403455+00",
          "logTime": "2019-05-24 00:36:54.494235+00",
          "statementId": 2,
          "substatementId": 1,
          "objectType": null,
          "command": "CREATE TABLE",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": null,
          "commandText": "create table my_table (id serial primary key, name varchar(32));",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "DDL",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Record di evento attività di un'istruzione CREATE TABLE Aurora MySQL**  
Il seguente esempio mostra un'istruzione `CREATE TABLE` per Aurora MySQL. L'operazione è rappresentata come due record di eventi separati. Un evento ha `"class":"MAIN"`. L'altro evento ha `"class":"AUX"`. I messaggi potrebbero arrivare in qualsiasi ordine. Il campo `logTime` dell'evento `MAIN` è sempre precedente ai campi `logTime` di qualsiasi evento `AUX` corrispondente.  
Nell'esempio seguente viene illustrato l'evento con un valore `class` pari a `MAIN`.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.250221+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"CREATE TABLE test1 (id INT)",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.250222+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 Nell'esempio seguente viene illustrato l'evento corrispondente con un valore `class` pari a `AUX`.  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.247182+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"CREATE",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.247182+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

**Example Record di evento attività di un'istruzione Aurora PostgreSQL SELECT**  
Il seguente esempio mostra un evento `SELECT` .  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:39:49.920564+00",
          "logTime": "2019-05-24 00:39:49.940668+00",
          "statementId": 6,
          "substatementId": 1,
          "objectType": "TABLE",
          "command": "SELECT",
          "objectName": "public.my_table",
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": 10,
          "commandText": "select * from my_table;",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "READ",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

```
{
    "type": "DatabaseActivityMonitoringRecord",
    "clusterId": "",
    "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q",
    "databaseActivityEventList": [
        {
            "class": "TABLE",
            "clientApplication": "Microsoft SQL Server Management Studio - Query",
            "command": "SELECT",
            "commandText": "select * from [testDB].[dbo].[TestTable]",
            "databaseName": "testDB",
            "dbProtocol": "SQLSERVER",
            "dbUserName": "test",
            "endTime": null,
            "errorMessage": null,
            "exitCode": 1,
            "logTime": "2022-10-06 21:24:59.9422268+00",
            "netProtocol": null,
            "objectName": "TestTable",
            "objectType": "TABLE",
            "paramList": null,
            "pid": null,
            "remoteHost": "local machine",
            "remotePort": null,
            "rowCount": 0,
            "serverHost": "172.31.30.159",
            "serverType": "SQLSERVER",
            "serverVersion": "15.00.4073.23.v1.R1",
            "serviceName": "sqlserver-ee",
            "sessionId": 62,
            "startTime": null,
            "statementId": "0x03baed90412f564fad640ebe51f89b99",
            "substatementId": 1,
            "transactionId": "4532935",
            "type": "record",
            "engineNativeAuditFields": {
                "target_database_principal_id": 0,
                "target_server_principal_id": 0,
                "target_database_principal_name": "",
                "server_principal_id": 2,
                "user_defined_information": "",
                "response_rows": 0,
                "database_principal_name": "dbo",
                "target_server_principal_name": "",
                "schema_name": "dbo",
                "is_column_permission": true,
                "object_id": 581577110,
                "server_instance_name": "EC2AMAZ-NFUJJNO",
                "target_server_principal_sid": null,
                "additional_information": "",
                "duration_milliseconds": 0,
                "permission_bitmask": "0x00000000000000000000000000000001",
                "data_sensitivity_information": "",
                "session_server_principal_name": "test",
                "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109",
                "audit_schema_version": 1,
                "database_principal_id": 1,
                "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000",
                "user_defined_event_id": 0,
                "host_name": "EC2AMAZ-NFUJJNO"
            }
        }
    ]
}
```

**Example Record di evento attività di un'istruzione SELECT Aurora MySQL**  
Il seguente esempio mostra un evento `SELECT`.  
 Nell'esempio seguente viene illustrato l'evento con un valore `class` pari a `MAIN`.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986467+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"SELECT * FROM test1 WHERE id < 28",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"726571",
      "rowCount":2,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986467+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 Nell'esempio seguente viene illustrato l'evento corrispondente con un valore `class` pari a `AUX`.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986399+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"READ",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"726571",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986399+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

## DatabaseActivityMonitoringRecords Oggetto JSON
<a name="DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords"></a>

I record di eventi dell'attività del database si trovano in un oggetto JSON che contiene le seguenti informazioni.


****  

| Campo JSON | Tipo di dati | Descrizione | 
| --- | --- | --- | 
|  `type`  | stringa |  Il tipo di record JSON. Il valore è `DatabaseActivityMonitoringRecords`.  | 
| version | stringa |  La versione dei record di monitoraggio delle attività del database. La versione dei record di attività del database generati dipende dalla versione del motore del cluster database. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.html)Tutti i seguenti campi sono nella versione 1.0 e nella versione 1.1, tranne dove espressamente indicato. | 
|  [databaseActivityEvents](#DBActivityStreams.AuditLog.databaseActivityEvents)  | stringa |  Un oggetto JSON contenente gli eventi di attività.  | 
| key | stringa | Una chiave di crittografia utilizzata per decrittare [Array JSON databaseActivityEventList](DBActivityStreams.AuditLog.databaseActivityEventList.md)  | 

## databaseActivityEvents Oggetto JSON
<a name="DBActivityStreams.AuditLog.databaseActivityEvents"></a>

L'oggetto JSON `databaseActivityEvents` contiene le seguenti informazioni.

### Campi di primo livello nel record JSON
<a name="DBActivityStreams.AuditLog.topLevel"></a>

 Ogni evento nel registro di controllo viene racchiuso in un record in formato JSON. Questo record contiene i seguenti campi. 

**type**  
 Questo campo ha sempre il valore `DatabaseActivityMonitoringRecords`. 

**versione**  
 Questo campo rappresenta la versione del protocollo o del contratto di dati del flusso di attività del database. Definisce quali campi sono disponibili.  
La versione 1.0 rappresenta il supporto dei flussi di attività dati originali per le versioni Aurora PostgreSQL 10.7 e 11.4. La versione 1.1 rappresenta il supporto dei flussi di attività dati per le versioni Aurora PostgreSQL 10.10 e successive e Aurora PostgreSQL 11.5 e successive. La versione 1.1 include i campi aggiuntivi `errorMessage` e `startTime`. La versione 1.2 rappresenta il supporto dei flussi di attività dati per Aurora MySQL 2.08 e versioni successive. La versione 1.2 include i campi aggiuntivi `endTime` e `transactionId`.

**databaseActivityEvents**  
 Stringa crittografata che rappresenta uno o più eventi di attività. È rappresentato come un array di byte base64. Quando si decrittografa la stringa, il risultato è un record in formato JSON con campi come illustrato negli esempi di questa sezione.

**key**  
 Chiave dati crittografata utilizzata per crittografare la stringa `databaseActivityEvents`. È lo stesso AWS KMS key che hai fornito quando hai avviato il flusso di attività del database.

 Nell'esempio seguente viene illustrato il formato di questo record.

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents":"encrypted audit records",
  "key":"encrypted key"
}
```

Per decrittografare il contenuto del campo `databaseActivityEvents`, procedere come segue:

1.  Decrittare il valore nel campo `key` JSON utilizzando la chiave KMS fornita all'avvio del flusso di attività del database. In questo modo viene restituita la chiave di crittografia dei dati in testo non crittografato. 

1.  Decodificare il valore nel campo `databaseActivityEvents` JSON con base64 per ottenere il testo cifrato, in formato binario, del payload di controllo. 

1.  Decifrare il testo cifrato binario con la chiave di crittografia dei dati decodificata nel primo passaggio. 

1.  Decomprimere il payload decrittografato. 
   +  Il payload crittografato è nel campo `databaseActivityEvents`. 
   +  Il campo `databaseActivityEventList` contiene una matrice di record di controllo. I campi `type` nella matrice possono essere `record` o `heartbeat`. 

Il record dell'evento attività registro di controllo è un oggetto JSON che contiene le seguenti informazioni.


****  

| Campo JSON | Tipo di dati | Descrizione | 
| --- | --- | --- | 
|  `type`  | stringa |  Il tipo di record JSON. Il valore è `DatabaseActivityMonitoringRecord`.  | 
| clusterId | stringa | Identificatore di risorsa cluster di database. Corrisponde all'attributo cluster database DbClusterResourceId. | 
| instanceId | stringa | Identificatore della risorsa istanza database. Corrisponde all'attributo di istanza database DbiResourceId. | 
|  [Array JSON databaseActivityEventList](DBActivityStreams.AuditLog.databaseActivityEventList.md)   | stringa |  Matrice di record di controllo delle attività o messaggi heartbeat.  | 

# Array JSON databaseActivityEventList per flussi di attività del database
<a name="DBActivityStreams.AuditLog.databaseActivityEventList"></a>

Il payload del registro di controllo è un array JSON `databaseActivityEventList` crittografato. Le tabelle riportano in ordine alfabetico i campi per ogni evento di attività nella matrice `DatabaseActivityEventList` decrittata di un log di verifica. I campi sono diversi a seconda che si utilizzi Aurora PostgreSQL o Aurora MySQL. Consultare la tabella che si applica al modulo di gestione di database.

**Importante**  
Tale struttura di eventi è soggetta a modifiche. Aurora potrebbe aggiungere nuovi campi agli eventi di attività in futuro. Nelle applicazioni che analizzano i dati JSON, assicurarsi che il codice possa ignorare o eseguire le azioni appropriate per i nomi di campo sconosciuti. 

## Campi databaseActivityEventList per Aurora PostgreSQL
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.apg"></a>

Di seguito sono riportati i campi di `databaseActivityEventList` per Aurora PostgreSQL.


| Campo | Tipo di dati | Descrizione | 
| --- | --- | --- | 
| class | stringa |  La classe dell'evento attività. I valori validi per Aurora PostgreSQL sono i seguenti. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | stringa | L'applicazione utilizzata dal client per eseguire la connessione come segnalato dal client. Il client non deve fornire queste informazioni, pertanto il valore può essere nullo. | 
| command | stringa | Il nome del comando SQL senza alcun dettaglio comando. | 
| commandText | stringa |  L'istruzione SQL effettiva passata dall'utente. Per Aurora PostgreSQL, il valore è identico all'istruzione SQL originale. Questo campo viene utilizzato per tutti i tipi di record tranne che i record di connessione e disconnessione, nel quale caso il valore è nullo.  Il testo SQL completo di ogni istruzione è visibile nel registro di controllo del flusso di attività, inclusi eventuali dati sensibili. Tuttavia, le password degli utenti del database vengono omesse se Aurora può stabilirle dal contesto, come nell'istruzione SQL seguente.  <pre>ALTER ROLE role-name WITH password</pre>   | 
| databaseName | stringa | Il database a cui è connesso l'utente. | 
| dbProtocol | stringa | Il protocollo del database, ad esempio Postgres 3.0. | 
| dbUserName | stringa | L'utente del database con cui il client è autenticato. | 
| errorMessage(solo record di attività del database versione 1.1) | stringa |  Se si verifica una errore qualsiasi, questo campo viene compilato con il messaggio di errore che verrebbe generato dal server di database. Il valore `errorMessage` è nullo per le istruzioni normali che non hanno generato un errore.  Un errore è definito come qualsiasi attività che produce un evento del log degli errori PostgreSQL visibile al client in corrispondenza di un livello di gravità pari a `ERROR` o superiore. Per ulteriori informazioni, consulta la tabella dei [livelli di gravità dei messaggi di PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-SEVERITY-LEVELS). Ad esempio, errori di sintassi e annullamenti delle query generano un messaggio di errore.  Gli errori del server PostgreSQL interno, come gli errori del processo checkpointer in background non generano un messaggio di errore. Tuttavia, i record per tali eventi vengono comunque generati a prescindere dall'impostazione del livello di gravità del registro. Ciò impedisce agli aggressori di disattivare la registrazione per tentare di evitare il rilevamento. Vedere anche il campo `exitCode`.  | 
| exitCode | int | Un valore usato per un record di chiusura sessione. In una clean exit, contiene il codice di chiusura. Un codice di chiusura può sempre essere ottenuto in alcuni scenari di errore. Esempi sono se PostgreSQL esegue un exit() o se un operatore esegue un comando quale kill -9.Se si verifica un errore qualsiasi, il campo `exitCode` mostra il codice di errore SQL, `SQLSTATE`, come elencato in [ PostgreSQL Error Codes](https://www.postgresql.org/docs/current/errcodes-appendix.html). Vedere anche il campo `errorMessage`. | 
| logTime | stringa | Una timestamp come registrato nel percorso del codice di controllo. Rappresenta l'ora di fine dell'esecuzione dell'istruzione SQL. Vedere anche il campo startTime. | 
| netProtocol | stringa | Il protocollo di comunicazione di rete. | 
| objectName | stringa | Il nome dell'oggetto di database se l'istruzione SQL viene eseguita su uno. Questo campo viene utilizzato solo dove l'istruzione SQL è eseguita su un oggetto di database. Se l'istruzione SQL non è in esecuzione su un oggetto, questo valore è nullo. | 
| objectType | stringa | Il tipo di oggetto di database, ad esempio tabella, indice, vista e così via. Questo campo viene utilizzato solo dove l'istruzione SQL è eseguita su un oggetto di database. Se l'istruzione SQL non è in esecuzione su un oggetto, questo valore è nullo. I valori validi includono i seguenti:[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) | 
| paramList | stringa | Un array di parametri separati da virgole passati all'istruzione SQL. Se l'istruzione SQL non dispone di parametri, questo valore è un array vuoto. | 
| pid | int | L'ID di processo del processo back-end allocato per servire la connessione client. | 
| remoteHost | stringa | L'indirizzo IP del client o il nome host. Per Aurora PostgreSQL, quale viene utilizzato dipende dall'impostazione dei parametri log\$1hostname del database. Il valore di remoteHost include anche [local] e localhost, che indicano l’attività dell’utente rdsadmin. | 
| remotePort | stringa | Il numero di porta del client. | 
| rowCount | int | Il numero di righe della tabella influenzate o recuperate dall'istruzione SQL. Questo campo viene utilizzato solo per istruzioni SQL che sono istruzioni DML (Data Manipulation Language). Se l'istruzione SQL non è un'istruzione DML, questo valore è nullo. | 
| serverHost | stringa | L'indirizzo IP host del server di database. Il valore di serverHost include anche [local] e localhost, che indicano l’attività dell’utente rdsadmin. | 
| serverType | stringa | Il tipo di server di database, ad esempio PostgreSQL. | 
| serverVersion | stringa | La versione del server di database, ad esempio 2.3.1 per Aurora PostgreSQL. | 
| serviceName | stringa | Il nome del servizio, ad esempi Amazon Aurora PostgreSQL-Compatible edition.  | 
| sessionId | int | Un identificatore di sessione pseudo-univoco. | 
| sessionId | int | Un identificatore di sessione pseudo-univoco. | 
| startTime(solo record di attività del database versione 1.1) | stringa |  L'ora di inizio dell'esecuzione dell'istruzione SQL.  Per calcolare il tempo di esecuzione approssimativo dell'istruzione SQL, utilizzar `logTime - startTime`. Vedere anche il campo `logTime`.  | 
| statementId | int | Identificatore per l'istruzione SQL del client. Il contatore è a livello di sessione e si incrementa con ogni istruzione SQL immessa dal client.  | 
| substatementId | int | Identificatore di una subistruzione SQL. Questo valore conteggia le istruzioni secondarie contenute per ogni istruzione SQL identificata dal campo statementId. | 
| type | stringa | Il tipo di evento, I valori validi sono record e heartbeat. | 

## Campi databaseActivityEventList per Aurora MySQL
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.ams"></a>

Di seguito sono riportati i campi di `databaseActivityEventList` per Aurora MySQL.


| Campo | Tipo di dati | Descrizione | 
| --- | --- | --- | 
| class | stringa |  La classe dell'evento attività. I valori validi per Aurora MySQL sono i seguenti: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | stringa | L'applicazione utilizzata dal client per eseguire la connessione come segnalato dal client. Il client non deve fornire queste informazioni, pertanto il valore può essere nullo. | 
| command | stringa |  Categoria generale dell'istruzione SQL. I valori di questo campo dipendono dal valore di `class`. I valori quando `class` è `MAIN` includono quanto segue: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) I valori quando `class` è `AUX` includono quanto segue: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| commandText | stringa |  Per gli eventi con un valore `class` pari a `MAIN`, questo campo rappresenta l'istruzione SQL effettiva passata dall'utente. Questo campo viene utilizzato per tutti i tipi di record tranne che i record di connessione e disconnessione, nel quale caso il valore è nullo.  Per gli eventi con un valore `class` pari a `AUX`, questo campo contiene informazioni supplementari sugli oggetti coinvolti nell'evento.  Per Aurora MySQL, i caratteri quali le virgolette sono preceduti da una barra rovesciata che rappresenta un carattere di escape.  Il testo SQL completo di ogni istruzione è visibile nel registro di controllo, inclusi eventuali dati sensibili. Tuttavia, le password degli utenti del database vengono omesse se Aurora può stabilirle dal contesto, come nell'istruzione SQL seguente.  <pre>mysql> SET PASSWORD = 'my-password';</pre> Specifica una password diversa dal prompt mostrato qui come best practice per la sicurezza.   | 
| databaseName | stringa | Il database a cui è connesso l'utente. | 
| dbProtocol | stringa | Il protocollo di database. Attualmente, questo valore è sempre MySQL per Aurora MySQL. | 
| dbUserName | stringa | L'utente del database con cui il client è autenticato. | 
| endTime(solo record di attività del database versione 1.2) | stringa |  L'ora di fine dell'esecuzione dell'istruzione SQL. È rappresentato in formato UTC (Coordinated Universal Time). Per calcolare il tempo di esecuzione dell'istruzione SQL, utilizzare `endTime - startTime`. Vedere anche il campo `startTime`.  | 
| errorMessage(solo record di attività del database versione 1.1) | stringa |  Se si verifica una errore qualsiasi, questo campo viene compilato con il messaggio di errore che verrebbe generato dal server di database. Il valore `errorMessage` è nullo per le istruzioni normali che non hanno generato un errore.  Un errore è definito come qualsiasi attività che produce un evento del log degli errori MySQL visibile al client in corrispondenza di un livello di gravità pari a `ERROR` o superiore. Per ulteriori informazioni, consulta il [Log degli errori](https://dev.mysql.com/doc/refman/5.7/en/error-log.html) nel *Manuale di riferimento di MySQL*. Ad esempio, errori di sintassi e annullamenti delle query generano un messaggio di errore.  Gli errori del server MySQL interno, come gli errori del processo checkpointer in background non generano un messaggio di errore. Tuttavia, i record per tali eventi vengono comunque generati a prescindere dall'impostazione del livello di gravità del registro. Ciò impedisce agli aggressori di disattivare la registrazione per tentare di evitare il rilevamento. Vedere anche il campo `exitCode`.  | 
| exitCode | int | Un valore usato per un record di chiusura sessione. In una clean exit, contiene il codice di chiusura. Un codice di chiusura può sempre essere ottenuto in alcuni scenari di errore. In questi casi, questo valore potrebbe essere zero o potrebbe essere vuoto. | 
| logTime | stringa | Una timestamp come registrato nel percorso del codice di controllo. È rappresentato in formato UTC (Coordinated Universal Time). Per il modo più accurato per calcolare la durata dell'istruzione, vedere i campi startTime e endTime. | 
| netProtocol | stringa | Il protocollo di comunicazione di rete. Attualmente, questo valore è sempre TCP per Aurora MySQL. | 
| objectName | stringa | Il nome dell'oggetto di database se l'istruzione SQL viene eseguita su uno. Questo campo viene utilizzato solo dove l'istruzione SQL è eseguita su un oggetto di database. Se l'istruzione SQL non è in esecuzione su un oggetto, questo valore è nullo. Per costruire il nome completo dell'oggetto, combinare databaseName e objectName. Se la query coinvolge più oggetti, questo campo può essere un elenco di nomi separati da virgole. | 
| objectType | stringa |  Il tipo di oggetto di database, ad esempio tabella, indice e così via. Questo campo viene utilizzato solo dove l'istruzione SQL è eseguita su un oggetto di database. Se l'istruzione SQL non è in esecuzione su un oggetto, questo valore è nullo. I valori validi per Aurora MySQL includono i seguenti: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| paramList | stringa | Questo campo non viene utilizzato per Aurora MySQL ed è sempre nullo. | 
| pid | int | L'ID di processo del processo back-end allocato per servire la connessione client. Quando il server di database viene riavviato, le modifiche pid e il contatore per il campo statementId ricomincia. | 
| remoteHost | stringa | L'indirizzo IP o il nome host del client che ha emesso l'istruzione SQL. Per Aurora MySQL, quale viene utilizzato dipende dall'impostazione dei parametri skip\$1name\$1resolve del database. Il valore localhost indica l'attività dell'utente rdsadmin speciale.  | 
| remotePort | stringa | Il numero di porta del client. | 
| rowCount | int | Numero di righe restituite dall'istruzione SQL. Ad esempio, se un'istruzione SELECT restituisce 10 righe, RowCount è 10. Per le istruzioni INSERT o UPDATE, RowCount è 0. | 
| serverHost | stringa | Identificatore dell'istanza del server di database. | 
| serverType | stringa | Il tipo di server di database, ad esempio MySQL. | 
| serverVersion | stringa | La versione del server di database. Attualmente, questo valore è sempre MySQL 5.7.12 per Aurora MySQL. | 
| serviceName | stringa | Il nome del servizio Attualmente, questo valore è sempre Amazon Aurora MySQL per Aurora MySQL. | 
| sessionId | int | Un identificatore di sessione pseudo-univoco. | 
| startTime(solo record di attività del database versione 1.1) | stringa |  L'ora di inizio dell'esecuzione dell'istruzione SQL. È rappresentato in formato UTC (Coordinated Universal Time). Per calcolare il tempo di esecuzione dell'istruzione SQL, utilizzare `endTime - startTime`. Vedere anche il campo `endTime`.  | 
| statementId | int | Identificatore per l'istruzione SQL del client. Il contatore aumenta con ogni istruzione SQL immessa dal client. Il contatore viene reimpostato quando viene riavviata l'istanza database. | 
| substatementId | int | Identificatore di una subistruzione SQL. Questo valore è 1 per gli eventi con classe MAIN e 2 per gli eventi con classe AUX. Utilizzare il campo statementId per identificare tutti gli eventi generati dalla stessa istruzione. | 
| transactionId(solo record di attività del database versione 1.2) | int | Identificatore di una transazione. | 
| type | stringa | Il tipo di evento, I valori validi sono record e heartbeat. | 

# Elaborazione di un flusso di attività del database utilizzando l'SDK AWS
<a name="DBActivityStreams.CodeExample"></a>

Puoi elaborare un flusso di attività a livello di programmazione utilizzando l'SDK AWS. Di seguito sono riportati esempi Java e Python completamente funzionanti dell'elaborazione del flusso di dati Kinesis.

------
#### [ Java ]

```
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.Security;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;

import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.SecretKeySpec;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CryptoInputStream;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kms.AWSKMS;
import com.amazonaws.services.kms.AWSKMSClientBuilder;
import com.amazonaws.services.kms.model.DecryptRequest;
import com.amazonaws.services.kms.model.DecryptResult;
import com.amazonaws.util.Base64;
import com.amazonaws.util.IOUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

public class DemoConsumer {

    private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]";
    private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking
    private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]";
    private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]";
    private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]";
    private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2...
    private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY);
    private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS);

    private static final AwsCrypto CRYPTO = new AwsCrypto();
    private static final AWSKMS KMS = AWSKMSClientBuilder.standard()
            .withRegion(REGION_NAME)
            .withCredentials(CREDENTIALS_PROVIDER).build();

    class Activity {
        String type;
        String version;
        String databaseActivityEvents;
        String key;
    }

    class ActivityEvent {
        @SerializedName("class") String _class;
        String clientApplication;
        String command;
        String commandText;
        String databaseName;
        String dbProtocol;
        String dbUserName;
        String endTime;
        String errorMessage;
        String exitCode;
        String logTime;
        String netProtocol;
        String objectName;
        String objectType;
        List<String> paramList;
        String pid;
        String remoteHost;
        String remotePort;
        String rowCount;
        String serverHost;
        String serverType;
        String serverVersion;
        String serviceName;
        String sessionId;
        String startTime;
        String statementId;
        String substatementId;
        String transactionId;
        String type;
    }

    class ActivityRecords {
        String type;
        String clusterId;
        String instanceId;
        List<ActivityEvent> databaseActivityEventList;
    }

    static class RecordProcessorFactory implements IRecordProcessorFactory {
        @Override
        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }

    static class RecordProcessor implements IRecordProcessor {

        private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
        private static final int PROCESSING_RETRIES_MAX = 10;
        private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
        private static final Gson GSON = new GsonBuilder().serializeNulls().create();

        private static final Cipher CIPHER;
        static {
            Security.insertProviderAt(new BouncyCastleProvider(), 1);
            try {
                CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC");
            } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        private long nextCheckpointTimeInMillis;

        @Override
        public void initialize(String shardId) {
        }

        @Override
        public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) {
            for (final Record record : records) {
                processSingleBlob(record.getData());
            }

            if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
                checkpoint(checkpointer);
                nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
            }
        }

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            if (reason == ShutdownReason.TERMINATE) {
                checkpoint(checkpointer);
            }
        }

        private void processSingleBlob(final ByteBuffer bytes) {
            try {
                // JSON $Activity
                final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class);

                // Base64.Decode
                final byte[] decoded = Base64.decode(activity.databaseActivityEvents);
                final byte[] decodedDataKey = Base64.decode(activity.key);

                Map<String, String> context = new HashMap<>();
                context.put("aws:rds:dbc-id", DBC_RESOURCE_ID);

                // Decrypt
                final DecryptRequest decryptRequest = new DecryptRequest()
                        .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context);
                final DecryptResult decryptResult = KMS.decrypt(decryptRequest);
                final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext()));

                // GZip Decompress
                final byte[] decompressed = decompress(decrypted);
                // JSON $ActivityRecords
                final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class);

                // Iterate throught $ActivityEvents
                for (final ActivityEvent event : activityRecords.databaseActivityEventList) {
                    System.out.println(GSON.toJson(event));
                }
            } catch (Exception e) {
                // Handle error.
                e.printStackTrace();
            }
        }

        private static byte[] decompress(final byte[] src) throws IOException {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
            GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
            return IOUtils.toByteArray(gzipInputStream);
        }

        private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
            for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) {
                try {
                    checkpointer.checkpoint();
                    break;
                } catch (ShutdownException se) {
                    // Ignore checkpoint if the processor instance has been shutdown (fail over).
                    System.out.println("Caught shutdown exception, skipping checkpoint." + se);
                    break;
                } catch (ThrottlingException e) {
                    // Backoff and re-attempt checkpoint upon transient failures
                    if (i >= (PROCESSING_RETRIES_MAX - 1)) {
                        System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e);
                        break;
                    } else {
                        System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e);
                    }
                } catch (InvalidStateException e) {
                    // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
                    System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e);
                    break;
                }
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    System.out.println("Interrupted sleep" + e);
                }
            }
        }
    }

    private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException {
        // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm
        final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"),
                "BC", "DataKey", "AES/GCM/NoPadding");
        try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded));
             final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
            IOUtils.copy(decryptingStream, out);
            return out.toByteArray();
        }
    }

    public static void main(String[] args) throws Exception {
        final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        final KinesisClientLibConfiguration kinesisClientLibConfiguration =
                new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId);
        kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST);
        kinesisClientLibConfiguration.withRegionName(REGION_NAME);
        final Worker worker = new Builder()
                .recordProcessorFactory(new RecordProcessorFactory())
                .config(kinesisClientLibConfiguration)
                .build();

        System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId);

        try {
            worker.run();
        } catch (Throwable t) {
            System.err.println("Caught throwable while processing data.");
            t.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }

    private static byte[] getByteArray(final ByteBuffer b) {
        byte[] byteArray = new byte[b.remaining()];
        b.get(byteArray);
        return byteArray;
    }
}
```

------
#### [ Python ]

```
import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3

REGION_NAME = '<region>'                    # us-east-1
RESOURCE_ID = '<external-resource-id>'      # cluster-ABCD123456
STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID  # aws-rds-das-cluster-ABCD123456

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT)

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt_payload(payload, data_key):
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    decrypted_plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
    return decrypted_plaintext


def decrypt_decompress(payload, key):
    decrypted = decrypt_payload(payload, key)
    return zlib.decompress(decrypted, zlib.MAX_WBITS + 16)


def main():
    session = boto3.session.Session()
    kms = session.client('kms', region_name=REGION_NAME)
    kinesis = session.client('kinesis', region_name=REGION_NAME)

    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    shard_iters = []
    for shard in response['StreamDescription']['Shards']:
        shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'],
                                                         ShardIteratorType='LATEST')
        shard_iters.append(shard_iter_response['ShardIterator'])

    while len(shard_iters) > 0:
        next_shard_iters = []
        for shard_iter in shard_iters:
            response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000)
            for record in response['Records']:
                record_data = record['Data']
                record_data = json.loads(record_data)
                payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
                data_key_decoded = base64.b64decode(record_data['key'])
                data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded,
                                                      EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
                print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']))
            if 'NextShardIterator' in response:
                next_shard_iters.append(response['NextShardIterator'])
        shard_iters = next_shard_iters


if __name__ == '__main__':
    main()
```

------