

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Afficher des exemples de requêtes pour analyser des données dans un bloc-notes Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Création de tableaux avec Amazon MSK/Apache Kafka](#how-zeppelin-examples-creating-tables)
+ [Création de tables avec Kinesis](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [Interrogez une fenêtre qui clignote](#how-zeppelin-examples-tumbling)
+ [Interroger une fenêtre coulissante](#how-zeppelin-examples-sliding)
+ [Utiliser du SQL interactif](#how-zeppelin-examples-interactive-sql)
+ [Utiliser le connecteur BlackHole SQL](#how-zeppelin-examples-blackhole-connector-sql)
+ [Utiliser Scala pour générer des exemples de données](#notebook-example-data-generator)
+ [Utilisez Scala interactif](#notebook-example-interactive-scala)
+ [Utiliser du Python interactif](#notebook-example-interactive-python)
+ [Utilisez une combinaison de Python, SQL et Scala interactifs](#notebook-example-interactive-pythonsqlscala)
+ [Utiliser un flux de données Kinesis entre comptes](#notebook-example-crossaccount-kds)

Pour obtenir des informations sur les paramètres de requête SQL d’Apache Flink, consultez [Flink on Zeppelin Notebooks for Interactive Data Analysis](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

Pour afficher votre application dans le tableau de bord Apache Flink, choisissez **FLINK JOB** sur la page **Note Zeppelin** de votre application.

Pour plus d’informations sur les requêtes de fenêtre, consultez [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) dans la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

Pour d’autres exemples de requêtes SQL Apache Flink Streaming, consultez la section [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) de la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Création de tableaux avec Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Vous pouvez utiliser le connecteur Amazon MSK Flink avec le service géré pour Apache Flink Studio afin d’authentifier votre connexion à l’aide de l’authentification en texte brut, SSL ou IAM. Créez vos tables en utilisant les propriétés spécifiques selon vos besoins.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Vous pouvez les combiner avec d’autres propriétés sur le [connecteur SQL Apache Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/).

## Création de tables avec Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

Dans l’exemple suivant, vous créez une table à l’aide de Kinesis :

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Pour plus d’informations sur les autres propriétés que vous pouvez utiliser, consultez [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Interrogez une fenêtre qui clignote
<a name="how-zeppelin-examples-tumbling"></a>

La requête SQL Flink Streaming suivante sélectionne le prix le plus élevé pour chaque fenêtre bascule de cinq secondes dans la table `ZeppelinTopic` :

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Interroger une fenêtre coulissante
<a name="how-zeppelin-examples-sliding"></a>

La requête SQL Apache Flink Streaming suivante sélectionne le prix le plus élevé pour chaque fenêtre défilante de cinq secondes dans la table `ZeppelinTopic` :

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Utiliser du SQL interactif
<a name="how-zeppelin-examples-interactive-sql"></a>

Cet exemple imprime la durée maximale de l’événement et le temps de traitement, ainsi que la somme des valeurs de la table des valeurs clés. Assurez-vous que vous disposez de l’exemple de script de génération de données de l’exécution [Utiliser Scala pour générer des exemples de données](#notebook-example-data-generator). Pour essayer d’autres requêtes SQL telles que le filtrage et les jointures dans votre bloc-notes Studio, consultez [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) dans la documentation Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Utiliser le connecteur BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

Le connecteur BlackHole SQL ne vous oblige pas à créer un flux de données Kinesis ou un cluster Amazon MSK pour tester vos requêtes. Pour plus d'informations sur le connecteur BlackHole SQL, consultez la section [Connecteur BlackHole SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) dans la documentation d'Apache Flink. Dans cet exemple, le catalogue par défaut est un catalogue en mémoire.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Utiliser Scala pour générer des exemples de données
<a name="notebook-example-data-generator"></a>

Cet exemple utilise Scala pour générer des exemples de données. Vous pouvez utiliser ces exemples de données pour tester différentes requêtes. Utilisez l’instruction create table pour créer la table des valeurs clés.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Utilisez Scala interactif
<a name="notebook-example-interactive-scala"></a>

Il s’agit de la traduction Scala de [Utiliser du SQL interactif](#how-zeppelin-examples-interactive-sql). Pour d’autres exemples de Scala, consultez [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) dans la documentation d’Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Utiliser du Python interactif
<a name="notebook-example-interactive-python"></a>

Il s’agit de la traduction Python de [Utiliser du SQL interactif](#how-zeppelin-examples-interactive-sql). Pour d’autres exemples de Python, consultez [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) dans la documentation d’Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Utilisez une combinaison de Python, SQL et Scala interactifs
<a name="notebook-example-interactive-pythonsqlscala"></a>

Vous pouvez utiliser n’importe quelle combinaison de SQL, Python et Scala dans votre bloc-notes pour une analyse interactive. Dans un bloc-notes Studio que vous envisagez de déployer en tant qu’application à état durable, vous pouvez utiliser une combinaison de SQL et de Scala. Cet exemple montre les sections qui sont ignorées et celles qui sont déployées dans l’application à état durable.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Utiliser un flux de données Kinesis entre comptes
<a name="notebook-example-crossaccount-kds"></a>

Pour utiliser un flux de données Kinesis se trouvant dans un compte autre que le compte associé à votre bloc-notes Studio, créez un rôle d’exécution de service dans le compte sur lequel votre bloc-notes Studio est exécuté et une politique d’approbation des rôles dans le compte contenant le flux de données. Utilisez `aws.credentials.provider`, `aws.credentials.role.arn` et `aws.credentials.role.sessionName` dans le connecteur Kinesis dans votre instruction DDL de création de table pour créer une table par rapport au flux de données.

Utilisez le rôle d’exécution de service suivant pour le compte de bloc-notes Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Utilisez la politique `AmazonKinesisFullAccess` et la politique d’approbation des rôles suivante pour le compte de flux de données.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Utilisez le paragraphe suivant pour l’instruction de création de table.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```