Cree y ejecute un servicio gestionado para la aplicación Apache Flink - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Cree y ejecute un servicio gestionado para la aplicación Apache Flink

En este paso, creará una aplicación de Managed Service for Apache Flink con los flujos de datos de Kinesis como fuente y receptor.

Cree recursos dependientes

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Dos flujos de datos de Kinesis para entrada y salida

  • Un bucket de Amazon S3 para almacenar el código de la aplicación

    nota

    En este tutorial se supone que está desplegando la aplicación en la región us-east-1 US East (Virginia del Norte). Si utiliza otra región, adapte todos los pasos en consecuencia.

Cree dos transmisiones de datos de Amazon Kinesis

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (ExampleInputStream y ExampleOutputStream). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.

Puede crear estas transmisiones mediante la consola de Amazon Kinesis o el siguiente AWS CLI comando. Para obtener instrucciones sobre la consola, consulte Creating and Updating Data Streams en la Guía para desarrolladores de Amazon Kinesis Data Streams. Para crear las transmisiones mediante el AWS CLI, utilice los siguientes comandos, ajustándolos a la región que utilice para su aplicación.

Cómo crear flujos de datos (AWS CLI)
  1. Para crear la primera transmisión (ExampleInputStream), utilice el siguiente comando de Amazon Kinesis create-stream AWS CLI :

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. Para crear la segunda transmisión que la aplicación utiliza para escribir el resultado, ejecute el mismo comando y cambie el nombre de la transmisión aExampleOutputStream:

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

Cree un bucket de Amazon S3 para el código de la aplicación

Puede crear el bucket de Amazon S3 usando la consola. Para obtener información sobre cómo crear un bucket de Amazon S3 mediante la consola, consulte Creación de un bucket en la Guía del usuario de Amazon S3. Asigne un nombre al bucket de Amazon S3 con un nombre único a nivel mundial, por ejemplo, añadiendo su nombre de inicio de sesión.

nota

Asegúrese de crear el depósito en la región que utiliza para este tutorial (us-east-1).

Otros recursos

Al crear la aplicación, Managed Service for Apache Flink crea automáticamente los siguientes CloudWatch recursos de Amazon si aún no existen:

  • Un grupo de registro llamado /AWS/KinesisAnalytics-java/<my-application>

  • Un flujo de registro llamado kinesis-analytics-log-stream

Configurar su entorno de desarrollo local

Para el desarrollo y la depuración, puede ejecutar la aplicación Apache Flink en su máquina directamente desde su IDE elección. Todas las dependencias de Apache Flink se gestionan como las dependencias normales de Java con Apache Maven.

nota

En su máquina de desarrollo, debe tener Java JDK 11, Maven y Git instalados. Le recomendamos que utilice un entorno de desarrollo como Eclipse, Java, Neon o IDEAIntelliJ. Para comprobar que cumple todos los requisitos previos, consulte. Cumpla con los requisitos previos para completar los ejercicios No necesita instalar un clúster de Apache Flink en su máquina.

Autentica tu sesión AWS

La aplicación utiliza los flujos de datos de Kinesis para publicar datos. Si se ejecuta de forma local, debe tener una sesión AWS autenticada válida con permisos para escribir en la transmisión de datos de Kinesis. Siga los siguientes pasos para autenticar la sesión:

  1. Si no tiene configurado el perfil con un nombre AWS CLI y una credencial válida, consulte. Configure el AWS Command Line Interface (AWS CLI)

  2. Compruebe que AWS CLI está correctamente configurado y que sus usuarios tienen permisos para escribir en la transmisión de datos de Kinesis publicando el siguiente registro de prueba:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Si IDE tiene un complemento con el que integrarse AWS, puede usarlo para pasar las credenciales a la aplicación que se ejecuta en elIDE. Para obtener más información, consulte AWS Toolkit for IDEA IntelliJ y Toolkit for Eclipse AWS .

Descargue y examine el código Java de streaming de Apache Flink

El código de la aplicación Java para este ejemplo está disponible en. GitHub Para descargar el código de la aplicación, haga lo siguiente:

  1. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Vaya al directorio amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted.

Revise los componentes de la aplicación

