Migración de Kinesis v1 a v2 - Amazon Monitron

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Migración de Kinesis v1 a v2

Si utiliza actualmente el esquema de datos v1, es posible que ya envíe datos a Amazon S3 o que procese a posteriori la carga útil del flujo de datos con Lambda.

Actualización del esquema de datos a v2

Si ya ha configurado un flujo de datos con el esquema v1, puede actualizar su proceso de exportación de datos mediante el siguiente procedimiento:

  1. Abra su consola de Amazon Monitron.

  2. Vaya a su proyecto.

  3. Detenga la exportación de datos en directo actual.

  4. Inicie la exportación de datos en directo para crear un nuevo flujo de datos.

  5. Seleccione el flujo de datos recién creado.

  6. Elija iniciar exportación de datos en directo. En este punto, el nuevo esquema enviará su carga a través del flujo de datos.

  7. (Opcional) Vaya a la consola de Kinesis y elimine su antiguo flujo de datos.

  8. Configure un nuevo método de entrega para su flujo de datos recién creado con el esquema v2.

Su nuevo flujo entrega ahora cargas útiles conformes con el esquema v2 a su nuevo bucket. Le recomendamos que utilice dos buckets distintos para tener un formato coherente en caso de que desee procesar todos los datos en estos buckets. Por ejemplo, al utilizar otros servicios como Athena y AWS Glue.

nota

Si ya entregaba sus datos a Amazon S3, obtenga información sobre cómo almacenar los datos exportados en Amazon S3 para obtener detalles sobre cómo entregar sus datos a Amazon S3 con el esquema v2.

nota

Si utilizaba una función de Lambda para procesar sus cargas, obtenga información sobre cómo procesar datos con Lambda. También puede consultar la sección actualización con Lambda para obtener más información.

Actualización del procesamiento de datos con Lambda

La actualización del procesamiento de datos con Lambda requiere que tenga en cuenta que el flujo de datos de la v2 se basa ahora en eventos. Su código inicial de Lambda v1 podría haber sido similar al siguiente:

import base64 def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) measurement = payload["measurement"] projectDisplayName = payload["projectDisplayName"] # Process the content of the measurement # ...

Dado que el esquema de datos de v1 está en vías de desaparición, el código Lambda anterior no funcionará con todos los nuevos flujos de datos.

El siguiente código Python de ejemplo procesa los eventos del flujo de Kinesis con el esquema de datos v2. Este código utiliza el nuevo parámetro eventType para orientar el procesamiento al controlador apropiado:

import base64 handlers = { "measurement": measurementEventHandler, "gatewayConnected": gatewayConnectedEventHandler, "gatewayDisconnected": gatewayDisconnectedEventHandler, "sensorConnected": sensorConnectedEventHandler, "sensorDisconnected": sensorDisconnectedEventHandler, } def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) eventType = payload["eventType"] if eventType not in handler.keys(): log.info("No event handler found for the event type: {event['eventType']}") return # Invoke the appropriate handler based on the event type. eventPayload = payload["eventPayload"] eventHandler = handlers[eventType] eventHandler(eventPayload) def measurementEventHandler(measurementEventPayload): # Handle measurement event projectName = measurementEventPayload["projectName"] # ... def gatewayConnectedEventHandler(gatewayConnectedEventPayload): # Handle gateway connected event # Other event handler functions