

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Conceitos básicos do Amazon Managed Service for Apache Flink para Python
<a name="gs-python"></a>

Esta seção apresenta os conceitos fundamentais do Managed Service for Apache Flink usando Python e a API de tabela. Ela descreve as opções disponíveis para criar e testar seus aplicativos. Ela também fornece instruções para instalar as ferramentas necessárias para concluir os tutoriais deste guia e criar seu primeiro aplicativo. 

**Topics**
+ [Analise os componentes de um aplicativo do Managed Service for Apache Flink](#gs-python-table-components)
+ [Atenda os pré-requisitos](#gs-python-prerequisites)
+ [Crie e execute um aplicativo Managed Service for Apache Flink para o aplicativo Python](gs-python-createapp.md)
+ [Limpe AWS os recursos](gs-python-cleanup.md)

## Analise os componentes de um aplicativo do Managed Service for Apache Flink
<a name="gs-python-table-components"></a>

**nota**  
O Amazon Managed Service para Apache Flink oferece suporte a todos os [Apache](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/overview/#flinks-apis) Flink. APIs Dependendo da API escolhida, a estrutura do aplicativo é um pouco diferente. Uma abordagem popular ao desenvolver um aplicativo Apache Flink em Python é definir o fluxo do aplicativo usando SQL incorporado no código Python. Essa é a abordagem que seguimos no tutorial de introdução a seguir.

Para processar dados, seu aplicativo do Managed Service for Apache Flink usa um script Python para definir o fluxo de dados que processa a entrada e produz a saída usando o runtime do Apache Flink. 

Um aplicativo típico do Managed Service for Apache Flink tem os seguintes componentes:
+ **Propriedades de runtime:** você pode usar as *propriedades de runtime* para configurar seu aplicativo sem recompilar o código do aplicativo. 
+ **Fontes:** o aplicativo consome dados de uma ou mais *fontes*. Uma fonte usa um [conector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) para ler dados de um sistema externo, como um fluxo de dados do Kinesis ou um tópico do Amazon MSK. Também é possível usar conectores especiais para gerar dados de dentro do aplicativo. Quando você usa SQL, o aplicativo define fontes como *tabelas de origem*. 
+ **Transformações:** o aplicativo processa dados usando uma ou mais *transformações* que podem filtrar, enriquecer ou agregar dados. Quando você usa SQL, o aplicativo define transformações como consultas SQL. 
+ **Coletores:** o aplicativo envia dados para fontes externas por meio de *coletores*. Um coletor usa um [conector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) para enviar dados para um sistema externo, como um fluxo de dados do Kinesis, um tópico do Amazon MSK, um bucket do Amazon S3 ou um banco de dados relacional. Você também pode usar um conector especial para imprimir a saída para fins de desenvolvimento. Quando você usa SQL, o aplicativo define coletores como *tabelas de coletores* nas quais você insere resultados. Para obter mais informações, consulte [Grave dados com coletores no Managed Service for Apache Flink](how-sinks.md).

Seu aplicativo Python também pode exigir dependências externas, como bibliotecas Python adicionais ou qualquer conector Flink que o aplicativo use. Ao empacotar o aplicativo, você deve incluir todas as dependências que ele exige. Este tutorial demonstra como incluir dependências de conectores e como empacotar o aplicativo para implantação no Amazon Managed Service for Apache Flink.

## Atenda os pré-requisitos
<a name="gs-python-prerequisites"></a>

Para concluir este tutorial, você precisa atender aos seguintes pré-requisitos:
+ **Python 3.11** [https://docs.conda.io/en/latest/](https://docs.conda.io/en/latest/)
+  [Cliente do Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) - Instale o cliente do Git, se isso ainda não foi feito.
+ [Java Development Kit (JDK) versão 11](https://www.oracle.com/java/technologies/downloads/#java11) - instale um Java JDK 11 e defina a variável de ambiente `JAVA_HOME` para apontar para seu local de instalação. Se você não tem um JDK 11, pode usar [Amazon Corretto](https://docs.aws.amazon.com/corretto) ou qualquer JDK padrão de sua escolha. 
  + Para verificar se o JDK está instalado corretamente, execute o seguinte comando. A saída será diferente se estiver usando um JDK diferente do Amazon Corretto 11. Verifique se a versão é a 11.x.

    ```
    $ java --version
    
    openjdk 11.0.23 2024-04-16 LTS
    OpenJDK Runtime Environment Corretto-11.0.23.9.1 (build 11.0.23+9-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.23.9.1 (build 11.0.23+9-LTS, mixed mode)
    ```
+ [Apache Maven](https://maven.apache.org/) - instale o Apache Maven se ainda não tiver feito isso. Para obter mais informações, consulte [Como instalar o Apache Maven](https://maven.apache.org/install.html).
  + Para testar a instalação do Apache Maven, use o seguinte comando:

    ```
    $ mvn -version
    ```

**nota**  
Embora o aplicativo seja escrito em Python, o Apache Flink é executado na Java Virtual Machine (JVM). Ela distribui a maioria das dependências, como o conector Kinesis, em arquivos JAR. Para gerenciar essas dependências e empacotar o aplicativo em um arquivo ZIP, use o [Apache Maven](https://maven.apache.org/). Este tutorial explica como fazer isso. 

**Atenção**  
Recomendamos usar Python 3.11 para desenvolvimento local. Essa é a mesma versão do Python usada pelo Amazon Managed Service for Apache Flink com o runtime do Flink 1.19.   
A instalação da biblioteca Python Flink 1.19 no Python 3.12 pode falhar.  
Se você tiver outra versão do Python instalada por padrão em sua máquina, recomendamos criar um ambiente independente, como o uso do VirtualEnv Python 3.11.

**IDE para desenvolvimento local**

Recomendamos que você use um ambiente de desenvolvimento como [PyCharm](https://www.jetbrains.com/pycharm/)o [Visual Studio Code](https://code.visualstudio.com/) para desenvolver e compilar seu aplicativo.

Em seguida, conclua duas primeiras etapas de [Comece a usar o Amazon Managed Service para Apache Flink (DataStream API)](getting-started.md):
+ [Configurar uma AWS conta e criar um usuário administrador](setting-up.md)
+ [Configure o AWS Command Line Interface (AWS CLI)](setup-awscli.md)

Para começar, consulte o [Criar uma aplicação do ](gs-python-createapp.md).

# Crie e execute um aplicativo Managed Service for Apache Flink para o aplicativo Python
<a name="gs-python-createapp"></a>

Nesta seção, você cria um aplicativo Managed Service for Apache Flink para aplicativo Python com um fluxo do Kinesis como fonte e coletor.

**Topics**
+ [Crie recursos dependentes](#gs-python-resources)
+ [Configurar seu ambiente de desenvolvimento local](#gs-python-set-up)
+ [Baixe e examine o código Python da transmissão Apache Flink](#gs-python-download)
+ [Gerencie dependências do JAR](#gs-python-jar-dependencies)
+ [Gravação de registros de amostra no fluxo de entrada](#gs-python-sample-records)
+ [Execute o aplicativo localmente](#gs-python-run-locally)
+ [Observe os dados de entrada e saída nos fluxos do Kinesis](#gs-python-observe-input-output)
+ [Interrompa seu aplicativo em execução localmente](#gs-python-stop)
+ [Empacote o código do seu aplicativo](#gs-python-package-code)
+ [Faça upload do pacote do aplicativo para um bucket do Amazon S3](#gs-python-upload-bucket)
+ [Crie e configure o aplicativo do Managed Service for Apache Flink](#gs-python-7)
+ [Próxima etapa](#gs-python-next-step-4)

## Crie recursos dependentes
<a name="gs-python-resources"></a>

Antes de criar um Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes: 
+ Duas transmissões do Kinesis para entrada e saída.
+ Um bucket do Amazon S3 para armazenar o código do aplicativo.

**nota**  
Este tutorial pressupõe que você está implantando seu aplicativo na região us-east-1. Se você usa outra região, deve adaptar todas as etapas corretamente.

### Criar dois fluxos do Kinesis
<a name="gs-python-resources-streams"></a>

Antes de criar um aplicativo do Managed Service for Apache Flink para este exercício, crie dois fluxos de dados do Kinesis (`ExampleInputStream` e `ExampleOutputStream`) na mesma região que você usará para implantar seu aplicativo (us-east-1 neste exemplo). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.

É possível criar esses fluxos usando o console do Amazon Kinesis ou o comando da AWS CLI a seguir. Para obter instruções sobre o console, consulte [Criar e atualizar fluxos de dados](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*. 

**Como criar os fluxos de dados (AWS CLI)**

1. Para criar o primeiro stream (`ExampleInputStream`), use o seguinte comando do Amazon Kinesis `create-stream` AWS CLI .

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1
   ```

1. Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome da transmissão para `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1
   ```

### Crie um bucket Amazon S3
<a name="gs-python-resources-s3"></a>

Você pode criar um bucket do Amazon S3 usando o console. Para obter instruções sobre como criar esse recurso, consulte os tópicos a seguir:
+ Para obter instruções, consulte [Como criar um bucket do S3?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) no *Guia do usuário do Amazon Simple Storage Service*. Dê ao bucket do Amazon S3 um nome globalmente exclusivo anexando seu nome de login.
**nota**  
Crie o bucket do S3 na região que você usa para este tutorial (us-east-1).

### Outros recursos da
<a name="gs-python-resources-cw"></a>

Quando você cria seu aplicativo, o Managed Service for Apache Flink cria os seguintes CloudWatch recursos da Amazon, caso eles ainda não existam:
+ Um grupo de logs chamado `/AWS/KinesisAnalytics-java/<my-application>`.
+ Um fluxo de logs chamado `kinesis-analytics-log-stream`.

## Configurar seu ambiente de desenvolvimento local
<a name="gs-python-set-up"></a>

Para desenvolvimento e depuração, você pode executar o aplicativo Python Flink na máquina. Você pode iniciar o aplicativo a partir da linha de comando com `python main.py` ou em um IDE Python de sua preferência. 

**nota**  
Na máquina de desenvolvimento, você deve ter o Python 3.10 ou 3.11, o Java 11, o Apache Maven e o Git instalados. Recomendamos que você use um IDE como o [PyCharm](https://www.jetbrains.com/pycharm/)[Visual Studio Code](https://code.visualstudio.com/). Para verificar se você atende a todos os pré-requisitos, consulte [Atenda os pré-requisitos para concluir os exercícios](gs-python.md#gs-python-prerequisites) antes de continuar. 

### Instale a PyFlink biblioteca
<a name="gs-python-install-pyflink"></a>

Para desenvolver o aplicativo e executá-lo localmente, você deve instalar a biblioteca Flink Python.

1. Crie um ambiente Python autônomo VirtualEnv usando o Conda ou qualquer ferramenta Python similar. 

1. Instale a PyFlink biblioteca nesse ambiente. Use a mesma versão de runtime do Apache Flink que você usará no Amazon Managed Service for Apache Flink. Atualmente, o runtime recomendado é 1.19.1. 

   ```
   $ pip install apache-flink==1.19.1
   ```

1. Certifique-se de que o ambiente esteja ativo ao executar o aplicativo. Se você executar o aplicativo no IDE, certifique-se de que ele esteja usando o ambiente como runtime. O processo depende do IDE que você está usando: 
**nota**  
Você só precisa instalar a PyFlink biblioteca. Você **não** precisa instalar um cluster Apache Flink na máquina. 

### Autentique sua sessão AWS
<a name="gs-python-authenticate"></a>

O aplicativo usa fluxos de dados do Kinesis para publicar dados. Ao executar localmente, você deve ter uma sessão AWS autenticada válida com permissões para gravar no stream de dados do Kinesis. Use as etapas a seguir para autenticar sua sessão:

1. Se você não tiver o AWS CLI e um perfil nomeado com credencial válida configurado, consulte[Configure o AWS Command Line Interface (AWS CLI)](setup-awscli.md).

1. Verifique se o seu AWS CLI está configurado corretamente e se seus usuários têm permissões para gravar no stream de dados do Kinesis publicando o seguinte registro de teste:

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. Se o seu IDE tiver um plug-in com o qual se integrar AWS, você poderá usá-lo para passar as credenciais para o aplicativo em execução no IDE. Para obter mais informações, consulte [AWS Toolkit for PyCharm](https://aws.amazon.com/pycharm/), [AWS Toolkit for Visual Studio](https://aws.amazon.com/visualstudiocode/) Code [AWS e Toolkit for](https://aws.amazon.com/intellij/) IntelliJ IDEA.

## Baixe e examine o código Python da transmissão Apache Flink
<a name="gs-python-download"></a>

O código do aplicativo Python para este exemplo está disponível em. GitHub Para fazer download do código do aplicativo, faça o seguinte:

1. Duplique o repositório remoto usando o seguinte comando:

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. Navegue até o diretório `./python/GettingStarted`.

### Analise os componentes do aplicativo
<a name="gs-python-review"></a>

O código do aplicativo está localizado em `main.py`. Usamos SQL incorporado em Python para definir o fluxo do aplicativo. 

**nota**  
Para uma experiência de desenvolvedor otimizada, o aplicativo foi projetado para ser executado sem nenhuma alteração de código no Amazon Managed Service for Apache Flink e localmente, para desenvolvimento em sua máquina. O aplicativo usa a variável de ambiente `IS_LOCAL = true` para detectar quando está sendo executado localmente. Você deve definir a variável de ambiente `IS_LOCAL = true` no seu shell ou na configuração de execução do seu IDE.
+ O aplicativo configura o ambiente de execução e lê a configuração do runtime. Para funcionar no Amazon Managed Service for Apache Flink e localmente, o aplicativo verifica a variável `IS_LOCAL`. 
  + O comportamento padrão a seguir é quando o aplicativo é executado no Amazon Managed Service for Apache Flink: 

    1. Carregue as dependências empacotadas com o aplicativo. Para obter mais informações, consulte o (link)

    1. Carregue a configuração das propriedades do Runtime que você define no aplicativo Amazon Managed Service for Apache Flink. Para obter mais informações, consulte o (link)
  + Quando o aplicativo detecta `IS_LOCAL = true` quando você o executa localmente:

    1. Carrega dependências externas do projeto.

    1. Carrega a configuração do arquivo `application_properties.json` incluído no projeto. 

       ```
       ...
       APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"
       ...
       is_local = (
           True if os.environ.get("IS_LOCAL") else False
       )
       ...
       if is_local:
           APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"
           CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
           table_env.get_config().get_configuration().set_string(
               "pipeline.jars",
               "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar",
           )
       ```
+ O aplicativo define uma tabela de origem com uma declaração `CREATE TABLE`, usando o [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/). Essa tabela lê dados do fluxo de entrada do Kinesis. O aplicativo usa o nome do fluxo, a região e a posição inicial da configuração do runtime. 

  ```
  table_env.execute_sql(f"""
          CREATE TABLE prices (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3),
                  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{input_stream_name}',
                  'aws.region' = '{input_stream_region}',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                ) """)
  ```
+ O aplicativo também define uma tabela de coletor usando o [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) neste exemplo. Essa tabela envia dados para o fluxo de saída do Kinesis.

  ```
  table_env.execute_sql(f"""
              CREATE TABLE output (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3)
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{output_stream_name}',
                  'aws.region' = '{output_stream_region}',
                  'sink.partitioner-field-delimiter' = ';',
                  'sink.batch.max-size' = '100',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                )""")
  ```
+ Por fim, o aplicativo executa um SQL que `INSERT INTO...` a tabela de coletor a partir da tabela de origem. Em um aplicativo mais complexo, é provável que você tenha etapas extras para transformar os dados antes de gravar no coletor. 

  ```
  table_result = table_env.execute_sql("""INSERT INTO output 
          SELECT ticker, price, event_time FROM prices""")
  ```
+ Você deve adicionar outra etapa no final da função `main()` para executar o aplicativo localmente:

  ```
  if is_local:
      table_result.wait()
  ```

  Sem essa instrução, o aplicativo é encerrado imediatamente quando é executado localmente. Você não deve executar essa declaração ao executar seu aplicativo no Amazon Managed Service for Apache Flink.

## Gerencie dependências do JAR
<a name="gs-python-jar-dependencies"></a>

Um PyFlink aplicativo geralmente requer um ou mais conectores. O aplicativo neste tutorial usa o [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/). Como o Apache Flink é executado em Java JVM, os conectores são distribuídos como arquivos JAR, independentemente de você implementar seu aplicativo em Python. Você deve empacotar essas dependências com o aplicativo ao implantá-lo no Amazon Managed Service for Apache Flink. 

Neste exemplo, mostramos como usar o Apache Maven para buscar as dependências e empacotar o aplicativo para ser executado no Managed Service para Apache Flink.

**nota**  
Existem formas alternativas de buscar e empacotar dependências. Este exemplo demonstra um método que funciona corretamente com um ou mais conectores. Ele também permite que executar o aplicativo localmente, para desenvolvimento e no Managed Service for Apache Flink sem alterações no código. 

### Usar o arquivo pom.xml
<a name="gs-python-jar-pom"></a>

O Apache Maven usa o arquivo `pom.xml` para controlar dependências e pacotes de aplicativos. 

Todas as dependências do JAR são especificadas no arquivo `pom.xml` no bloco `<dependencies>...</dependencies>`. 

```
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>4.3.0-1.19</version>
        </dependency>
    </dependencies>
    ...
```

Para encontrar o artefato e a versão corretos do conector a serem usados, consulte [Use os conectores do Apache Flink com o Managed Service for Apache Flink](how-flink-connectors.md). Certifique-se de consultar a versão do Apache Flink que está usando. Neste exemplo, usamos o conector do Kinesis. Para o Apache Flink 1.19, a versão do conector é `4.3.0-1.19`.

**nota**  
Se estiver usando o Apache Flink 1.19, não há uma versão de conector lançada especificamente para essa versão. Use os conectores lançados para a 1.18.

### Baixe e empacote dependências
<a name="gs-python-dependencies-download"></a>

Use o Maven para baixar as dependências definidas no arquivo `pom.xml` e empacotá-las para o aplicativo Python Flink. 

1. Navegue até o diretório que contém o projeto Python Getting Started chamado `python/GettingStarted`.

1. Execute este comando: .

```
$ mvn package
```

O Maven cria um novo arquivo chamado `./target/pyflink-dependencies.jar`. Quando está desenvolvendo localmente em sua máquina, o aplicativo Python procura esse arquivo. 

**nota**  
Se você esquecer de executar este comando, ao tentar executar seu aplicativo, ele falhará com o erro: **Não foi possível encontrar nenhuma fábrica para o identificador “kinesis”**.

## Gravação de registros de amostra no fluxo de entrada
<a name="gs-python-sample-records"></a>

Nesta seção, você enviará amostras de registros para o fluxo a serem processados pelo aplicativo. Você tem duas opções para gerar dados de amostra, usando um script Python ou o [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator).

### Gerar dados de amostra usando um script Python
<a name="gs-python-sample-data"></a>

É possível usar um script Python para enviar registros de amostra para o fluxo.

**nota**  
Para executar esse script do Python, você deve usar o Python 3.x e ter a biblioteca [AWS SDK para Python (Boto)](https://aws.amazon.com/developer/language/python/) instalada.

**Para começar a enviar dados de teste para o fluxo de entrada do Kinesis:**

1. Baixe o script `stock.py` Python do gerador de dados no repositório do [gerador GitHub de dados](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator).

1. Execute o script `stock.py`:

   ```
   $ python stock.py
   ```

Mantenha o script em execução enquanto você conclui o restante do tutorial. Agora você pode executar o aplicativo Apache Flink.

### Gerar dados de amostra usando o Kinesis Data Generator
<a name="gs-python-sample-kinesis"></a>

Como alternativa ao script Python, você pode usar o [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator), também disponível em uma [versão hospedada](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html), para enviar dados de amostra aleatórios para o fluxo. O Kinesis Data Generator é executado no seu navegador e você não precisa instalar nada na máquina. 

**Para configurar e executar o Kinesis Data Generator:** 

1. Siga as instruções na [documentação do Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) para configurar o acesso à ferramenta. Você executará um CloudFormation modelo que configura um usuário e uma senha. 

1. Acesse o Kinesis Data Generator por meio da URL gerada pelo CloudFormation modelo. Você pode encontrar o URL na guia **Saída** após a conclusão do CloudFormation modelo. 

1. Configure o gerador de dados:
   + **Região:** selecione a região que utilizada neste tutorial: us-east-1
   + **Fluxo/fluxo de entrega:** selecione o fluxo de entrada que o aplicativo usará: `ExampleInputStream`
   + **Registros por segundo:** 100
   + **Modelo de registro:** copie e cole o modelo a seguir:

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. Teste o modelo: selecione **Modelo de teste** e verifique se o registro gerado é semelhante ao seguinte:

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. Inicie o gerador de dados: escolha **Selecionar dados a enviar**.

O Kinesis Data Generator agora está enviando dados para o `ExampleInputStream`. 

## Execute o aplicativo localmente
<a name="gs-python-run-locally"></a>

Você pode testar o aplicativo localmente, executando a partir da linha de comando com `python main.py` ou a partir do IDE.

Para executar seu aplicativo localmente, você deve ter a versão correta da PyFlink biblioteca instalada conforme descrito na seção anterior. Para obter mais informações, consulte o (link)

**nota**  
Antes de continuar, verifique se os fluxos de entrada e saída estão disponíveis. Consulte [Criar dois fluxos de dados do Amazon Kinesis](get-started-exercise.md#get-started-exercise-1). Além disso, verifique se você tem permissão para ler e gravar em ambos os fluxos. Consulte [Autentique sua sessão AWS](get-started-exercise.md#get-started-exercise-2-5). 

### Importe o projeto Python para o IDE
<a name="gs-python-import"></a>

Para começar a trabalhar no aplicativo em seu IDE, você deve importá-lo como um projeto Python. 

O repositório que você clonou contém vários exemplos. Cada exemplo é um projeto separado. Para este tutorial, importe o conteúdo do subdiretório `./python/GettingStarted` para o seu IDE. 

Importe o código como um projeto Python existente.

**nota**  
O processo exato para importar um novo projeto Python varia de acordo com o IDE que você está usando.

### Verifique a configuração do aplicativo local
<a name="gs-python-check-configuration"></a>

Ao ser executado localmente, o aplicativo usa a configuração no arquivo `application_properties.json` na pasta de recursos do projeto sob `./src/main/resources`. Você pode editar esse arquivo para usar diferentes nomes ou regiões de fluxos do Kinesis.

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### Execute o aplicativo Python localmente
<a name="gs-python-run-locally"></a>

Você pode executar o aplicativo localmente, a partir da linha de comando, como um script Python normal, ou a partir do IDE. 

**Para executar seu aplicativo a partir da linha de comando**

1. Certifique-se de que o ambiente autônomo do Python, como o Conda VirtualEnv ou onde você instalou a biblioteca Python Flink, esteja ativo no momento. 

1. Verifique se você executou o `mvn package` pelo menos uma vez. 

1. Defina a `IS_LOCAL = true` variável de ambiente:

   ```
   $ export IS_LOCAL=true
   ```

1. Execute o aplicativo como um script Python normal.

   ```
   $python main.py
   ```

**Para executar o aplicativo de dentro do IDE**

1. Configure seu IDE para executar o script `main.py` com a seguinte configuração:

   1. Use o ambiente Python independente, como o Conda VirtualEnv ou onde você instalou a biblioteca. PyFlink 

   1. Use as AWS credenciais para acessar os streams de dados de entrada e saída do Kinesis. 

   1. Defina `IS_LOCAL = true`.

1. O processo exato para definir a configuração de execução depende do IDE e varia. 

1. Depois de configurar o IDE, execute o script Python e use as ferramentas fornecidas pelo seu IDE enquanto o aplicativo estiver em execução. 

### Inspecione os logs do aplicativo localmente
<a name="gs-python-run-IDE"></a>

Quando executado localmente, o aplicativo não mostra nenhum registro no console, exceto algumas linhas impressas e exibidas quando o aplicativo é iniciado. PyFlink grava registros em um arquivo no diretório em que a biblioteca Python Flink está instalada. O aplicativo imprime a localização dos logs quando é iniciado. Você também pode executar o seguinte comando para encontrar os logs:

```
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
```

1. Liste os arquivos no diretório de registro de logs. Normalmente, você encontra um único arquivo `.log`.

1. Acompanhe o arquivo enquanto o aplicativo estiver em execução: `tail -f <log-path>/<log-file>.log`.

## Observe os dados de entrada e saída nos fluxos do Kinesis
<a name="gs-python-observe-input-output"></a>

Você pode observar os registros enviados ao fluxo de entrada pelo Python (geração de amostra) ou pelo Kinesis Data Generator (link) usando o **Visualizador de dados** no console do Amazon Kinesis. 

**Para observar registros:**  [Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)  Verifique se a região é a mesma executada este tutorial, que é us-east-1 Leste dos EUA (Norte da Virgínia) por padrão. Altere a região se ela não corresponde.    Escolha **Fluxos de dados**.    Selecione o fluxo que deseja observar, `ExampleInputStream` ou `ExampleOutputStream.`   Selecione a guia **Visualizador de dados**.    Escolha qualquer **Fragmento**, mantenha **Último** como **Posição inicial** e, em seguida, escolha **Obter registros**. Talvez você veja o erro “Nenhum registro encontrado para esta solicitação”. Em caso afirmativo, escolha **Tentar obter registros novamente**. Os registros mais recentes publicados no fluxo são exibidos.    Selecione o valor na coluna Dados para inspecionar o conteúdo do registro no formato JSON.   

## Interrompa seu aplicativo em execução localmente
<a name="gs-python-stop"></a>

Interrompa o aplicativo em execução no IDE. O IDE geralmente fornece uma opção de “interromper”. A localização e o método exatos dependem do IDE. 

## Empacote o código do seu aplicativo
<a name="gs-python-package-code"></a>

Nesta seção, você usa o Apache Maven para empacotar o código do aplicativo e todas as dependências necessárias em um arquivo .zip. 

Execute o comando do pacote Maven mais uma vez:

```
$ mvn package
```

Este comando gera o arquivo `target/managed-flink-pyflink-getting-started-1.0.0.zip`.

## Faça upload do pacote do aplicativo para um bucket do Amazon S3
<a name="gs-python-upload-bucket"></a>

Nesta seção, você faz o upload do arquivo .zip criado na seção anterior no bucket do Amazon Simple Storage Service (Amazon S3) criado no início deste tutorial. Se você não concluiu essa etapa, consulte o (link).

**Para fazer upload do arquivo JAR do código do aplicativo**

1. Abra o console do Amazon S3 em [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Selecione o bucket que você criou anteriormente para o código do aplicativo.

1. Escolha **Carregar**.

1. Escolha **Adicionar arquivos**.

1. Navegue até o arquivo .zip gerado na etapa anterior: `target/managed-flink-pyflink-getting-started-1.0.0.zip`. 

1. Escolha **Fazer upload** sem alterar nenhuma outra configuração.

## Crie e configure o aplicativo do Managed Service for Apache Flink
<a name="gs-python-7"></a>

É possível criar e configurar um aplicativo Managed Service for Apache Flink usando o console ou a AWS CLI. Para este tutorial, você usará o console. 

### Criar a aplicação
<a name="gs-python-7-console-create"></a>

1. Faça login no e abra Console de gerenciamento da AWS o console Amazon MSF em https://console.aws.amazon.com /flink.

1. Verifique se a região correta está selecionada: Leste dos EUA (Norte da Virgínia) us-east-1.

1. Abra o menu do lado direito e escolha **Aplicativos Apache Flink** e, em seguida, **Criar aplicativo de transmissão**. Como alternativa, escolha **Criar aplicativo de transmissão** na seção **Introdução** da página inicial. 

1. Na página **Criar aplicativos de transmissão**:
   + Em **Escolha um método para configurar o aplicativo de processamento de fluxo**, escolha **Criar do zero**.
   + Para **Configuração do Apache Flink, versão do aplicativo Flink**, escolha **Apache Flink 1.19**.
   + Para **Configuração do aplicativo**:
     + Em **Nome do aplicativo**, insira **MyApplication**.
     + Em **Descrição**, insira **My Python test app**.
     + Em **Acesso aos recursos do aplicativo**, escolha **Create/update IAM role kinesis-analytics-MyApplication-us -east-1 com as políticas necessárias**.
   + Para **Modelo para configurações de aplicativos**:
     + Para **Modelos**, escolha **Desenvolvimento**.
   + Escolha **Criar aplicativo de transmissão**.

**nota**  
Ao criar um aplicativo Managed Service for Apache Flink usando o console, você tem a opção de ter um perfil do IAM e uma política criada para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses recursos do IAM são nomeados usando o nome do aplicativo e a região da seguinte forma:  
Política: `kinesis-analytics-service-MyApplication-us-west-2`
Função: `kinesisanalytics-MyApplication-us-west-2`
Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como *Kinesis Data Analytics*. O nome dos recursos gerados automaticamente recebe o prefixo `kinesis-analytics` para fins de compatibilidade com versões anteriores.

### Editar a política do IAM
<a name="gs-python-7-console-iam"></a>

Edite a política do IAM para adicionar permissões para acessar o bucket do Amazon S3.

**Editar a política do IAM para adicionar permissões do bucket do S3**

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Selecione **Políticas**. Selecione a política **`kinesis-analytics-service-MyApplication-us-east-1`** que o console criou na seção anterior. 

1. Escolha a guia **Edit (Editar)** e escolha a guia **JSON**.

1. Adicione a seção destacada do exemplo de política a seguir à política. Substitua a conta de amostra IDs (*012345678901*) pelo ID da sua conta.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1. Escolha **Próximo** e, em seguida, escolha **Salvar alterações**.

### Configure o aplicativo
<a name="gs-python-7-console-configure"></a>

Edite a configuração do aplicativo para definir o artefato do código do aplicativo. 

**Configurar o aplicativo**

1. Na **MyApplication**página, escolha **Configurar**.

1. Na seção **Localização do código do aplicativo**: 
   + Para o **bucket do Amazon S3**, selecione o bucket criado anteriormente para o código do aplicativo. Escolha **Procurar**, selecione o bucket correto e, em seguida, selecione **Escolher**. Não selecione o nome do bucket. 
   + Em **Caminho do objeto do Amazon S3**, insira **managed-flink-pyflink-getting-started-1.0.0.zip**.

1. Em **Permissões de acesso**, selecione **Criar/atualizar o perfil do IAM `kinesis-analytics-MyApplication-us-east-1` com as políticas necessárias**.

1. Navegue até as **Propriedades do Runtime** e mantenha os valores padrão para todas as outras configurações.

1. Selecione **Adicionar novo item** e adicione cada um dos seguintes parâmetros:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/managed-flink/latest/java/gs-python-createapp.html)

1. Não modifique nenhuma das outras seções e escolha **Salvar alterações**.

**nota**  
Quando você opta por ativar o CloudWatch registro na Amazon, o Managed Service for Apache Flink cria um grupo de logs e um stream de logs para você. Os nomes desses recursos são os seguintes:   
Grupo de logs: `/aws/kinesis-analytics/MyApplication`
Fluxo de logs: `kinesis-analytics-log-stream`

### Execute o aplicativo
<a name="gs-python-7-console-run"></a>

Agora o aplicativo está configurado e pronto para execução.

**Executar o aplicativo**

1. No console do Amazon Managed Service for Apache Flink, escolha **Meu aplicativo** e escolha **Executar**.

1. Na próxima página, que é a página de configuração de restauração do aplicativo, escolha **Executar com o snapshot mais recente** e, em seguida, escolha **Executar**. 

   O **Status** nos **Detalhes do aplicativo** muda de `Ready` para `Starting` e depois para `Running` quando o aplicativo é iniciado.

Quando o aplicativo está no status `Running`, aí é possível abrir o painel do Flink. 

**Para abrir o painel do**

1. Selecione **Abrir painel do Apache Flink**. O painel é aberto em uma nova página.

1. Na lista **Trabalhos em execução**, escolha o único trabalho exibido. 
**nota**  
Se você definiu as propriedades do Runtime ou editou as políticas do IAM incorretamente, o status do aplicativo pode se transformar em `Running`, mas o painel do Flink mostra que o trabalho está sendo reiniciado continuamente. Esse é um cenário de falha comum se o aplicativo estiver configurado incorretamente ou não tiver permissões para acessar recursos externos.   
Quando isso acontecer, verifique a guia **Exceções** no painel do Flink para verificar a causa do problema.

### Observe as métricas do aplicativo em execução
<a name="gs-python-observe-metrics"></a>

Na **MyApplication**página, na seção de ** CloudWatch métricas da Amazon**, você pode ver algumas das métricas fundamentais do aplicativo em execução. 

**Para visualizar as métricas**

1. Ao lado do botão **Atualizar**, selecione **10 segundos** na lista suspensa.

1. Quando o aplicativo está em execução e íntegro, é possível ver a métrica de **tempo de atividade** aumentando continuamente.

1. A métrica **fullrestarts** deve ser zero. Se estiver aumentando, a configuração pode ter problemas. Para investigar o problema, revise a guia **Exceções** no painel do Flink.

1. A métrica **Número de pontos de verificação com falha** deve ser zero em um aplicativo íntegro. 
**nota**  
Esse painel exibe um conjunto fixo de métricas com uma granularidade de 5 minutos. Você pode criar um painel de aplicativos personalizado com qualquer métrica no CloudWatch painel.

### Observe os dados de saída nos fluxos do Kinesis
<a name="gs-python-observe-output"></a>

Verifique se você ainda está publicando dados na entrada, usando o script Python ou o Kinesis Data Generator. 

Agora você pode observar a saída do aplicativo em execução no Managed Service for Apache Flink usando o Visualizador de Dados no [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/), da mesma forma que você já fez anteriormente. 

**Para visualizar a saída**

1. [Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Verifique se a região é a mesma utilizada para executar este tutorial. Por padrão, é us-east-1 Leste dos EUA (Norte da Virgínia). Altere a região se necessário.

1. Escolha **Fluxos de dados**. 

1. Selecione o fluxo que deseja observar. Para este tutorial, use `ExampleOutputStream`. 

1.  Selecione a guia **Visualizador de dados**. 

1. Selecione qualquer **Fragmento**, mantenha **Último** como **Posição inicial** e, em seguida, escolha **Obter registros**. Talvez você veja o erro “Nenhum registro encontrado para esta solicitação”. Em caso afirmativo, escolha **Tentar obter registros novamente**. Os registros mais recentes publicados no fluxo são exibidos.

1. Selecione o valor na coluna Dados para inspecionar o conteúdo do registro no formato JSON.

### Interromper o aplicativo
<a name="gs-python-7-console-stop"></a>

Para interromper o aplicativo, acesse a página do console do aplicativo do Managed Service for Apache Flink, `MyApplication`.

**Como interromper o aplicativo**

1. Na lista suspensa **Ação**, escolha **Interromper**.

1. O **Status** nos **Detalhes do aplicativo** muda de `Running` para `Stopping` e depois para `Ready` quando o aplicativo é completamente interrompido. 
**nota**  
Não se esqueça também de parar de enviar dados para o fluxo de entrada a partir do script Python ou do Kinesis Data Generator.

## Próxima etapa
<a name="gs-python-next-step-4"></a>

[Limpe AWS os recursos](gs-python-cleanup.md)

# Limpe AWS os recursos
<a name="gs-python-cleanup"></a>

Esta seção inclui procedimentos para limpar AWS recursos criados no tutorial Getting Started (Python).

**Topics**
+ [Exclua o seu aplicativo Managed Service for Apache Flink](#gs-python-cleanup-app)
+ [Exclua seus fluxos de dados do Kinesis](#gs-python-cleanup-msk)
+ [Exclua seus objetos e bucket do Amazon S3](#gs-python-cleanup-s3)
+ [Exclua seus recursos do IAM](#gs-python-cleanup-iam)
+ [Exclua seus CloudWatch recursos](#gs-python-cleanup-cw)

## Exclua o seu aplicativo Managed Service for Apache Flink
<a name="gs-python-cleanup-app"></a>

Siga o procedimento a seguir para excluir o aplicativo.

**Para excluir o aplicativo**

1. [Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. No painel Managed Service for Apache Flink, escolha. **MyApplication**

1. Na lista suspensa **Ações**, selecione **Excluir** e confirme a exclusão.

## Exclua seus fluxos de dados do Kinesis
<a name="gs-python-cleanup-msk"></a>

1. Faça login no e abra Console de gerenciamento da AWS o console Amazon MSF em https://console.aws.amazon.com /flink.

1. Escolha **Fluxos de dados**.

1. Selecione os dois fluxos criados, `ExampleInputStream` e `ExampleOutputStream`. 

1. Na lista suspensa **Ações**, selecione **Excluir** e confirme a exclusão.

## Exclua seus objetos e bucket do Amazon S3
<a name="gs-python-cleanup-s3"></a>

Siga o procedimento a seguir para excluir objetos e bucket do S3.

**Para excluir o objeto de um bucket do S3**

1. Abra o console do Amazon S3 em [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Selecione o bucket do S3 que criado para o artefato do aplicativo.

1. Selecione o artefato do aplicativo que você carregou, chamado `amazon-msf-java-stream-app-1.0.jar`.

1. Selecione **Excluir** para confirmar a exclusão.

**Para excluir o bucket do S3**

1. Abra o console do Amazon S3 em [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Selecione o bucket criado para os artefatos.

1. Selecione **Excluir** para confirmar a exclusão.
**nota**  
Para ser excluído, o bucket do S3 deve estar vazio.

## Exclua seus recursos do IAM
<a name="gs-python-cleanup-iam"></a>

Siga o procedimento a seguir para excluir seus recursos do IAM.

**Excluir seus recursos do IAM**

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Na barra de navegação, selecione **Políticas**.

1. No controle do filtro, insira **kinesis**.

1. Escolha a política **kinesis-analytics-service- MyApplication -us-east-1**.

1. Selecione **Ações da política** e, em seguida, **Excluir**.

1. Na barra de navegação, selecione **Roles (Funções)**.

1. Escolha a função **kinesis-analytics- MyApplication** -us-east-1.

1. Selecione **Excluir função** e, em seguida, confirme a exclusão.

## Exclua seus CloudWatch recursos
<a name="gs-python-cleanup-cw"></a>

Use o procedimento a seguir para excluir seus CloudWatch recursos.

**Para excluir seus CloudWatch recursos**

1. Abra o CloudWatch console em [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. No painel de navegação, selecione **Logs**.

1. Escolha o grupo**/aws/kinesis-analytics/MyApplication**log.

1. Selecione **Excluir grupo de logs** e, em seguida, confirme a exclusão.