Creación y ejecución de un servicio gestionado para la aplicación Apache Flink para Python - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Creación y ejecución de un servicio gestionado para la aplicación Apache Flink para Python

En esta sección, creará una aplicación Managed Service for Apache Flink para Python con una transmisión de Kinesis como fuente y receptor.

Cree recursos dependientes

Antes de crear un Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Dos flujos de Kinesis para entrada y salida.

  • Un bucket de Amazon S3 para almacenar el código de la aplicación.

nota

En este tutorial se asume que está desplegando la aplicación en la región us-east-1. Si utiliza otra región, debe adaptar todos los pasos en consecuencia.

Cree dos transmisiones de Kinesis

Antes de crear una aplicación de Managed Service for Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis ExampleInputStream (ExampleOutputStreamy) en la misma región que utilizará para implementar la aplicación (us-east-1 en este ejemplo). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.

Puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI . Para obtener instrucciones sobre la consola, consulte Creating and Updating Data Streams en la Guía para desarrolladores de Amazon Kinesis Data Streams.

Cómo crear flujos de datos (AWS CLI)
  1. Para crear la primera transmisión (ExampleInputStream), utilice el siguiente comando de Amazon Kinesis create-stream AWS CLI .

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

Crear un bucket de Amazon S3

Puede crear el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear este recurso, consulte los siguientes temas:

  • ¿Cómo se puede crear un bucket de S3? en la Guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único a nivel mundial, por ejemplo, añadiendo su nombre de inicio de sesión.

    nota

    Asegúrese de crear el bucket S3 en la región que utilice para este tutorial (us-east-1).

Otros recursos

Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:

  • Un grupo de registro llamado /AWS/KinesisAnalytics-java/<my-application>.

  • Un flujo de registro llamado kinesis-analytics-log-stream.

Configurar su entorno de desarrollo local

Para el desarrollo y la depuración, puede ejecutar la aplicación Python Flink en su máquina. Puede iniciar la aplicación desde la línea de comandos con python main.py o en un Python IDE de su elección.

nota

En tu máquina de desarrollo, debes tener Python 3.10 o 3.11, Java 11, Apache Maven y Git instalados. Le recomendamos que utilice un código IDE como PyCharmVisual Studio Code. Para comprobar que cumple todos los requisitos previos, consulte Cumpla con los requisitos previos para completar los ejercicios antes de continuar.

Para desarrollar la aplicación y ejecutarla localmente, debe instalar la biblioteca Python de Flink.

  1. Cree un entorno Python independiente con VirtualEnv Conda o cualquier herramienta similar de Python.

  2. Instale la PyFlink biblioteca en ese entorno. Utilice la misma versión de tiempo de ejecución de Apache Flink que utilizará en Amazon Managed Service para Apache Flink. Actualmente, el tiempo de ejecución recomendado es la 1.19.1.

    $ pip install apache-flink==1.19.1
  3. Asegúrese de que el entorno esté activo cuando ejecute la aplicación. Si ejecuta la aplicación enIDE, asegúrese de que IDE utiliza el entorno como tiempo de ejecución. El proceso depende del IDE que esté utilizando.

    nota

    Solo necesita instalar la PyFlink biblioteca. No necesita instalar un clúster de Apache Flink en su máquina.

Autentica tu sesión AWS

La aplicación utiliza los flujos de datos de Kinesis para publicar datos. Si se ejecuta de forma local, debe tener una sesión AWS autenticada válida con permisos para escribir en la transmisión de datos de Kinesis. Siga los siguientes pasos para autenticar la sesión:

  1. Si no tiene configurado el perfil con un nombre AWS CLI y una credencial válida, consulte. Configure el AWS Command Line Interface (AWS CLI)

  2. Compruebe que AWS CLI está correctamente configurado y que sus usuarios tienen permisos para escribir en la transmisión de datos de Kinesis publicando el siguiente registro de prueba:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Si IDE tiene un complemento con el que integrarse AWS, puede usarlo para pasar las credenciales a la aplicación que se ejecuta en elIDE. Para obtener más información, consulte AWS Toolkit for PyCharm, AWS Toolkit for Visual Studio Code AWS y Toolkit for IntelliJ. IDEA

