Crea una libreta Studio con Amazon MSK - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Crea una libreta Studio con Amazon MSK

En este tutorial se describe cómo crear un bloc de notas de Studio que utilice un MSK clúster de Amazon como fuente.

Configurar un MSK clúster de Amazon

Para este tutorial, necesitas un MSK clúster de Amazon que permita el acceso a texto sin formato. Si aún no tienes un MSK clúster de Amazon configurado, sigue el MSK tutorial Cómo empezar a usar Amazon para crear un AmazonVPC, un MSK clúster de Amazon, un tema y una instancia de EC2 cliente de Amazon.

Al seguir el tutorial, haga lo siguiente:

Añada una NAT puerta de enlace a su VPC

Si has creado un MSK clúster de Amazon siguiendo el MSK tutorial Cómo empezar a usar Amazon, o si tu Amazon actual aún VPC no tiene una NAT puerta de enlace para sus subredes privadas, debes añadir una NAT puerta de enlace a tu AmazonVPC. En el siguiente diagrama se muestra la arquitectura.

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

Para crear una NAT puerta de enlace para tu AmazonVPC, haz lo siguiente:

  1. Abre la VPC consola de Amazon en https://console.aws.amazon.com/vpc/.

  2. Selecciona NATGateways en la barra de navegación izquierda.

  3. En la página NATPuertas de enlace, elija Crear NAT puerta de enlace.

  4. En la página Crear NAT puerta de enlace, proporcione los siguientes valores:

    Nombre: opcional ZeppelinGateway
    Subred AWS KafkaTutorialSubnet1
    ID de asignación de IP elástica Elija una IP elástica disponible. Si no hay ninguna elástica IPs disponible, elija Asignar IP elástica y, a continuación, elija la IP elástica que cree la consola.

    Elija Crear NAT puerta de enlace.

  5. En la de navegación izquierda, elija Tablas de ruteo.

  6. Elija Create Route Table (Crear tabla de ruteo).

  7. En la página Crear tabla de enrutamiento, proporcione la siguiente información:

    • Name tag: ZeppelinRouteTable

    • VPC: Elija su VPC (por ejemplo AWS KafkaTutorialVPC).

    Seleccione Crear.

  8. En la lista de tablas de rutas, elija ZeppelinRouteTable. Elija la pestaña Rutas y, a continuación, Editar rutas.

  9. En la pestaña Editar rutas, elija Añadir rutas.

  10. En Para Destino, escriba 0.0.0.0/0. Para Target, elija NATGateway, ZeppelinGateway. Elija Guardar rutas. Elija Close.

  11. En la página de tablas de rutas, con la ZeppelinRouteTableopción seleccionada, elija la pestaña Asociaciones de subredes. Elija Editar asociaciones de subredes.

  12. En la página Editar asociaciones de subredes, elija AWS KafkaTutorialSubnet2 y AWS KafkaTutorialSubnet 3. Seleccione Guardar.

Cree una AWS Glue conexión y una tabla

Tu bloc de notas Studio usa una AWS Gluebase de datos para los metadatos de tu fuente de MSK datos de Amazon. En esta sección, crea una AWS Glue conexión que describe cómo acceder a su MSK clúster de Amazon y una AWS Glue tabla que describe cómo presentar los datos de su fuente de datos a clientes como su bloc de notas Studio.

Creación de una conexión
  1. Inicia sesión en AWS Management Console y abre la AWS Glue consola en https://console.aws.amazon.com/glue/.

  2. Si aún no tiene una AWS Glue base de datos, elija Bases de datos en la barra de navegación izquierda. Elija Agregar una base de datos. En la ventana Añadir base de datos, introduzca default en el nombre de la base de datos. Seleccione Crear.

  3. En la barra de navegación de la izquierda, seleccione Conexiones. Elija Añadir conexión.

  4. En la ventana Añadir conexión, introduzca los siguientes valores:

    • En Nombre de conexión, ingrese ZeppelinConnection.

    • En Tipo de conexión, elija Kafka.

    • En el caso del servidor bootstrap de Kafka URLs, proporcione la cadena del agente de arranque de su clúster. Puede obtener los agentes de arranque desde la MSK consola o introduciendo el siguiente comando: CLI

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Desmarca la casilla de verificación Requerir SSL conexión.

    Elija Next (Siguiente).

  5. En la VPCpágina, introduzca los siguientes valores:

    • Para VPC, elige el nombre de tu VPC ( AWS KafkaTutorialVPCp. ej.)

    • Para Subred, elija AWS KafkaTutorialSubnet2.

    • Para los grupos de seguridad, elija todos los grupos disponibles.

    Elija Next (Siguiente).

  6. En la página Propiedades de la conexión o Acceso a la conexión, seleccione Finalizar.

Crear una tabla
nota

Puede crear la tabla manualmente tal y como se describe en los pasos siguientes, o bien puede usar el código conector de creación de tablas para Managed Service for Apache Flink en su bloc de notas de Apache Zeppelin para crear la tabla mediante una sentencia. DDL A continuación, puede comprobar si AWS Glue la tabla se ha creado correctamente.

  1. En la barra de navegación izquierda, seleccione Tablas. En la página Tablas, seleccione Añadir tablas y Añadir tabla manualmente.

  2. En la página Configurar las propiedades de la tabla, introduzca stock como Nombre de la tabla. Asegúrese de seleccionar la base de datos que creó anteriormente. Elija Next (Siguiente).

  3. En la página Añadir almacén de datos, elija Kafka. Para el nombre del tema, introduce el nombre del tema (por ejemplo AWS KafkaTutorialTopic). En Conexión, elija ZeppelinConnection.

  4. En la página de clasificación, elija JSON. Elija Next (Siguiente).

  5. En la página Definir un esquema, elija Añadir columna para añadir una. Añada columnas con las siguientes propiedades:

    Nombre de la columna Tipo de datos
    ticker string
    price double

    Elija Next (Siguiente).

  6. En la página siguiente, verifique su configuración y seleccione Finalizar.

  7. Elija la tabla recién creada de la lista de tablas.

  8. Elija Editar tabla y añada las siguientes propiedades:

    • clave:managed-flink.proctime, valor: proctime

    • clave:flink.properties.group.id, valor: test-consumer-group

    • clave:flink.properties.auto.offset.reset, valor: latest

    • clave:classification, valor: json

    Sin estos pares clave/valor, se produce un error en el cuaderno Flink.

  9. Seleccione Apply.