La aplicación está completamente implementada en la com.amazonaws.services.msf.BasicStreamingJob clase. El main() método define el flujo de datos para procesar los datos de transmisión y ejecutarlos.

nota

Para una experiencia de desarrollador optimizada, la aplicación está diseñada para ejecutarse sin cambios en el código tanto en Amazon Managed Service para Apache Flink como de forma local, para su IDE desarrollo.

  • Para leer la configuración del tiempo de ejecución para que funcione cuando se ejecute en Amazon Managed Service para Apache Flink y en el suyoIDE, la aplicación detecta automáticamente si se ejecuta de forma independiente de forma local en el. IDE En ese caso, la aplicación carga la configuración del tiempo de ejecución de forma diferente:

    1. Cuando la aplicación detecte que se está ejecutando en modo independiente en su equipoIDE, cree el application_properties.json archivo incluido en la carpeta de recursos del proyecto. El contenido del archivo es el siguiente.

    2. Cuando la aplicación se ejecuta en Amazon Managed Service for Apache Flink, el comportamiento predeterminado carga la configuración de la aplicación desde las propiedades de tiempo de ejecución que defina en la aplicación Amazon Managed Service for Apache Flink. Consulte Cree y configure la aplicación Managed Service for Apache Flink.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • El main() método define el flujo de datos de la aplicación y lo ejecuta.

    • Inicializa los entornos de streaming predeterminados. En este ejemplo, mostramos cómo crear tanto el que se va StreamExecutionEnvironment a utilizar con la tabla como el DataSteam API StreamTableEnvironment que se va a utilizar con SQL la tablaAPI. Los dos objetos del entorno son dos referencias independientes al mismo entorno de ejecución, para utilizarlos de forma diferenteAPIs.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • Cargue los parámetros de configuración de la aplicación. Esto los cargará automáticamente desde el lugar correcto, en función de dónde se ejecute la aplicación:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • La aplicación define una fuente mediante el conector Kinesis Consumer para leer los datos del flujo de entrada. La configuración del flujo de entrada se define en PropertyGroupId =InputStream0. El nombre y la región del flujo se encuentran en las propiedades nombradas stream.name yaws.region, respectivamente. Para simplificar, esta fuente lee los registros como una cadena.

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • A continuación, la aplicación define un receptor mediante el conector del colector de Kinesis Streams para enviar datos al flujo de salida. El nombre y la región del flujo de salida se definen en PropertyGroupId =OutputStream0, de forma similar al flujo de entrada. El receptor está conectado directamente a la unidad interna DataStream que recibe los datos de la fuente. En una aplicación real, hay alguna transformación entre la fuente y el receptor.

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • Por último, ejecuta el flujo de datos que acaba de definir. Esta debe ser la última instrucción del main() método, después de definir todos los operadores que requiere el flujo de datos:

      env.execute("Flink streaming Java API skeleton");

Utilice el archivo pom.xml

El archivo pom.xml define todas las dependencias requeridas por la aplicación y configura el complemento Maven Shade para crear el fat-jar que contiene todas las dependencias requeridas por Flink.

  • Algunas dependencias tienen alcance. provided Estas dependencias están disponibles automáticamente cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink. Son necesarias para compilar la aplicación o para ejecutarla localmente en su ordenador. IDE Para obtener más información, consulte Ejecute la aplicación localmente. Asegúrese de utilizar la misma versión de Flink que el motor de ejecución que utilizará en Amazon Managed Service para Apache Flink.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • Debe añadir dependencias adicionales de Apache Flink al pom con el ámbito predeterminado, como el conector Kinesis que utiliza esta aplicación. Para obtener más información, consulte Utilice los conectores de Apache Flink con el servicio gestionado para Apache Flink. También puede añadir cualquier dependencia de Java adicional que necesite su aplicación.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • El complemento Maven Java Compiler se asegura de que el código esté compilado en Java 11, la JDK versión actualmente compatible con Apache Flink.

  • El complemento Maven Shade empaqueta el fat-jar, excluyendo algunas bibliotecas que proporciona el motor de ejecución. También especifica dos transformadores: y. ServicesResourceTransformer ManifestResourceTransformer Este último configura la clase que contiene el main método para iniciar la aplicación. Si cambias el nombre de la clase principal, no olvides actualizar este transformador.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

