Studio ノートブックでデータを分析するためのクエリの例を表示する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Studio ノートブックでデータを分析するためのクエリの例を表示する

Apache Flink SQLクエリ設定の詳細については、「インタラクティブデータ分析用 Zeppelin Notebooks の Flink」を参照してください。

Apache Flink ダッシュボードでアプリケーションを表示するには、FLINKJOBアプリケーションの Zeppelin Note ページで を選択します。

ウィンドウクエリの詳細については、「Apache Flink ドキュメント」の「Windows」を参照してください。

Apache Flink ストリーミングSQLクエリのその他の例については、Apache Flink ドキュメント「クエリ」を参照してください。

Amazon MSK/Apache Kafka を使用してテーブルを作成する

Managed Service for Apache MSKFlink Studio で Amazon Flink コネクタを使用してSSL、プレーンテキスト、、またはIAM認証で接続を認証できます。要件に応じて特定のプロパティを使用してテーブルを作成します。

-- Plaintext connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- SSL connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SSL', 'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts', 'properties.ssl.truststore.password' = 'changeit', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- IAM connection (or for MSK Serverless) CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'AWS_MSK_IAM', 'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );

これらを Apache Kafka SQL Connector の他のプロパティと組み合わせることができます。

Kinesis を使用してテーブルを作成する

次の例では、Kinesis を使用してテーブルを作成します。

CREATE TABLE KinesisTable ( `column1` BIGINT, `column2` BIGINT, `column3` BIGINT, `column4` STRING, `ts` TIMESTAMP(3) ) PARTITIONED BY (column1, column2) WITH ( 'connector' = 'kinesis', 'stream' = 'test_stream', 'aws.region' = '<region>', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );

使用できる他のプロパティの詳細については、Amazon Kinesis Data Streams SQL Connector」を参照してください。

タンブリングウィンドウをクエリする

次の Flink Streaming SQLクエリは、ZeppelinTopicテーブルから 5 秒のタンブリングウィンドウごとに最高価格を選択します。

%flink.ssql(type=update) SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker FROM ZeppelinTopic GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)

スライドウィンドウをクエリする

次の Apache Flink Streaming SQLクエリは、ZeppelinTopicテーブルから 5 秒のスライディングウィンドウごとに最高価格を選択します。

%flink.ssql(type=update) SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max FROM ZeppelinTopic//or your table name in AWS Glue GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)

インタラクティブを使用する SQL

この例では、イベント時間と処理時間の最大値、キー値テーブルの値の合計を出力します。Scala を使用してサンプルデータを生成する のサンプル・データ生成スクリプトが実行されていることを確認します。Studio ノートブックのフィルタリングや結合などの他のSQLクエリを試すには、Apache Flink ドキュメントの「Apache Flink ドキュメントのクエリ」を参照してください。

%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time. SELECT MAX(`et`) as `et`, MAX(`pt`) as `pt`, SUM(`value`) as `sum` FROM `key-values`
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive tumbling window query that displays the number of records observed per (event time) second. -- Browse through the chart views to see different visualizations of the streaming result. SELECT TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`, `key`, SUM(`value`) as `sum` FROM `key-values` GROUP BY TUMBLE(`et`, INTERVAL '1' SECONDS), `key`;

コネクタを使用する BlackHole SQL

BlackHole SQL コネクタでは、クエリをテストするために Kinesis データストリームまたは Amazon MSKクラスターを作成する必要はありません。SQL コネクタの詳細については、Apache Flink ドキュメントの BlackHoleBlackHole SQL「コネクタ」を参照してください。この例では、デフォルトカタログはインメモリカタログです。

%flink.ssql CREATE TABLE default_catalog.default_database.blackhole_table ( `key` BIGINT, `value` BIGINT, `et` TIMESTAMP(3) ) WITH ( 'connector' = 'blackhole' )
%flink.ssql(parallelism=1) INSERT INTO `test-target` SELECT `key`, `value`, `et` FROM `test-source` WHERE `key` > 3
%flink.ssql(parallelism=2) INSERT INTO `default_catalog`.`default_database`.`blackhole_table` SELECT `key`, `value`, `et` FROM `test-target` WHERE `key` > 7

Scala を使用してサンプルデータを生成する

この例では Scala を使用してサンプルデータを生成します。このサンプルデータを使用して、さまざまなクエリをテストできます。テーブル作成ステートメントを使用して key-values テーブルを作成します。

import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator import org.apache.flink.streaming.api.scala.DataStream import java.sql.Timestamp // ad-hoc convenience methods to be defined on Table implicit class TableOps[T](table: DataStream[T]) { def asView(name: String): DataStream[T] = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView("`" + name + "`") } stenv.createTemporaryView("`" + name + "`", table) return table; } }
%flink(parallelism=4) val stream = senv .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000)) .map(key => (key, 1, new Timestamp(System.currentTimeMillis))) .asView("key-values-data-generator")
%flink.ssql(parallelism=4) -- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)") -- in this case the INSERT query will inherit the parallelism of the of the above paragraph INSERT INTO `key-values` SELECT `_1` as `key`, `_2` as `value`, `_3` as `et` FROM `key-values-data-generator`

インタラクティブ Scala を使用する

これは インタラクティブを使用する SQL の Scala 翻訳です。Scala のその他の例については、Apache Flink ドキュメントの表APIを参照してください。

%flink import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=4) // A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time. val query01 = stenv .from("`key-values`") .select( $"et".max().as("et"), $"pt".max().as("pt"), $"value".sum().as("sum") ).asView("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink(parallelism=4) // An tumbling window view that displays the number of records observed per (event time) second. val query02 = stenv .from("`key-values`") .window(Tumble over 1.seconds on $"et" as $"w") .groupBy($"w", $"key") .select( $"w".start.as("window"), $"key", $"value".sum().as("sum") ).asView("query02")
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`