Descargue y examine el código Python de streaming de Apache Flink

El código de la aplicación Python para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

  1. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Vaya al directorio ./python/GettingStarted.

Revise los componentes de la aplicación

El código de la aplicación se encuentra enmain.py. Usamos elementos SQL embebidos en Python para definir el flujo de la aplicación.

nota

Para una experiencia de desarrollador optimizada, la aplicación está diseñada para ejecutarse sin cambios de código tanto en Amazon Managed Service para Apache Flink como de forma local, para el desarrollo en su máquina. La aplicación utiliza la variable de entorno IS_LOCAL = true para detectar si se está ejecutando de forma local. Debe configurar la variable IS_LOCAL = true de entorno en su shell o en la configuración de ejecución de suIDE.

  • La aplicación configura el entorno de ejecución y lee la configuración del tiempo de ejecución. Para funcionar tanto en Amazon Managed Service for Apache Flink como de forma local, la aplicación comprueba la IS_LOCAL variable.

    • El siguiente es el comportamiento predeterminado cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink:

      1. Cargue las dependencias empaquetadas con la aplicación. Para obtener más información, consulte (enlace)

      2. Cargue la configuración desde las propiedades de tiempo de ejecución que defina en la aplicación Amazon Managed Service for Apache Flink. Para obtener más información, consulte (enlace)

    • Cuando la aplicación detecte IS_LOCAL = true cuándo se ejecuta la aplicación de forma local:

      1. Carga las dependencias externas del proyecto.

      2. Carga la configuración desde el application_properties.json archivo incluido en el proyecto.

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • La aplicación define una tabla de origen con una CREATE TABLE declaración mediante el Kinesis Connector. En esta tabla se leen los datos de la transmisión de Kinesis de entrada. La aplicación toma el nombre de la transmisión, la región y la posición inicial de la configuración del tiempo de ejecución.

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • En este ejemplo, la aplicación también define una mesa de recepción mediante el conector Kinesis. Esta tabla envía los datos a la transmisión de Kinesis de salida.

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • Por último, la aplicación ejecuta una tabla de SQL destino a partir de INSERT INTO... la tabla de origen. En una aplicación más compleja, es probable que tenga que realizar pasos adicionales para transformar los datos antes de escribirlos en el sumidero.

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • Debe añadir otro paso al final de la main() función para ejecutar la aplicación de forma local:

    if is_local: table_result.wait()

    Sin esta instrucción, la aplicación finaliza inmediatamente cuando se ejecuta localmente. No debes ejecutar esta declaración cuando ejecutes tu aplicación en Amazon Managed Service for Apache Flink.

Administre las dependencias JAR

Por lo general, una PyFlink aplicación requiere uno o más conectores. La aplicación de este tutorial usa el Kinesis Connector. Como Apache Flink se ejecuta en JavaJVM, los conectores se distribuyen como JAR archivos, independientemente de si implementa la aplicación en Python. Debes empaquetar estas dependencias con la aplicación cuando la implementes en Amazon Managed Service para Apache Flink.

En este ejemplo, mostramos cómo usar Apache Maven para recuperar las dependencias y empaquetar la aplicación para que se ejecute en Managed Service for Apache Flink.

nota

Existen formas alternativas de buscar y empaquetar las dependencias. En este ejemplo se muestra un método que funciona correctamente con uno o más conectores. También permite ejecutar la aplicación de forma local, para su desarrollo y en Managed Service for Apache Flink sin necesidad de realizar cambios en el código.

Utilice el archivo pom.xml

Apache Maven usa el pom.xml archivo para controlar las dependencias y el empaquetado de las aplicaciones.

Todas JAR las dependencias se especifican en el pom.xml archivo del bloque. <dependencies>...</dependencies>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

