

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Überwachen von Datenbankaktivitäts-Streams
<a name="DBActivityStreams.Monitoring"></a>

Datenbankaktivitäts-Streams überwachen und melden Aktivitäten. Der Aktivitäts-Stream wird erfasst und an Amazon Kinesis übertragen. Von Kinesis aus können Sie den Aktivitäts-Stream überwachen, oder andere Dienste und Anwendungen können den Aktivitäts-Stream zur weiteren Analyse nutzen. Sie können den zugrunde liegenden Namen des Kinesis-Streams mit dem AWS CLI-Befehl `describe-db-clusters` oder der RDS-API-Operation `DescribeDBClusters` finden.

Aurora verwaltet den Kinesis Stream wie folgt:
+ Aurora erzeugt den Kinesis Stream automatisch mit einem Aufbewahrungszeitraum von 24 Stunden. 
+  Aurora skaliert den Kinesis-Stream bei Bedarf. 
+  Wenn Sie den Datenbankaktivitäts-Stream stoppen oder den DB-Cluster löschen, löscht Aurora den Kinesis-Stream. 

Die folgenden Kategorien von Aktivitäten werden überwacht und in das Prüfprotokoll des Aktivitäts-Streams aufgenommen:
+ **SQL-Befehle** – Alle SQL-Befehle werden geprüft, ebenso vorbereitete Anweisungen, integrierte Funktionen und Funktionen in PL/SQL. Aufrufe von gespeicherten Prozeduren werden überprüft. Alle SQL-Anweisungen, die in gespeicherten Prozeduren oder Funktionen ausgegeben werden, werden ebenfalls überprüft.
+ **Sonstige Datenbankinformationen** – Die überwachte Aktivität umfasst die vollständige SQL-Anweisung, die Zeilenzahl der betroffenen Zeilen aus DML-Befehlen, Objekte, auf die zugegriffen wurde, und den eindeutigen Datenbanknamen. Für Aurora PostgreSQL überwachen Datenbankaktivitäts-Streams auch die Bindevariablen und die Parameter der gespeicherten Prozedur. 
**Wichtig**  
Der vollständige SQL-Text jeder Anweisung ist im Prüfprotokoll des Aktivitäts-Streams sichtbar, inklusive aller sensiblen Daten. Datenbankbenutzerkennwörter werden jedoch redigiert, wenn Aurora sie wie in der folgenden SQL-Anweisung aus dem Kontext ermitteln kann.   

  ```
  ALTER ROLE role-name WITH password
  ```
+ **Verbindungsinformationen** – Die überwachte Aktivität umfasst Sitzungs- und Netzwerkinformationen, die Server-Prozess-ID und Beendigungscodes.

Wenn ein Aktivitätsstream während der Überwachung Ihrer DB-Instance fehlschlägt, werden Sie über RDS-Ereignisse benachrichtigt.

In den folgenden Abschnitten können Sie auf Datenbankaktivitäts-Streams zugreifen, diese prüfen und verarbeiten.

**Topics**
+ [Zugriff auf einen Aktivitäts-Stream von Amazon Kinesis aus](DBActivityStreams.KinesisAccess.md)
+ [Prüfungsprotokoll-Inhalte und Beispiele für Datenbankaktivitäts-Streams](DBActivityStreams.AuditLog.md)
+ [JSON-Array databaseActivityEventList für Datenbankaktivitäts-Streams](DBActivityStreams.AuditLog.databaseActivityEventList.md)
+ [Verarbeitung eines Datenbankaktivitäts-Streams mit dem AWS SDK](DBActivityStreams.CodeExample.md)

# Zugriff auf einen Aktivitäts-Stream von Amazon Kinesis aus
<a name="DBActivityStreams.KinesisAccess"></a>

Wenn Sie einen Aktivitäts-Stream für einen DB-Cluster aktivieren, wird ein Kinesis-Stream für Sie erstellt. Von Kinesis aus können Sie die Datenbankaktivität in Echtzeit überwachen. Zur weiteren Analyse der Datenbankaktivität können Sie Ihren Kinesis-Stream mit Consumer-Anwendungen verbinden. Sie können den Stream auch mit Compliance-Management-Anwendungen wie Security Guardium von IBM oder SecureSphere Database Audit and Protection von Imperva oder  verbinden.

Sie können entweder über die RDS- oder Kinesis-Konsole auf Ihren Kinesis-Stream zugreifen.

**So greifen Sie über die RDS-Konsole auf einen Aktivitätsstream zu**

1. Öffnen Sie die Amazon-RDS-Konsole unter [https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/).

1. Wählen Sie im Navigationsbereich **Datenbanken** aus.

1. Wählen Sie den/die DB-Cluster aus, auf der Sie einen Aktivitätsstream gestartet haben.

1. Wählen Sie **Konfiguration**.

1. Wählen Sie unter **Database activity stream** (Datenbank-Aktivitätsstream) den Link unter **Kinesis stream** (Kinesis-Stream) aus.

1. Wählen Sie in der Kinesis-Konsole **Monitoring** (Überwachung) aus, um mit der Überwachung der Datenbankaktivität zu beginnen.

**So greifen Sie über die Kinesis-Konsole auf einen Aktivitätsstream von Kinesis zu**