Escribe registros de muestra en el flujo de entrada

En esta sección, enviará registros de muestra a la transmisión para que la aplicación los procese. Tiene dos opciones para generar datos de muestra: mediante un script de Python o el generador de datos de Kinesis.

Generar datos de muestra mediante un script de Python

Puede usar un script de Python para enviar registros de muestra a la transmisión.

nota

Para ejecutar este script de Python, debe usar Python 3.x y tener instalada la biblioteca AWS SDKpara Python (Boto).

Para empezar a enviar datos de prueba a la transmisión de entrada de Kinesis:

  1. Descargue el script de stock.py Python del generador de datos del GitHub repositorio del generador de datos.

  2. Ejecute el script stock.py:

    $ python stock.py

Mantenga el script en ejecución mientras completa el resto del tutorial. Ahora puede ejecutar la aplicación Apache Flink.

Genere datos de muestra con Kinesis Data Generator

Como alternativa a la secuencia de comandos de Python, puede utilizar el Generador de datos de Kinesis, también disponible en una versión alojada, para enviar datos de muestra aleatorios a la transmisión. Kinesis Data Generator se ejecuta en su navegador y no necesita instalar nada en su máquina.

Para configurar y ejecutar Kinesis Data Generator:

  1. Siga las instrucciones de la documentación de Kinesis Data Generator para configurar el acceso a la herramienta. Ejecutará una AWS CloudFormation plantilla que configurará un usuario y una contraseña.

  2. Acceda a Kinesis Data Generator a través del URL generado por la CloudFormation plantilla. Puede encontrarlo URL en la pestaña Resultados una vez que haya completado la CloudFormation plantilla.

  3. Configure el generador de datos:

    • Región: Seleccione la región que está utilizando para este tutorial: us-east-1

    • Flujo de transmisión/entrega: seleccione el flujo de entrada que utilizará la aplicación: ExampleInputStream

    • Registros por segundo: 100

    • Plantilla de registro: copia y pega la siguiente plantilla:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Pruebe la plantilla: elija la plantilla de prueba y compruebe que el registro generado es similar al siguiente:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Inicie el generador de datos: elija Seleccionar enviar datos.

Kinesis Data Generator ahora envía datos al. ExampleInputStream

Ejecute la aplicación localmente

Puede ejecutar y depurar su aplicación Flink localmente en su. IDE

nota

Antes de continuar, compruebe que los flujos de entrada y salida estén disponibles. Consulte Cree dos transmisiones de datos de Amazon Kinesis. Compruebe también que tiene permiso para leer y escribir en ambas transmisiones. Consulte Autentica tu sesión AWS.

La configuración del entorno de desarrollo local requiere Java 11JDK, Apache Maven y y IDE para el desarrollo de Java. Compruebe que cumple los requisitos previos requeridos. Consulte Cumpla con los requisitos previos para completar los ejercicios.

Importe el proyecto Java a su IDE

Para empezar a trabajar en la aplicación que tieneIDE, debe importarla como un proyecto Java.

El repositorio que ha clonado contiene varios ejemplos. Cada ejemplo es un proyecto independiente. Para este tutorial, importe el contenido del ./java/GettingStarted subdirectorio a suIDE.

Inserte el código como un proyecto Java existente utilizando Maven.

nota

El proceso exacto para importar un nuevo proyecto de Java varía según el IDE que se utilice.

Compruebe la configuración de la aplicación local

Cuando se ejecuta localmente, la aplicación utiliza la configuración del application_properties.json archivo de la carpeta de recursos del proyecto./src/main/resources. Puede editar este archivo para usar diferentes regiones o nombres de transmisión de Kinesis.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Configure su configuración de IDE ejecución