Para encontrar el artefacto y la versión del conector correctos que debe utilizar, consulte. Utilice los conectores Apache Flink con el servicio gestionado para Apache Flink Asegúrese de consultar la versión de Apache Flink que está utilizando. Para este ejemplo, utilizamos el conector Kinesis. Para Apache Flink 1.19, la versión del conector es. 4.3.0-1.19

nota

Si utiliza Apache Flink 1.19, no hay ninguna versión del conector publicada específicamente para esta versión. Utilice los conectores publicados para la versión 1.18.

Dependencias de descarga y paquete

Use Maven para descargar las dependencias definidas en el pom.xml archivo y empaquetarlas para la aplicación Python Flink.

  1. Navegue hasta el directorio que contiene el proyecto Python Getting Started llamadopython/GettingStarted.

  2. Ejecute el siguiente comando:

$ mvn package

Maven crea un nuevo archivo llamado./target/pyflink-dependencies.jar. Cuando desarrolla localmente en su máquina, la aplicación Python busca este archivo.

nota

Si olvidas ejecutar este comando, cuando intentes ejecutar la aplicación, esta fallará con el siguiente error: No se ha encontrado ninguna fábrica para el identificador «kinesis».

Escribe ejemplos de registros en el flujo de entrada

En esta sección, enviará registros de muestra a la transmisión para que la aplicación los procese. Tiene dos opciones para generar datos de muestra: mediante un script de Python o el generador de datos de Kinesis.

Generar datos de muestra mediante un script de Python

Puede usar un script de Python para enviar registros de muestra a la transmisión.

nota

Para ejecutar este script de Python, debe usar Python 3.x y tener instalada la biblioteca AWS SDKpara Python (Boto).

Para empezar a enviar datos de prueba a la transmisión de entrada de Kinesis:

  1. Descargue el script stock.py Python del generador de datos del GitHub repositorio del generador de datos.

  2. Ejecute el script stock.py:

    $ python stock.py

Mantenga el script en ejecución mientras completa el resto del tutorial. Ahora puede ejecutar la aplicación Apache Flink.

Genere datos de muestra con Kinesis Data Generator

Como alternativa a la secuencia de comandos de Python, puede utilizar el Generador de datos de Kinesis, también disponible en una versión alojada, para enviar datos de muestra aleatorios a la transmisión. Kinesis Data Generator se ejecuta en su navegador y no necesita instalar nada en su máquina.

Para configurar y ejecutar Kinesis Data Generator:

  1. Siga las instrucciones de la documentación de Kinesis Data Generator para configurar el acceso a la herramienta. Ejecutará una AWS CloudFormation plantilla que configurará un usuario y una contraseña.

  2. Acceda a Kinesis Data Generator a través del URL generado por la CloudFormation plantilla. Puede encontrarlo URL en la pestaña Resultados una vez que haya completado la CloudFormation plantilla.

  3. Configure el generador de datos:

    • Región: Seleccione la región que está utilizando para este tutorial: us-east-1

    • Flujo de transmisión/entrega: seleccione el flujo de entrada que utilizará la aplicación: ExampleInputStream

    • Registros por segundo: 100

    • Plantilla de registro: copia y pega la siguiente plantilla:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Probar la plantilla: elija la plantilla de prueba y compruebe que el registro generado es similar al siguiente:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Inicie el generador de datos: elija Seleccionar enviar datos.

Kinesis Data Generator ahora envía datos al. ExampleInputStream

Ejecute la aplicación localmente

Puede probar la aplicación localmente, ejecutándola desde la línea de comandos python main.py o desde suIDE.

Para ejecutar la aplicación de forma local, debe tener instalada la versión correcta de la PyFlink biblioteca, tal y como se describe en la sección anterior. Para obtener más información, consulte (enlace)

nota

Antes de continuar, compruebe que los flujos de entrada y salida estén disponibles. Consulte Cree dos transmisiones de datos de Amazon Kinesis. Además, compruebe que tiene permiso para leer y escribir en ambas transmisiones. Consulte Autentica tu sesión AWS.

