Procesamiento previo de registros con una función de Lambda - Guía para desarrolladores de Amazon Kinesis Data Analytics SQL para aplicaciones

Tras considerarlo detenidamente, hemos decidido interrumpir Amazon Kinesis Data Analytics SQL para aplicaciones en dos pasos:

1. A partir del 15 de octubre de 2025, no podrá crear nuevos Kinesis Data Analytics SQL para aplicaciones.

2. Eliminaremos sus aplicaciones a partir del 27 de enero de 2026. No podrá iniciar ni utilizar Amazon Kinesis Data Analytics SQL para aplicaciones. A partir de ese momento, el soporte para Amazon Kinesis Data Analytics dejará SQL de estar disponible. Para obtener más información, consulte Suspensión de Amazon Kinesis Data Analytics SQL for Applications.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Procesamiento previo de registros con una función de Lambda

nota

Después del 12 de septiembre de 2023, no podrá crear nuevas aplicaciones con Kinesis Data Firehose como origen si aún no utiliza Kinesis Data Analytics para SQL. Para obtener más información, consulte Límites.

Si los datos de un flujo de datos requieren conversión de formato, transformación, enriquecimiento o filtrado, puede utilizar una función de AWS Lambda para su procesamiento previo. Puede hacerlo antes de que se ejecute el código SQL de la aplicación o antes de que la aplicación cree un esquema a partir deel flujo de datos.

El uso de una función de Lambda para el procesamiento previo de registros es útil en las siguientes situaciones:

  • Transformar registros a partir de otros formatos (como KPL o GZIP) en formatos que Kinesis Data Analytics puede analizar. Kinesis Data Analytics actualmente admite formatos de datos JSON o CSV.

  • Expandir datos en un formato que sea más asequible para operaciones como, por ejemplo, agregación o detección de anomalías. Por ejemplo, si se almacenan juntos en una cadena varios valores de datos, puede expandir los datos en columnas independientes.

  • El enriquecimiento de datos con otros servicios de Amazon, como la extrapolación o la corrección de errores.

  • Aplicación de transformación de cadenas complejas en campos de registros.

  • Filtrado de datos para limpieza de datos.

Uso de una función de Lambda para procesamiento previo de registros

Al crear su aplicación de Kinesis Data Analytics, habilita el procesamiento previo de en la página Conectar con una fuente.

Para utilizar una función de Lambda para procesamiento previo de registros en una aplicación de Kinesis Data Analytics
  1. Inicie sesión en la AWS Management Console y abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/kinesisanalytics.

  2. En la página Conectar a un origen de la aplicación, elija Habilitado en la sección Procesamiento previo de registros con AWS Lambda.

  3. Para utilizar una función de Lambda que haya creado, elija la función en la lista desplegable Función de Lambda .

  4. Para crear una función de Lambda nueva a partir de una de las plantillas de procesamiento previo de Lambda, elija la plantilla en la lista desplegable. A continuación, elija View <template name> in Lambda (Ver <nombre de la plantilla> en Lambda) para editar la función.

  5. Para crear una nueva función de Lambda, elija Crear nueva. Para obtener información sobre la creación de una función de Lambda, consulte Creación de una función Lambda sencilla y exploración de la consola en la Guía para desarrolladores de AWS Lambda.

  6. Elija la versión de la función de Lambda que desea utilizar. Para utilizar la versión más reciente, elija $LATEST.

Cuando se elige o se crea una función de Lambda para el procesamiento previo de los registros, estos se procesan antes de que se ejecute el código SQL de la aplicación o de que la aplicación genere un esquema a partir de ellos.

Permisos de procesamiento previo de Lambda

Para utilizar el procesamiento previo de Lambda, el rol de IAM de la aplicación requiere la política de permisos siguiente:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Métricas de procesamiento previo de Lambda

Puede usar Amazon CloudWatch para monitorizar el número de invocaciones de Lambda, los bytes procesados, los éxitos y errores, etc. Para obtener información acerca de las métricas de CloudWatch que emite el procesamiento previo de Lambda por Kinesis Analytics Metrics, consulte Amazon Kinesis Analytics Metrics.

Uso de AWS Lambda con Kinesis Producer Library

Kinesis Producer Library (KPL) agrega pequeños registros formateados por el usuario en registros de mayor tamaño, hasta 1 MB, para utilizar mejor el rendimiento de Amazon Kinesis Data Streams. La Kinesis Client Library (KCL) para Java es compatible con la desagrupación de estos registros. Sin embargo, debe utilizar un módulo especial para desagrupar los registros cuando utilice AWS Lambda como consumidor de las secuencias.

Para obtener las instrucciones y el código de proyecto necesarios, consulte Kinesis Producer Library Deaggregation Modules for AWS Lambda en GitHub. Puede utilizar los componentes de este proyecto para procesar datos serializados por KPL dentro de AWS Lambda en Java, Node.js y Python. Estos componentes también se pueden utilizar como parte de una aplicación KCL multilenguaje.

Modelo de datos de entrada de eventos/modelo de respuesta de registros para el procesamiento previo de datos

Para realizar el procesamiento previo de los registros, la función de Lambda debe respetar los modelos necesarios de datos de entrada de eventos y de respuesta de registros.

Modelo de datos de entrada de eventos

Kinesis Data Analytics lee continuamente los datos de su flujo de datos de Kinesis o de su flujo de entrega de Firehose. Para cada lote de registros que recupera, el servicio administra cómo se transfiere cada lote a su función de Lambda. Su función recibe una lista de registros como entrada. Dentro de su función, itera a través de la lista y aplica su lógica de negocio para llevar a cabo sus requisitos de procesamiento previo (tales como el enriquecimiento o la conversión de formato de datos).