Puedes ejecutar y depurar la aplicación Flink IDE directamente desde tu clase principalcom.amazonaws.services.msf.BasicStreamingJob, como lo harías con cualquier aplicación Java. Antes de ejecutar la aplicación, debe configurar la configuración de ejecución. La configuración depende de la IDE que esté utilizando. Por ejemplo, consulte Ejecutar/depurar configuraciones en la documentación de IntelliJ. IDEA En concreto, debe configurar lo siguiente:

  1. Añada las provided dependencias a la ruta de clases. Esto es necesario para garantizar que las dependencias con provided alcance se transfieran a la aplicación cuando se ejecuta localmente. Sin esta configuración, la aplicación muestra un class not found error inmediatamente.

  2. Pase las AWS credenciales para acceder a las transmisiones de Kinesis a la aplicación. La forma más rápida es utilizar el AWS kit de herramientas para IDEA IntelliJ. Con este IDE complemento en la configuración de ejecución, puede seleccionar un perfil específico. AWS AWS la autenticación se realiza con este perfil. No necesita pasar las AWS credenciales directamente.

  3. Compruebe que IDE ejecuta la aplicación mediante JDK11.

Ejecute la aplicación en su IDE

Tras configurar la configuración de ejecución para elBasicStreamingJob, puede ejecutarla o depurarla como una aplicación Java normal.

nota

No puedes ejecutar el fat-jar generado por Maven directamente java -jar ... desde la línea de comandos. Este contenedor no contiene las dependencias principales de Flink necesarias para ejecutar la aplicación de forma independiente.

Cuando la aplicación se inicia correctamente, registra cierta información sobre el minicluster independiente y la inicialización de los conectores. A esto le siguen varios INFO y algunos WARN registros que Flink normalmente emite cuando se inicia la aplicación.

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

Una vez completada la inicialización, la aplicación no emite más entradas de registro. Mientras los datos fluyen, no se emite ningún registro.

Para comprobar si la aplicación procesa los datos correctamente, puede inspeccionar las transmisiones de Kinesis de entrada y salida, tal y como se describe en la siguiente sección.

nota

No emitir registros sobre el flujo de datos es lo normal en una aplicación de Flink. Emitir registros en cada registro puede ser conveniente para la depuración, pero puede suponer una sobrecarga considerable cuando se ejecuta en producción.

Observe los datos de entrada y salida en las transmisiones de Kinesis

Puede observar los registros enviados al flujo de entrada por el generador de datos de Kinesis (que genera un ejemplo de Python) o el generador de datos de Kinesis (enlace) mediante el visor de datos de la consola de Amazon Kinesis.

Para observar los registros
  1. Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.

  2. Compruebe que la región es la misma en la que está ejecutando este tutorial, que es us-east-1 US East (Virginia del Norte) de forma predeterminada. Cambie la región si no coincide.

  3. Elija Data Streams.

  4. Seleccione la transmisión que desee observar, ExampleInputStream o ExampleOutputStream.

  5. Seleccione la pestaña Visor de datos.

  6. Elija cualquier fragmento, mantenga el valor Último como posición inicial y, a continuación, seleccione Obtener registros. Es posible que aparezca el error «No se ha encontrado ningún registro para esta solicitud». Si es así, selecciona Volver a intentar obtener los registros. Se muestran los registros más recientes publicados en la transmisión.

  7. Elija el valor de la columna Datos para inspeccionar el contenido del registro en su JSON formato.

Detenga la ejecución local de la aplicación

Detenga la ejecución de la aplicación en suIDE. IDEPor lo general, proporciona una opción de «detener». La ubicación y el método exactos dependen del IDE que utilices.

Compila y empaqueta el código de tu aplicación

En esta sección, utilizará Apache Maven para compilar el código Java y empaquetarlo en un JAR archivo. Puede compilar y empaquetar el código utilizando la herramienta de línea de comandos de Maven o su. IDE

Para compilar y empaquetar usando la línea de comandos de Maven:

Vaya al directorio que contiene el GettingStarted proyecto Java y ejecute el siguiente comando:

$ mvn package

Para compilar y empaquetar usando suIDE:

Ejecute mvn package desde su integración con IDE Maven.

En ambos casos, se crea el siguiente JAR archivo:target/amazon-msf-java-stream-app-1.0.jar.

nota

IDEEs posible que ejecutar un «proyecto de compilación» desde usted no cree el JAR archivo.

Cargue el JAR archivo de código de la aplicación

En esta sección, debes subir el JAR archivo que creaste en la sección anterior al bucket de Amazon Simple Storage Service (Amazon S3) que creaste al principio de este tutorial. Si no ha completado este paso, consulte (enlace).