1. Öffnen Sie die Kinesis-Konsole unter.[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Wählen Sie Ihren Aktivitäts-Stream aus der Liste der Kinesis-Streams aus.

   Der Name eines Aktivitäts-Streams besteht aus dem Präfix `aws-rds-das-cluster-` gefolgt von der Ressourcen-ID des DB-Clusters. Im Folgenden wird ein Beispiel gezeigt. 

   ```
   aws-rds-das-cluster-NHVOV4PCLWHGF52NP
   ```

   Um die Amazon-RDS-Konsole zum Ermitteln der Ressourcen-ID für den DB-Cluster zu verwenden, wählen Sie Ihren DB-Cluster aus der Liste der Datenbanken aus und wählen dann die Registerkarte **Konfiguration** aus.

   Um den vollständigen Kinesis-Stream-Namen für einen Aktivitäts-Stream mit AWS CLI zu finden, verwenden Sie eine [describe-db-clusters](https://docs.aws.amazon.com/cli/latest/reference/rds/describe-db-clusters.html) -CLI-Anfrage und notieren den Wert von `ActivityStreamKinesisStreamName` in der Antwort.

1. Wählen Sie **Monitoring (Überwachung)** aus, um mit der Überwachung der Datenbankaktivität zu beginnen.

Weitere Informationen zur Verwendung von Amazon Kinesis finden Sie unter [Was sind Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html).

# Prüfungsprotokoll-Inhalte und Beispiele für Datenbankaktivitäts-Streams
<a name="DBActivityStreams.AuditLog"></a>

Überwachte Ereignisse werden im Datenbankaktivitäts-Stream als JSON-Zeichenfolgen dargestellt. Die Struktur besteht aus einem JSON-Objekt mit einem `DatabaseActivityMonitoringRecord`, der wiederum ein Array von Aktivitätsereignissen `databaseActivityEventList` enthält. 

**Anmerkung**  
Für Datenbankaktivitäts-Streams enthält das JSON-Array `paramList` keine Nullwerte aus Anwendungen im Ruhezustand.

**Topics**
+ [Prüfungsprotokollbeispiele für Aktivitäts-Streams](#DBActivityStreams.AuditLog.Examples)
+ [DatabaseActivityMonitoringRecords JSON-Objekt](#DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords)
+ [databaseActivityEvents JSON-Objekt](#DBActivityStreams.AuditLog.databaseActivityEvents)

## Prüfungsprotokollbeispiele für Aktivitäts-Streams
<a name="DBActivityStreams.AuditLog.Examples"></a>

Im Folgenden sehen Sie Beispiele für entschlüsselte JSON-Prüfprotokolle von Aktivitätsereignisdatensätzen.

**Example Aktivitätsereignisdatensatz einer Aurora PostgreSQL - CONNECT-SQL-Anweisung**  
Im Folgenden sehen Sie einen Aktivitätsereignisdatensatz einer Anmeldung unter Verwendung einer `CONNECT`-SQL-Anweisung (`command`) durch einen psql-Client (`clientApplication`).  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-10-30 00:39:49.940668+00",
          "logTime": "2019-10-30 00:39:49.990579+00",
          "statementId": 1,
          "substatementId": 1,
          "objectType": null,
          "command": "CONNECT",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "49804",
          "sessionId": "5ce5f7f0.474b",
          "rowCount": null,
          "commandText": null,
          "paramList": [],
          "pid": 18251,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "MISC",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Aktivitätsereignisdatensatz einer Aurora MySQL-CONNECT SQL-Anweisung**  
Im Folgenden sehen Sie einen Aktivitätsereignisdatensatz einer Anmeldung unter Verwendung einer `CONNECT`-SQL-Anweisung (`command`) durch einen mysql-Client (`clientApplication`).   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:13.267214+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"rdsadmin",
      "databaseName":"",
      "remoteHost":"localhost",
      "remotePort":"11053",
      "command":"CONNECT",
      "commandText":"",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"",
      "statementId":0,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725121",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:13.267207+00",
      "endTime":"2020-05-22 18:07:13.267213+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```

**Example Aktivitätsereignisdatensatz einer Aurora PostgreSQL CREATE TABLE-Anweisung**  
Im Folgenden sehen Sie ein Beispiel eines `CREATE TABLE`-Ereignisses für Aurora PostgreSQL.  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:36:54.403455+00",
          "logTime": "2019-05-24 00:36:54.494235+00",
          "statementId": 2,
          "substatementId": 1,
          "objectType": null,
          "command": "CREATE TABLE",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": null,
          "commandText": "create table my_table (id serial primary key, name varchar(32));",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "DDL",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Aktivitätsereignisdatensatz einer Aurora-MySQL-CREATE TABLE-Anweisung**  
Das folgende Beispiel zeigt eine `CREATE TABLE`-Anweisung für Aurora MySQL. Die Operation wird als zwei separate Ereignisdatensätze dargestellt. Das eine Ereignis verfügt über einen Wert `"class":"MAIN"`. Das andere über einen Wert `"class":"AUX"`. Die Nachrichten können in beliebiger Reihenfolge eintreffen. Das `logTime`-Feld des `MAIN`-Ereignisses ist immer früher als die `logTime`-Felder der entsprechenden `AUX`-Ereignisse.  
Im folgenden Beispiel wird das Ereignis mit einem `class`-Wert von `MAIN` gezeigt.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.250221+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"CREATE TABLE test1 (id INT)",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.250222+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 Im folgenden Beispiel wird das entsprechende Ereignis mit einem `class`-Wert von `AUX` gezeigt.  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.247182+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"CREATE",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.247182+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

**Example Aktivitätsereignisdatensatz einer Aurora PostgreSQL SELECT-Anweisung**  
Das folgende Beispiel zeigt ein `SELECT`-Ereignis .  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:39:49.920564+00",
          "logTime": "2019-05-24 00:39:49.940668+00",
          "statementId": 6,
          "substatementId": 1,
          "objectType": "TABLE",
          "command": "SELECT",
          "objectName": "public.my_table",
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": 10,
          "commandText": "select * from my_table;",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "READ",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

```
{
    "type": "DatabaseActivityMonitoringRecord",
    "clusterId": "",
    "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q",
    "databaseActivityEventList": [
        {
            "class": "TABLE",
            "clientApplication": "Microsoft SQL Server Management Studio - Query",
            "command": "SELECT",
            "commandText": "select * from [testDB].[dbo].[TestTable]",
            "databaseName": "testDB",
            "dbProtocol": "SQLSERVER",
            "dbUserName": "test",
            "endTime": null,
            "errorMessage": null,
            "exitCode": 1,
            "logTime": "2022-10-06 21:24:59.9422268+00",
            "netProtocol": null,
            "objectName": "TestTable",
            "objectType": "TABLE",
            "paramList": null,
            "pid": null,
            "remoteHost": "local machine",
            "remotePort": null,
            "rowCount": 0,
            "serverHost": "172.31.30.159",
            "serverType": "SQLSERVER",
            "serverVersion": "15.00.4073.23.v1.R1",
            "serviceName": "sqlserver-ee",
            "sessionId": 62,
            "startTime": null,
            "statementId": "0x03baed90412f564fad640ebe51f89b99",
            "substatementId": 1,
            "transactionId": "4532935",
            "type": "record",
            "engineNativeAuditFields": {
                "target_database_principal_id": 0,
                "target_server_principal_id": 0,
                "target_database_principal_name": "",
                "server_principal_id": 2,
                "user_defined_information": "",
                "response_rows": 0,
                "database_principal_name": "dbo",
                "target_server_principal_name": "",
                "schema_name": "dbo",
                "is_column_permission": true,
                "object_id": 581577110,
                "server_instance_name": "EC2AMAZ-NFUJJNO",
                "target_server_principal_sid": null,
                "additional_information": "",
                "duration_milliseconds": 0,
                "permission_bitmask": "0x00000000000000000000000000000001",
                "data_sensitivity_information": "",
                "session_server_principal_name": "test",
                "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109",
                "audit_schema_version": 1,
                "database_principal_id": 1,
                "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000",
                "user_defined_event_id": 0,
                "host_name": "EC2AMAZ-NFUJJNO"
            }
        }
    ]
}
```

**Example Aktivitätsereignisdatensatz einer Aurora MySQL-SELECT-Anweisung**  
Das folgende Beispiel zeigt ein `SELECT`-Ereignis.  
 Im folgenden Beispiel wird das Ereignis mit einem `class`-Wert von `MAIN` gezeigt.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986467+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"SELECT * FROM test1 WHERE id < 28",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"726571",
      "rowCount":2,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986467+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 Im folgenden Beispiel wird das entsprechende Ereignis mit einem `class`-Wert von `AUX` gezeigt.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986399+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"READ",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"726571",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986399+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

## DatabaseActivityMonitoringRecords JSON-Objekt
<a name="DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords"></a>

Die Datenbank-Aktivitätsereignisdatensätze befinden sich in einem JSON-Objekt, das die folgenden Informationen enthält.


****  

| JSON-Feld | Datentyp | Beschreibung | 
| --- | --- | --- | 
|  `type`  | string |  Der Typ des JSON-Datensatzes. Der Wert ist `DatabaseActivityMonitoringRecords`.  | 
| version | string |  Die Version der Datenbank-Aktivitätsüberwachungsdatensätze. Die Version der generierten Datenbank-Aktivitätsdatensätze hängt von der Engine-Version des DB-Clusters ab: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.html)Alle folgenden Felder befinden sich sowohl in Version 1.0 als auch in Version 1.1, sofern nicht ausdrücklich angegeben. | 
|  [databaseActivityEvents](#DBActivityStreams.AuditLog.databaseActivityEvents)  | Zeichenfolge |  Ein JSON-Objekt, das die Aktivitätsereignisse enthält.  | 
| Schlüssel | Zeichenfolge | Ein Verschlüsselungsschlüssel, den Sie zum Entschlüsseln des [JSON-Array databaseActivityEventList](DBActivityStreams.AuditLog.databaseActivityEventList.md) verwenden  | 

## databaseActivityEvents JSON-Objekt
<a name="DBActivityStreams.AuditLog.databaseActivityEvents"></a>

Das `databaseActivityEvents`-JSON-Objekt enthält die folgenden Informationen.

### Felder der obersten Ebene im JSON-Datensatz
<a name="DBActivityStreams.AuditLog.topLevel"></a>

 Jedes Ereignis im Prüfprotokoll wird in einen Datensatz im JSON-Format verpackt. Dieser Datensatz enthält die folgenden Felder. 

**type**  
 Dieses Feld hat immer den Wert `DatabaseActivityMonitoringRecords`. 

**Version**  
 Dieses Feld stellt die Version des Datenprotokolls oder des Vertrags für die Datenbankaktivität dar. Es definiert, welche Felder verfügbar sind.  
Version 1.0 stellt die Unterstützung der ursprünglichen Datenaktivitäts-Streams für die Aurora PostgreSQL-Versionen 10.7 und 11.4 dar. Version 1.1 stellt die Unterstützung der Datenaktivitäts-Streams für die Aurora PostgreSQL-Versionen ab 10.10 und ab Aurora PostgreSQL-Version 11.5 dar. Version 1.1 enthält die zusätzlichen Felder `errorMessage` und `startTime`. Version 1.2 stellt die Unterstützung der Datenaktivitäts-Streams für Aurora MySQL 2.08 und höher dar. Version 1.2 enthält die zusätzlichen Felder `endTime` und `transactionId`.

**databaseActivityEvents**  
 Eine verschlüsselte Zeichenfolge, die ein oder mehrere Aktivitätsereignisse darstellt. Sie wird als Base64-Byte-Array dargestellt. Wenn Sie die Zeichenfolge entschlüsseln, ist das Ergebnis ein Datensatz im JSON-Format mit Feldern, wie in den Beispielen in diesem Abschnitt gezeigt.

**Schlüssel**  
 Der verschlüsselte Datenschlüssel, der zum Verschlüsseln der `databaseActivityEvents`-Zeichenfolge verwendet wird. Dies ist dasselbe AWS KMS key , das Sie beim Start des Datenbank-Aktivitätsstreams angegeben haben.

 Im folgenden Beispiel wird das Format dieses Datensatzes gezeigt.

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents":"encrypted audit records",
  "key":"encrypted key"
}
```

Führen Sie die folgenden Schritte aus, um den Inhalt des `databaseActivityEvents`-Feldes zu entschlüsseln:

1.  Entschlüsseln Sie den Wert im JSON-Feld `key` mit dem KMS-Schlüssel, den Sie beim Starten des Datenbankaktivitätsstroms angegeben haben. Dadurch wird der Datenverschlüsselungsschlüssel im Klartext zurückgegeben. 

1.  Base64-dekodieren Sie den Wert im `databaseActivityEvents`-JSON-Feld, um den Verschlüsselungstext der Prüfungsnutzlast im Binärformat zu erhalten. 

1.  Entschlüsseln Sie den binären Verschlüsselungstext mit dem Datenverschlüsselungsschlüssel, den Sie im ersten Schritt dekodiert haben. 

1.  Dekomprimieren Sie die entschlüsselte Nutzlast. 
   +  Die verschlüsselte Nutzlast befindet sich im `databaseActivityEvents`-Feld. 
   +  Das `databaseActivityEventList`-Feld enthält ein Array von Prüfdatensätzen. Die `type`-Felder im Array können `record` oder sein `heartbeat`. 

Der Prüfprotokoll-Aktivitätsereignisdatensatz ist ein JSON-Objekt mit folgenden Informationen.


****  

| JSON-Feld | Datentyp | Beschreibung | 
| --- | --- | --- | 
|  `type`  | string |  Der Typ des JSON-Datensatzes. Der Wert ist `DatabaseActivityMonitoringRecord`.  | 
| clusterId | string | Die Ressourcen-ID des DB-Clusters. Sie entspricht dem DB-Clusterattribut DbClusterResourceId. | 
| instanceId | string | Die Ressourcen-ID der DB-Instance. Sie dem DB-Instance-Attribut DbiResourceId. | 
|  [JSON-Array databaseActivityEventList](DBActivityStreams.AuditLog.databaseActivityEventList.md)   | string |  Ein Array von Aktivitätsprüfdatensätzen oder Heartbeat-Nachrichten.  | 

# JSON-Array databaseActivityEventList für Datenbankaktivitäts-Streams
<a name="DBActivityStreams.AuditLog.databaseActivityEventList"></a>

Die Prüfprotokollnutzlast ist ein verschlüsseltes JSON-Array `databaseActivityEventList`. In der folgenden Tabelle sind die Felder für jedes Aktivitätsereignis im entschlüsselten Array `DatabaseActivityEventList` eines Prüfprotokolls alphabetisch aufgelistet. Die Felder unterscheiden sich je nachdem, ob Sie Aurora PostgreSQL oder Aurora MySQL verwenden. Näheres entnehmen Sie bitte der Tabelle, die für Ihre Datenbank-Engine gilt.

**Wichtig**  
Die Ereignisstruktur kann sich ändern. Aurora könnte in Zukunft neue Felder zu Aktivitätsereignissen hinzufügen. Stellen Sie bei Anwendungen, welche die JSON-Daten analysieren, sicher, dass Ihr Code unbekannte Feldnamen ignorieren oder entsprechende Aktionen durchführen kann. 

## databaseActivityEventList-Felder für Aurora PostgreSQL
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.apg"></a>

Im Folgenden sind `databaseActivityEventList`-Felder für Aurora PostgreSQL aufgeführt.


| Feld | Datentyp | Beschreibung | 
| --- | --- | --- | 
| class | string |  Die Aktivitätsereignisklasse. Gültige Werte für Aurora PostgreSQL sind die folgenden: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | string | Die Anwendung, die der Client laut Meldung für die Verbindung verwendet hat. Der Client muss diese Informationen nicht angeben, der Wert kann daher Null sein. | 
| command | string | Der Name des SQL-Befehls ohne Befehlsdetails. | 
| commandText | string |  Die vom Benutzer übergebene eigentliche SQL-Anweisung. Bei Aurora PostgreSQL ist der Wert identisch mit der ursprünglichen SQL-Anweisung. Dieses Feld wird für alle Arten von Datensätzen verwendet, mit Ausnahme von Verbindungs- oder Verbindungstrennungsdatensätzen, bei denen der Wert Null ist.  Der vollständige SQL-Text jeder Anweisung ist im Prüfprotokoll des Aktivitäts-Streams sichtbar, inklusive aller sensiblen Daten. Datenbankbenutzerkennwörter werden jedoch redigiert, wenn Aurora sie wie in der folgenden SQL-Anweisung aus dem Kontext ermitteln kann.  <pre>ALTER ROLE role-name WITH password</pre>   | 
| databaseName | string | Die Datenbank, zu der der Benutzer eine Verbindung hergestellt hat. | 
| dbProtocol | string | Das Datenbankprotokoll, z. B. Postgres 3.0. | 
| dbUserName | string | Der Datenbankbenutzer, mit dem sich der Client authentifiziert hat. | 
| errorMessage(nur Datenbank-Aktivitätsdatensätze der Version 1.1) | string |  Wenn ein Fehler aufgetreten ist, wird dieses Feld mit der Fehlermeldung gefüllt, die vom DB-Server generiert worden wäre. Der `errorMessage`-Wert ist null für normale Anweisungen, die nicht zu einem Fehler geführt haben.  Ein Fehler wird als jede Aktivität definiert, die ein vom Client sichtbares PostgreSQL-Fehlerprotokollereignis mit einem Schweregrad von `ERROR` oder höher erzeugen würde. Weitere Informationen finden Sie unter [PostgreSQL-Nachrichtenschweregrade](https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-SEVERITY-LEVELS). Beispielsweise erzeugen Syntaxfehler und Abfrageabbrüche eine Fehlermeldung.  Interne PostgreSQL-Serverfehler wie Hintergrund-Checkpointer-Prozessfehler erzeugen keine Fehlermeldung. Datensätze für solche Ereignisse werden jedoch weiterhin ausgegeben, unabhängig von der Einstellung des Schweregrads des Protokolls. Dadurch wird verhindert, dass Angreifer die Protokollierung deaktivieren, um eine Erkennung zu vermeiden. Siehe auch das Feld `exitCode`.  | 
| exitCode | int | Ein Wert, der für einen Sitzungsbeendigungs-Datensatz verwendet wird. Bei einer sauberen Beendigung ist hier der Beendigungscode enthalten. In manchen Fehlersituationen kann nicht immer ein Beendigungscode erhalten werden. Beispiele: exit() von PostgreSQL oder Ausführung eines Befehls wie kill -9 durch einen Operator.Wenn ein Fehler aufgetreten ist, zeigt das `exitCode`-Feld den SQL-Fehlercode `SQLSTATE` an, wie in [PostgreSQL-Fehlercodes](https://www.postgresql.org/docs/current/errcodes-appendix.html) aufgeführt. Siehe auch das Feld `errorMessage`. | 
| logTime | string | Ein Zeitstempel wie im Prüfcodepfad aufgezeichnet. Dies stellt die Endzeit der SQL-Anweisungsausführung dar. Siehe auch das Feld startTime. | 
| netProtocol | string | Das Netzwerkkommunikationsprotokoll. | 
| objectName | string | Der Name des Datenbankobjekts, wenn die SQL-Anweisung für eines ausgeführt wird. Dieses Feld wird nur verwendet, wenn die SQL-Anweisung für ein Datenbankobjekt ausgeführt wird. Falls die SQL-Anweisung nicht für ein Objekt ausgeführt wird, lautet dieser Wert Null. | 
| objectType | string | Der Datenbankobjekttyp wie z. B. Tabelle, Index, Ansicht usw. Dieses Feld wird nur verwendet, wenn die SQL-Anweisung für ein Datenbankobjekt ausgeführt wird. Falls die SQL-Anweisung nicht für ein Objekt ausgeführt wird, lautet dieser Wert Null. Gültige Werte sind unter anderem:[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) | 
| paramList | string | Ein Array durch Kommas getrennter Parameter, die an die SQL-Anweisung übergeben werden. Wenn die SQL-Anweisung keine Parameter beinhaltet, ist dieser Wert ein leeres Array. | 
| pid | int | Die Prozess-ID des Back-End-Prozesses, der für die Client-Verbindung zugewiesen wird. | 
| remoteHost | string | Entweder die Client-IP-Adresse oder der Hostname. Was davon verwendet wird, ist bei Aurora PostgreSQL von der Parametereinstellung log\$1hostname der Datenbank abhängig. Der Wert remoteHost beinhaltet auch die Werte [local] und localhost, die auf Aktivität des rdsadmin-Benutzers hinweisen. | 
| remotePort | Zeichenfolge | Die Portnummer des Clients. | 
| rowCount | int | Die Anzahl der Tabellenzeilen, die von der SQL-Anweisung betroffen sind bzw. abgerufen werden. Dieses Feld wird nur für SQL-Anweisungen verwendet, bei denen es sich um DML-Anweisungen (DML = Data Manipulation Language) handelt. Falls die SQL-Anweisung keine DML-Anweisung ist, lautet dieser Wert Null. | 
| serverHost | Zeichenfolge | Die Host-IP-Adresse des Datenbankservers. Der Wert serverHost beinhaltet auch die Werte [local] und localhost, die auf Aktivität des rdsadmin-Benutzers hinweisen. | 
| serverType | Zeichenfolge | Der Datenbankservertyp, z. B PostgreSQL. | 
| serverVersion | string | Die Datenbankserver-Version, z. B. 2.3.1 für Aurora PostgreSQL. | 
| serviceName | string | Der Name des Service, beispielsweise Amazon Aurora PostgreSQL-Compatible edition.  | 
| sessionId | int | Eine pseudoeindeutige Sitzungskennung. | 
| sessionId | int | Eine pseudoeindeutige Sitzungskennung. | 
| startTime(nur Datenbank-Aktivitätsdatensätze der Version 1.1) | string |  Die Zeit, zu der die Ausführung für die SQL-Anweisung begann.  Um die ungefähre Ausführungszeit der SQL-Anweisung zu berechnen, verwenden Sie `logTime - startTime`. Siehe auch das Feld `logTime`.  | 
| statementId | int | Eine ID für die SQL-Anweisung des Clients. Dieser Zähler auf Sitzungsebene erhöht sich mit jeder vom Client eingegebenen SQL-Anweisung.  | 
| substatementId | int | Eine ID für eine SQL-Unteranweisung. Dieser Wert zählt die enthaltenen Unteranweisungen für jede über das Feld statementId angegebene SQL-Anweisung. | 
| type | string | Der Ereignistyp. Gültige Werte sind record oder heartbeat. | 

## databaseActivityEventList-Felder für Aurora MySQL
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.ams"></a>

Im Folgenden sind `databaseActivityEventList`-Felder für Aurora MySQL aufgeführt.


| Feld | Datentyp | Beschreibung | 
| --- | --- | --- | 
| class | string |  Die Aktivitätsereignisklasse. Gültige Werte für Aurora MySQL sind die folgenden: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | string | Die Anwendung, die der Client laut Meldung für die Verbindung verwendet hat. Der Client muss diese Informationen nicht angeben, der Wert kann daher Null sein. | 
| command | string |  Die allgemeine Kategorie der SQL-Anweisung. Die Werte für dieses Feld hängen vom Wert von a `class`. Wenn `class` `MAIN` ist, enthalten die Werte Folgendes: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) Wenn `class` `AUX` ist, enthalten die Werte Folgendes: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| commandText | string |  Bei Ereignissen mit dem `class`-Wert von `MAIN` stellt dieses Feld die tatsächliche, vom Benutzer eingegebene SQL-Anweisung dar. Dieses Feld wird für alle Arten von Datensätzen verwendet, mit Ausnahme von Verbindungs- oder Verbindungstrennungsdatensätzen, bei denen der Wert Null ist.  Bei Ereignissen mit einem `class`-Wert von `AUX` enthält dieses Feld zusätzliche Informationen über die am Ereignis beteiligten Objekte.  Bei Aurora MySQL wird Zeichen, z. B. Anführungszeichen, ein Backslash vorangestellt, der ein Escape-Zeichen darstellt.  Der vollständige SQL-Text jeder Anweisung ist im Prüfprotokoll sichtbar, inklusive aller sensiblen Daten. Datenbankbenutzerkennwörter werden jedoch redigiert, wenn Aurora sie wie in der folgenden SQL-Anweisung aus dem Kontext ermitteln kann.  <pre>mysql> SET PASSWORD = 'my-password';</pre> Geben Sie aus Sicherheitsgründen ein anderes Passwort als hier angegeben an.   | 
| databaseName | Zeichenfolge | Die Datenbank, zu der der Benutzer eine Verbindung hergestellt hat. | 
| dbProtocol | string | Das Datenbankprotokoll. Derzeit ist dieser Wert bei Aurora MySQL immer MySQL. | 
| dbUserName | string | Der Datenbankbenutzer, mit dem sich der Client authentifiziert hat. | 
| endTime(nur Datenbank-Aktivitätsdatensätze der Version 1.2) | string |  Die Zeit, zu der die Ausführung für die SQL-Anweisung endete. Sie wird im UTC-Format (Coordinated Universal Time) dargestellt. Um die Ausführungszeit der SQL-Anweisung zu berechnen, verwenden Sie `endTime - startTime`. Siehe auch das Feld `startTime`.  | 
| errorMessage(nur Datenbank-Aktivitätsdatensätze der Version 1.1) | string |  Wenn ein Fehler aufgetreten ist, wird dieses Feld mit der Fehlermeldung gefüllt, die vom DB-Server generiert worden wäre. Der `errorMessage`-Wert ist null für normale Anweisungen, die nicht zu einem Fehler geführt haben.  Ein Fehler wird als jede Aktivität definiert, die ein vom Client sichtbares MySQL-Fehlerprotokollereignis mit einem Schweregrad von `ERROR` oder höher erzeugen würde. Weitere Informationen finden Sie unter [Fehlerprotokoll](https://dev.mysql.com/doc/refman/5.7/en/error-log.html) im *MySQL-Referenzhandbuch*. Beispielsweise erzeugen Syntaxfehler und Abfrageabbrüche eine Fehlermeldung.  Interne MySQL-Serverfehler wie Hintergrund-Checkpointer-Prozessfehler erzeugen keine Fehlermeldung. Datensätze für solche Ereignisse werden jedoch weiterhin ausgegeben, unabhängig von der Einstellung des Schweregrads des Protokolls. Dadurch wird verhindert, dass Angreifer die Protokollierung deaktivieren, um eine Erkennung zu vermeiden. Siehe auch das Feld `exitCode`.  | 
| exitCode | int | Ein Wert, der für einen Sitzungsbeendigungs-Datensatz verwendet wird. Bei einer sauberen Beendigung ist hier der Beendigungscode enthalten. In manchen Fehlersituationen kann nicht immer ein Beendigungscode erhalten werden. In solchen Fällen kann dieser Wert Null oder leer sein. | 
| logTime | string | Ein Zeitstempel wie im Prüfcodepfad aufgezeichnet. Sie wird im UTC-Format (Coordinated Universal Time) dargestellt. Die genaueste Methode zum Berechnen der Anweisungsdauer finden Sie in den Feldern startTime und endTime. | 
| netProtocol | string | Das Netzwerkkommunikationsprotokoll. Derzeit ist dieser Wert bei Aurora MySQL immer TCP. | 
| objectName | string | Der Name des Datenbankobjekts, wenn die SQL-Anweisung für eines ausgeführt wird. Dieses Feld wird nur verwendet, wenn die SQL-Anweisung für ein Datenbankobjekt ausgeführt wird. Falls die SQL-Anweisung nicht für ein Objekt ausgeführt wird, ist dieser Wert leer. Um den vollständig qualifizierten Namen des Objekts zu erstellen, kombinieren Sie databaseName und objectName. Wenn die Abfrage mehrere Objekte umfasst, kann dieses Feld eine durch Komma getrennte Liste von Namen sein. | 
| objectType | string |  Der Datenbankobjekttyp, z. B. Tabelle, Index usw. Dieses Feld wird nur verwendet, wenn die SQL-Anweisung für ein Datenbankobjekt ausgeführt wird. Falls die SQL-Anweisung nicht für ein Objekt ausgeführt wird, lautet dieser Wert Null. Gültige Werte für Aurora MySQL sind unter anderem: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| paramList | string | Dieses Feld wird für Aurora MySQL nicht verwendet und ist immer Null. | 
| pid | int | Die Prozess-ID des Back-End-Prozesses, der für die Client-Verbindung zugewiesen wird. Wenn der Datenbankserver neu gestartet wird, ändert sich die pid und der Zähler für das Feld statementId beginnt von vorn. | 
| remoteHost | string | Entweder die IP-Adresse oder der Hostname des Clients, der die SQL-Anweisung ausgegeben hat. Was davon verwendet wird, ist bei Aurora MySQL von der Parametereinstellung skip\$1name\$1resolve der Datenbank abhängig. Der Wert localhost gibt die Aktivität des speziellen Benutzers rdsadmin an.  | 
| remotePort | string | Die Portnummer des Clients. | 
| rowCount | int | Die Anzahl der Zeilen, die von der SQL-Anweisung zurückgegeben werden. Wenn eine SELECT-Anweisung beispielsweise 10 Zeilen zurückgibt, beträgt rowCount 10. Für INSERT- oder UPDATE-Anweisungen ist der RowCount 0. | 
| serverHost | Zeichenfolge | Die Datenbankserver-Instance-ID. | 
| serverType | Zeichenfolge | Der Datenbankservertyp, z. B MySQL. | 
| serverVersion | string | Die Version des Datenbankservers. Derzeit ist dieser Wert bei Aurora MySQL immer MySQL 5.7.12. | 
| serviceName | string | Name des Service. Derzeit ist dieser Wert bei Aurora MySQL immer Amazon Aurora MySQL. | 
| sessionId | int | Eine pseudoeindeutige Sitzungskennung. | 
| startTime(nur Datenbank-Aktivitätsdatensätze der Version 1.1) | string |  Die Zeit, zu der die Ausführung für die SQL-Anweisung begann. Sie wird im UTC-Format (Coordinated Universal Time) dargestellt. Um die Ausführungszeit der SQL-Anweisung zu berechnen, verwenden Sie `endTime - startTime`. Siehe auch das Feld `endTime`.  | 
| statementId | int | Eine ID für die SQL-Anweisung des Clients. Der Zähler erhöht sich mit jeder vom Client eingegebenen SQL-Anweisung. Der Zähler wird zurückgesetzt, wenn die DB-Instance neu gestartet wird. | 
| substatementId | int | Eine ID für eine SQL-Unteranweisung. Dieser Wert ist 1 für Ereignisse mit der Klasse MAIN und 2 für Ereignisse mit der Klasse AUX. Verwenden Sie das statementId-Feld, um alle Ereignisse zu identifizieren, die von derselben Anweisung generiert werden. | 
| transactionId(nur Datenbank-Aktivitätsdatensätze der Version 1.2) | int | Eine ID für eine Transaktion. | 
| type | string | Der Ereignistyp. Gültige Werte sind record oder heartbeat. | 

# Verarbeitung eines Datenbankaktivitäts-Streams mit dem AWS SDK
<a name="DBActivityStreams.CodeExample"></a>

Sie können einen Aktivitäts-Stream mit Hilfe des AWS SDK programmgesteuert verarbeiten. Im Folgenden sehen Sie vollständig funktionsfähige Java- und Python-Beispiele für eine mögliche Verarbeitung des Kinesis-Datenstroms. 

------
#### [ Java ]

```
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.Security;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;

import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.SecretKeySpec;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CryptoInputStream;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kms.AWSKMS;
import com.amazonaws.services.kms.AWSKMSClientBuilder;
import com.amazonaws.services.kms.model.DecryptRequest;
import com.amazonaws.services.kms.model.DecryptResult;
import com.amazonaws.util.Base64;
import com.amazonaws.util.IOUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

public class DemoConsumer {

    private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]";
    private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking
    private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]";
    private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]";
    private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]";
    private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2...
    private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY);
    private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS);

    private static final AwsCrypto CRYPTO = new AwsCrypto();
    private static final AWSKMS KMS = AWSKMSClientBuilder.standard()
            .withRegion(REGION_NAME)
            .withCredentials(CREDENTIALS_PROVIDER).build();

    class Activity {
        String type;
        String version;
        String databaseActivityEvents;
        String key;
    }

    class ActivityEvent {
        @SerializedName("class") String _class;
        String clientApplication;
        String command;
        String commandText;
        String databaseName;
        String dbProtocol;
        String dbUserName;
        String endTime;
        String errorMessage;
        String exitCode;
        String logTime;
        String netProtocol;
        String objectName;
        String objectType;
        List<String> paramList;
        String pid;
        String remoteHost;
        String remotePort;
        String rowCount;
        String serverHost;
        String serverType;
        String serverVersion;
        String serviceName;
        String sessionId;
        String startTime;
        String statementId;
        String substatementId;
        String transactionId;
        String type;
    }

    class ActivityRecords {
        String type;
        String clusterId;
        String instanceId;
        List<ActivityEvent> databaseActivityEventList;
    }

    static class RecordProcessorFactory implements IRecordProcessorFactory {
        @Override
        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }

    static class RecordProcessor implements IRecordProcessor {

        private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
        private static final int PROCESSING_RETRIES_MAX = 10;
        private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
        private static final Gson GSON = new GsonBuilder().serializeNulls().create();

        private static final Cipher CIPHER;
        static {
            Security.insertProviderAt(new BouncyCastleProvider(), 1);
            try {
                CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC");
            } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        private long nextCheckpointTimeInMillis;

        @Override
        public void initialize(String shardId) {
        }

        @Override
        public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) {
            for (final Record record : records) {
                processSingleBlob(record.getData());
            }

            if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
                checkpoint(checkpointer);
                nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
            }
        }

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            if (reason == ShutdownReason.TERMINATE) {
                checkpoint(checkpointer);
            }
        }

        private void processSingleBlob(final ByteBuffer bytes) {
            try {
                // JSON $Activity
                final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class);

                // Base64.Decode
                final byte[] decoded = Base64.decode(activity.databaseActivityEvents);
                final byte[] decodedDataKey = Base64.decode(activity.key);

                Map<String, String> context = new HashMap<>();
                context.put("aws:rds:dbc-id", DBC_RESOURCE_ID);

                // Decrypt
                final DecryptRequest decryptRequest = new DecryptRequest()
                        .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context);
                final DecryptResult decryptResult = KMS.decrypt(decryptRequest);
                final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext()));

                // GZip Decompress
                final byte[] decompressed = decompress(decrypted);
                // JSON $ActivityRecords
                final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class);

                // Iterate throught $ActivityEvents
                for (final ActivityEvent event : activityRecords.databaseActivityEventList) {
                    System.out.println(GSON.toJson(event));
                }
            } catch (Exception e) {
                // Handle error.
                e.printStackTrace();
            }
        }

        private static byte[] decompress(final byte[] src) throws IOException {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
            GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
            return IOUtils.toByteArray(gzipInputStream);
        }

        private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
            for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) {
                try {
                    checkpointer.checkpoint();
                    break;
                } catch (ShutdownException se) {
                    // Ignore checkpoint if the processor instance has been shutdown (fail over).
                    System.out.println("Caught shutdown exception, skipping checkpoint." + se);
                    break;
                } catch (ThrottlingException e) {
                    // Backoff and re-attempt checkpoint upon transient failures
                    if (i >= (PROCESSING_RETRIES_MAX - 1)) {
                        System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e);
                        break;
                    } else {
                        System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e);
                    }
                } catch (InvalidStateException e) {
                    // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
                    System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e);
                    break;
                }
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    System.out.println("Interrupted sleep" + e);
                }
            }
        }
    }

    private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException {
        // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm
        final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"),
                "BC", "DataKey", "AES/GCM/NoPadding");
        try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded));
             final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
            IOUtils.copy(decryptingStream, out);
            return out.toByteArray();
        }
    }

    public static void main(String[] args) throws Exception {
        final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        final KinesisClientLibConfiguration kinesisClientLibConfiguration =
                new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId);
        kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST);
        kinesisClientLibConfiguration.withRegionName(REGION_NAME);
        final Worker worker = new Builder()
                .recordProcessorFactory(new RecordProcessorFactory())
                .config(kinesisClientLibConfiguration)
                .build();

        System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId);

        try {
            worker.run();
        } catch (Throwable t) {
            System.err.println("Caught throwable while processing data.");
            t.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }

    private static byte[] getByteArray(final ByteBuffer b) {
        byte[] byteArray = new byte[b.remaining()];
        b.get(byteArray);
        return byteArray;
    }
}
```

------
#### [ Python ]

```
import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3

