Sehen Sie sich Beispielabfragen zur Analyse von Daten in einem Studio-Notizbuch an - Managed Service für Apache Flink

Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Sehen Sie sich Beispielabfragen zur Analyse von Daten in einem Studio-Notizbuch an

Informationen zu den Apache SQL Flink-Abfrageeinstellungen finden Sie unter Flink on Zeppelin Notebooks for Interactive Data Analysis.

Um Ihre Anwendung im Apache Flink-Dashboard anzuzeigen, wählen Sie auf der Zeppelin FLINKJOBNote-Seite Ihrer Anwendung.

Weitere Informationen zu Fensterabfragen finden Sie unter Windows in der Apache-Flink-Dokumentation.

Weitere Beispiele für Apache Flink SQL Streaming-Abfragen finden Sie unter Abfragen in der Apache Flink-Dokumentation.

Erstellen Sie Tabellen mit Amazon MSK /Apache Kafka

Sie können den Amazon MSK Flink-Konnektor mit Managed Service für Apache Flink Studio verwenden, um Ihre Verbindung mit Klartext oder Authentifizierung zu authentifizieren. SSL IAM Erstellen Sie Ihre Tabellen mit den spezifischen Eigenschaften gemäß Ihren Anforderungen.

-- Plaintext connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- SSL connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SSL', 'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts', 'properties.ssl.truststore.password' = 'changeit', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- IAM connection (or for MSK Serverless) CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'AWS_MSK_IAM', 'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );

Sie können diese mit anderen Eigenschaften bei Apache Kafka Connector kombinieren. SQL

Erstellen Sie Tabellen mit Kinesis

Im folgenden Beispiel erstellen Sie eine Tabelle mit Kinesis:

CREATE TABLE KinesisTable ( `column1` BIGINT, `column2` BIGINT, `column3` BIGINT, `column4` STRING, `ts` TIMESTAMP(3) ) PARTITIONED BY (column1, column2) WITH ( 'connector' = 'kinesis', 'stream' = 'test_stream', 'aws.region' = '<region>', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );

Weitere Informationen zu anderen Eigenschaften, die Sie verwenden können, finden Sie unter Amazon Kinesis Data Streams SQL Connector.

Fragen Sie ein taumelndes Fenster ab

Die folgende Flink SQL Streaming-Abfrage wählt den höchsten Preis in jedem Fünf-Sekunden-Tummelfenster aus der Tabelle aus: ZeppelinTopic

%flink.ssql(type=update) SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker FROM ZeppelinTopic GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)

Fragen Sie ein Schiebefenster ab

Die folgende Apache Flink SQL Streaming-Abfrage wählt aus der Tabelle den höchsten Preis in jedem 5-Sekunden-Schiebefenster aus: ZeppelinTopic

%flink.ssql(type=update) SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max FROM ZeppelinTopic//or your table name in AWS Glue GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)

Interaktiv verwenden SQL

In diesem Beispiel wird der Höchstwert der Ereignis- und Verarbeitungszeit sowie die Summe der Werte aus der Schlüssel-Wert-Tabelle ausgegeben. Stellen Sie sicher, dass Sie das Beispielskript zur Datengenerierung aus dem laufenden Verwenden Sie Scala, um Beispieldaten zu generieren haben. Informationen zum Ausprobieren anderer SQL Abfragen wie Filtern und Joins in Ihrem Studio-Notizbuch finden Sie in der Apache Flink-Dokumentation: Abfragen in der Apache Flink-Dokumentation.

%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time. SELECT MAX(`et`) as `et`, MAX(`pt`) as `pt`, SUM(`value`) as `sum` FROM `key-values`
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive tumbling window query that displays the number of records observed per (event time) second. -- Browse through the chart views to see different visualizations of the streaming result. SELECT TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`, `key`, SUM(`value`) as `sum` FROM `key-values` GROUP BY TUMBLE(`et`, INTERVAL '1' SECONDS), `key`;

Verwenden Sie den Konnektor BlackHole SQL

Der BlackHole SQL Connector erfordert nicht, dass Sie einen Kinesis-Datenstream oder einen MSK Amazon-Cluster erstellen, um Ihre Abfragen zu testen. Informationen zum Connector finden Sie unter BlackHole SQL BlackHole SQLConnector in der Apache Flink-Dokumentation. In diesem Beispiel ist der Standardkatalog ein speicherinterner Katalog.

%flink.ssql CREATE TABLE default_catalog.default_database.blackhole_table ( `key` BIGINT, `value` BIGINT, `et` TIMESTAMP(3) ) WITH ( 'connector' = 'blackhole' )
%flink.ssql(parallelism=1) INSERT INTO `test-target` SELECT `key`, `value`, `et` FROM `test-source` WHERE `key` > 3
%flink.ssql(parallelism=2) INSERT INTO `default_catalog`.`default_database`.`blackhole_table` SELECT `key`, `value`, `et` FROM `test-target` WHERE `key` > 7

Verwenden Sie Scala, um Beispieldaten zu generieren

In diesem Beispiel wird Scala verwendet, um Beispieldaten zu generieren. Sie können diese Beispieldaten verwenden, um verschiedene Abfragen zu testen. Verwenden Sie die Anweisung create table, um die Schlüssel-Wert-Tabelle zu erstellen.

import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator import org.apache.flink.streaming.api.scala.DataStream import java.sql.Timestamp // ad-hoc convenience methods to be defined on Table implicit class TableOps[T](table: DataStream[T]) { def asView(name: String): DataStream[T] = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView("`" + name + "`") } stenv.createTemporaryView("`" + name + "`", table) return table; } }
%flink(parallelism=4) val stream = senv .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000)) .map(key => (key, 1, new Timestamp(System.currentTimeMillis))) .asView("key-values-data-generator")
%flink.ssql(parallelism=4) -- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)") -- in this case the INSERT query will inherit the parallelism of the of the above paragraph INSERT INTO `key-values` SELECT `_1` as `key`, `_2` as `value`, `_3` as `et` FROM `key-values-data-generator`

Verwenden Sie interaktives Scala

Dies ist die Scala-Übersetzung von Interaktiv verwenden SQL. Weitere Scala-Beispiele finden Sie in der Tabelle API in der Apache Flink-Dokumentation.

%flink import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=4) // A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time. val query01 = stenv .from("`key-values`") .select( $"et".max().as("et"), $"pt".max().as("pt"), $"value".sum().as("sum") ).asView("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink(parallelism=4) // An tumbling window view that displays the number of records observed per (event time) second. val query02 = stenv .from("`key-values`") .window(Tumble over 1.seconds on $"et" as $"w") .groupBy($"w", $"key") .select( $"w".start.as("window"), $"key", $"value".sum().as("sum") ).asView("query02")
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`

