

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Studio ノートブックでデータを分析するため、クエリの例を表示する
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Amazon MSK/Apache Kafka でテーブルを作成する](#how-zeppelin-examples-creating-tables)
+ [Kinesis でテーブルを作成する](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [タンブリングウィンドウをクエリする](#how-zeppelin-examples-tumbling)
+ [スライディングウィンドウをクエリする](#how-zeppelin-examples-sliding)
+ [インタラクティブ SQL を使用する](#how-zeppelin-examples-interactive-sql)
+ [BlackHole SQL コネクタを使用する](#how-zeppelin-examples-blackhole-connector-sql)
+ [Scala を使用してサンプルデータを生成する](#notebook-example-data-generator)
+ [インタラクティブ Scala を使用する](#notebook-example-interactive-scala)
+ [インタラクティブ Python を使用する](#notebook-example-interactive-python)
+ [インタラクティブ Python、SQL、Scala の組み合わせを使用する](#notebook-example-interactive-pythonsqlscala)
+ [クロスアカウント Kinesis データストリームを使用する](#notebook-example-crossaccount-kds)

Apache Flink SQL クエリ設定の情報については、「[インタラクティブなデータ分析のための Zeppelin ノートブック上の Flink](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html)」を参照してください。

Apache Flink ダッシュボードでアプリケーションを表示するには、アプリケーションの「**Zeppelin Note**」ページで「**FLINK JOB**」を選択します。

ウィンドウクエリの詳細については、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html)」を参照してください。

Apache Flink Streaming SQL クエリの他の例については、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[クエリ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)」を参照してください。

## Amazon MSK/Apache Kafka でテーブルを作成する
<a name="how-zeppelin-examples-creating-tables"></a>

Apache Flink Studio 用 Managed Service を搭載した Amazon MSK Flink コネクタを使用して、Plaintext、SSL または IAM 認証で接続を認証できます。要件に応じて特定のプロパティを使用してテーブルを作成します。

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

これらのプロパティは、「[Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/)」の他のプロパティと組み合わせることができます。

## Kinesis でテーブルを作成する
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

次の例では、Kinesis を使用してテーブルを作成します。

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

使用可能な他のプロパティの詳細については、「[Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/)」を参照してください。

## タンブリングウィンドウをクエリする
<a name="how-zeppelin-examples-tumbling"></a>

次の Flink Streaming SQL クエリは、 `ZeppelinTopic` テーブルから 5 秒ごとのタンブリングウィンドウの最大値を選択します。

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## スライディングウィンドウをクエリする
<a name="how-zeppelin-examples-sliding"></a>

次の Apache Flink Streaming SQL クエリは、 `ZeppelinTopic` テーブルから 5 秒ごとのスライディングウィンドウの最大値を選択します。

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## インタラクティブ SQL を使用する
<a name="how-zeppelin-examples-interactive-sql"></a>

この例では、イベント時間と処理時間の最大値、キー値テーブルの値の合計を出力します。[Scala を使用してサンプルデータを生成する](#notebook-example-data-generator) のサンプル・データ生成スクリプトが実行されていることを確認します。Studio ノートブックでフィルタリングや結合などの他の SQL クエリを試すには、Apache Flink ドキュメントのApache Flink ドキュメント:「[クエリ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)」を参照してください。

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## BlackHole SQL コネクタを使用する
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

BlackHole SQL コネクタでは、クエリをテストするために Kinesis データストリームや Amazon MSK クラスターを作成する必要はありません。BlackHole SQL コネクタの詳細については、Apache Flink ドキュメントの「[BlackHole SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html)」を参照してください。この例では、デフォルトカタログはインメモリカタログです。

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Scala を使用してサンプルデータを生成する
<a name="notebook-example-data-generator"></a>

この例では Scala を使用してサンプルデータを生成します。このサンプルデータを使用して、さまざまなクエリをテストできます。テーブル作成ステートメントを使用して key-values テーブルを作成します。

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## インタラクティブ Scala を使用する
<a name="notebook-example-interactive-scala"></a>

これは [インタラクティブ SQL を使用する](#how-zeppelin-examples-interactive-sql) の Scala 翻訳です。Scala の他の例については、Apache Flink ドキュメントの「[Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)」を参照してください。

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## インタラクティブ Python を使用する
<a name="notebook-example-interactive-python"></a>

これは [インタラクティブ SQL を使用する](#how-zeppelin-examples-interactive-sql) の Python 翻訳です。Python の他のサンプルについては、Apache Flink ドキュメントの「[Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)」を参照してください。

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## インタラクティブ Python、SQL、Scala の組み合わせを使用する
<a name="notebook-example-interactive-pythonsqlscala"></a>

ノートブックでは、SQL、Python、Scala を自由に組み合わせてインタラクティブな分析を行うことができます。永続的な状態を持つアプリケーションとしてデプロイする予定の Studio ノートブックでは、SQL と Scala を組み合わせて使用できます。この例では、無視されるセクションと、永続的な状態でアプリケーションにデプロイされるセクションを示しています。

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## クロスアカウント Kinesis データストリームを使用する
<a name="notebook-example-crossaccount-kds"></a>

Studio ノートブックを所有するアカウント以外のアカウントにおける Kinesis データ・ストリームを使用するには、Studio ノートブックが実行されているアカウントにサービス実行ロールを作成し、データストリームを所有するアカウントにロール信頼ポリシーを作成します。Create table DDL ステートメントの Kinesis コネクタで `aws.credentials.provider`、`aws.credentials.role.arn`、`aws.credentials.role.sessionName` を使用して、データストリームに対してテーブルを作成します。

Studio ノートブックアカウントには、次のサービス実行ロールを使用します。

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

データストリームアカウントには、 `AmazonKinesisFullAccess` ポリシーと以下のロール信頼ポリシーを使用してください。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

create table ステートメントには以下の段落を使用してます。

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```