Tras considerarlo detenidamente, hemos decidido interrumpir Amazon Kinesis Data Analytics SQL para aplicaciones en dos pasos:
1. A partir del 15 de octubre de 2025, no podrá crear nuevos Kinesis Data Analytics SQL para aplicaciones.
2. Eliminaremos sus aplicaciones a partir del 27 de enero de 2026. No podrá iniciar ni utilizar Amazon Kinesis Data Analytics SQL para aplicaciones. A partir de ese momento, el soporte para Amazon Kinesis Data Analytics dejará SQL de estar disponible. Para obtener más información, consulte Suspensión de Amazon Kinesis Data Analytics SQL for Applications.
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Procesamiento previo de registros con una función de Lambda
nota
Después del 12 de septiembre de 2023, no podrá crear nuevas aplicaciones con Kinesis Data Firehose como origen si aún no utiliza Kinesis Data Analytics para SQL. Para obtener más información, consulte Límites.
Si los datos de un flujo de datos requieren conversión de formato, transformación, enriquecimiento o filtrado, puede utilizar una función de AWS Lambda para su procesamiento previo. Puede hacerlo antes de que se ejecute el código SQL de la aplicación o antes de que la aplicación cree un esquema a partir deel flujo de datos.
El uso de una función de Lambda para el procesamiento previo de registros es útil en las siguientes situaciones:
-
Transformar registros a partir de otros formatos (como KPL o GZIP) en formatos que Kinesis Data Analytics puede analizar. Kinesis Data Analytics actualmente admite formatos de datos JSON o CSV.
-
Expandir datos en un formato que sea más asequible para operaciones como, por ejemplo, agregación o detección de anomalías. Por ejemplo, si se almacenan juntos en una cadena varios valores de datos, puede expandir los datos en columnas independientes.
-
El enriquecimiento de datos con otros servicios de Amazon, como la extrapolación o la corrección de errores.
-
Aplicación de transformación de cadenas complejas en campos de registros.
-
Filtrado de datos para limpieza de datos.
Uso de una función de Lambda para procesamiento previo de registros
Al crear su aplicación de Kinesis Data Analytics, habilita el procesamiento previo de en la página Conectar con una fuente.
Para utilizar una función de Lambda para procesamiento previo de registros en una aplicación de Kinesis Data Analytics
Inicie sesión en la AWS Management Console y abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/kinesisanalytics.
-
En la página Conectar a un origen de la aplicación, elija Habilitado en la sección Procesamiento previo de registros con AWS Lambda.
-
Para utilizar una función de Lambda que haya creado, elija la función en la lista desplegable Función de Lambda .
-
Para crear una función de Lambda nueva a partir de una de las plantillas de procesamiento previo de Lambda, elija la plantilla en la lista desplegable. A continuación, elija View <template name> in Lambda (Ver <nombre de la plantilla> en Lambda) para editar la función.
-
Para crear una nueva función de Lambda, elija Crear nueva. Para obtener información sobre la creación de una función de Lambda, consulte Creación de una función Lambda sencilla y exploración de la consola en la Guía para desarrolladores de AWS Lambda.
-
Elija la versión de la función de Lambda que desea utilizar. Para utilizar la versión más reciente, elija $LATEST.
Cuando se elige o se crea una función de Lambda para el procesamiento previo de los registros, estos se procesan antes de que se ejecute el código SQL de la aplicación o de que la aplicación genere un esquema a partir de ellos.
Permisos de procesamiento previo de Lambda
Para utilizar el procesamiento previo de Lambda, el rol de IAM de la aplicación requiere la política de permisos siguiente:
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }
Métricas de procesamiento previo de Lambda
Puede usar Amazon CloudWatch para monitorizar el número de invocaciones de Lambda, los bytes procesados, los éxitos y errores, etc. Para obtener información acerca de las métricas de CloudWatch que emite el procesamiento previo de Lambda por Kinesis Analytics Metrics, consulte Amazon Kinesis Analytics Metrics.
Uso de AWS Lambda con Kinesis Producer Library
Kinesis Producer Library (KPL) agrega pequeños registros formateados por el usuario en registros de mayor tamaño, hasta 1 MB, para utilizar mejor el rendimiento de Amazon Kinesis Data Streams. La Kinesis Client Library (KCL) para Java es compatible con la desagrupación de estos registros. Sin embargo, debe utilizar un módulo especial para desagrupar los registros cuando utilice AWS Lambda como consumidor de las secuencias.
Para obtener las instrucciones y el código de proyecto necesarios, consulte Kinesis Producer Library Deaggregation Modules for AWS Lambda
Modelo de datos de entrada de eventos/modelo de respuesta de registros para el procesamiento previo de datos
Para realizar el procesamiento previo de los registros, la función de Lambda debe respetar los modelos necesarios de datos de entrada de eventos y de respuesta de registros.
Modelo de datos de entrada de eventos
Kinesis Data Analytics lee continuamente los datos de su flujo de datos de Kinesis o de su flujo de entrega de Firehose. Para cada lote de registros que recupera, el servicio administra cómo se transfiere cada lote a su función de Lambda. Su función recibe una lista de registros como entrada. Dentro de su función, itera a través de la lista y aplica su lógica de negocio para llevar a cabo sus requisitos de procesamiento previo (tales como el enriquecimiento o la conversión de formato de datos).
El modelo de entrada a su función de preprocesamiento varía ligeramente, en función de si los datos se han recibido desde un flujo de datos de Kinesis o un flujo de entrega de Firehose.
Si el origen es un flujo de entrega de Firehose, el modelo de datos de entrada de eventos es el siguiente:
Modelo de datos de solicitud de Kinesis Data Firehose
Campo | Descripción | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
El ID de invocación de Lambda (GUID aleatorio). | ||||||||||||
applicationArn |
El nombre de recurso de Amazon (ARN) de la aplicación de Kinesis Data Analytics. | ||||||||||||
streamArn |
ARN de secuencia de entrega | ||||||||||||
registros
|
En el siguiente ejemplo, se muestra la entrada de una secuencia de entrega de Firehose:
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }
Si el origen es un flujo de datos de Kinesis, el modelo de datos de entrada de eventos es el siguiente:
Modelo de datos de solicitud de secuencias de Kinesis.
Campo | Descripción | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
El ID de invocación de Lambda (GUID aleatorio). | ||||||||||||||||||
applicationArn |
ARN de la aplicación de Kinesis Data Analytics | ||||||||||||||||||
streamArn |
ARN de secuencia de entrega | ||||||||||||||||||
registros
|
En el siguiente ejemplo, se muestra la entrada de un flujo de datos de Kinesis:
{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }
Modelo de respuesta de registros
Todos los registros que devuelve su función de procesamiento previo de Lambda (con ID de registro) que se envían a la función de Lambda se deben devolver. Deben contener los siguientes parámetros, de lo contrario, Kinesis Data Analytics los rechaza y los trata como error de procesamiento previo de datos. La parte de carga de datos del registro se puede transformar para cumplir los requisitos de procesamiento previo.
Modelo de datos de respuesta
registros
|
En el siguiente ejemplo, se muestra el resultado de una función Lambda:
{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }
Errores comunes de procesamiento previo de datos
A continuación se indican los motivos habituales por los que un procesamiento previo puede generar un error.
-
No se devuelven al servicio de Kinesis Data Analytics todos los registros (con ID de registro) de un lote enviado a la función de Lambda.
-
En la respuesta falta el campo de ID de registro, estado o carga de datos. El campo de carga de datos es opcional para un registro
Dropped
oProcessingFailed
. -
Los tiempos de espera de la función de Lambda no son suficientes para el procesamiento previo de los datos.
-
La respuesta de la función de Lambda supera los límites de respuesta impuestos por el servicio de AWS Lambda.
Para errores de procesamiento previo de datos, Kinesis Data Analytics sigue reintentando las invocaciones de Lambda en el mismo conjunto de registros hasta que tiene éxito. Puede monitorear las siguientes métricas de CloudWatch para obtener información sobre los errores.
-
MillisBehindLatest
de aplicación de Kinesis Data Analytics: indica el retraso que lleva la aplicación al leer desde el origen de streaming. -
Métricas de CloudWatch de
InputPreprocessing
de aplicación de Kinesis Data Analytics: Indica el número de éxitos y errores, entre otras estadísticas. Para obtener más información, consulte Amazon Kinesis Analytics Metrics. -
Métricas y registros de CloudWatch de función AWS Lambda.