Operationen für Streaming-Daten: Zusammenführen von Streams - Entwicklerhandbuch für Amazon Kinesis Data Analytics for SQL Applications

Für neue Projekte empfehlen wir, den neuen Managed Service für Apache Flink Studio anstelle von Kinesis Data Analytics for SQL Applications zu verwenden. Der Managed Service für Apache Flink Studio kombiniert Benutzerfreundlichkeit mit fortschrittlichen Analysefunktionen, sodass Sie in wenigen Minuten anspruchsvolle Anwendungen zur Stream-Verarbeitung erstellen können.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Operationen für Streaming-Daten: Zusammenführen von Streams

In Ihrer Anwendung kann es mehrere In-Application-Streams geben. Sie können JOIN-Abfragen schreiben, um Daten zu korrelieren, die über diese Streams eintreffen. Angenommen, es liegen die folgenden In-Application-Streams vor:

  • OrderStream – empfängt Lageraufträge, die platziert werden.

    (orderId SqlType, ticker SqlType, amount SqlType, ROWTIME TimeStamp)
  • TradeStream – empfängt die resultierenden Wertpapiertransaktionen für diese Bestellungen.

    (tradeId SqlType, orderId SqlType, ticker SqlType, amount SqlType, ticker SqlType, amount SqlType, ROWTIME TimeStamp)

Im Folgenden finden Sie Beispiele für JOIN-Abfragen, die Daten aus diesen Streams korrelieren.

Beispiel 1: Bericht zu Bestellungen, bei denen es innerhalb einer Minute nach Platzierung der Bestellung eine Handelsaktion gibt

In diesem Beispiel führt die Abfrage OrderStream und TradeStream zusammen. Da jedoch nur Handelsaktionen ausgegeben werden sollen, die innerhalb einer Minute nach der Bestellung ausgeführt werden, definiert die Abfrage das 1-Minuten-Fenster für den TradeStream. Informationen zu Abfragen mit Fenstern finden Sie unter Gleitende Fenster.

SELECT STREAM ROWTIME, o.orderId, o.ticker, o.amount AS orderAmount, t.amount AS tradeAmount FROM OrderStream AS o JOIN TradeStream OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS t ON o.orderId = t.orderId;

Sie können die Fenster mittels der WINDOW-Klausel explizit definieren und die vorhergehende Abfrage wie folgt schreiben:

SELECT STREAM ROWTIME, o.orderId, o.ticker, o.amount AS orderAmount, t.amount AS tradeAmount FROM OrderStream AS o JOIN TradeStream OVER t ON o.orderId = t.orderId WINDOW t AS (RANGE INTERVAL '1' MINUTE PRECEDING)

Wenn Sie diese Abfrage in Ihren Anwendungscode einschließen, wird der Anwendungscode kontinuierlich ausgeführt. Für jeden im OrderStream eintreffenden Datensatz gibt die Anwendung eine Ausgabe aus, wenn es innerhalb des 1-Minuten-Fensters nach Platzierung der Bestellung eine Handelsaktion gibt.

Die Zusammenführung in der vorhergehenden Abfrage ist eine interne Zusammenführung, bei der die Abfrage Datensätze im OrderStream ausgibt, für die es einen übereinstimmenden Datensatz in TradeStream gibt (und umgekehrt). Mithilfe einer externen Zusammenführung können Sie ein weiteres interessantes Szenario erstellen. Stellen Sie sich folgendes Szenario vor: Es sollen Börsenaufträge erteilt werden, für die es innerhalb einer Minute nach Platzierung des Auftrags keine Handelsaktionen gibt. Ebenso gibt es Handelsaktionen innerhalb desselben Fensters, die jedoch für andere Börsenaufträge gelten. In diesem Beispiel wird eine externe Zusammenführung gezeigt.

SELECT STREAM ROWTIME, o.orderId, o.ticker, o.amount AS orderAmount, t.ticker, t.tradeId, t.amount AS tradeAmount, FROM OrderStream AS o LEFT OUTER JOIN TradeStream OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS t ON o.orderId = t.orderId;