Beispiel: Transformieren von DateTime Werten - Entwicklerhandbuch für Amazon Kinesis Data Analytics for SQL Applications

Für neue Projekte empfehlen wir, den neuen Managed Service für Apache Flink Studio anstelle von Kinesis Data Analytics for SQL Applications zu verwenden. Der Managed Service für Apache Flink Studio kombiniert Benutzerfreundlichkeit mit fortschrittlichen Analysefunktionen, sodass Sie in wenigen Minuten anspruchsvolle Anwendungen zur Stream-Verarbeitung erstellen können.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Beispiel: Transformieren von DateTime Werten

Amazon Kinesis Data Analytics unterstützt die Konvertierung von Spalten in Zeitstempel. Beispielsweise möchten Sie vielleicht Ihren eigenen Zeitstempel als Teil einer GROUP BY-Klausel in Form eines anderen zeitbasierten Fensters zusätzlich zur Spalte ROWTIME verwenden. Kinesis Data Analytics stellt Vorgänge und SQL-Funktionen für die Arbeit mit Datums- und Uhrzeitfeldern bereit.

  • Operatoren für Datum und Uhrzeit – Sie können Rechenoperationen mit Datums-, Uhrzeit- und Intervall-Datentypen ausführen. Weitere Informationen finden Sie unter Datums-, Zeitstempel- und Intervalloperatoren in der SQL-Referenz zu Amazon Managed Service für Apache Flink.

     

  • SQL-Funktionen – Diese umfassen u. a. das Folgende. Weitere Informationen finden Sie unter Datums- und Uhrzeitfunktionen in der SQL-Referenz zu Amazon Managed Service für Apache Flink.

    • EXTRACT() – Extrahiert ein Feld von einem Datums-, Uhrzeit-, Zeitstempel- oder Intervall Ausdruck.

    • CURRENT_TIME – Gibt die Zeit an, zu der die Abfrage ausgeführt wird (UTC).

    • CURRENT_DATE – Gibt das Datum an, an dem die Abfrage ausgeführt wird (UTC).

    • CURRENT_TIMESTAMP – Gibt den Zeitstempel an, an dem die Abfrage ausgeführt wird (UTC).

    • LOCALTIME – Gibt die aktuelle Uhrzeit an, wenn die Abfrage ausgeführt wird, entsprechend der Definition der Umgebung, in der Kinesis Data Analytics ausgeführt wird (UTC).

    • LOCALTIMESTAMP – Gibt den aktuellen Zeitstempel an, entsprechend der Definition der Umgebung, in der Kinesis Data Analytics ausgeführt wird (UTC).

       

  • SQL-Erweiterungen – Diese umfassen u. a. die Folgenden. Weitere Informationen finden Sie unter Datums- und Zeitfunktionen und Konvertierungsfunktionen für Datum/Uhrzeit in der SQL-Referenz zu Amazon Managed Service für Apache Flink.

    • CURRENT_ROW_TIMESTAMP – Gibt einen neuen Zeitstempel für alle Zeilen im Stream an.

    • TSDIFF – Gibt die Differenz zwischen zwei Zeitstempeln in Millisekunden an.

    • CHAR_TO_DATE – Wandelt eine Zeichenfolge in ein Datum um.

    • CHAR_TO_TIME – Wandelt eine Zeichenfolge in eine Uhrzeit um.

    • CHAR_TO_TIMESTAMP – Wandelt eine Zeichenfolge in einen Zeitstempel um.

    • DATE_TO_CHAR – Wandelt ein Datum in eine Zeichenfolge um.

    • TIME_TO_CHAR – Wandelt eine Uhrzeit in eine Zeichenfolge um.

    • TIMESTAMP_TO_CHAR – Wandelt eine Zeitstempel in einen Zeichenfolge um.

Die meisten der oben genannten SQL-Funktionen verwenden ein Format zum Umwandeln der Spalten. Das Format ist flexibel. Sie können beispielsweise das Format yyyy-MM-dd hh:mm:ss festlegen, damit die Eingabezeichenfolge 2009-09-16 03:15:24 in einen Zeitstempel umgewandelt wird. Weitere Informationen finden Sie unter Char To Timestamp (Sys) in der SQL-Referenz zu Amazon Managed Service für Apache Flink.

Beispiele: Umwandeln von Datumsangaben

