

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Introducción a Amazon Managed Service para Apache Flink para Python
<a name="gs-python"></a>

Esta sección presenta una introducción a los conceptos fundamentales de un Managed Service para Apache Flink utilizando Python y la API de tabla. Describe las opciones disponibles para crear y probar sus aplicaciones. También proporciona instrucciones para instalar las herramientas necesarias para completar los tutoriales de esta guía y crear su primera aplicación. 

**Topics**
+ [

## Revisión de los componentes de una aplicación de Managed Service para Apache Flink
](#gs-python-table-components)
+ [

## Cumplimiento con los requisitos previos
](#gs-python-prerequisites)
+ [

# Creación y ejecución de una aplicación de Managed Service para Apache Flink para Python
](gs-python-createapp.md)
+ [

# Limpie los recursos AWS
](gs-python-cleanup.md)

## Revisión de los componentes de una aplicación de Managed Service para Apache Flink
<a name="gs-python-table-components"></a>

**nota**  
Amazon Managed Service for Apache Flink es compatible con todos los [Apache APIs Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/overview/#flinks-apis). Según la API que se elija, la estructura de la aplicación es ligeramente diferente. Un enfoque popular al desarrollar una aplicación Apache Flink en Python es definir el flujo de la aplicación mediante SQL integrado en el código Python. Este es el enfoque que seguimos en el siguiente tutorial de introducción.

Para procesar los datos, su aplicación de Managed Service para Apache Flink utiliza un script de Python que define el flujo de datos que procesa las entradas y produce las salidas mediante el tiempo de ejecución de Apache Flink. 

Una aplicación de Managed Service para Apache Flink típica tiene los siguientes componentes:
+ **Propiedades de tiempo de ejecución:** puede usar las *propiedades de tiempo de ejecución* para configurar su aplicación sin tener que volver a compilar el código de la aplicación. 
+ **Fuentes:** la aplicación consume datos de una o más *fuentes*. Una fuente utiliza un [conector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) para leer datos de un sistema externo, como un flujo de datos de Kinesis o un tema de Amazon MSK. También se pueden utilizar conectores especiales para generar datos desde la aplicación. Cuando se utiliza SQL, la aplicación define las fuentes como *tablas de fuentes*. 
+ **Transformaciones:** la aplicación procesa los datos mediante una o más *transformaciones* que pueden filtrar, enriquecer o agregar datos. Cuando se utiliza SQL, la aplicación define las transformaciones como consultas SQL. 
+ **Receptores:** la aplicación envía los datos a fuentes externas a través de los *receptores*. Un receptor utiliza un [conector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) para enviar datos a un sistema externo, como un flujo de datos de Kinesis, un flujo de datos de Kinesis, un bucket de Amazon S3 o una base de datos relacional. También se puede utilizar un conector especial para imprimir la salida con fines de desarrollo. Cuando se utiliza SQL, la aplicación define los receptores como *tablas de destino* en las que se insertan los resultados. Para obtener más información, consulte [Escritura de datos mediante receptores en Managed Service para Apache Flink](how-sinks.md).

Es posible que su aplicación Python también requiera dependencias externas, como bibliotecas de Python adicionales o cualquier conector Flink que utilice su aplicación. Al empaquetar la aplicación, se deben incluir todas las dependencias que esta requiera. En este tutorial se muestra cómo incluir las dependencias de los conectores y cómo empaquetar la aplicación para su implementación en Amazon Managed Service para Apache Flink.

## Cumplimiento con los requisitos previos
<a name="gs-python-prerequisites"></a>

Para completar este tutorial, necesita tener lo siguiente:
+ **Python 3.11**[, preferiblemente usando un entorno independiente como [VirtualEnv (venv)](https://docs.python.org/3.11/library/venv.html), [Conda o Miniconda](https://docs.conda.io/en/latest/).](https://docs.anaconda.com/miniconda/)
+  [Cliente Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git): si aún no lo ha hecho, instale el cliente Git.
+ [Kit de desarrollo de Java (JDK) versión 11](https://www.oracle.com/java/technologies/downloads/#java11): instale un Java JDK 11 y configure la variable de entorno `JAVA_HOME` para que apunte a su ubicación de instalación. Si no tiene un JDK 11, puede utilizar [Amazon Corretto](https://docs.aws.amazon.com/corretto) o cualquier JDK estándar de nuestra elección. 
  + Para verificar que el JDK se haya instalado correctamente, ejecute el siguiente comando. El resultado será diferente si utiliza un JDK que no sea Amazon Corretto 11. Asegúrese de que la versión sea 11.x.

    ```
    $ java --version
    
    openjdk 11.0.23 2024-04-16 LTS
    OpenJDK Runtime Environment Corretto-11.0.23.9.1 (build 11.0.23+9-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.23.9.1 (build 11.0.23+9-LTS, mixed mode)
    ```
+ [Apache Maven](https://maven.apache.org/): instale Apache Maven si aún no lo ha hecho. Para obtener más información, consulte [Instalación de Apache Maven](https://maven.apache.org/install.html).
  + Para probar la instalación de Apache Maven, use el siguiente comando:

    ```
    $ mvn -version
    ```

**nota**  
Aunque su aplicación está escrita en Python, Apache Flink se ejecuta en la máquina virtual de Java (JVM). Distribuye la mayoría de las dependencias, como el conector de Kinesis, como archivos JAR. Para administrar estas dependencias y empaquetar la aplicación en un archivo ZIP, utilice [Apache Maven](https://maven.apache.org/). En este tutorial se explica cómo hacerlo. 

**aviso**  
Se recomienda que se utilice Python 3.11 para el desarrollo local. Se trata de la misma versión de Python que utiliza Amazon Managed Service para Apache Flink con el tiempo de ejecución de Flink 1.19.   
La instalación de la biblioteca Python Flink 1.19 en Python 3.12 podría fallar.  
Si tiene otra versión de Python instalada de forma predeterminada en su máquina, le recomendamos que cree un entorno independiente, como VirtualEnv Python 3.11.

**IDE para desarrollo local**

Le recomendamos que utilice un entorno de desarrollo como [PyCharm](https://www.jetbrains.com/pycharm/)[Visual Studio Code](https://code.visualstudio.com/) para desarrollar y compilar la aplicación.

A continuación, complete los dos primeros pasos de [Comience a utilizar Amazon Managed Service para Apache Flink (DataStream API)](getting-started.md):
+ [Configure una AWS cuenta y cree un usuario administrador](setting-up.md)
+ [Configure el AWS Command Line Interface ()AWS CLI](setup-awscli.md)

Para empezar, consulte [Creación de una aplicación de ](gs-python-createapp.md).

# Creación y ejecución de una aplicación de Managed Service para Apache Flink para Python
<a name="gs-python-createapp"></a>

En esta sección, se creará una aplicación de Managed Service para Apache Flink para una aplicación Python con un flujo de Kinesis como origen y receptor.

**Topics**
+ [

## Creación de recursos dependientes
](#gs-python-resources)
+ [

## Configuración de su entorno de desarrollo local
](#gs-python-set-up)
+ [

## Descarga y análisis del código de Python de flujo de Apache Flink
](#gs-python-download)
+ [

## Administración de las dependencias JAR
](#gs-python-jar-dependencies)
+ [

## Escritura de registros de muestra en el flujo de entrada
](#gs-python-sample-records)
+ [

## Ejecución de la aplicación a nivel local
](#gs-python-run-locally)
+ [

## Observación de los datos de entrada y salida en las transmisiones de Kinesis
](#gs-python-observe-input-output)
+ [

## Detención de la ejecución de la aplicación de forma local
](#gs-python-stop)
+ [

## Empaquetado de su código de la aplicación
](#gs-python-package-code)
+ [

## Cargue el paquete de la aplicación en un bucket de Amazon S3.
](#gs-python-upload-bucket)
+ [

## Creación y ejecución de la aplicación de Managed Service para Apache Flink
](#gs-python-7)
+ [

## Siguiente paso
](#gs-python-next-step-4)

## Creación de recursos dependientes
<a name="gs-python-resources"></a>

Antes de crear un Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes: 
+ Dos flujos de Kinesis para entrada y salida.
+ Un bucket de Amazon S3 para almacenar el código de la aplicación.

**nota**  
En este tutorial se supone que se está implementando su aplicación en la región us-east-1. Si se utiliza otra región, se deben adaptar todos los pasos en consecuencia.

### Creación de dos flujos de Kinesis
<a name="gs-python-resources-streams"></a>

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (`ExampleInputStream` y `ExampleOutputStream`) en la misma región que utilizará para implementar la aplicación (us-east-1 en este ejemplo). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.

Se puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI . Para obtener instrucciones sobre la consola, consulte [Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*. 

**Cómo crear flujos de datos (AWS CLI)**

1. Para crear la primera transmisión (`ExampleInputStream`), utilice el siguiente comando de Amazon Kinesis `create-stream` AWS CLI .

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1
   ```

1. Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1
   ```

### Crear un bucket de Amazon S3
<a name="gs-python-resources-s3"></a>

Se puede crear el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear este recurso, consulte los siguientes temas:
+ [¿Cómo se puede crear un bucket de S3?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) en la *Guía de usuario de Amazon Simple Storage Service*. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión.
**nota**  
S3 Asegúrese de crear el bucket en la región que utilice para este tutorial (us-east-1).

### Otros recursos de
<a name="gs-python-resources-cw"></a>

Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:
+ Un grupo de registro llamado `/AWS/KinesisAnalytics-java/<my-application>`.
+ Un flujo de registro llamado `kinesis-analytics-log-stream`.

## Configuración de su entorno de desarrollo local
<a name="gs-python-set-up"></a>

Para el desarrollo y la depuración, se puede ejecutar la aplicación Python Flink en su máquina. Se puede iniciar la aplicación desde la línea de comandos con `python main.py` o en un IDE de Python de su elección. 

**nota**  
En su máquina de desarrollo, debe tener instalados Python 3.10 o 3.11, Java 11, Apache Maven y Git. Le recomendamos que utilice un IDE como [PyCharm](https://www.jetbrains.com/pycharm/)[Visual Studio Code](https://code.visualstudio.com/). Para comprobar que cumple todos los requisitos previos, consulte [Cumplimiento de los requisitos previos para realizar los ejercicios](gs-python.md#gs-python-prerequisites) antes de continuar. 

### Instale la PyFlink biblioteca
<a name="gs-python-install-pyflink"></a>

Para desarrollar la aplicación y ejecutarla localmente, debe instalar la biblioteca Python de Flink.

1. Cree un entorno Python independiente con VirtualEnv Conda o cualquier herramienta similar de Python. 

1. Instale la PyFlink biblioteca en ese entorno. Utilice la misma versión de tiempo de ejecución de Apache Flink que utilizará en Amazon Managed Service para Apache Flink. Actualmente, el tiempo de ejecución recomendado es 1.19.1. 

   ```
   $ pip install apache-flink==1.19.1
   ```

1. Asegúrese de que el entorno esté activo cuando ejecute la aplicación. Si ejecuta la aplicación en el IDE, asegúrese de que este utilice el entorno como tiempo de ejecución. El procedimiento depende del IDE que esté utilizando. 
**nota**  
Solo necesita instalar la PyFlink biblioteca. **No** se necesita instalar un clúster de Apache Flink en su máquina. 

### Autentica tu sesión AWS
<a name="gs-python-authenticate"></a>

La aplicación utiliza los flujos de datos de Kinesis para publicar datos. Si se ejecuta de forma local, debe tener una sesión AWS autenticada válida con permisos para escribir en la transmisión de datos de Kinesis. Siga los pasos siguientes para autenticar su sesión:

1. Si no tiene configurado el AWS CLI perfil con un nombre específico con una credencial válida, consulte. [Configure el AWS Command Line Interface ()AWS CLI](setup-awscli.md)

1. Compruebe que AWS CLI está correctamente configurado y que sus usuarios tienen permisos para escribir en la transmisión de datos de Kinesis publicando el siguiente registro de prueba:

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. Si su IDE tiene un complemento con el que integrarse AWS, puede usarlo para pasar las credenciales a la aplicación que se ejecuta en el IDE. Para obtener más información, consulte [AWS Toolkit for PyCharm](https://aws.amazon.com/pycharm/), [AWS Toolkit for Visual Studio](https://aws.amazon.com/visualstudiocode/) Code [AWS y Toolkit for](https://aws.amazon.com/intellij/) IntelliJ IDEA.

## Descarga y análisis del código de Python de flujo de Apache Flink
<a name="gs-python-download"></a>

El código de la aplicación Python para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

1. Clone el repositorio remoto con el siguiente comando:

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. Vaya al directorio `./python/GettingStarted`.

### Revisión de los componentes de la aplicación
<a name="gs-python-review"></a>

El código de la aplicación se encuentra en `main.py`. Usamos SQL incrustado en Python para definir el flujo de la aplicación. 

**nota**  
Para una experiencia de desarrollador optimizada, la aplicación está diseñada para ejecutarse sin cambios de código tanto en Amazon Managed Service para Apache Flink como de forma local, para el desarrollo en su máquina. La aplicación utiliza la variable de entorno `IS_LOCAL = true` para detectar si se está ejecutando de forma local. Se debe configurar la variable de entorno `IS_LOCAL = true` en el intérprete de comandos o en la configuración de ejecución del IDE.
+ La aplicación configura el entorno de ejecución y lee la configuración del tiempo de ejecución. Para funcionar tanto en Amazon Managed Service para Apache Flink como de forma local, la aplicación comprueba la variable `IS_LOCAL`. 
  + El siguiente es el comportamiento predeterminado cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink:

    1. Cargue las dependencias empaquetadas con la aplicación. Para más información, consulte (enlace)

    1. Cargue la configuración desde las propiedades de tiempo de ejecución que se definan en la aplicación Amazon Managed Service para Apache Flink. Para más información, consulte (enlace)
  + Cuando la aplicación detecte `IS_LOCAL = true` cuando se ejecuta la aplicación de forma local:

    1. Carga las dependencias externas del proyecto.

    1. Carga la configuración desde el archivo `application_properties.json` incluido en el proyecto. 

       ```
       ...
       APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"
       ...
       is_local = (
           True if os.environ.get("IS_LOCAL") else False
       )
       ...
       if is_local:
           APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"
           CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
           table_env.get_config().get_configuration().set_string(
               "pipeline.jars",
               "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar",
           )
       ```
+ La aplicación define una tabla de origen con una declaración `CREATE TABLE` mediante el [conector de Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/). En esta tabla se leen los datos del flujo de Kinesis de entrada. La aplicación toma el nombre del flujo, la región y la posición inicial de la configuración del tiempo de ejecución. 

  ```
  table_env.execute_sql(f"""
          CREATE TABLE prices (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3),
                  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{input_stream_name}',
                  'aws.region' = '{input_stream_region}',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                ) """)
  ```
+ En este ejemplo, la aplicación también define una mesa de recepción mediante el [conector de Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/). Esta tabla envía los datos al flujo de Kinesis de salida.

  ```
  table_env.execute_sql(f"""
              CREATE TABLE output (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3)
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{output_stream_name}',
                  'aws.region' = '{output_stream_region}',
                  'sink.partitioner-field-delimiter' = ';',
                  'sink.batch.max-size' = '100',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                )""")
  ```
+ Por último, la aplicación ejecuta un código SQL que `INSERT INTO...` la tabla de receptor de la tabla de origen. En una aplicación más compleja, es probable que tenga que realizar pasos adicionales para transformar los datos antes de escribirlos en el receptor. 

  ```
  table_result = table_env.execute_sql("""INSERT INTO output 
          SELECT ticker, price, event_time FROM prices""")
  ```
+ Debe añadir otro paso al final de la función `main()` para ejecutar la aplicación de manera local:

  ```
  if is_local:
      table_result.wait()
  ```

  Sin esta instrucción, la aplicación finaliza de inmediato cuando se ejecuta de manera local. No se debe ejecutar esta declaración cuando ejecute su aplicación en Amazon Managed Service para Apache Flink.

## Administración de las dependencias JAR
<a name="gs-python-jar-dependencies"></a>

Por lo general, una PyFlink aplicación requiere uno o más conectores. La aplicación de este tutorial usa el [conector de Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/). Como Apache Flink se ejecuta en la JVM de Java, los conectores se distribuyen como archivos JAR, independientemente de si implementa la aplicación en Python. Se deben empaquetar estas dependencias con la aplicación cuando se la implemente en Amazon Managed Service para Apache Flink. 

En este ejemplo, mostramos cómo usar Apache Maven para recuperar las dependencias y empaquetar la aplicación para que se ejecute en Managed Service para Apache Flink.

**nota**  
Existen formas alternativas de buscar y empaquetar las dependencias. En este ejemplo se muestra un método que funciona correctamente con uno o más conectores. También permite ejecutar la aplicación de forma local, para su desarrollo y en Managed Service para Apache Flink sin necesidad de realizar cambios en el código. 

### Uso del archivo pom.xml
<a name="gs-python-jar-pom"></a>

Apache Maven usa el archivo `pom.xml` para controlar las dependencias y el empaquetado de las aplicaciones. 

Todas las dependencias del JAR se especifican en el archivo `pom.xml` del bloque `<dependencies>...</dependencies>`. 

```
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>4.3.0-1.19</version>
        </dependency>
    </dependencies>
    ...
```

Para encontrar el artefacto y la versión del conector correctos que se van a utilizar, consulte [Uso de los conectores de Apache Flink con Managed Service para Apache Flink](how-flink-connectors.md). Asegúrese de consultar la versión de Apache Flink que está utilizando. En este ejemplo, usaremos el conector de Kinesis. Para Apache Flink 1.19, la versión del conector es `4.3.0-1.19`.

**nota**  
Si se utiliza Apache Flink 1.19, no hay ninguna versión del conector publicada específicamente para esta versión. Utilice los conectores publicados para la versión 1.18.

### Descarga y empaquetado de dependencias
<a name="gs-python-dependencies-download"></a>

Use Maven para descargar las dependencias definidas en el archivo `pom.xml` y empaquetarlas para la aplicación Python Flink. 

1. Vaya al directorio que contiene el proyecto de introducción a Python llamado `python/GettingStarted`.

1. Use el siguiente comando:

```
$ mvn package
```

Maven crea un nuevo archivo llamado `./target/pyflink-dependencies.jar`. Cuando desarrolla localmente en su máquina, la aplicación Python busca este archivo. 

**nota**  
Si se olvida de ejecutar este comando, cuando intentes ejecutar la aplicación, esta fallará con el siguiente error: **No se ha encontrado ninguna fábrica para el identificador “kinesis”**.

## Escritura de registros de muestra en el flujo de entrada
<a name="gs-python-sample-records"></a>

En esta sección, se envían registros de muestra al flujo para que los procese la aplicación. Tiene dos opciones para generar datos de muestra: mediante un script de Python o el [generador de datos de Kinesis](https://github.com/awslabs/amazon-kinesis-data-generator).

### Generación de datos de muestra mediante un script de Python
<a name="gs-python-sample-data"></a>

Se puede utilizar un script de Python para enviar registros de muestra al flujo.

**nota**  
Para ejecutar este script de Python, debe usar Python 3.x y tener instalada la biblioteca [AWS SDK para Python (Boto)](https://aws.amazon.com/developer/language/python/).

**Para empezar a enviar datos de prueba al flujo de entrada de Kinesis:**

1. Descargue el script de `stock.py` Python del generador de [datos del GitHub repositorio del generador](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator) de datos.

1. Ejecute el script `stock.py`:

   ```
   $ python stock.py
   ```

Mantenga el script en ejecución mientras completa el resto del tutorial. Ahora se puede ejecutar la aplicación Apache Flink.

### Genere datos de muestra con Kinesis Data Generator
<a name="gs-python-sample-kinesis"></a>

Como alternativa a la secuencia de comandos de Python, se puede utilizar el [Generador de datos de Kinesis](https://github.com/awslabs/amazon-kinesis-data-generator), también disponible en una [versión alojada](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html), para enviar datos de muestra al azar al flujo. Kinesis Data Generator se ejecuta en su navegador y no se necesita instalar nada en su máquina. 

**Para configurar y ejecutar Kinesis Data Generator:**

1. Siga las instrucciones de la [documentación de Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) para configurar el acceso a la herramienta. Ejecutará una CloudFormation plantilla que configurará un usuario y una contraseña. 

1. Acceda a Kinesis Data Generator a través de la URL generada por la CloudFormation plantilla. Encontrará la URL en la pestaña Resultados **una** vez que haya completado la CloudFormation plantilla. 

1. Configure el generador de datos:
   + **Región:** seleccione la región que está utilizando para este tutorial: us-east-1
   + **Flujo de flujo/entrega:** seleccione el flujo de entrada que utilizará la aplicación: `ExampleInputStream`
   + **Registros por segundo:** 100
   + **Plantilla de registro:** copie y pegue la siguiente plantilla:

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. Pruebe la plantilla: elija la **plantilla de prueba** y compruebe que el registro generado es semejante al siguiente:

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. Inicie el generador de datos: elija **Seleccionar enviar datos**.

Kinesis Data Generator ahora envía datos al `ExampleInputStream`. 

## Ejecución de la aplicación a nivel local
<a name="gs-python-run-locally"></a>

Se puede probar la aplicación localmente, ejecutándola desde la línea de comandos con `python main.py` o desde su IDE.

Para ejecutar la aplicación de forma local, debe tener instalada la versión correcta de la PyFlink biblioteca, tal y como se describe en la sección anterior. Para más información, consulte (enlace)

**nota**  
Antes de continuar, compruebe que las secuencias de entrada y salida estén disponibles. Consulte [Crear dos Amazon Kinesis Data Streams](get-started-exercise.md#get-started-exercise-1). Además, compruebe que tiene permiso para leer y escribir en ambas secuencias. Consulte [Autentica tu sesión AWS](get-started-exercise.md#get-started-exercise-2-5). 

### Importación del proyecto de Python en su IDE
<a name="gs-python-import"></a>

Para empezar a trabajar en la aplicación en su IDE, debe importarla como proyecto de Python. 

El repositorio que ha clonado contiene varios ejemplos. Cada ejemplo es un proyecto independiente. Para este tutorial, se importa el contenido del subdirectorio `./python/GettingStarted` a su IDE. 

Importe el código como un proyecto de Python existente.

**nota**  
El proceso exacto para importar un nuevo proyecto de Python varía según el IDE que se esté utilizando.

### Compruebe la configuración de la aplicación local
<a name="gs-python-check-configuration"></a>

Cuando se ejecuta localmente, la aplicación utiliza la configuración del archivo `application_properties.json` de la carpeta de recursos del proyecto en `./src/main/resources`. Se puede editar este archivo para usar diferentes regiones o nombres de flujo de Kinesis.

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### Ejecución de la aplicación Python de manera local
<a name="gs-python-run-locally"></a>

Se puede ejecutar la aplicación localmente, ya sea desde la línea de comandos como un script de Python normal o desde el IDE. 

**Ejecución de la aplicación en la línea de comandos**

1. Asegúrese de que el entorno Python independiente, como Conda o VirtualEnv donde instaló la biblioteca Python Flink, esté activo actualmente. 

1. Asegúrese de haber ejecutado `mvn package` al menos una vez. 

1. Establezca la variable de entorno `IS_LOCAL = true`:

   ```
   $ export IS_LOCAL=true
   ```

1. Ejecute la aplicación como un script de Python normal.

   ```
   $python main.py
   ```

**Ejecución de la aplicación desde el IDE**

1. Configure su IDE para ejecutar el script `main.py` con la siguiente configuración:

   1. Utilice el entorno Python independiente, como Conda o VirtualEnv donde instaló la PyFlink biblioteca. 

   1. Utilice las AWS credenciales para acceder a las transmisiones de datos de entrada y salida de Kinesis. 

   1. Configurar `IS_LOCAL = true`.

1. El proceso exacto para establecer la configuración de ejecución depende del IDE y varía. 

1. Una vez configurado el IDE, ejecute el script de Python y utilice las herramientas proporcionadas por el IDE mientras se ejecuta la aplicación. 

### Inspección de los registros de la aplicación localmente
<a name="gs-python-run-IDE"></a>

Cuando se ejecuta de forma local, la aplicación no muestra ningún registro en la consola, aparte de unas pocas líneas que se imprimen y se muestran cuando se inicia la aplicación. PyFlink escribe los registros en un archivo del directorio donde está instalada la biblioteca Python Flink. La aplicación imprime la ubicación de los registros cuando se inicia. También puede ejecutar el comando siguiente para encontrar los registros:

```
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
```

1. Enumere los archivos en el directorio de registro. Por lo general, encontrará un único archivo `.log`.

1. Siga el archivo mientras se ejecuta la aplicación: `tail -f <log-path>/<log-file>.log`.

## Observación de los datos de entrada y salida en las transmisiones de Kinesis
<a name="gs-python-observe-input-output"></a>

Se pueden observar los registros enviados al flujo de entrada por el (que genera un ejemplo de Python) o el generador de datos de Kinesis (enlace) mediante el **visor de datos** de la consola de Amazon Kinesis. 

**Observación de los registros:**  [Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)  Compruebe que la región es la misma en la que está ejecutando este tutorial, que es us-east-1 Este de EE. UU. (Norte de Virginia) de forma predeterminada. Cambie la región si no coincide.    Elija **Flujos de datos**.    Selección del flujo que desee observar, `ExampleInputStream` o `ExampleOutputStream.`   Seleccione la pestaña **Visor de datos**.    Elija cualquier **partición**, mantenga el valor **Último** como **posición inicial** y, a continuación, seleccione **Obtener registros**. Es posible que aparezca el error “No se ha encontrado ningún registro para esta solicitud”. Si es así, seleccione **Volver a intentar obtener los registros**. Se muestran los registros más recientes publicados en el flujo.    Elija el valor de la columna Datos para inspeccionar el contenido del registro en formato JSON.   

## Detención de la ejecución de la aplicación de forma local
<a name="gs-python-stop"></a>

Detenga la aplicación que se está ejecutando en el IDE. El IDE normalmente ofrece una opción de “parada”. La ubicación y el método exactos dependen del IDE. 

## Empaquetado de su código de la aplicación
<a name="gs-python-package-code"></a>

En esta sección, se usa Apache Maven para empaquetar el código de la aplicación y todas las dependencias necesarias en un archivo .zip. 

Vuelva a ejecutar el comando del paquete Maven:

```
$ mvn package
```

Este comando genera el archivo `target/managed-flink-pyflink-getting-started-1.0.0.zip`.

## Cargue el paquete de la aplicación en un bucket de Amazon S3.
<a name="gs-python-upload-bucket"></a>

En esta sección, se cargará el archivo .zip que creó en la sección anterior en el bucket de Amazon Simple Storage Service (Amazon S3) que creó al principio de este tutorial. Si no se ha completado este paso, consulte (enlace).

**Cómo cargar el archivo JAR del código de la aplicación**

1. Abra la consola de Amazon S3 en [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Elija el bucket que creó anteriormente para el código de la aplicación.

1. Seleccione **Cargar**.

1. Elija **Add files**.

1. Navegue hasta el archivo.zip generado en el paso anterior: `target/managed-flink-pyflink-getting-started-1.0.0.zip`. 

1. Elija **Cargar** sin cambiar ninguna otra configuración.

## Creación y ejecución de la aplicación de Managed Service para Apache Flink
<a name="gs-python-7"></a>

Se puede crear y ejecutar una aplicación de Managed Service para Apache Flink mediante la consola o la AWS CLI. Para este tutorial, usaremos la consola. 

### Creación de la aplicación
<a name="gs-python-7-console-create"></a>

1. Inicie sesión en y abra la Consola de administración de AWS consola de Amazon MSF en https://console.aws.amazon.com /flink.

1. Compruebe que ha seleccionado la región correcta: Este de EE. UU. (Norte de Virginia) us-east-1.

1. Abra el menú de la derecha y seleccione **Aplicaciones de Apache Flink** y luego **Crear aplicación de streaming.** Como alternativa, seleccione **Crear aplicación de flujo** en la sección **Introducción** de la página inicial. 

1. En la página **Crear aplicaciones de flujo**:
   + En **Elija un método para configurar la aplicación de procesamiento de transmisiones**, elija **Crear desde cero**.
   + En **Configuración de Apache Flink, versión Application Flink**, elija **Apache Flink 1.19**.
   + En **Configuración de aplicaciones**:
     + En **Nombre de la aplicación**, escriba **MyApplication**.
     + En **Descripción**, escriba **My Python test app**.
     + En **Acceso a los recursos de la aplicación**, selecciona **Crear o actualizar el rol kinesis-analytics-MyApplication-us de IAM -east-1** con las políticas requeridas.
   + En **Configuración de la plantilla de aplicaciones**:
     + En **Plantillas**, elija **Desarrollo**.
   + Elija **Crear aplicación de flujos**.

**nota**  
Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:  
Política: `kinesis-analytics-service-MyApplication-us-west-2`
Rol: `kinesisanalytics-MyApplication-us-west-2`
Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente * Kinesis Data Analytics*. El nombre de los recursos que se generan automáticamente lleva el prefijo `kinesis-analytics` por motivos de compatibilidad con versiones anteriores.

### Modificar la política de IAM
<a name="gs-python-7-console-iam"></a>

Edite la política de IAM para añadir los permisos para acceder al bucket de Amazon S3.

**Cómo editar la política de IAM para añadir los permisos para el bucket de S3**

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Elija **Políticas**. Elija la política **`kinesis-analytics-service-MyApplication-us-east-1`** que la consola creó en su nombre en la sección anterior. 

1. Elija **Editar política** y, a continuación, elija la pestaña **JSON**.

1. Añada la sección subrayada de la siguiente política de ejemplo a la política. Sustituya la cuenta de muestra IDs (*012345678901*) por su ID de cuenta.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1. Elija **Guardar cambios** y después **Probar**.

### Configurar la aplicación
<a name="gs-python-7-console-configure"></a>

Edite la configuración de la aplicación para establecer el artefacto del código de la aplicación. 

**Cómo configurar la aplicación**

1. En la **MyApplication**página, selecciona **Configurar**.

1. En la sección **Ubicación del código de la aplicación**:
   + En **Bucket de Amazon S3**, seleccione el bucket que creó anteriormente para el código de la aplicación. Elija **Examinar** y seleccione el bucket correcto y, a continuación, seleccione **Elegir**. No seleccione el nombre del bucket. 
   + En **Ruta al objeto de Amazon S3**, introduzca **managed-flink-pyflink-getting-started-1.0.0.zip**.

1. En **Permisos de acceso**, seleccione **Crear/actualizar `kinesis-analytics-MyApplication-us-east-1` del rol de IAM con las políticas requeridas**. 

1. Vaya a **Propiedades de tiempo de ejecución** y mantenga los valores predeterminados para todas las demás configuraciones.

1. Seleccione **Añadir nuevo elemento** y añada cada uno de los siguientes parámetros:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/gs-python-createapp.html)

1. No modifique ninguna de las demás secciones y elija **Guardar cambios**.

**nota**  
Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:   
Grupo de registro: `/aws/kinesis-analytics/MyApplication`
Flujo de registro: `kinesis-analytics-log-stream`

### Ejecución de la aplicación
<a name="gs-python-7-console-run"></a>

La aplicación ya está configurada y lista para ejecutarse.

**Cómo ejecutar la aplicación**

1. En la consola de Amazon Managed Service para Apache Flink, seleccione **Mi aplicación** y, a continuación, **Ejecutar**.

1. En la página siguiente, la página de configuración de restauración de la aplicación, seleccione **Ejecutar con la última instantánea** y, a continuación, seleccione **Ejecutar**. 

   El **estado** en **Detalles de la aplicación** cambia de `Ready` a `Starting` y luego a `Running` cuando se ha iniciado la aplicación.

Cuando la aplicación esté en el estado `Running`, se puede abrir el panel de control de Flink. 

**Para abrir el panel de**

1. Seleccione **Abrir el panel de control de Apache Flink**. El panel se abre en una nueva página.

1. En la lista **Trabajos en ejecución**, elija el único trabajo que pueda ver. 
**nota**  
Si se configuran las propiedades de Runtime o se editan las políticas de IAM de forma incorrecta, el estado de la solicitud podría cambiar a `Running`, pero el panel de control de Flink muestra que el trabajo se reinicia continuamente. Este es un escenario de error común si la aplicación está mal configurada o carece de permisos para acceder a los recursos externos.   
Cuando esto suceda, consulte la pestaña **Excepciones** en el panel de control de Flink para ver la causa del problema.

### Observación de las métricas de la aplicación en ejecución
<a name="gs-python-observe-metrics"></a>

En la **MyApplication**página, en la sección de ** CloudWatch métricas de Amazon**, puedes ver algunas de las métricas fundamentales de la aplicación en ejecución. 

**Visualización de las métricas**

1. Junto al botón **Actualizar**, seleccione **10 segundos** en la lista desplegable.

1. Cuando la aplicación está en ejecución y en buen estado, se puede ver que la métrica de **tiempo de actividad** aumenta continuamente.

1. La métrica **fullrestarts** debe ser cero. Si aumenta, es posible que la configuración tenga problemas. Para investigar el problema, consulte la pestaña **Excepciones** del panel de control de Flink.

1. La métrica **Número de puntos de control fallidos** debe ser cero en una aplicación en buen estado. 
**nota**  
En este panel se muestra un conjunto fijo de métricas con una granularidad de 5 minutos. Puedes crear un panel de aplicaciones personalizado con cualquier métrica del CloudWatch panel.

### Observación de los datos de salida en los flujos de Kinesis
<a name="gs-python-observe-output"></a>

Asegúrese de seguir publicando datos en la entrada, ya sea mediante el script de Python o el generador de datos de Kinesis. 

Ahora puede observar el resultado de la aplicación que se ejecuta en Managed Service for Apache Flink mediante el visor de datos del servidor [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/), de forma similar a como lo hacía anteriormente. 

**Visualización del resultado**

1. [Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Compruebe que la región es la misma que la que está utilizando para ejecutar este tutorial. Por defecto, es us-east-1 Este de EE. UU. (Norte de Virginia). De ser necesario, cambie la región.

1. Elija **Flujos de datos**. 

1. Seleccione el flujo que desea observar. Para este tutorial, escriba `ExampleOutputStream`. 

1.  Seleccione la pestaña **Visor de datos**. 

1. Seleccione cualquier **partición**, mantenga el valor **Último** como **Posición inicial** y, a continuación, elija **Obtener registros**. Es posible que aparezca el error “no se ha encontrado ningún registro para esta solicitud”. Si es así, seleccione **Volver a intentar obtener los registros**. Se muestran los registros más recientes publicados en el flujo.

1. Seleccione el valor en la columna Datos para inspeccionar el contenido del registro en formato JSON.

### Detener la aplicación
<a name="gs-python-7-console-stop"></a>

Para detener la aplicación, vaya a la página de la consola de la aplicación de Managed Service para Apache Flink denominada `MyApplication`.

**Cómo detener la aplicación**

1. En la lista desplegable **Acciones**, seleccione **Detener**.

1. El **estado** en **Detalles de la aplicación** cambia de `Running` a `Stopping` y después a `Ready` cuando se ha detenido totalmente la aplicación. 
**nota**  
No olvide que también debe dejar de enviar datos al flujo de entrada desde el script de Python o el generador de datos de Kinesis.

## Siguiente paso
<a name="gs-python-next-step-4"></a>

[Limpie los recursos AWS](gs-python-cleanup.md)

# Limpie los recursos AWS
<a name="gs-python-cleanup"></a>

Esta sección incluye procedimientos para limpiar AWS los recursos creados en el tutorial de introducción (Python).

**Topics**
+ [

## Eliminación de su aplicación de Managed Service para Apache Flink
](#gs-python-cleanup-app)
+ [

## Eliminación de sus flujos de datos de Kinesis
](#gs-python-cleanup-msk)
+ [

## Eliminación de objetos y el bucket de Amazon S3
](#gs-python-cleanup-s3)
+ [

## Eliminación de sus recursos de IAM
](#gs-python-cleanup-iam)
+ [

## CloudWatch Elimine sus recursos
](#gs-python-cleanup-cw)

## Eliminación de su aplicación de Managed Service para Apache Flink
<a name="gs-python-cleanup-app"></a>

Utilice el siguiente procedimiento para eliminar la aplicación.

**Para eliminar la aplicación**

1. [Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. En el panel Managed Service for Apache Flink, elija. **MyApplication**

1. En la lista desplegable **Acciones**, seleccione **Eliminar** y, a continuación, confirme la eliminación.

## Eliminación de sus flujos de datos de Kinesis
<a name="gs-python-cleanup-msk"></a>

1. Inicie sesión en y abra la Consola de administración de AWS consola de Amazon MSF en https://console.aws.amazon.com /flink.

1. Elija **Flujos de datos**.

1. Seleccione los flujos que creó, `ExampleInputStream` y `ExampleOutputStream`. 

1. En la lista desplegable **Acciones**, seleccione **Eliminar** y, a continuación, confirme la eliminación.

## Eliminación de objetos y el bucket de Amazon S3
<a name="gs-python-cleanup-s3"></a>

Siga el siguiente procedimiento para eliminar los objetos y el bucket de S3.

**Para eliminar objetos de un bucket de S3**

1. Abra la consola de Amazon S3 en [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Elija el bucket de S3 que ha creado para el artefacto de la aplicación.

1. Seleccione el artefacto de la aplicación que ha cargado, denominado. `amazon-msf-java-stream-app-1.0.jar`

1. Para confirmar la eliminación, seleccione **Eliminar**.

**Para eliminar el bucket de S3**

1. Abra la consola de Amazon S3 en [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Seleccione el bucket que ha creado para los artefactos.

1. Para confirmar la eliminación, seleccione **Eliminar**.
**nota**  
El bucket de S3 tiene que estar vacío para eliminarlo.

## Eliminación de sus recursos de IAM
<a name="gs-python-cleanup-iam"></a>

Utilice el siguiente procedimiento para eliminar sus recursos de IAM.

**Cómo eliminar sus recursos de IAM**

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. En la barra de navegación, seleccione **Políticas**.

1. En el control de filtros, introduzca **kinesis**.

1. Elija la política **kinesis-analytics-service- MyApplication -us-east-1**.

1. Seleccione **Acciones de política** y, a continuación, **Eliminar**.

1. En la barra de navegación, seleccione **Roles**.

1. Elija la función **kinesis-analytics- MyApplication** -us-east-1.

1. Elija **Eliminar rol** y, a continuación, confirme la eliminación.

## CloudWatch Elimine sus recursos
<a name="gs-python-cleanup-cw"></a>

Utilice el siguiente procedimiento para eliminar CloudWatch los recursos.

**Para eliminar sus CloudWatch recursos**

1. Abre la CloudWatch consola en [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. En la barra de navegación, elija **Registros**.

1. Elija el grupo de aws/kinesis-analytics/MyApplication registros**/**.

1. Elija **Eliminar grupo de registro** y, a continuación, confirme la eliminación.