Crea una libreta Studio con Amazon MSK

Ahora que ha creado los recursos que utiliza su aplicación, cree su cuaderno de Studio.

Puede crear su aplicación utilizando el AWS Management Console o el AWS CLI.
nota

También puedes crear un bloc de notas de Studio desde la MSK consola de Amazon. Para ello, selecciona un clúster existente y, a continuación, selecciona Procesar datos en tiempo real.

Crea un bloc de notas de Studio con AWS Management Console

  1. ¿Abrir la consola Managed Service for Apache Flink en https://console.aws.amazon.com/managed-flink/casa? region=us-east-1#/applications/panel de control.

  2. En la página de Aplicaciones de Managed Service para Apache Flink, seleccione la pestaña Studio. Seleccione Crear cuaderno de Studio.

    nota

    Para crear un bloc de notas Studio desde las consolas Amazon MSK o Kinesis Data Streams, seleccione el clúster de MSK Amazon o el flujo de datos de Kinesis de entrada y, a continuación, elija Procesar datos en tiempo real.

  3. En la página Crear cuaderno de Studio, proporcione la siguiente información:

    • Introduzca MyNotebook como Nombre del cuaderno de Studio.

    • Elija el valor predeterminado para la Base de datos de Glue de AWS .

    Seleccione Crear cuaderno de Studio.

  4. En la MyNotebookpágina, seleccione la pestaña Configuración. En la sección Redes, elija Editar.

  5. En la MyNotebook página Editar red para, elige la VPCconfiguración basada en el MSK clúster de Amazon. Elige tu MSK clúster de Amazon para Amazon MSK Cluster. Elija Guardar cambios.

  6. En la MyNotebookpágina, selecciona Ejecutar. Espere a que el Estado muestre En ejecución.

Cree un bloc de notas de Studio con AWS CLI

Para crear tu bloc de notas de Studio mediante el AWS CLI, haz lo siguiente:

  1. Verifique que disponga de la siguiente información. Necesita estos valores para crear su aplicación.

    • Su ID de cuenta de .

    • El ID de subred IDs y grupo de seguridad de Amazon VPC que contiene tu MSK clúster de Amazon.

  2. Cree un archivo denominado create.json con el siguiente contenido. Reemplace los valores de marcador de posición con su información.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Para crear su aplicación, ejecute el siguiente comando:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Una vez completado el comando, debería ver un resultado similar al siguiente, con los detalles de su nuevo cuaderno de Studio:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Para iniciar su aplicación, ejecute el siguiente comando. Sustituya los valores de muestra por su ID de la cuenta.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Envía datos a tu MSK clúster de Amazon

En esta sección, ejecuta un script de Python en su EC2 cliente de Amazon para enviar datos a su fuente de MSK datos de Amazon.

  1. Conéctate con tu EC2 cliente de Amazon.

  2. Ejecute los siguientes comandos para instalar la versión 3 de Python, Pip y el paquete Kafka para Python, y confirme las acciones:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Configure AWS CLI en su máquina cliente introduciendo el siguiente comando:

    aws configure

    Proporcione las credenciales de su cuenta y us-east-1 para region.

  4. Cree un archivo denominado stock.py con el siguiente contenido. Sustituya el valor de muestra por la cadena Bootstrap Brokers de su MSK clúster de Amazon y actualice el nombre del tema si su tema no AWS KafkaTutorialTopices:

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Ejecute el script con el siguiente comando:

    $ python3 stock.py
  6. Deje el script en ejecución mientras completa la siguiente sección.

Prueba de su cuaderno de Studio

En esta sección, utilizas tu bloc de notas de Studio para consultar datos de tu MSK clúster de Amazon.

  1. ¿Abrir la consola Managed Service for Apache Flink en https://console.aws.amazon.com/managed-flink/casa? region=us-east-1#/applications/panel de control.

  2. En la página de Aplicaciones de Managed Service para Apache Flink, seleccione la pestaña Cuaderno de Studio. Elige. MyNotebook

  3. En la MyNotebookpágina, elija Abrir en Apache Zeppelin.

    La interfaz de Apache Zeppelin se abre en una pestaña nueva.

  4. En la página ¡Bienvenido a Zeppelin!, elija la nueva nota de Zeppelin.

  5. En la página Zeppelin Note, introduzca la siguiente consulta en una nota nueva:

    %flink.ssql(type=update) select * from stock

    Seleccione el icono de reproducción.

    La aplicación muestra los datos del MSK clúster de Amazon.

Para abrir el panel de control de Apache Flink de su aplicación para ver los aspectos operativos, elija FLINKJOB. Para obtener más información sobre el Panel de control de Flink, consulte Apache Flink Dashboard en la Guía para desarrolladores de Managed Service para Apache Flink.

Para ver más ejemplos de SQL consultas de Flink Streaming, consulte Consultas en la documentación de Apache Flink.