REGION_NAME = '<region>'                    # us-east-1
RESOURCE_ID = '<external-resource-id>'      # cluster-ABCD123456
STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID  # aws-rds-das-cluster-ABCD123456

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT)

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt_payload(payload, data_key):
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    decrypted_plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
    return decrypted_plaintext


def decrypt_decompress(payload, key):
    decrypted = decrypt_payload(payload, key)
    return zlib.decompress(decrypted, zlib.MAX_WBITS + 16)


def main():
    session = boto3.session.Session()
    kms = session.client('kms', region_name=REGION_NAME)
    kinesis = session.client('kinesis', region_name=REGION_NAME)

    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    shard_iters = []
    for shard in response['StreamDescription']['Shards']:
        shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'],
                                                         ShardIteratorType='LATEST')
        shard_iters.append(shard_iter_response['ShardIterator'])

    while len(shard_iters) > 0:
        next_shard_iters = []
        for shard_iter in shard_iters:
            response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000)
            for record in response['Records']:
                record_data = record['Data']
                record_data = json.loads(record_data)
                payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
                data_key_decoded = base64.b64decode(record_data['key'])
                data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded,
                                                      EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
                print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']))
            if 'NextShardIterator' in response:
                next_shard_iters.append(response['NextShardIterator'])
        shard_iters = next_shard_iters


if __name__ == '__main__':
    main()
```

------