Procesamiento previo de registros con una función de Lambda - Guía para desarrolladores de Amazon Kinesis Data Analytics SQL para aplicaciones

Para proyectos nuevos, le recomendamos que utilice el nuevo servicio gestionado para Apache Flink Studio en lugar de Kinesis Data Analytics SQL for Applications. El servicio gestionado para Apache Flink Studio combina la facilidad de uso con capacidades analíticas avanzadas, lo que le permite crear aplicaciones sofisticadas de procesamiento de flujos en cuestión de minutos.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Procesamiento previo de registros con una función de Lambda

nota

Después del 12 de septiembre de 2023, no podrá crear nuevas aplicaciones con Kinesis Data Firehose como origen si aún no utiliza Kinesis Data Analytics para SQL. Para obtener más información, consulte Límites.

Si los datos de su transmisión necesitan conversión de formato, transformación, enriquecimiento o filtrado, puede preprocesar los datos mediante una función. AWS Lambda Puede hacerlo antes de que se ejecute el código SQL de la aplicación o antes de que la aplicación cree un esquema a partir deel flujo de datos.

El uso de una función de Lambda para el procesamiento previo de registros es útil en las siguientes situaciones:

  • Transformar registros a partir de otros formatos (como KPL o GZIP) en formatos que Kinesis Data Analytics puede analizar. Kinesis Data Analytics actualmente admite formatos de datos JSON o CSV.

  • Expandir datos en un formato que sea más asequible para operaciones como, por ejemplo, agregación o detección de anomalías. Por ejemplo, si se almacenan juntos en una cadena varios valores de datos, puede expandir los datos en columnas independientes.

  • El enriquecimiento de datos con otros servicios de Amazon, como la extrapolación o la corrección de errores.

  • Aplicación de transformación de cadenas complejas en campos de registros.

  • Filtrado de datos para limpieza de datos.

Uso de una función de Lambda para procesamiento previo de registros

Al crear su aplicación de Kinesis Data Analytics, habilita el procesamiento previo de en la página Conectar con una fuente.

Para utilizar una función de Lambda para procesamiento previo de registros en una aplicación de Kinesis Data Analytics
  1. Inicie sesión en la consola Managed Service for Apache Flink AWS Management Console y ábrala en https://console.aws.amazon.com/kinesisanalytics.

  2. En la página Conectar a un origen de la aplicación, elija Habilitado en la sección Procesamiento previo de registros con AWS Lambda.

  3. Para utilizar una función de Lambda que haya creado, elija la función en la lista desplegable Función de Lambda .

  4. Para crear una función de Lambda nueva a partir de una de las plantillas de procesamiento previo de Lambda, elija la plantilla en la lista desplegable. A continuación, elija View <template name> in Lambda (Ver <nombre de la plantilla> en Lambda) para editar la función.

  5. Para crear una nueva función de Lambda, elija Crear nueva. Para obtener información sobre la creación de una función Lambda, consulte Creación de una función HelloWorld Lambda y Explore la consola en la Guía para desarrolladores.AWS Lambda

  6. Elija la versión de la función de Lambda que desea utilizar. Para utilizar la versión más reciente, elija $LATEST.

Cuando se elige o se crea una función de Lambda para el procesamiento previo de los registros, estos se procesan antes de que se ejecute el código SQL de la aplicación o de que la aplicación genere un esquema a partir de ellos.

Permisos de procesamiento previo de Lambda

Para utilizar el procesamiento previo de Lambda, el rol de IAM de la aplicación requiere la política de permisos siguiente:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Métricas de procesamiento previo de Lambda

Puede utilizar Amazon CloudWatch para supervisar el número de invocaciones a Lambda, los bytes procesados, los éxitos y los errores, etc. Para obtener información sobre CloudWatch las métricas emitidas por el preprocesamiento de Kinesis Data Analytics Lambda, consulte Amazon Kinesis Analytics Metrics.

Uso AWS Lambda con la biblioteca de Kinesis Producer

Kinesis Producer Library (KPL) agrega pequeños registros formateados por el usuario en registros de mayor tamaño, hasta 1 MB, para utilizar mejor el rendimiento de Amazon Kinesis Data Streams. La Kinesis Client Library (KCL) para Java es compatible con la desagrupación de estos registros. Sin embargo, debe utilizar un módulo especial para desagregar los registros cuando los utilice AWS Lambda como consumidor de sus transmisiones.

Para obtener el código y las instrucciones del proyecto necesarios, consulte los módulos de desagregación de la biblioteca de productores de Kinesis para obtener más información. AWS LambdaGitHub Puede usar los componentes de este proyecto para procesar datos serializados de KPL AWS Lambda en Java, Node.js y Python. Estos componentes también se pueden utilizar como parte de una aplicación KCL multilenguaje.

Modelo de datos de entrada de eventos/modelo de respuesta de registros para el procesamiento previo de datos

Para realizar el procesamiento previo de los registros, la función de Lambda debe respetar los modelos necesarios de datos de entrada de eventos y de respuesta de registros.

Modelo de datos de entrada de eventos

Kinesis Data Analytics lee continuamente los datos de la transmisión de datos de Kinesis o la transmisión de entrega de Firehose. Para cada lote de registros que recupera, el servicio administra cómo se transfiere cada lote a su función de Lambda. Su función recibe una lista de registros como entrada. Dentro de su función, itera a través de la lista y aplica su lógica de negocio para llevar a cabo sus requisitos de procesamiento previo (tales como el enriquecimiento o la conversión de formato de datos).