インタラクティブ Python を使用する

これは インタラクティブを使用する SQL の Python 翻訳です。Python のその他の例については、Apache Flink ドキュメントの「表API」を参照してください。

%flink.pyflink from pyflink.table.table import Table def as_view(table, name): if (name in st_env.list_temporary_views()): st_env.drop_temporary_view(name) st_env.create_temporary_view(name, table) return table Table.as_view = as_view
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`keyvalues`") \ .select(", ".join([ "max(et) as et", "max(pt) as pt", "sum(value) as sum" ])) \ .as_view("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`key-values`") \ .window(Tumble.over("1.seconds").on("et").alias("w")) \ .group_by("w, key") \ .select(", ".join([ "w.start as window", "key", "sum(value) as sum" ])) \ .as_view("query02")
%flink.ssql(type=update, parallelism=16, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`

インタラクティブ Python、SQL、および Scala の組み合わせを使用する

ノートブックではSQL、、Python、および Scala の任意の組み合わせをインタラクティブ分析に使用できます。耐久性のある状態のアプリケーションとしてデプロイする予定の Studio ノートブックでは、 SQLと Scala の組み合わせを使用できます。この例では、無視されるセクションと、永続的な状態でアプリケーションにデプロイされるセクションを示しています。

%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-source` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-source-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-target` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-target-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink() // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=1) val table = stenv .from("`default_catalog`.`default_database`.`my-test-source`") .select($"key", $"value", $"et") .filter($"key" > 10) .asView("query01")
%flink.ssql(parallelism=1) -- forward data INSERT INTO `default_catalog`.`default_database`.`my-test-target` SELECT * FROM `query01`
%flink.ssql(type=update, parallelism=1, refreshInterval=1000) -- forward data to local stream (ignored when deployed as application) SELECT * FROM `query01`
%flink // tell me the meaning of life (ignored when deployed as application!) print("42!")

クロスアカウント Kinesis データストリームを使用する

Studio ノートブックを所有するアカウント以外のアカウントにおける Kinesis データ・ストリームを使用するには、Studio ノートブックが実行されているアカウントにサービス実行ロールを作成し、データストリームを所有するアカウントにロール信頼ポリシーを作成します。Create table DDLステートメントaws.credentials.role.sessionNameの Kinesis コネクタでaws.credentials.provideraws.credentials.role.arn、、および を使用して、データストリームに対してテーブルを作成します。

Studio ノートブックアカウントには、次のサービス実行ロールを使用します。

{ "Sid": "AllowNotebookToAssumeRole", "Effect": "Allow", "Action": "sts:AssumeRole" "Resource": "*" }

データストリームアカウントには、 AmazonKinesisFullAccess ポリシーと以下のロール信頼ポリシーを使用してください。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<accountID>:root" }, "Action": "sts:AssumeRole", "Condition": {} } ] }

create table ステートメントには以下の段落を使用してます。

%flink.ssql CREATE TABLE test1 ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kinesis', 'stream' = 'stream-assume-role-test', 'aws.region' = 'us-east-1', 'aws.credentials.provider' = 'ASSUME_ROLE', 'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role', 'aws.credentials.role.sessionName' = 'stream-assume-role-test-session', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json' )