

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Veja exemplos de consultas para analisar dados em um notebook do Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Crie tabelas com o Amazon MSK/Apache Kafka](#how-zeppelin-examples-creating-tables)
+ [Crie tabelas com o Kinesis](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [Consulte uma janela em cascata](#how-zeppelin-examples-tumbling)
+ [Consulte uma janela deslizante](#how-zeppelin-examples-sliding)
+ [Use o SQL interativo](#how-zeppelin-examples-interactive-sql)
+ [Use o conector BlackHole SQL](#how-zeppelin-examples-blackhole-connector-sql)
+ [Use o Scala para gerar dados de amostra](#notebook-example-data-generator)
+ [Use o Scala interativo](#notebook-example-interactive-scala)
+ [Use o Python interativo](#notebook-example-interactive-python)
+ [Use uma combinação interativa de Python, SQL e Scala](#notebook-example-interactive-pythonsqlscala)
+ [Use um fluxo de dados do Kinesis entre contas](#notebook-example-crossaccount-kds)

Para obter informações sobre as configurações de consulta SQL do Apache Flink, consulte [Flink on Zeppelin](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html) Notebooks for Interactive Data Analysis.

Para visualizar seu aplicativo no painel do Apache Flink, selecione **FLINK JOB na página **Zeppelin Note **** do seu aplicativo.

Para obter mais informações sobre consultas de janela, consulte [Windows na documentação do](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) [Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

[Para obter mais exemplos de consultas SQL do Apache Flink Streaming, consulte [Consultas](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) na documentação do Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

## Crie tabelas com o Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Você pode usar o conector Amazon MSK Flink com o Managed Service for Apache Flink Studio para autenticar sua conexão com autenticação de texto simples, SSL ou IAM. Crie suas tabelas usando as propriedades específicas de acordo com seus requisitos.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Você pode combiná-las com outras propriedades no [Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/).

## Crie tabelas com o Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

No exemplo a seguir, você cria uma tabela usando o Kinesis:

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Para obter mais informações sobre outras propriedades que você pode usar, consulte [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Consulte uma janela em cascata
<a name="how-zeppelin-examples-tumbling"></a>

A consulta SQL do Flink Streaming a seguir seleciona o preço mais alto em cada janela em cascata de cinco segundos da tabela `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Consulte uma janela deslizante
<a name="how-zeppelin-examples-sliding"></a>

A consulta SQL do Flink Streaming a seguir seleciona o preço mais alto em cada janela deslizante de cinco segundos da tabela `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Use o SQL interativo
<a name="how-zeppelin-examples-interactive-sql"></a>

Este exemplo imprime o tempo máximo do evento e o tempo de processamento e a soma dos valores da tabela de valores-chave. Certifique-se de ter o exemplo de script de geração de dados da execução [Use o Scala para gerar dados de amostra](#notebook-example-data-generator). Para experimentar outras consultas SQL, como filtragem e junções em seu notebook Studio, consulte a documentação do Apache Flink: [Consultas](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) na documentação do Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Use o conector BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

O conector BlackHole SQL não exige que você crie um stream de dados do Kinesis ou um cluster Amazon MSK para testar suas consultas. Para obter informações sobre o conector BlackHole SQL, consulte [Conector BlackHole SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) na documentação do Apache Flink. Neste exemplo, o catálogo padrão é um catálogo na memória.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Use o Scala para gerar dados de amostra
<a name="notebook-example-data-generator"></a>

Este exemplo usa o Scala para gerar dados de amostra. Você pode usar esses dados de exemplo para testar várias consultas. Use a instrução criar tabela para criar a tabela de valores-chave.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Use o Scala interativo
<a name="notebook-example-interactive-scala"></a>

Esta é a tradução em Scala do [Use o SQL interativo](#how-zeppelin-examples-interactive-sql). Para ver mais exemplos de Scala, consulte [API de tabela](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) na documentação do Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Use o Python interativo
<a name="notebook-example-interactive-python"></a>

Esta é a tradução em Python do [Use o SQL interativo](#how-zeppelin-examples-interactive-sql). Para ver mais exemplos de Python, consulte [API de tabela](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) na documentação do Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Use uma combinação interativa de Python, SQL e Scala
<a name="notebook-example-interactive-pythonsqlscala"></a>

Você pode usar qualquer combinação de SQL, Python e Scala em seu notebook para análise interativa. Em um notebook do Studio que você planeja implantar como um aplicativo com estado durável, você pode usar uma combinação de SQL e Scala. Este exemplo mostra as seções que são ignoradas e aquelas que são implantadas no aplicativo com estado durável.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Use um fluxo de dados do Kinesis entre contas
<a name="notebook-example-crossaccount-kds"></a>

Para usar um fluxo de dados do Kinesis que está em uma conta diferente da conta que tem seu notebook Studio, crie uma função de execução de serviço na conta em que seu notebook Studio está sendo executado e uma política de confiança de função na conta que tem o fluxo de dados. Use `aws.credentials.provider`, `aws.credentials.role.arn` e `aws.credentials.role.sessionName` no conector Kinesis em sua instrução criar tabela DDL para criar uma tabela em relação ao fluxo de dados.

Use a seguinte função de execução de serviço para a conta do notebook Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Use a política `AmazonKinesisFullAccess` e a seguinte política de confiança de função para a conta de fluxo de dados.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Use o parágrafo a seguir para a instrução de criação de tabela.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```