In diesem Beispiel schreiben Sie die folgenden Datensätze in einen Amazon Kinesis-Datenstrom.

{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...

Anschließend erstellen Sie eine Kinesis Data Analytics-Anwendung in der Konsole mit dem Kinesis-Stream als Streaming-Quelle. Der Erkennungsvorgang liest Beispieldatensätze auf der Streaming-Quelle und erschließt ein In-Application-Schema mit zwei Spalten (EVENT_TIME und TICKER), wie hier gezeigt.

Screenshot der Konsole mit In-Application-Schema mit Ereigniszeit- und Ticker-Spalten.

Anschließend verwenden Sie den Anwendungscode mit SQL-Funktionen, um das EVENT_TIME-Zeitstempelfeld auf verschiedene Weise zu konvertieren. Fügen Sie die resultierenden Daten dann wie im folgenden Screenshot dargestellt in einen anderen In-Application-Stream ein:

Screenshot der Konsole mit den resultierenden Daten in einem In-Application-Stream.

Schritt 1: Erstellen Sie einen Kinesis-Datenstrom

Erstellen Sie einen Amazon Kinesis-Datenstrom und füllen Sie ihn folgendermaßen mit Ereigniszeit- und Ticker-Datensätzen:

  1. Melden Sie sich bei der an AWS Management Console und öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.

  2. Klicken Sie im Navigationsbereich auf Data Streams (Daten-Streams).

  3. Klicken Sie auf Create Kinesis stream (Kinesis-Stream erstellen) und erstellen Sie einen Stream mit einer Shard.

  4. Führen Sie den folgenden Python-Code aus, um den Stream mit Beispieldaten zu füllen. Dieser einfache Code schreibt kontinuierlich einen Datensatz mit einem zufälligen Tickersymbol und dem aktuellen Zeitstempel in den Stream.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Schritt 4: Erstellen Sie die Amazon Kinesis Data Analytics-Anwendung

Erstellen Sie eine Anwendung wie folgt:

  1. Öffnen Sie die Flink-Konsole Managed Service für Apache Flink unter https://console.aws.amazon.com/kinesisanalytics.

  2. Klicken Sie auf Create application (Anwendung erstellen), geben Sie einen Anwendungsnamen ein und klicken Sie erneut auf Create application (Anwendung erstellen).

  3. Wählen Sie auf der Detailseite der Anwendung Connect streaming data (Streaming-Daten verbinden), um eine Verbindung mit der Quelle herzustellen.

  4. Gehen Sie auf der Seite Connect to source (Mit Quelle verbinden) wie folgt vor:

    1. Wählen Sie den Stream aus, den Sie im vorherigen Abschnitt erstellt haben.

    2. Wählen Sie die Option zum Erstellen einer IAM-Rolle.

    3. Klicken Sie auf Discover schema (Schema erkennen). Warten Sie, bis die Konsole das abgeleitete Schema und die Beispieldatensätze anzeigt, die zum Ableiten des Schemas für den erstellten In-Application-Stream verwendet werden. Das abgeleitete Schema verfügt über zwei Spalten.

    4. Klicken Sie auf Edit schema (Schema bearbeiten). Ändern Sie den Wert für Column type (Spaltentyp) der Spalte EVENT_TIME in TIMESTAMP.

    5. Wählen Sie Save schema and update stream samples (Schema speichern und Stream-Beispiel aktualiseren). Nachdem die Konsole das Schema gespeichert hat, klicken Sie auf Exit (Beenden).

    6. Wählen Sie Save and continue aus.

  5. Klicken Sie auf der Detailseite der Anwendung auf Go to SQL editor (Gehe zu SQL-Editor). Um die Anwendung zu starten, wählen Sie im angezeigten Dialogfeld Yes, start application (Ja, Anwendung starten) aus.

  6. Schreiben Sie im SQL-Editor den Anwendungscode und überprüfen Sie die Ergebnisse wie folgt:

    1. Kopieren Sie den folgenden Anwendungscode und fügen Sie diesen in den Editor ein.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
    2. Klicken Sie auf Save and run SQL (SQL speichern und ausführen). Auf der Registerkarte Real-time analytics (Echtzeitanalyse) können Sie alle In-Application-Streams sehen, die von der Anwendung erstellt wurden, und die Daten überprüfen.