El modelo de entrada de la función de preprocesamiento varía ligeramente en función de si los datos se recibieron de una transmisión de datos de Kinesis o de una transmisión de entrega de Firehose.

Si la fuente es un flujo de entrega de Firehose, el modelo de datos de entrada del evento es el siguiente:

Modelo de datos de solicitud de Kinesis Data Firehose

Campo Descripción
invocationId El ID de invocación de Lambda (GUID aleatorio).
applicationArn El nombre de recurso de Amazon (ARN) de la aplicación de Kinesis Data Analytics.
streamArn ARN de secuencia de entrega
registros
Campo Descripción
recordId ID de registro (GUID aleatorio)
kinesisFirehoseRecordMetadata
Campo Descripción
approximateArrivalTimestamp Hora de llegada aproximada del registro de secuencia de entrega
data Carga útil de registro de origen codificada en Base64

En el siguiente ejemplo, se muestra la entrada de una secuencia de entrega de Firehose:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

Si el origen es un flujo de datos de Kinesis, el modelo de datos de entrada de eventos es el siguiente:

Modelo de datos de solicitud de secuencias de Kinesis.

Campo Descripción
invocationId El ID de invocación de Lambda (GUID aleatorio).
applicationArn ARN de la aplicación de Kinesis Data Analytics
streamArn ARN de secuencia de entrega
registros
Campo Descripción
recordId ID de registro en función del número de secuencia de registro de Kinesis
kinesisStreamRecordMetadata
Campo Descripción
sequenceNumber Número de secuencia del registro de secuencia de Kinesis
partitionKey Clave de partición del registro de secuencia de Kinesis
shardId ShardId del registro de secuencia de Kinesis
approximateArrivalTimestamp Hora de llegada aproximada del registro de secuencia de entrega
datos Carga útil de registro de origen codificada en Base64

En el siguiente ejemplo, se muestra la entrada de un flujo de datos de Kinesis:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

Modelo de respuesta de registros

Todos los registros que devuelve su función de procesamiento previo de Lambda (con ID de registro) que se envían a la función de Lambda se deben devolver. Deben contener los siguientes parámetros, de lo contrario, Kinesis Data Analytics los rechaza y los trata como error de procesamiento previo de datos. La parte de carga de datos del registro se puede transformar para cumplir los requisitos de procesamiento previo.

Modelo de datos de respuesta

registros
Campo Descripción
recordId El ID de registro se transfiere desde Kinesis Data Analytics hacia Lambda durante la invocación. El registro transformado debe contener el mismo ID de registro. Cualquier discrepancia entre el ID del registro original y el ID del registro transformado se trata como un error de procesamiento previo de datos.
result Es el estado de la transformación de los datos del registro. Los valores posibles son:
  • Ok: el registro se ha transformado correctamente. Kinesis Data Analytics ingiere el registro para el procesamiento de SQL.

  • Dropped: el registro lo ha descartado de forma intencionada la lógica de procesamiento. Kinesis Data Analytics recibe el registro para el procesamiento de SQL. El campo de carga de datos es opcional para un registro Dropped.

  • ProcessingFailed: el registro no se ha podido transformar. Kinesis Data Analytics considera que lo ha procesado sin éxito la función de Lambda y escribe un error en la secuencia de errores. Para obtener más información acerca de la secuencia de errores, consulte Control de errores. El campo de carga de datos es opcional para un registro ProcessingFailed.

data Es la carga útil de datos transformados después codificarlos en base64. Cada carga de datos puede contener varios documentos JSON si el formato de datos de adquisición de la aplicación es JSON. O bien, cada una puede contener varias filas CSV (con un delimitador de filas especificado en cada fila) si el formato de datos de adquisición de la aplicación es CSV. El servicio de Kinesis Data Analytics analiza y procesa correctamente los datos con varios documentos JSON o filas CSV dentro de la misma carga de datos.

En el siguiente ejemplo, se muestra el resultado de una función Lambda:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

Errores comunes de procesamiento previo de datos

A continuación se indican los motivos habituales por los que un procesamiento previo puede generar un error.

  • No se devuelven al servicio de Kinesis Data Analytics todos los registros (con ID de registro) de un lote enviado a la función de Lambda.

  • En la respuesta falta el campo de ID de registro, estado o carga de datos. El campo de carga de datos es opcional para un registro Dropped o ProcessingFailed.

  • Los tiempos de espera de la función de Lambda no son suficientes para el procesamiento previo de los datos.

  • La respuesta de la función de Lambda supera los límites de respuesta impuestos por el servicio de AWS Lambda .

Para errores de procesamiento previo de datos, Kinesis Data Analytics sigue reintentando las invocaciones de Lambda en el mismo conjunto de registros hasta que tiene éxito. Puede supervisar las siguientes CloudWatch métricas para obtener información sobre los fallos.

  • MillisBehindLatest de aplicación de Kinesis Data Analytics: indica el retraso que lleva la aplicación al leer desde el origen de streaming.

  • Métricas de la InputPreprocessing CloudWatch aplicación Kinesis Data Analytics: indican el número de éxitos y fracasos, entre otras estadísticas. Para obtener más información, consulte Amazon Kinesis Analytics Metrics.

  • AWS Lambda CloudWatch métricas y registros de funciones.