

Tras considerarlo detenidamente, hemos decidido dejar de utilizar Amazon Kinesis Data Analytics para aplicaciones SQL:

1. A partir del **1 de septiembre de 2025,** no proporcionaremos ninguna corrección de errores para las aplicaciones de Amazon Kinesis Data Analytics for SQL porque tendremos un soporte limitado debido a la próxima discontinuación.

2. A partir del **15 de octubre de 2025,** no podrá crear nuevas aplicaciones de Kinesis Data Analytics for SQL.

3. Eliminaremos sus aplicaciones a partir del **27 de enero de 2026**. No podrá iniciar ni utilizar sus aplicaciones de Amazon Kinesis Data Analytics para SQL. A partir de ese momento, el servicio de soporte de Amazon Kinesis Data Analytics para SQL dejará de estar disponible. Para obtener más información, consulte [Retirada de las aplicaciones de Amazon Kinesis Data Analytics para SQL](discontinuation.md).

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Procesamiento previo de registros con una función de Lambda
<a name="lambda-preprocessing"></a>

**nota**  
Después del 12 de septiembre de 2023, no podrá crear nuevas aplicaciones con Kinesis Data Firehose como origen si aún no utiliza Kinesis Data Analytics para SQL. Para obtener más información, consulte [Límites](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

Si los datos de su transmisión necesitan conversión de formato, transformación, enriquecimiento o filtrado, puede preprocesar los datos mediante una función. AWS Lambda Puede hacerlo antes de que se ejecute el código SQL de la aplicación o antes de que la aplicación cree un esquema a partir deel flujo de datos. 

El uso de una función de Lambda para el procesamiento previo de registros es útil en las siguientes situaciones:
+ Transformar registros a partir de otros formatos (como KPL o GZIP) en formatos que Kinesis Data Analytics puede analizar. Kinesis Data Analytics actualmente admite formatos de datos JSON o CSV.
+ Expandir datos en un formato que sea más asequible para operaciones como, por ejemplo, agregación o detección de anomalías. Por ejemplo, si se almacenan juntos en una cadena varios valores de datos, puede expandir los datos en columnas independientes.
+ El enriquecimiento de datos con otros servicios de Amazon, como la extrapolación o la corrección de errores.
+ Aplicación de transformación de cadenas complejas en campos de registros.
+ Filtrado de datos para limpieza de datos.

## Uso de una función de Lambda para procesamiento previo de registros
<a name="lambda-preprocessing-use"></a>

Al crear su aplicación de Kinesis Data Analytics, habilita el procesamiento previo de en la página **Conectar con una fuente**.

**Para utilizar una función de Lambda para procesamiento previo de registros en una aplicación de Kinesis Data Analytics**

1. [Inicie sesión en la consola Managed Service for Apache Flink Consola de administración de AWS y ábrala en /kinesisanalytics. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. En la página **Conectar a un origen** de la aplicación, elija **Habilitado** en la sección **Procesamiento previo de registros con AWS Lambda**.

1. Para utilizar una función de Lambda que haya creado, elija la función en la lista desplegable **Función de Lambda** .

1. Para crear una función de Lambda nueva a partir de una de las plantillas de procesamiento previo de Lambda, elija la plantilla en la lista desplegable. A continuación, elija **View <template name> in Lambda (Ver <nombre de la plantilla> en Lambda)** para editar la función.

1. Para crear una nueva función de Lambda, elija **Crear nueva**. *Para obtener información sobre la creación de una función Lambda, consulte [Creación de una función HelloWorld Lambda y Explore la consola en la Guía para](https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html) desarrolladores.AWS Lambda *

1. Elija la versión de la función de Lambda que desea utilizar. Para utilizar la versión más reciente, elija **\$1LATEST**.

Cuando se elige o se crea una función de Lambda para el procesamiento previo de los registros, estos se procesan antes de que se ejecute el código SQL de la aplicación o de que la aplicación genere un esquema a partir de ellos.

## Permisos de procesamiento previo de Lambda
<a name="lambda-preprocessing-policy"></a>

Para utilizar el procesamiento previo de Lambda, el rol de IAM de la aplicación requiere la política de permisos siguiente:

```
     {
       "Sid": "UseLambdaFunction",
       "Effect": "Allow",
       "Action": [
           "lambda:InvokeFunction",
           "lambda:GetFunctionConfiguration"
       ],
       "Resource": "<FunctionARN>"
   }
```

## Métricas de procesamiento previo de Lambda
<a name="lambda-preprocessing-metrics"></a>

Puede utilizar Amazon CloudWatch para supervisar el número de invocaciones a Lambda, los bytes procesados, los éxitos y los errores, etc. [Para obtener información sobre CloudWatch las métricas emitidas por el preprocesamiento de Kinesis Data Analytics Lambda, consulte Amazon Kinesis Analytics Metrics.](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)

## Uso AWS Lambda con la biblioteca de Kinesis Producer
<a name="lambda-preprocessing-deaggregation"></a>

[Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) (KPL) agrega pequeños registros formateados por el usuario en registros de mayor tamaño, hasta 1 MB, para utilizar mejor el rendimiento de Amazon Kinesis Data Streams. La Kinesis Client Library (KCL) para Java es compatible con la desagrupación de estos registros. Sin embargo, debe utilizar un módulo especial para desagregar los registros cuando los utilice AWS Lambda como consumidor de sus transmisiones. 

Para obtener el código y las instrucciones del proyecto necesarios, consulte los [módulos de desagregación de la biblioteca de productores de Kinesis](https://github.com/awslabs/kinesis-deaggregation) para obtener más información. AWS Lambda GitHub Puede usar los componentes de este proyecto para procesar datos serializados de KPL AWS Lambda en Java, Node.js y Python. Estos componentes también se pueden utilizar como parte de una [aplicación KCL multilenguaje](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java).

## Modelo de respuesta de datos de entrada de eventos de preprocesamiento de datos Model/Record
<a name="lambda-preprocessing-data-model"></a>

Para realizar el procesamiento previo de los registros, la función de Lambda debe respetar los modelos necesarios de datos de entrada de eventos y de respuesta de registros. 

### Modelo de datos de entrada de eventos
<a name="lambda-preprocessing-request-model"></a>

Kinesis Data Analytics lee continuamente los datos de su flujo de datos de Kinesis o de su flujo de entrega de Firehose. Para cada lote de registros que recupera, el servicio administra cómo se transfiere cada lote a su función de Lambda. Su función recibe una lista de registros como entrada. Dentro de su función, itera a través de la lista y aplica su lógica de negocio para llevar a cabo sus requisitos de procesamiento previo (tales como el enriquecimiento o la conversión de formato de datos). 

El modelo de entrada a su función de preprocesamiento varía ligeramente, en función de si los datos se han recibido desde un flujo de datos de Kinesis o un flujo de entrega de Firehose. 

Si el origen es un flujo de entrega de Firehose, el modelo de datos de entrada de eventos es el siguiente:

**Modelo de datos de solicitud de Kinesis Data Firehose**


| Campo | Description (Descripción) | 
| --- | --- | 
| Campo | Description (Descripción) | 
| --- | --- | 
| Campo | Description (Descripción) | 
| --- | --- | 
| invocationId | El ID de invocación de Lambda (GUID aleatorio). | 
| applicationArn | El nombre de recurso de Amazon (ARN) de la aplicación de Kinesis Data Analytics. | 
| streamArn | ARN de secuencia de entrega | 
| registros [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | ID de registro (GUID aleatorio) | 
| kinesisFirehoseRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Carga útil de registro de origen codificada en Base64 | 
| approximateArrivalTimestamp | Hora de llegada aproximada del registro de secuencia de entrega | 

En el siguiente ejemplo, se muestra la entrada de una secuencia de entrega de Firehose:

```
{
   "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
   "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
   "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test",
   "records":[
      {
         "recordId":"49572672223665514422805246926656954630972486059535892482",
         "data":"aGVsbG8gd29ybGQ=",
         "kinesisFirehoseRecordMetadata":{
            "approximateArrivalTimestamp":1520280173
         }
      }
   ]
}
```

Si el origen es un flujo de datos de Kinesis, el modelo de datos de entrada de eventos es el siguiente:

**Modelo de datos de solicitud de secuencias de Kinesis**.


| Campo | Description (Descripción) | 
| --- | --- | 
| Campo | Description (Descripción) | 
| --- | --- | 
| Campo | Description (Descripción) | 
| --- | --- | 
| invocationId | El ID de invocación de Lambda (GUID aleatorio). | 
| applicationArn | ARN de la aplicación de Kinesis Data Analytics | 
| streamArn | ARN de secuencia de entrega | 
| registros [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | ID de registro en función del número de secuencia de registro de Kinesis | 
| kinesisStreamRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Carga útil de registro de origen codificada en Base64 | 
| sequenceNumber | Número de secuencia del registro de secuencia de Kinesis | 
| partitionKey | Clave de partición del registro de secuencia de Kinesis | 
| shardId | ShardId del registro de secuencia de Kinesis | 
| approximateArrivalTimestamp | Hora de llegada aproximada del registro de secuencia de entrega | 

En el siguiente ejemplo, se muestra la entrada de un flujo de datos de Kinesis:

```
{
  "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
  "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
  "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test",
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "data": "aGVsbG8gd29ybGQ=",
      "kinesisStreamRecordMetadata":{
            "shardId" :"shardId-000000000003",
            "partitionKey":"7400791606",
            "sequenceNumber":"49572672223665514422805246926656954630972486059535892482",
            "approximateArrivalTimestamp":1520280173
         }
    }
  ]
}
```

### Modelo de respuesta de registros
<a name="lambda-preprocessing-response-model"></a>

Deben devolverse todos los registros devueltos por la función de preprocesamiento de Lambda (con registro IDs) que se envíen a la función de Lambda. Deben contener los siguientes parámetros, de lo contrario, Kinesis Data Analytics los rechaza y los trata como error de procesamiento previo de datos. La parte de carga de datos del registro se puede transformar para cumplir los requisitos de procesamiento previo.

**Modelo de datos de respuesta**


| Campo | Description (Descripción) | 
| --- | --- | 
| registros [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | El ID de registro se transfiere desde Kinesis Data Analytics hacia Lambda durante la invocación. El registro transformado debe contener el mismo ID de registro. Cualquier discrepancia entre el ID del registro original y el ID del registro transformado se trata como un error de procesamiento previo de datos. | 
| result | Es el estado de la transformación de los datos del registro. Los valores posibles son: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Es la carga útil de datos transformados después codificarlos en base64. Cada carga de datos puede contener varios documentos JSON si el formato de datos de adquisición de la aplicación es JSON. O bien, cada una puede contener varias filas CSV (con un delimitador de filas especificado en cada fila) si el formato de datos de adquisición de la aplicación es CSV. El servicio de Kinesis Data Analytics analiza y procesa correctamente los datos con varios documentos JSON o filas CSV dentro de la misma carga de datos.  | 

En el siguiente ejemplo, se muestra el resultado de una función Lambda:

```
{
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "result": "Ok",
      "data": "SEVMTE8gV09STEQ="
    }
  ]
}
```

## Errores comunes de procesamiento previo de datos
<a name="lambda-preprocessing-failures"></a>

A continuación se indican los motivos habituales por los que un procesamiento previo puede generar un error.
+ No todos los registros (con registro IDs) de un lote que se envían a la función Lambda se devuelven al servicio Kinesis Data Analytics. 
+ En la respuesta falta el campo de ID de registro, estado o carga de datos. El campo de carga de datos es opcional para un registro `Dropped` o `ProcessingFailed`.
+ Los tiempos de espera de la función de Lambda no son suficientes para el procesamiento previo de los datos.
+ La respuesta de la función de Lambda supera los límites de respuesta impuestos por el servicio de AWS Lambda .

Para errores de procesamiento previo de datos, Kinesis Data Analytics sigue reintentando las invocaciones de Lambda en el mismo conjunto de registros hasta que tiene éxito. Puede supervisar las siguientes CloudWatch métricas para obtener información sobre los errores.
+ `MillisBehindLatest` de aplicación de Kinesis Data Analytics: indica el retraso que lleva la aplicación al leer desde el origen de streaming. 
+ Métricas de la `InputPreprocessing` CloudWatch aplicación Kinesis Data Analytics: indican el número de éxitos y fracasos, entre otras estadísticas. Para obtener más información, consulte [Amazon Kinesis Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html).
+ AWS Lambda CloudWatch métricas y registros de funciones.