Para cargar el JAR archivo de códigos de la aplicación
  1. Abra la consola Amazon S3 en https://console.aws.amazon.com/s3/.

  2. Elija el bucket que creó anteriormente para el código de la aplicación.

  3. Seleccione Cargar.

  4. Elija Add files.

  5. Navegue hasta el JAR archivo generado en el paso anterior:target/amazon-msf-java-stream-app-1.0.jar.

  6. Elija Cargar sin cambiar ninguna otra configuración.

aviso

Asegúrese de seleccionar el JAR archivo correcto en<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar.

El target directorio también contiene otros JAR archivos que no necesita cargar.

Cree y configure la aplicación Managed Service for Apache Flink

Puede crear y ejecutar una aplicación de Managed Service para Apache Flink mediante la consola o la AWS CLI. Para este tutorial, utilizará la consola.

nota

Cuando crea la aplicación mediante la consola, sus recursos AWS Identity and Access Management (IAM) y Amazon CloudWatch Logs se crean automáticamente. Cuando crea la aplicación con AWS CLI, crea estos recursos por separado.

Creación de la aplicación

Para crear la aplicación
  1. Abra la consola de Managed Service for Apache Flink en https://console.aws.amazon.com /flink

  2. Compruebe que ha seleccionado la región correcta: us-east-1 US East (Norte de Virginia)

  3. Abre el menú de la derecha y selecciona Aplicaciones Apache Flink y luego Crear aplicación de streaming. También puede elegir Crear aplicación de streaming en el contenedor Comenzar de la página inicial.

  4. En la página Crear aplicación de streaming:

    • Elija un método para configurar la aplicación de procesamiento de transmisiones: elija Crear desde cero.

    • Configuración de Apache Flink, versión de Application Flink: elija Apache Flink 1.19.

  5. Configure su aplicación

    • Nombre de la aplicación: introduzcaMyApplication.

    • Descripción: introduzcaMy java test app.

    • Acceso a los recursos de la aplicación: elija Crear o actualizar IAM el rol kinesis-analytics-MyApplication-us-east-1 con las políticas requeridas.

  6. Configure su plantilla para los ajustes de la aplicación

    • Plantillas: elija Desarrollo.

  7. Selecciona Crear aplicación de streaming en la parte inferior de la página.

nota

Al crear una aplicación de servicio gestionado para Apache Flink mediante la consola, tiene la opción de crear una IAM función y una política para la aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos IAM recursos se nombran con el nombre de la aplicación y la región de la siguiente manera:

  • Política: kinesis-analytics-service-MyApplication-us-east-1

  • Rol: kinesisanalytics-MyApplication-us-east-1

Amazon Managed Service para Apache Flink se conocía anteriormente como Kinesis Data Analytics. El nombre de los recursos que se crean automáticamente lleva un prefijo por motivos de compatibilidad con kinesis-analytics- versiones anteriores.

Edite la política IAM

Edite la IAM política para añadir permisos de acceso a las transmisiones de datos de Kinesis.

Para editar la política
  1. Abre la IAM consola en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-east-1 que la consola creó en su nombre en la sección anterior.

  3. Seleccione Editar y, a continuación, elija la JSONpestaña.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Sustituya la cuenta de muestra IDs (012345678901) por tu ID de cuenta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Selecciona Siguiente en la parte inferior de la página y, a continuación, selecciona Guardar cambios.

Configure la aplicación

Edite la configuración de la aplicación para establecer el artefacto del código de la aplicación.

Para editar la configuración
  1. En la MyApplicationpágina, elija Configurar.

  2. En la sección de ubicación del código de la aplicación:

    • Para el bucket de Amazon S3, seleccione el bucket que creó anteriormente para el código de la aplicación. Elija Examinar y seleccione el bucket correcto y, a continuación, seleccione Elegir. No haga clic en el nombre del depósito.

    • En Ruta al objeto de Amazon S3, introduzca amazon-msf-java-stream-app-1.0.jar.

  3. Para los permisos de acceso, selecciona Crear o actualizar IAM el rol kinesis-analytics-MyApplication-us-east-1 con las políticas requeridas.

  4. En la sección Propiedades del tiempo de ejecución, añada las siguientes propiedades.

  5. Seleccione Añadir nuevo elemento y añada cada uno de los siguientes parámetros:

    ID de grupo Clave Valor
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. No modifique ninguna de las demás secciones.

  7. Elija Guardar cambios.

