Beispiel: Aggregieren von Teilergebnissen aus einer Abfrage - Entwicklerhandbuch für Amazon Kinesis Data Analytics for SQL Applications

Für neue Projekte empfehlen wir, den neuen Managed Service für Apache Flink Studio anstelle von Kinesis Data Analytics for SQL Applications zu verwenden. Der Managed Service für Apache Flink Studio kombiniert Benutzerfreundlichkeit mit fortschrittlichen Analysefunktionen, sodass Sie in wenigen Minuten anspruchsvolle Anwendungen zur Stream-Verarbeitung erstellen können.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Beispiel: Aggregieren von Teilergebnissen aus einer Abfrage

Wenn ein Amazon Kinesis-Datenstrom Datensätze mit einer Ereigniszeit enthält, die nicht genau dem Zeitpunkt der Datenübernahme entspricht, enthalten bestimmte Ergebnisse in einem rollierenden Fenster Datensätze, die innerhalb des Fensters eingetroffen sind, aber nicht unbedingt eingetreten sein müssen. In diesem Fall enthält das rollierende Fenster nur eine Teilmenge der gewünschten Ergebnisse. Es gibt mehrere Möglichkeiten, wie Sie dieses Problem beheben können:

  • Verwenden Sie nur ein rollierendes Fenster und aggregieren Sie Teilergebnisse in der Nachverarbeitung über eine Datenbank oder ein Data Warehouse mithilfe von upsert-Operationen. Dieser Ansatz ist bei der Verarbeitung einer Anwendung sehr effizient. Verspätete Daten für Aggregat-Operatoren (sum, min, max usw.) werden ohne jegliche Einschränkungen verarbeitet. Der Nachteil dieser Methode besteht darin, dass Sie eine zusätzliche Anwendungslogik im Datenbank-Layer entwickeln und pflegen müssen.

  • Verwenden Sie ein rollierendes und ein gleitendes Fenster, das früh zu Teilergebnissen führt, aber innerhalb des Zeitraums des gleitenden Fensters zudem vollständige Ergebnisse erzielen wird. Dieser Ansatz verarbeitet verspätete Daten durch Überschreiben anstatt mit einer upsert-Operation. Daher muss keine zusätzliche Anwendungslogik im Datenbank-Layer hinzugefügt werden. Der Nachteil dieser Methode besteht darin, dass sie mehr Kinesis Processing Units (KPUs) verbraucht und nach wie vor zwei Ergebnisse erzeugt, was bei manchen Anwendungsfällen möglicherweise nicht funktioniert.

Weitere Informationen zu rollierenden und gleitenden Fenstern finden Sie unter Abfragen mit Fenstern.

Im folgenden Verfahren erzielt die Aggregation über ein rollierendes Fenster zwei (an den CALC_COUNT_SQL_STREAM-In-Application-Stream übermittelte) Teilergebnisse, die zusammen das Endergebnis ergeben. Die Anwendung erstellt dann eine zweite (an den DESTINATION_SQL_STREAM-In-Application-Stream übermittelte) Aggregation, um die beiden Teilergebnisse zu vereinen.

So erstellen Sie eine Anwendung, die Teilergebnisse mittels einer Ereigniszeit aggregiert
  1. Melden Sie sich bei AWS Management Console an und öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.

  2. Klicken Sie im Navigationsbereich auf Data Analytics (Datenanalyse). Erstellen Sie eine Kinesis Data Analytics-Anwendung gemäß der Beschreibung im Erste Schritte mit Amazon-Kinesis-Data-Analytics for SQL-Anwendungen-Tutorial.

  3. Ersetzen Sie im SQL-Editor den Anwendungscode durch Folgendes:

    CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);

    Die SELECT-Anweisung im Anwendungscode filtert Zeilen im SOURCE_SQL_STREAM_001 nach Aktien mit Preisänderungen von mehr als 1 % und fügt diese Zeilen mittels eines Pump in einen anderen CHANGE_STREAM-In-Application-Stream ein.

  4. Klicken Sie auf Save and run SQL (SQL speichern und ausführen).

Die erste Pumpe gibt einen Stream ähnlich dem Folgenden an CALC_COUNT_SQL_STREAM aus. Beachten Sie, dass der Ergebnissatz unvollständig ist:

Screenshot der Konsole mit Teilergebnissen.

Die zweite Pumpe gibt dann einen Stream mit dem vollständigen Ergebnissatz an DESTINATION_SQL_STREAM aus:

Screenshot der Konsole mit vollständigen Ergebnissen.