El modelo de entrada a su función de preprocesamiento varía ligeramente, en función de si los datos se han recibido desde un flujo de datos de Kinesis o un flujo de entrega de Firehose.

Si el origen es un flujo de entrega de Firehose, el modelo de datos de entrada de eventos es el siguiente:

Modelo de datos de solicitud de Kinesis Data Firehose

Campo Descripción
invocationId El ID de invocación de Lambda (GUID aleatorio).
applicationArn El nombre de recurso de Amazon (ARN) de la aplicación de Kinesis Data Analytics.
streamArn ARN de secuencia de entrega
registros
Campo Descripción
recordId ID de registro (GUID aleatorio)
kinesisFirehoseRecordMetadata
Campo Descripción
approximateArrivalTimestamp Hora de llegada aproximada del registro de secuencia de entrega
data Carga útil de registro de origen codificada en Base64

En el siguiente ejemplo, se muestra la entrada de una secuencia de entrega de Firehose:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

Si el origen es un flujo de datos de Kinesis, el modelo de datos de entrada de eventos es el siguiente:

Modelo de datos de solicitud de secuencias de Kinesis.

Campo Descripción
invocationId El ID de invocación de Lambda (GUID aleatorio).
applicationArn ARN de la aplicación de Kinesis Data Analytics
streamArn ARN de secuencia de entrega
registros
Campo Descripción
recordId ID de registro en función del número de secuencia de registro de Kinesis
kinesisStreamRecordMetadata
Campo Descripción
sequenceNumber Número de secuencia del registro de secuencia de Kinesis
partitionKey Clave de partición del registro de secuencia de Kinesis
shardId ShardId del registro de secuencia de Kinesis
approximateArrivalTimestamp Hora de llegada aproximada del registro de secuencia de entrega
datos Carga útil de registro de origen codificada en Base64

En el siguiente ejemplo, se muestra la entrada de un flujo de datos de Kinesis:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

Modelo de respuesta de registros

Todos los registros que devuelve su función de procesamiento previo de Lambda (con ID de registro) que se envían a la función de Lambda se deben devolver. Deben contener los siguientes parámetros, de lo contrario, Kinesis Data Analytics los rechaza y los trata como error de procesamiento previo de datos. La parte de carga de datos del registro se puede transformar para cumplir los requisitos de procesamiento previo.

Modelo de datos de respuesta

registros
Campo Descripción
recordId El ID de registro se transfiere desde Kinesis Data Analytics hacia Lambda durante la invocación. El registro transformado debe contener el mismo ID de registro. Cualquier discrepancia entre el ID del registro original y el ID del registro transformado se trata como un error de procesamiento previo de datos.
result Es el estado de la transformación de los datos del registro. Los valores posibles son:
  • Ok: el registro se ha transformado correctamente. Kinesis Data Analytics ingiere el registro para el procesamiento de SQL.

  • Dropped: el registro lo ha descartado de forma intencionada la lógica de procesamiento. Kinesis Data Analytics recibe el registro para el procesamiento de SQL. El campo de carga de datos es opcional para un registro Dropped.

  • ProcessingFailed: el registro no se ha podido transformar. Kinesis Data Analytics considera que lo ha procesado sin éxito la función de Lambda y escribe un error en la secuencia de errores. Para obtener más información acerca de la secuencia de errores, consulte Control de errores. El campo de carga de datos es opcional para un registro ProcessingFailed.

data Es la carga útil de datos transformados después codificarlos en base64. Cada carga de datos puede contener varios documentos JSON si el formato de datos de adquisición de la aplicación es JSON. O bien, cada una puede contener varias filas CSV (con un delimitador de filas especificado en cada fila) si el formato de datos de adquisición de la aplicación es CSV. El servicio de Kinesis Data Analytics analiza y procesa correctamente los datos con varios documentos JSON o filas CSV dentro de la misma carga de datos.

En el siguiente ejemplo, se muestra el resultado de una función Lambda:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

Errores comunes de procesamiento previo de datos

A continuación se indican los motivos habituales por los que un procesamiento previo puede generar un error.

  • No se devuelven al servicio de Kinesis Data Analytics todos los registros (con ID de registro) de un lote enviado a la función de Lambda.

  • En la respuesta falta el campo de ID de registro, estado o carga de datos. El campo de carga de datos es opcional para un registro Dropped o ProcessingFailed.

  • Los tiempos de espera de la función de Lambda no son suficientes para el procesamiento previo de los datos.

  • La respuesta de la función de Lambda supera los límites de respuesta impuestos por el servicio de AWS Lambda.

Para errores de procesamiento previo de datos, Kinesis Data Analytics sigue reintentando las invocaciones de Lambda en el mismo conjunto de registros hasta que tiene éxito. Puede monitorear las siguientes métricas de CloudWatch para obtener información sobre los errores.

  • MillisBehindLatest de aplicación de Kinesis Data Analytics: indica el retraso que lleva la aplicación al leer desde el origen de streaming.

  • Métricas de CloudWatch de InputPreprocessing de aplicación de Kinesis Data Analytics: Indica el número de éxitos y errores, entre otras estadísticas. Para obtener más información, consulte Amazon Kinesis Analytics Metrics.

  • Métricas y registros de CloudWatch de función ‭AWS Lambda.