nota

Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Ejecución de la aplicación

La aplicación ya está configurada y lista para ejecutarse.

Cómo ejecutar la aplicación
  1. En la consola de Amazon Managed Service for Apache Flink, seleccione Mi aplicación y, a continuación, Ejecutar.

  2. En la página siguiente, la página de configuración de restauración de la aplicación, seleccione Ejecutar con la última instantánea y, a continuación, seleccione Ejecutar.

    El estado de la aplicación detalla las transiciones desde Starting y Ready hasta Running cuando se ha iniciado la aplicación.

Cuando la aplicación esté en Running estado, ahora puede abrir el panel de control de Flink.

Para abrir el panel de
  1. Seleccione Abrir el panel de control de Apache Flink. El panel se abre en una página nueva.

  2. En la lista de trabajos en ejecución, elige el único trabajo que puedas ver.

    nota

    Si configuras las propiedades de Runtime o editas las IAM políticas de forma incorrecta, el estado de la solicitud podría cambiar aRunning, pero el panel de control de Flink muestra que el trabajo se reinicia continuamente. Este es un escenario de error común si la aplicación está mal configurada o carece de permisos para acceder a los recursos externos.

    Cuando esto suceda, consulte la pestaña Excepciones en el panel de control de Flink para ver la causa del problema.

Observe las métricas de la aplicación en ejecución

En la MyApplicationpágina, en la sección de CloudWatch métricas de Amazon, puedes ver algunas de las métricas fundamentales de la aplicación en ejecución.

Para ver las métricas
  1. Junto al botón Actualizar, selecciona 10 segundos en la lista desplegable.

  2. Cuando la aplicación está en ejecución y en buen estado, puede ver que la métrica de tiempo de actividad aumenta continuamente.

  3. La métrica de reinicios completos debe ser cero. Si aumenta, es posible que la configuración tenga problemas. Para investigar el problema, consulta la pestaña Excepciones del panel de control de Flink.

  4. La métrica Número de puntos de control fallidos debe ser cero en una aplicación en buen estado.

    nota

    Este panel muestra un conjunto fijo de métricas con una granularidad de 5 minutos. Puede crear un panel de aplicaciones personalizado con cualquier métrica del CloudWatch panel.

Observe los datos de salida en las transmisiones de Kinesis

Asegúrese de seguir publicando datos en la entrada, ya sea mediante el script de Python o el generador de datos de Kinesis.

Ahora puede observar el resultado de la aplicación que se ejecuta en Managed Service for Apache Flink mediante el visor de datos del servidor https://console.aws.amazon.com/kinesis/, de forma similar a como lo hacía anteriormente.

Para ver el resultado
  1. Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.

  2. Compruebe que la región es la misma que la que está utilizando para ejecutar este tutorial. De forma predeterminada, es US-East-1US East (Virginia del Norte). Cambie la región si es necesario.

  3. Elija Flujos de datos.

  4. Seleccione la transmisión que desee observar. Para este tutorial, escriba ExampleOutputStream.

  5. Seleccione la pestaña Visor de datos.

  6. Seleccione cualquier fragmento, mantenga el valor Último como posición inicial y, a continuación, elija Obtener registros. Es posible que aparezca el error «no se ha encontrado ningún registro para esta solicitud». Si es así, selecciona Volver a intentar obtener los registros. Se muestran los registros más recientes publicados en la transmisión.

  7. Seleccione el valor en la columna Datos para inspeccionar el contenido del registro en JSON formato.

Detenga la aplicación

Para detener la aplicación, vaya a la página de la consola de la aplicación Managed Service for Apache Flink denominada. MyApplication

Cómo detener la aplicación
  1. En la lista desplegable Acción, seleccione Detener.

  2. El estado en los detalles de la aplicación pasa de Running a yStopping, después, a Ready cuando la aplicación se detiene por completo.

    nota

    No olvide dejar también de enviar datos al flujo de entrada desde el script de Python o el generador de datos de Kinesis.

Siguiente paso

Limpie los recursos AWS