Importa el proyecto de Python a tu IDE

Para empezar a trabajar en la aplicación que tienesIDE, debes importarla como un proyecto de Python.

El repositorio que has clonado contiene varios ejemplos. Cada ejemplo es un proyecto independiente. Para este tutorial, importe el contenido del ./python/GettingStarted subdirectorio a suIDE.

Importa el código como un proyecto de Python existente.

nota

El proceso exacto para importar un nuevo proyecto de Python varía según el IDE que esté utilizando.

Compruebe la configuración de la aplicación local

Cuando se ejecuta de forma local, la aplicación utiliza la configuración del application_properties.json archivo de la carpeta de recursos del proyecto correspondiente./src/main/resources. Puede editar este archivo para usar diferentes regiones o nombres de transmisión de Kinesis.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Ejecuta tu aplicación Python localmente

Puede ejecutar la aplicación localmente, ya sea desde la línea de comandos como un script de Python normal o desdeIDE.

Para ejecutar la aplicación desde la línea de comandos
  1. Asegúrese de que el entorno Python independiente, como Conda o VirtualEnv donde instaló la biblioteca Python Flink, esté activo actualmente.

  2. Asegúrese de haber corrido al mvn package menos una vez.

  3. Establezca la variable de entorno IS_LOCAL = true:

    $ export IS_LOCAL=true
  4. Ejecute la aplicación como un script de Python normal.

    $python main.py
Para ejecutar la aplicación desde el IDE
  1. Configure su IDE modo de ejecutar el main.py script con la siguiente configuración:

    1. Utilice el entorno Python independiente, como Conda o VirtualEnv donde instaló la PyFlink biblioteca.

    2. Utilice las AWS credenciales para acceder a las transmisiones de datos de entrada y salida de Kinesis.

    3. Configurar IS_LOCAL = true.

  2. El proceso exacto para establecer la configuración de ejecución depende de usted IDE y varía.

  3. Cuando haya configurado suIDE, ejecute el script de Python y utilice las herramientas proporcionadas por usted IDE mientras la aplicación está en ejecución.

Inspeccione los registros de la aplicación localmente

Cuando se ejecuta localmente, la aplicación no muestra ningún registro en la consola, aparte de unas pocas líneas que se imprimen y se muestran cuando se inicia la aplicación. PyFlink escribe los registros en un archivo del directorio donde está instalada la biblioteca Python Flink. La aplicación imprime la ubicación de los registros cuando se inicia. También puede ejecutar el siguiente comando para buscar los registros:

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. Enumere los archivos del directorio de registro. Por lo general, encontrará un único .log archivo.

  2. Siga el archivo mientras se ejecuta la aplicación:tail -f <log-path>/<log-file>.log.

Observe los datos de entrada y salida en las transmisiones de Kinesis

Puede observar los registros enviados al flujo de entrada por el generador de datos de Kinesis (que genera un ejemplo de Python) o el generador de datos de Kinesis (enlace) mediante el visor de datos de la consola de Amazon Kinesis.

Para observar los registros:

Detenga la ejecución local de la aplicación

Detenga la ejecución de la aplicación en suIDE. IDEPor lo general, proporciona una opción de «detener». La ubicación y el método exactos dependen delIDE.

Package el código de su aplicación

En esta sección, utilizará Apache Maven para empaquetar el código de la aplicación y todas las dependencias necesarias en un archivo.zip.

Vuelva a ejecutar el comando del paquete Maven:

$ mvn package

Este comando genera el archivotarget/managed-flink-pyflink-getting-started-1.0.0.zip.

Cargue el paquete de la aplicación en un bucket de Amazon S3

En esta sección, debes subir el archivo.zip que creaste en la sección anterior al bucket de Amazon Simple Storage Service (Amazon S3) que creaste al principio de este tutorial. Si no ha completado este paso, consulte (enlace).