Interaktives Python verwenden

Dies ist die Python-Übersetzung von Interaktiv verwenden SQL. Weitere Python-Beispiele finden Sie in der Tabelle API in der Apache Flink-Dokumentation.

%flink.pyflink from pyflink.table.table import Table def as_view(table, name): if (name in st_env.list_temporary_views()): st_env.drop_temporary_view(name) st_env.create_temporary_view(name, table) return table Table.as_view = as_view
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`keyvalues`") \ .select(", ".join([ "max(et) as et", "max(pt) as pt", "sum(value) as sum" ])) \ .as_view("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`key-values`") \ .window(Tumble.over("1.seconds").on("et").alias("w")) \ .group_by("w, key") \ .select(", ".join([ "w.start as window", "key", "sum(value) as sum" ])) \ .as_view("query02")
%flink.ssql(type=update, parallelism=16, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`

Verwenden Sie eine Kombination aus interaktivem PythonSQL, und Scala

Sie können eine beliebige Kombination von SQL Python und Scala in Ihrem Notizbuch für interaktive Analysen verwenden. In einem Studio-Notizbuch, das Sie als Anwendung mit einem dauerhaften Status bereitstellen möchten, können Sie eine Kombination aus SQL und Scala verwenden. Dieses Beispiel zeigt Ihnen die Abschnitte, die ignoriert werden, und diejenigen, die in der Anwendung mit dem dauerhaften Zustand bereitgestellt werden.

%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-source` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-source-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-target` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-target-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink() // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=1) val table = stenv .from("`default_catalog`.`default_database`.`my-test-source`") .select($"key", $"value", $"et") .filter($"key" > 10) .asView("query01")
%flink.ssql(parallelism=1) -- forward data INSERT INTO `default_catalog`.`default_database`.`my-test-target` SELECT * FROM `query01`
%flink.ssql(type=update, parallelism=1, refreshInterval=1000) -- forward data to local stream (ignored when deployed as application) SELECT * FROM `query01`
%flink // tell me the meaning of life (ignored when deployed as application!) print("42!")

Verwenden Sie einen kontoübergreifenden Kinesis-Datenstream

Um einen Kinesis-Datenstrom zu verwenden, der sich in einem anderen Konto als dem Konto befindet, das Ihr Studio-Notebook enthält, erstellen Sie eine Serviceausführungsrolle in dem Konto, in dem Ihr Studio-Notebook ausgeführt wird, und eine Rollenvertrauensrichtlinie für das Konto, das den Datenstrom enthält. Verwenden Sieaws.credentials.provider,aws.credentials.role.arn, und aws.credentials.role.sessionName im Kinesis-Konnektor in Ihrer DDL Anweisung create table, um eine Tabelle anhand des Datenstroms zu erstellen.

Verwenden Sie die folgende Serviceausführungsrolle für das Studio-Notebook-Konto.

{ "Sid": "AllowNotebookToAssumeRole", "Effect": "Allow", "Action": "sts:AssumeRole" "Resource": "*" }

Verwenden Sie die AmazonKinesisFullAccess-Richtlinie und die folgende Rollenvertrauensrichtlinie für das Datenstrom-Konto.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<accountID>:root" }, "Action": "sts:AssumeRole", "Condition": {} } ] }

Verwenden Sie den folgenden Absatz für die create-table-Anweisung.

%flink.ssql CREATE TABLE test1 ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kinesis', 'stream' = 'stream-assume-role-test', 'aws.region' = 'us-east-1', 'aws.credentials.provider' = 'ASSUME_ROLE', 'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role', 'aws.credentials.role.sessionName' = 'stream-assume-role-test-session', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json' )