Para cargar el JAR archivo de códigos de la aplicación
  1. Abra la consola Amazon S3 en https://console.aws.amazon.com/s3/.

  2. Elija el bucket que creó anteriormente para el código de la aplicación.

  3. Seleccione Cargar.

  4. Elija Add files.

  5. Navegue hasta el archivo.zip generado en el paso anterior:target/managed-flink-pyflink-getting-started-1.0.0.zip.

  6. Elija Cargar sin cambiar ninguna otra configuración.

Cree y configure la aplicación Managed Service for Apache Flink

Puede crear y configurar una aplicación de servicio gestionado para Apache Flink mediante la consola o el. AWS CLI Para este tutorial, utilizaremos la consola.

Creación de la aplicación

  1. Abra la consola de Managed Service for Apache Flink en https://console.aws.amazon.com /flink

  2. Compruebe que se ha seleccionado la región correcta: US East (North Virginia) us-east-1.

  3. Abre el menú de la derecha y selecciona Aplicaciones de Apache Flink y, a continuación, Crear aplicación de streaming. Como alternativa, selecciona Crear aplicación de streaming en la sección Cómo empezar de la página inicial.

  4. En la página Crear aplicaciones de streaming:

    • En Elija un método para configurar la aplicación de procesamiento de transmisiones, elija Crear desde cero.

    • Para la configuración de Apache Flink, versión Application Flink, elija Apache Flink 1.19.

    • Para la configuración de la aplicación:

      • En Nombre de la aplicación, escriba MyApplication.

      • En Descripción, escriba My Python test app.

      • En Acceso a los recursos de la aplicación, elija Crear o actualizar el IAM rol kinesis-analytics- MyApplication -us-east-1 con las políticas requeridas.

    • Para la configuración de la plantilla de aplicaciones:

      • En el caso de las plantillas, elija Desarrollo.

    • Selecciona Crear aplicación de streaming.

nota

Al crear una aplicación de servicio gestionado para Apache Flink mediante la consola, tiene la opción de crear un IAM rol y una política para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos IAM recursos se nombran con el nombre de la aplicación y la región de la siguiente manera:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Rol: kinesisanalytics-MyApplication-us-west-2

Amazon Managed Service para Apache Flink se conocía anteriormente como Kinesis Data Analytics. El nombre de los recursos que se generan automáticamente lleva un prefijo kinesis-analytics por motivos de compatibilidad con versiones anteriores.

Edite la política IAM

Edite la IAM política para añadir permisos de acceso al bucket de Amazon S3.

Para editar la IAM política de añadir permisos al bucket de S3
  1. Abra la IAM consola en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-east-1 que la consola creó en su nombre en la sección anterior.

  3. Seleccione Editar y, a continuación, elija la JSONpestaña.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Sustituya la cuenta de muestra IDs (012345678901) por tu ID de cuenta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Elija Guardar cambios y después Probar.

Configura la aplicación

Edite la configuración de la aplicación para establecer el artefacto del código de la aplicación.

Cómo configurar la aplicación
  1. En la MyApplicationpágina, elija Configurar.

  2. En la sección de ubicación del código de la aplicación:

    • Para el bucket de Amazon S3, seleccione el bucket que creó anteriormente para el código de la aplicación. Elija Browse y seleccione el bucket correcto y, a continuación, elija Choose. No selecciones el nombre del depósito.

    • En Ruta al objeto de Amazon S3, introduzca managed-flink-pyflink-getting-started-1.0.0.zip.

  3. Para los permisos de acceso, selecciona Crear o actualizar IAM el rol kinesis-analytics-MyApplication-us-east-1 con las políticas requeridas.

  4. Vaya a las propiedades de Runtime y mantenga los valores predeterminados para todas las demás configuraciones.

  5. Seleccione Añadir nuevo elemento y añada cada uno de los siguientes parámetros:

    ID de grupo Clave Valor
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. No modifique ninguna de las demás secciones y elija Guardar cambios.

nota

Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Ejecución de la aplicación

La aplicación ya está configurada y lista para ejecutarse.

Cómo ejecutar la aplicación
  1. En la consola de Amazon Managed Service for Apache Flink, seleccione Mi aplicación y, a continuación, Ejecutar.

  2. En la página siguiente, la página de configuración de restauración de la aplicación, seleccione Ejecutar con la última instantánea y, a continuación, seleccione Ejecutar.

    El estado de la aplicación detalla las transiciones desde Starting y Ready hasta Running cuando se ha iniciado la aplicación.

Cuando la aplicación esté en Running estado, ahora puede abrir el panel de control de Flink.

Para abrir el panel de
  1. Seleccione Abrir el panel de control de Apache Flink. El panel se abre en una página nueva.

  2. En la lista de trabajos en ejecución, elige el único trabajo que puedas ver.

    nota

    Si configuras las propiedades de Runtime o editas las IAM políticas de forma incorrecta, el estado de la solicitud podría cambiar aRunning, pero el panel de control de Flink muestra que el trabajo se reinicia continuamente. Este es un escenario de error común si la aplicación está mal configurada o carece de permisos para acceder a los recursos externos.

    Cuando esto suceda, consulte la pestaña Excepciones en el panel de control de Flink para ver la causa del problema.

Observe las métricas de la aplicación en ejecución

En la MyApplicationpágina, en la sección de CloudWatch métricas de Amazon, puedes ver algunas de las métricas fundamentales de la aplicación en ejecución.

Para ver las métricas
  1. Junto al botón Actualizar, selecciona 10 segundos en la lista desplegable.

  2. Cuando la aplicación está en ejecución y en buen estado, puede ver que la métrica de tiempo de actividad aumenta continuamente.

  3. La métrica de reinicios completos debe ser cero. Si aumenta, es posible que la configuración tenga problemas. Para investigar el problema, consulta la pestaña Excepciones del panel de control de Flink.

  4. La métrica Número de puntos de control fallidos debe ser cero en una aplicación en buen estado.

    nota

    Este panel muestra un conjunto fijo de métricas con una granularidad de 5 minutos. Puede crear un panel de aplicaciones personalizado con cualquier métrica del CloudWatch panel.

Observe los datos de salida en las transmisiones de Kinesis

Asegúrese de seguir publicando datos en la entrada, ya sea mediante el script de Python o el generador de datos de Kinesis.

Ahora puede observar el resultado de la aplicación que se ejecuta en Managed Service for Apache Flink mediante el visor de datos del servidor https://console.aws.amazon.com/kinesis/, de forma similar a como lo hacía anteriormente.

Para ver el resultado
  1. Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.

  2. Compruebe que la región es la misma que la que está utilizando para ejecutar este tutorial. De forma predeterminada, es US-East-1US East (Virginia del Norte). Cambie la región si es necesario.

  3. Elija Flujos de datos.

  4. Seleccione la transmisión que desee observar. Para este tutorial, escriba ExampleOutputStream.

  5. Seleccione la pestaña Visor de datos.

  6. Seleccione cualquier fragmento, mantenga el valor Último como posición inicial y, a continuación, elija Obtener registros. Es posible que aparezca el error «no se ha encontrado ningún registro para esta solicitud». Si es así, selecciona Volver a intentar obtener los registros. Se muestran los registros más recientes publicados en la transmisión.

  7. Seleccione el valor en la columna Datos para inspeccionar el contenido del registro en JSON formato.

Detenga la aplicación

Para detener la aplicación, vaya a la página de la consola de la aplicación Managed Service for Apache Flink denominada. MyApplication

Cómo detener la aplicación
  1. En la lista desplegable Acción, seleccione Detener.

  2. El estado en los detalles de la aplicación pasa de Running a yStopping, después, a Ready cuando la aplicación se detiene por completo.

    nota

    No olvide dejar también de enviar datos al flujo de entrada desde el script de Python o el generador de datos de Kinesis.

Siguiente paso

Limpie los recursos AWS