

# Uso de Lambda para procesar los registros de Amazon Kinesis Data Streams
<a name="with-kinesis"></a>

Puede utilizar una función de Lambda para procesar los registros de un [flujo de datos de Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/introduction.html). Puede asignar una función de Lambda a un consumidor de rendimiento compartido (iterador estándar) de Kinesis Data Streams o a un consumidor de rendimiento dedicado con [distribución ramificada mejorada](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html). Para iteradores estándar, Lambda sondea cada partición de la secuencia de Kinesis en busca de registros utilizando el protocolo HTTP. El mapeo de origen de eventos comparte el rendimiento de lectura con otros consumidores de la partición.

 Para obtener información detallada sobre los flujos de datos de Kinesis, consulte [Lectura de datos de Amazon Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html).

**nota**  
Kinesis cobra por cada partición y, para una distribución ramificada mejorada, por los datos leídos desde el flujo. Para obtener más información sobre precios, consulte [Precios de Amazon Kinesis](https://aws.amazon.com/kinesis/data-streams/pricing).

## Flujos de sondeo y procesamiento por lotes
<a name="kinesis-polling-and-batching"></a>

Lambda lee los registros del flujo de datos e invoca la función [sincrónicamente](invocation-sync.md) con un evento que contiene registros de flujo. Lambda lee los registros por lotes e invoca la función para procesar los registros del lote. Cada lote contiene registros de una única partición o flujo de datos.

Su función de Lambda es una aplicación consumidora para su flujo de datos. Procesa un lote de registros a la vez desde cada partición. Puede asignar una función Lambda a un consumidor de rendimiento compartido (iterador estándar) o a un consumidor de rendimiento dedicado con distribución ramificada mejorada.
+ **Iterador estándar:** Lambda sondea cada partición del flujo de Kinesis y busca registros a una velocidad base de una vez por segundo. Cuando hay más registros disponibles, Lambda sigue procesando lotes hasta que la función se pone al día con el flujo. El mapeo de origen de eventos comparte el rendimiento de lectura con otros consumidores de la partición.
+ **Distribución ramificada mejorada:** para minimizar la latencia y maximizar el rendimiento de lectura, cree un consumidor de flujo de datos con [distribución ramificada mejorada](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html). Los consumidores con distribución ramificada mejorada obtienen una conexión dedicada a cada partición que no afecta a las demás aplicaciones que leen el flujo. Los consumidores de flujos utilizan HTTP/2 para reducir la latencia enviando los registros a Lambda a través de una conexión de larga duración y mediante la compresión de los encabezados de las solicitudes. Es posible crear un consumidor de flujos con la API [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) de Kinesis.

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

Debería ver los siguientes datos de salida:

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

Para incrementar la velocidad con la que la función procesa los registros, [agregue particiones al flujo de datos](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards). Lambda procesa registros en cada partición en orden. Deja de procesar registros adicionales en una partición si la función devuelve un error. Al haber más particiones, se procesan más lotes simultáneamente, lo que reduce el impacto de los errores de simultaneidad.

Si la función no puede aumentar para administrar el número total de lotes simultáneos, [solicite un aumento de cuota](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html) o [reserve la simultaneidad](configuration-concurrency.md) para la función.

De forma predeterminada, Lambda invoca su función tan pronto como los registros estén disponibles. Si el lote que Lambda lee del origen de eventos solo tiene un registro, Lambda envía solo un registro a la función. Para evitar invocar la función con un número de registros pequeño, puede indicar al origen de eventos que almacene en búfer registros durante hasta 5 minutos configurando un *plazo de procesamiento por lotes*. Antes de invocar la función, Lambda continúa leyendo los registros del origen de eventos hasta que haya recopilado un lote completo, venza el plazo de procesamiento por lotes o el lote alcance el límite de carga de 6 MB. Para obtener más información, consulte [Comportamiento de procesamiento por lotes](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

**aviso**  
Las asignaciones de orígenes de eventos de Lambda procesan cada evento al menos una vez, y puede producirse un procesamiento duplicado de registros. Para evitar posibles problemas relacionados con la duplicación de eventos, le recomendamos encarecidamente que haga que el código de la función sea idempotente. Para obtener más información, consulte [¿Cómo puedo hacer que mi función de Lambda sea idempotente?](https://repost.aws/knowledge-center/lambda-function-idempotent) en el Centro de conocimientos de AWS.

Lambda no espera a que se completen las [extensiones](lambda-extensions.md) configuradas antes de enviar el siguiente lote para su procesamiento. En otras palabras, las extensiones pueden seguir ejecutándose mientras Lambda procesa el siguiente lote de registros. Esto puede provocar problemas de limitación si infringe alguno de los ajustes o límites de [simultaneidad](lambda-concurrency.md) de la cuenta. Para detectar si se trata de un posible problema, supervise sus funciones y compruebe si ve [métricas de simultaneidad](monitoring-concurrency.md#general-concurrency-metrics) más elevadas de lo esperado para la asignación de orígenes de eventos. Debido a los tiempos cortos entre invocaciones, Lambda puede informar brevemente un uso de simultaneidad superior al número de particiones. Esto puede ser cierto incluso para las funciones de Lambda sin extensiones.

Defina la configuración [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) para procesar una partición de un flujo de datos de Kinesis con más de una invocación de Lambda simultáneamente. Puede especificar el número de lotes simultáneos que Lambda sondea desde una partición a través de un factor de paralelización de 1 (predeterminado) a 10. Por ejemplo, cuando establece `ParallelizationFactor` en 2, puede tener un máximo de 200 invocaciones de Lambda simultáneas para procesar 100 particiones de datos de Kinesis (aunque, en la práctica, es posible que observe diferentes valores para la métrica `ConcurrentExecutions`). Esto ayuda a escalar verticalmente el rendimiento de procesamiento cuando el volumen de datos es volátil y el `IteratorAge` es alto. Cuando aumenta el número de lotes simultáneos por partición, Lambda sigue garantizando el procesamiento en orden a nivel de clave de partición.

También puede utilizar `ParallelizationFactor` con la agregación de Kinesis. El comportamiento de la asignación de orígenes de eventos depende de si utiliza la [distribución ramificada mejorada](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html):
+ **Sin distribución ramificada mejorada**: todos los eventos incluidos en un evento agregado deben tener la misma clave de partición. La clave de partición también debe coincidir con la del evento agregado. Si los eventos incluidos en el evento agregado tienen claves de partición diferentes, Lambda no puede garantizar el procesamiento de los eventos ordenados por clave de partición.
+ **Con distribución ramificada mejorada**: en primer lugar, Lambda decodifica el evento agregado en sus eventos individuales. El evento agregado puede tener una clave de partición diferente a la de los eventos que contiene. Sin embargo, los eventos que no se corresponden con la clave de partición [se eliminan y se pierden](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md). Lambda no procesa estos eventos ni los envía a un destino de error configurado.

## Evento de ejemplo
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```

# Procesamiento de registros de Amazon Kinesis Data Streams
<a name="services-kinesis-create"></a>

Para procesar los registros de Amazon Kinesis Data Streams con Lambda, cree una asignación de orígenes de eventos de Lambda. Puede asignar una función de Lambda a un iterador estándar o a un consumidor de distribución ramificada mejorada. Para obtener más información, consulte [Flujos de sondeo y procesamiento por lotes](with-kinesis.md#kinesis-polling-and-batching).

## Crear la asignación de orígenes de eventos de Kinesis
<a name="services-kinesis-eventsourcemapping"></a>

Para invocar la función de Lambda con registros del flujo de datos, cree una [asignación de orígenes de eventos](invocation-eventsourcemapping.md). Puede crear varias asignaciones de orígenes de eventos para procesar los mismos datos con distintas funciones de Lambda o para procesar elementos de varios flujos de datos con una sola función. Al procesar elementos de múltiples flujos de datos, cada lote solo contendrá registros de una única partición o flujo.

Puede configurar las asignaciones de orígenes de eventos para procesar los registros de un flujo en una Cuenta de AWS diferente. Para obtener más información, consulte [Creación de asignaciones de orígenes de eventos entre cuentas](#services-kinesis-eventsourcemapping-cross-account).

Antes de crear una asignación de orígenes de eventos, debe dar permiso a la función de Lambda para leer desde un flujo de datos de Kinesis. Lambda necesita los siguientes permisos para administrar los recursos relacionados con el flujo de datos de Kinesis:
+ [kinesis:DescribeStream](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStream.html)
+ [kinesis:DescribeStreamSummary](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStreamSummary.html)
+ [kinesis:GetRecords](https://docs.aws.amazon.com/lambda/latest/api/API_GetRecords.html)
+ [kinesis:GetShardIterator](https://docs.aws.amazon.com/lambda/latest/api/API_GetShardIterator.html)
+ [kinesis:ListShards](https://docs.aws.amazon.com/lambda/latest/api/API_ListShards.html)
+ [kinesis:SubscribeToShard](https://docs.aws.amazon.com/lambda/latest/api/API_SubscribeToShard.html)

La política administrada de AWS [AWSLambdaKinesisExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaKinesisExecutionRole.html) incluye estos permisos. Agregue esta política administrada a la función, tal como se describe en el siguiente procedimiento.

**nota**  
No necesita el permiso `kinesis:ListStreams` para crear y administrar las asignaciones de orígenes de eventos para Kinesis. Sin embargo, si crea una asignación de orígenes de eventos en la consola y no tiene este permiso, no podrá seleccionar una transmisión de Kinesis de la lista desplegable, y la consola mostrará un error. Para crear la asignación de orígenes de eventos, tendrá que introducir manualmente el Nombre de recurso de Amazon (ARN) de la transmisión.
Lambda realiza llamadas a las API `kinesis:GetRecords` y `kinesis:GetShardIterator` cuando vuelve a intentar las invocaciones fallidas.

------
#### [ Consola de administración de AWS ]

**Cómo añadir permisos de Kinesis a la función**

1. Abra la [página Funciones](https://console.aws.amazon.com/lambda/home#/functions) de la consola de Lambda y seleccione su función.

1. En la pestaña **Configuración**, elija **Permisos**.

1. En el panel de **Roles de ejecución**, en **Nombre del rol**, elija el enlace al rol de ejecución de la función. Este enlace abre la página para ese rol en la consola de IAM.

1. En el panel **Políticas de permisos**, elija **Agregar permisos** y, a continuación, elija **Adjuntar políticas**.

1. En el campo de búsqueda, escriba **AWSLambdaKinesisExecutionRole**.

1. Seleccione la casilla situada junto a la política y elija **Añadir permisos**.

------
#### [ AWS CLI ]

**Cómo añadir permisos de Kinesis a la función**
+ Ejecute el siguiente comando de la CLI para adjuntar la política de `AWSLambdaKinesisExecutionRole` al rol de ejecución de la función:

  ```
  aws iam attach-role-policy \
  --role-name MyFunctionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
  ```

------
#### [ AWS SAM ]

**Cómo añadir permisos de Kinesis a la función**
+ En la definición de la función, agregue la propiedad `Policies`, tal como se muestra en el siguiente ejemplo:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
  ```

------

Una vez que haya configurado los permisos necesarios, cree la asignación de orígenes de eventos.

------
#### [ Consola de administración de AWS ]

**Cómo crear la asignación de orígenes de eventos para Kinesis**

1. Abra la [página Funciones](https://console.aws.amazon.com/lambda/home#/functions) de la consola de Lambda y seleccione su función.

1. En el panel **Información general de la función**, elija **Agregar desencadenador**.

1. En **Configuración del desencadenador**, para el origen, seleccione **Kinesis**.

1. Seleccione el flujo de Kinesis para el que quiere crear la asignación de orígenes de eventos y, si lo desea, un consumidor del flujo.

1. (Opcional) Edite el **tamaño del lote**, la **posición inicial** y la **ventana del lote** para la asignación de orígenes de eventos.

1. Elija **Agregar**.

Al crear la asignación de orígenes de eventos desde la consola, el rol de IAM debe tener los permisos [kinesis:ListStreams](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreams.html) y [kinesis:ListStreamConsumers](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreamConsumers.html).

------
#### [ AWS CLI ]

**Cómo crear la asignación de orígenes de eventos de Kinesis**
+ Ejecute el siguiente comando de la CLI para crear una asignación de orígenes de eventos de Kinesis. Elija su propio tamaño de lote y posición inicial según su caso de uso.

  ```
  aws lambda create-event-source-mapping \
  --function-name MyFunction \
  --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
  --starting-position LATEST \
  --batch-size 100
  ```

Para especificar una ventana de procesamiento por lotes, agregue la opción `--maximum-batching-window-in-seconds`. Para obtener más información sobre el uso de este u otros parámetros, consulte [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) en la *Referencia de comandos de la AWS CLI*.

------
#### [ AWS SAM ]

**Cómo crear la asignación de orígenes de eventos de Kinesis**
+ En la definición de la función, agregue la propiedad `KinesisEvent`, tal como se muestra en el siguiente ejemplo:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
        Events:
          KinesisEvent:
            Type: Kinesis
            Properties:
              Stream: !GetAtt MyKinesisStream.Arn
              StartingPosition: LATEST
              BatchSize: 100
  
    MyKinesisStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
  ```

Para obtener más información sobre cómo crear una asignación de orígenes de eventos para Kinesis Data Streams en AWS SAM, consulte [Kinesis](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-kinesis.html) en la *Guía para desarrolladores de AWS Serverless Application Model*.

------

## Posición inicial de flujos y sondeo
<a name="services-kinesis-stream-start-pos"></a>

Tenga en cuenta que el sondeo de flujos durante la creación y las actualizaciones de la asignación de orígenes de eventos es, en última instancia, coherente.
+ Durante la creación de la asignación de orígenes de eventos, es posible que se demore varios minutos en iniciar el sondeo de los eventos del flujo.
+ Durante las actualizaciones de la asignación de orígenes de eventos, es posible que se demore varios minutos en detener y reiniciar el sondeo de los eventos del flujo.

Este comportamiento significa que, si especifica `LATEST` como posición inicial del flujo, la asignación de orígenes de eventos podría omitir eventos durante la creación o las actualizaciones. Para garantizar que no se pierda ningún evento, especifique la posición inicial del flujo como `TRIM_HORIZON` o `AT_TIMESTAMP`.

## Creación de asignaciones de orígenes de eventos entre cuentas
<a name="services-kinesis-eventsourcemapping-cross-account"></a>

Amazon Kinesis Data Streams admite [políticas basadas en recursos](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html). En consecuencia, puede procesar los datos ingeridos en un flujo en una Cuenta de AWS con una función de Lambda en otra cuenta.

Para crear una asignación de orígenes de eventos para la función de Lambda mediante un flujo de Kinesis en otra Cuenta de AWS, debe configurar el flujo mediante una política basada en recursos para conceder a la función de Lambda permiso para leer elementos. Para obtener información sobre cómo configurar la transmisión para permitir el acceso entre cuentas, consulte [Acceso compartido con funciones de AWS Lambda entre cuentas](https://docs.aws.amazon.com/streams/latest/dev/resource-based-policy-examples.html#Resource-based-policy-examples-lambda) en la *Guía para desarrolladores de Amazon Kinesis Streams*.

Una vez que haya configurado la transmisión con una política basada en recursos que otorgue a la función de Lambda los permisos necesarios, cree la asignación de orígenes de eventos mediante cualquiera de los métodos descritos en la sección anterior.

Si decide crear la asignación de orígenes de eventos mediante la consola Lambda, pegue el ARN de la transmisión directamente en el campo de entrada. Si quiere especificar un consumidor para la transmisión, al pegar el ARN del consumidor se rellena en forma automática el campo de transmisión.

# Configuración de la respuesta por lotes parcial con Kinesis Data Streams y Lambda
<a name="services-kinesis-batchfailurereporting"></a>

Al consumir y procesar datos de streaming desde un origen de eventos, de forma predeterminada, Lambda comprueba hasta el número de secuencia más alto de un lote solo cuando el lote se ha completado con éxito. Lambda trata todos los demás resultados como un error completo y vuelve a intentar procesar el lote hasta el límite de reintentos. Para permitir éxitos parciales al procesar lotes de una secuencia, active `ReportBatchItemFailures`. Permitir éxitos parciales puede ayudar a reducir el número de reintentos en un registro, aunque no impide por completo la posibilidad de reintentos en un registro exitoso.

Para activar `ReportBatchItemFailures`, incluya el valor enumerado **ReportBatchItemFailures** en la lista [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes). Esta lista indica qué tipos de respuesta están habilitados para su función. Puede configurar esta lista al [crear](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) o [actualizar](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) una asignación de orígenes de eventos.

**nota**  
Incluso cuando el código de función devuelva respuestas de fallos parciales por lotes, Lambda no procesará estas respuestas a menos que la característica `ReportBatchItemFailures` esté activada explícitamente para la asignación de orígenes de eventos.

## Sintaxis del informe
<a name="streams-batchfailurereporting-syntax"></a>

Al configurar los informes sobre errores de elementos por lotes, la clase `StreamsEventResponse` se devuelve con una lista de errores de elementos de lote. Puede utilizar un objeto `StreamsEventResponse` para devolver el número de secuencia del primer registro fallido del lote. También puede crear su propia clase personalizada usando la sintaxis de respuesta correcta. La siguiente estructura JSON muestra la sintaxis de respuesta requerida:

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**nota**  
Si la matriz `batchItemFailures` contiene varios elementos, Lambda usa el registro con el número de secuencia más bajo como punto de control. Luego Lambda vuelve a probar todos los registros a partir de ese punto de control.

## Condiciones de éxito y fracaso
<a name="streams-batchfailurereporting-conditions"></a>

Lambda trata un lote como un éxito completo si devuelve cualquiera de los siguientes elementos:
+ Una lista `batchItemFailure` vacía
+ Una lista `batchItemFailure` nula
+ Una `EventResponse` vacía
+ Un nulo `EventResponse`

Lambda trata un lote como un error completo si devuelve cualquiera de los siguientes elementos:
+ Una cadena `itemIdentifier` vacía
+ Una nula `itemIdentifier`
+ Un `itemIdentifier` con un mal nombre de clave

Lambda reintentos fallidos basados en su estrategia de reintento.

## Bisecar un lote
<a name="streams-batchfailurereporting-bisect"></a>

Si su invocación falla y `BisectBatchOnFunctionError` está activada, el lote se divide en bisectos independientemente de su configuración `ReportBatchItemFailures`.

Cuando se recibe una respuesta de éxito parcial de lote y se activan tanto `BisectBatchOnFunctionError` como `ReportBatchItemFailures`, el lote se divide en el número de secuencia devuelto y Lambda vuelve a intentar solo los registros restantes.

Para simplificar la implementación de la lógica de respuesta parcial por lotes, considere la posibilidad de utilizar la [utilidad Batch Processor](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) de PowertoolsAWS Lambda, que gestiona automáticamente estas complejidades para usted.

Estos son algunos ejemplos de código de función que devuelven la lista de IDs de mensajes fallidos del lote:

------
#### [ .NET ]

**SDK para .NET**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegration;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return new StreamsEventResponse();
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                return new StreamsEventResponse
                {
                    BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
                    {
                        new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
                    }
                };
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
        return new StreamsEventResponse();
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}

public class StreamsEventResponse
{
    [JsonPropertyName("batchItemFailures")]
    public IList<BatchItemFailure> BatchItemFailures { get; set; }
    public class BatchItemFailure
    {
        [JsonPropertyName("itemIdentifier")]
        public string ItemIdentifier { get; set; }
    }
}
```

------
#### [ Go ]

**SDK para Go V2**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
	batchItemFailures := []map[string]interface{}{}

	for _, record := range kinesisEvent.Records {
		curRecordSequenceNumber := ""

		// Process your record
		if /* Your record processing condition here */ {
			curRecordSequenceNumber = record.Kinesis.SequenceNumber
		}

		// Add a condition to check if the record processing failed
		if curRecordSequenceNumber != "" {
			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
		}
	}

	kinesisBatchResponse := map[string]interface{}{
		"batchItemFailures": batchItemFailures,
	}
	return kinesisBatchResponse, nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK para Java 2.x**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
            try {
                //Process your record
                KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
                curRecordSequenceNumber = kinesisRecord.getSequenceNumber();

            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse(batchItemFailures);   
    }
}
```

------
#### [ JavaScript ]

**SDK para JavaScript (v3)**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante Javascript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante TypeScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
  KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<KinesisStreamBatchResponse> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  logger.info(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK para PHP**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $kinesisEvent = new KinesisEvent($event);
        $this->logger->info("Processing records");
        $records = $kinesisEvent->getRecords();

        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK para Python (Boto3)**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK para Ruby**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  batch_item_failures = []

  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue StandardError => err
      puts "An error occurred #{err}"
      # Since we are working with streams, we can return the failed item immediately.
      # Lambda will immediately begin to retry processing from this failed item onwards.
      return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
    end
  end

  puts "Successfully processed #{event['Records'].length} records."
  { batchItemFailures: batch_item_failures }
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('utf-8')
  # Placeholder for actual async work
  sleep(1)
  data
end
```

------
#### [ Rust ]

**SDK para Rust**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Notificación de los errores de los elementos del lote de Kinesis con Lambda mediante Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
    event::kinesis::KinesisEvent,
    kinesis::KinesisEventRecord,
    streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
    let mut response = KinesisEventResponse {
        batch_item_failures: vec![],
    };

    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in &event.payload.records {
        tracing::info!(
            "EventId: {}",
            record.event_id.as_deref().unwrap_or_default()
        );

        let record_processing_result = process_record(record);

        if record_processing_result.is_err() {
            response.batch_item_failures.push(KinesisBatchItemFailure {
                item_identifier: record.kinesis.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(response)
}

fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
    let record_data = std::str::from_utf8(record.kinesis.data.as_slice());

    if let Some(err) = record_data.err() {
        tracing::error!("Error: {}", err);
        return Err(Error::from(err));
    }

    let record_data = record_data.unwrap_or_default();

    // do something interesting with the data
    tracing::info!("Data: {}", record_data);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Uso de Powertools para el procesador por lotes de AWS Lambda
<a name="services-kinesis-batchfailurereporting-powertools"></a>

La utilidad de procesamiento por lotes de Powertools para AWS Lambda gestiona de manera automática la lógica de respuesta parcial de los lotes, lo que reduce la complejidad de implementar la notificación de fallas en los lotes. Estos son algunos ejemplos del uso del procesador por lotes:

**Python**  
Para ver ejemplos completos e instrucciones de configuración, consulte la [documentación del procesador por lotes](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/).
Procesamiento de registros de flujos de Kinesis Data Streams con el procesador por lotes AWS Lambda.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import KinesisEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
Para ver ejemplos completos e instrucciones de configuración, consulte la [documentación del procesador por lotes](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/).
Procesamiento de registros de flujos de Kinesis Data Streams con un procesador por lotes AWS Lambda.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { KinesisEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: KinesisEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
Para ver ejemplos completos e instrucciones de configuración, consulte la [documentación del procesador por lotes](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/).
Procesamiento de registros de flujos de Kinesis Data Streams con un procesador por lotes AWS Lambda.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class KinesisStreamBatchHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;

    public KinesisStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withKinesisBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
        return handler.processBatch(kinesisEvent, context);
    }

    private void processMessage(KinesisEvent.KinesisEventRecord kinesisEventRecord, Context context) {
        // Process the stream record
    }
}
```

**.NET**  
Para ver ejemplos completos e instrucciones de configuración, consulte la [documentación del procesador por lotes](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/).
Procesamiento de registros de flujos de Kinesis Data Streams con un procesador por lotes AWS Lambda.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class OrderEvent
{
    public string? OrderId { get; set; }
    public string? CustomerId { get; set; }
    public decimal Amount { get; set; }
    public DateTime OrderDate { get; set; }
}

internal class TypedKinesisRecordHandler : ITypedRecordHandler<OrderEvent> 
{
    public async Task<RecordHandlerResult> HandleAsync(OrderEvent orderEvent, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(orderEvent.OrderId)) 
        {
            throw new ArgumentException("Order ID is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
    {
        return TypedKinesisStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Conserve los registros de lotes descartados para unorigen de eventos de Kinesis Data Streams en Lambda
<a name="kinesis-on-failure-destination"></a>

La gestión de errores en las asignaciones de orígenes de eventos de Kinesis depende de si el error se produce antes de que se invoque la función o durante la invocación de la función:
+ **Antes de la invocación:** si una asignación de orígenes de eventos de Lambda no puede invocar la función debido a una limitación u otros problemas, lo vuelve a intentar hasta que los registros caduquen o superen la antigüedad máxima configurada en la asignación de orígenes de eventos ([MaximumRecordageInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)).
+ **Durante invocación:** [si se invoca la función pero devuelve un error, Lambda vuelve a intentarlo hasta que los registros caduquen, superen la antigüedad [máxima (MaximumRecordageInSeconds) o alcancen la cuota de reintento configurada (MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)).](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts) En el caso de errores de función, también puede configurar [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError), que divide un lote fallido en dos lotes más pequeños, aislando registros fallidos y evitando tiempos de espera. La división de lotes no consume la cuota de reintentos.

Si las medidas de administración de errores fallan, Lambda descarta los registros y continúa procesando lotes del flujo. Con la configuración predeterminada, esto significa que un registro incorrecto puede bloquear el procesamiento en la partición afectada durante un máximo de una semana. Para evitar esto, configure la asignación de orígenes de eventos de su función con un número razonable de reintentos y una antigüedad máxima de registro que se ajuste a su caso de uso.

## Configuración de destinos para invocaciones fallidas
<a name="kinesis-on-failure-destination-console"></a>

Para retener los registros de las invocaciones de asignación de orígenes de eventos fallidos, agregue un destino a la asignación de orígenes de eventos de su función. Cada registro enviado al destino es un documento JSON que contiene metadatos sobre la invocación fallida. Para los destinos de Amazon S3, Lambda envía también todo el registro de invocación junto con los metadatos. Puede configurar cualquier tema de Amazon SNS, cola de Amazon SQS, bucket de Amazon S3 o Kafka como destino.

Con los destinos de Amazon S3, puede utilizar la característica de [notificaciones de eventos de Amazon S3](https://docs.aws.amazon.com/) para recibir notificaciones cuando se carguen objetos en el bucket de S3 de destino. También puede configurar las notificaciones de eventos de S3 para que invoquen otra función de Lambda para realizar el procesamiento automatizado de los lotes fallidos.

Su rol de ejecución debe tener permisos para el destino:
+ **En el caso de un destino de SQS:** [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **En el caso de un destino de SNS:** [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **En el caso de un destino de S3:** [ s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) y [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Para un destino de Kafka:** [kafka-cluster:WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

Puede configurar un tema de Kafka como destino en caso de error para las asignaciones de orígenes de eventos de Kafka. Cuando Lambda no puede procesar los registros tras agotar los reintentos, o cuando los registros superan la antigüedad máxima, Lambda envía los registros fallidos al tema de Kafka especificado para su posterior procesamiento. Consulte [Uso de un tema de Kafka como destino en caso de error](kafka-on-failure-destination.md).

Si tiene activado el cifrado con su propia clave de KMS para un destino de S3, el rol de ejecución de la función también debe tener permiso para llamar a [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html). Si la clave de KMS y el destino del bucket de S3 están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:GenerateDataKey.

Para configurar un destino en caso de error mediante la consola, siga estos pasos:

1. Abra la [página de Funciones](https://console.aws.amazon.com/lambda/home#/functions) en la consola de Lambda.

1. Elija una función.

1. En **Descripción general de la función**, elija **Agregar destino**.

1. En **Origen**, elija **Invocación de asignación de orígenes de eventos**.

1. Para la **Asignación de orígenes de eventos**, elija un origen de eventos que esté configurado para esta función.

1. En **Condición**, seleccione **En caso de error**. Para las invocaciones de asignación de orígenes de eventos, esta es la única condición aceptada.

1. En **Tipo de destino**, elija el tipo de destino al que Lambda envía los registros de invocación.

1. En **Destino**, elija un recurso.

1. Seleccione **Save**.

También puede configurar un destino en caso de error mediante la AWS Command Line Interface (AWS CLI). Por ejemplo, el siguiente comando [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) agrega una asignación de orígenes de eventos con un destino de SQS en caso de error a `MyFunction`:

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

El siguiente comando [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) actualiza una asignación de orígenes de eventos para enviar registros de invocación fallida a un destino de SNS después de dos intentos de reintento, o si los registros tienen más de una hora de antigüedad.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

La configuración actualizada se aplica de forma asincrónica y no se refleja en la salida hasta que se completa el proceso. Utilice el comando [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) para ver el estado actual.

Para eliminar un destino, introduzca una cadena vacía como argumento del parámetro `destination-config`:

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Prácticas recomendadas de seguridad para destinos de Amazon S3
<a name="kinesis-s3-destination-security"></a>

Eliminar un bucket de S3 que está configurado como destino sin eliminar el destino de la configuración de la función puede suponer un riesgo de seguridad. Si otro usuario conoce el nombre del bucket de destino, puede volver a crear el bucket en su Cuenta de AWS. Los registros de las invocaciones fallidas se enviarán a su bucket, lo que podría exponer los datos de su función.

**aviso**  
Para asegurarse de que los registros de invocación de su función no se puedan enviar a un bucket de S3 de otra Cuenta de AWS, agregue una condición al rol de ejecución de la función que limite los permisos `s3:PutObject` a los buckets de su cuenta. 

En el siguiente ejemplo, se muestra una política de IAM que limita los permisos `s3:PutObject` de la función a los buckets de la cuenta. Esta política también otorga a Lambda el permiso `s3:ListBucket` que necesita para usar un bucket de S3 como destino.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

Para agregar una política de permisos al rol de ejecución de su función mediante la Consola de administración de AWS o la AWS CLI, consulte las instrucciones de los siguientes procedimientos:

------
#### [ Console ]

**Cómo agregar una política de permisos al rol de ejecución de la función (consola)**

1. Abra la página de [Funciones](https://console.aws.amazon.com/lambda/home#/functions) en la consola de Lambda.

1. Seleccione la función de Lambda cuyo rol de ejecución desee modificar.

1. En la pestaña **Configuración**, elija **Permisos**.

1. En la pestaña **Rol de ejecución**, seleccione el **nombre del rol** de la función para abrir la página de la consola de IAM del rol.

1. Agregue una política de permisos al rol de la siguiente manera:

   1. En el panel **Política de permisos**, elija **Agregar permisos** y seleccione **Crear política insertada**.

   1. En el **editor de políticas**, seleccione **JSON**.

   1. Pegue la política que desee agregar en el editor (sustituyendo el JSON existente) y, a continuación, seleccione **Siguiente**.

   1. En **Detalles de política**, ingrese un **Nombre de política**.

   1. Seleccione **Crear política**.

------
#### [ AWS CLI ]

**Cómo agregar una política de permisos al rol de ejecución de la función (CLI)**

1. Cree un documento de política de JSON con los permisos necesarios y guárdelo en un directorio local.

1. Utilice el comando de la CLI de IAM `put-role-policy` para agregar permisos al rol de ejecución de la función. Ejecute el siguiente comando desde el directorio en el que guardó el documento de política JSON y sustituya el nombre del rol, el nombre de la política y el documento de política por sus propios valores.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Ejemplo de registro de invocación de Amazon SNS y Amazon SQS
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

El siguiente ejemplo muestra lo que Lambda envía a un destino de tema de SNS o cola de SQS cuando se produce un error en la invocación de un origen de eventos de Kafka. Como Lambda envía solo los metadatos para estos tipos de destino, utilice los campos `streamArn`, `shardId`, `startSequenceNumber` y `endSequenceNumber` para obtener el registro original completo. Todos los campos que se muestran en la propiedad `KinesisBatchInfo` siempre estarán presentes.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    }
}
```

Puede utilizar esta información para recuperar los registros afectados del flujo para solucionar problemas. Los registros reales no están incluidos, por lo que debe procesar este registro y recuperarlos del flujo antes de que caduquen y se pierdan.

### Ejemplo de registro de invocación de Amazon S3
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

En el siguiente ejemplo, se muestra lo que Lambda envía a un bucket de Amazon S3 cuando se produce un error en la invocación de un origen de eventos de Kinesis. Además de todos los campos del ejemplo anterior para los destinos de SQS y SNS, el campo `payload` contiene el registro de invocación original en forma de cadena JSON de escape.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

El objeto de S3 que contiene el registro de invocación utiliza la siguiente convención de nomenclatura:

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Implementación del procesamiento con estado de Kinesis Data Streams en Lambda
<a name="services-kinesis-windows"></a>

Las funciones de Lambda pueden ejecutar aplicaciones de procesamiento de flujo continuo. Una secuencia representa datos ilimitados que fluyen de forma continua a través de su aplicación. Para analizar la información de esta entrada de actualización continua, puede enlazar los registros incluidos mediante una ventana definida en términos de tiempo.

Las ventanas de salto constante son ventanas de tiempo distintas que se abren y cierran a intervalos regulares. De forma predeterminada, las invocaciones de Lambda no tienen estado: no se pueden utilizar para procesar datos en múltiples invocaciones continuas sin una base de datos externa. Sin embargo, con las ventanas de salto constante, puede mantener su estado en todas las invocaciones. Este estado contiene el resultado agregado de los mensajes procesados previamente para la ventana actual. Su estado puede ser un máximo de 1 MB por partición. Si supera ese tamaño, Lambda finaliza la ventana antes de tiempo.

Cada registro de una secuencia pertenece a un periodo específico. Lambda procesará cada registro al menos una vez, pero no garantiza que cada registro se procese solo una vez. En casos excepcionales, como el manejo de errores, es posible que algunos registros se procesen más de una vez. Los registros siempre se procesan en orden la primera vez. Si los registros se procesan más de una vez, es posible que lo hagan de forma desordenada.

## Agregación y procesamiento
<a name="streams-tumbling-processing"></a>

Su función administrada por el usuario se invoca tanto para la agregación como para procesar los resultados finales de esa agregación. Lambda agrega todos los registros recibidos en la ventana. Puede recibir estos registros en varios lotes, cada uno como una invocación independiente. Cada invocación recibe un estado. Por lo tanto, al usar las ventanas de salto constante, su respuesta de la función de Lambda debe contener una propiedad de `state`. Si la respuesta no contiene una propiedad de `state`, Lambda considera que esto es una invocación fallida. Para satisfacer esta condición, la función puede devolver un objeto de `TimeWindowEventResponse`, que tiene la siguiente forma JSON:

**Example `TimeWindowEventResponse`Valores**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**nota**  
Para las funciones Java, se recomienda utilizar un `Map<String, String>` para representar el estado.

Al final de la ventana, el indicador `isFinalInvokeForWindow` está configurado en `true` para indicar que este es el estado final y que está listo para su procesamiento. Después del procesamiento, la ventana se completa y su invocación final se completa, y luego se elimina el estado.

Al final de la ventana, Lambda utiliza el procesamiento final para las acciones en los resultados de agregación. Su procesamiento final se invoca sincrónicamente. Después de la invocación exitosa, los puntos de control de la función, el número de secuencia y el procesamiento de flujo continúa. Si la invocación no tiene éxito, su función de Lambda suspende el procesamiento posterior hasta una invocación exitosa.

**Example KinesisTimeWindowEvent**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1607497475.000
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
        }
    ],
    "window": {
        "start": "2020-12-09T07:04:00Z",
        "end": "2020-12-09T07:06:00Z"
    },
    "state": {
        "1": 282,
        "2": 715
    },
    "shardId": "shardId-000000000006",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Configuración
<a name="streams-tumbling-config"></a>

Puede configurar ventanas de salto constante al crear o actualizar una asignación de orígenes de eventos. Para configurar una ventana de saltos de tamaño constante, especifique la ventana en segundos ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)). El siguiente comando de ejemplo AWS Command Line Interface (AWS CLI) crea una asignación de origen de eventos de streaming que tiene una ventana de salto constante de 120 segundos. Se nombra la función de Lambda definida para la agregación y el procesamiento se llama `tumbling-window-example-function`.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda determina los límites de la ventana de salto constante en función de la hora en que se insertaron los registros en la secuencia. Todos los registros tienen una marca de hora aproximada disponible que Lambda utiliza en las determinaciones de límites.

Las agregaciones de ventanas de saltos constantes no admiten el reendurecimiento. Cuando una partición termina, Lambda considera la ventana actual como cerrada y las particiones secundarias comienzan su propia ventana en un estado renovado. Cuando no se agrega ningún registro nuevo a la ventana actual, Lambda espera hasta 2 minutos antes de considerar que la ventana ha terminado. Esto ayuda a garantizar que la función lea todos los registros de la ventana actual, incluso si los registros se agregan de forma intermitente.

Ventanas de saltos constantes son totalmente compatibles con las directivas de reintento existentes `maxRetryAttempts` y `maxRecordAge`.

**Example Handler.py: agregación y procesamiento**  
La siguiente función de Python muestra cómo agregar y luego procesar su estado final:  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Parámetros de Lambda para las asignaciones de orígenes de eventos de Amazon Kinesis Data Streams
<a name="services-kinesis-parameters"></a>

Todos los tipos de asignación de orígenes de eventos de Lambda comparten las mismas operaciones [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) y [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) de la API. Sin embargo, solo algunos de los parámetros se aplican a Amazon Kinesis.


| Parámetro | Obligatoria | Predeterminado | Notas | 
| --- | --- | --- | --- | 
|  [BatchSize](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BatchSize)  |  N  |  100  |  Máximo: 10 000  | 
|  [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BisectBatchOnFunctionError)  |  N  |  false  |  Ninguno | 
|  [DestinationConfig](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-DestinationConfig)  |  N  | N/A |  Destino de tema de la cola de Amazon SQS o de Amazon SNS para registros descartados Para obtener más información, consulte [Configuración de destinos para invocaciones fallidas](kinesis-on-failure-destination.md#kinesis-on-failure-destination-console).  | 
|  [Enabled (Habilitado)](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-Enabled)  |  N  |  true  |  Ninguno | 
|  [EventSourceArn](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-EventSourceArn)  |  Y  | N/A |  ARN del flujo de datos o un consumidor de flujos  | 
|  [FunctionName](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionName)  |  Y  | N/A |  Ninguno | 
|  [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)  |  N  |  N/A |  Para permitir que la función informe de errores específicos de un lote, incluya el valor `ReportBatchItemFailures` en `FunctionResponseTypes`. Para obtener más información, consulte [Configuración de la respuesta por lotes parcial con Kinesis Data Streams y Lambda](services-kinesis-batchfailurereporting.md).  | 
|  [MaximumBatchingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumBatchingWindowInSeconds)  |  N  |  0  |  Ninguno | 
|  [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)  |  N  |  -1  |  -1 significa infinito: Lambda no descarta registros (se sigue aplicando la [configuración de retención de datos de Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/kinesis-extended-retention.html)). Mínimo: -1 Máximo: 604 800  | 
|  [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)  |  N  |  -1  |  -1 significa infinito: se vuelven a intentar los registros que han producido error hasta que caduque el registro Mínimo: -1 Máximo: 10 000  | 
|  [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)  |  N  |  1  |  Máximo: 10  | 
|  [StartingPosition](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPosition)  |  Y  |  N/A |  AT\$1TIMESTAMP, TRIM\$1HORIZON o LATEST  | 
|  [StartingPositionTimestamp](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPositionTimestamp)  |  N  |  N/A |  Sólo es válido si StartingPosition se establece en AT\$1TIMESTAMP. El tiempo a partir del cual comenzar la lectura, en segundos de tiempo Unix  | 
|  [TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)  |  N  |  N/A |  Mínimo: 0 Máximo: 900  | 

# Uso del filtrado de eventos con una fuente de eventos de Kinesis
<a name="with-kinesis-filtering"></a>

Puede utilizar el filtrado de eventos para controlar qué registros de un flujo o una cola envía Lambda a su función. Para obtener información general sobre cómo funciona el filtrado de eventos, consulte [Controle qué eventos envía Lambda a la función](invocation-eventfiltering.md).

Esta sección se centra en el filtrado de eventos para las fuentes de eventos de Kinesis.

**nota**  
Las asignaciones de orígenes de eventos de Kinesis solo admiten el filtrado en la clave `data`.

**Topics**
+ [Conceptos básicos del filtrado de eventos de Kinesis](#filtering-kinesis)
+ [Filtrado de registros agregados de Kinesis](#filtering-kinesis-efo)

## Conceptos básicos del filtrado de eventos de Kinesis
<a name="filtering-kinesis"></a>

Supongamos que un productor incluye datos con formato JSON en su flujo de datos de Kinesis. Un registro de ejemplo tendría el siguiente aspecto, con los datos JSON convertidos en una cadena codificada en Base64 en el campo `data`.

```
{
    "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=",
        "approximateArrivalTimestamp": 1545084650.987
        },
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
```

Siempre que los datos que el productor incluya en el flujo sean JSON válidos, puede usar el filtrado de eventos para filtrar registros mediante la clave `data`. Supongamos que un productor incluye registros en su flujo de Kinesis en el siguiente formato JSON.

```
{
    "record": 12345,
    "order": {
        "type": "buy",
        "stock": "ANYCO",
        "quantity": 1000
        }
}
```

Para filtrar solo los registros en los que el tipo de pedido sea “comprar”, el objeto `FilterCriteria` sería el siguiente.

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"
        }
    ]
}
```

Para mayor claridad, este es el valor del `Pattern` del filtro ampliado en JSON no cifrado. 

```
{
    "data": {
        "order": {
            "type": [ "buy" ]
            }
      }
}
```

Puede agregar el filtro mediante la consola, la AWS CLI o una plantilla de AWS SAM.

------
#### [ Console ]

Para agregar este filtro mediante la consola, siga las instrucciones que se indican en [Adjuntar criterios de filtro a una asignación de origen de eventos (consola)](invocation-eventfiltering.md#filtering-console) e ingrese la siguiente cadena para los **Criterios de filtro**.

```
{ "data" : { "order" : { "type" : [ "buy" ] } } }
```

------
#### [ AWS CLI ]

Para crear una nueva asignación de orígenes de eventos con estos criterios de filtro mediante la AWS Command Line Interface (AWS CLI), ejecute el siguiente comando.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

Para agregar estos criterios de filtro a una asignación de orígenes de eventos existente, ejecute el siguiente comando.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

------
#### [ AWS SAM ]

Para agregar este filtro mediante AWS SAM, agregue el siguiente fragmento a la plantilla YAML de su origen de eventos.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'
```

------

Para filtrar correctamente los eventos de orígenes de Kinesis, tanto el campo de datos como los criterios de filtro del campo de datos deben estar en un formato JSON válido. Si el formato JSON de alguno de los campos no es válido, Lambda elimina el mensaje o genera una excepción. En la siguiente tabla se resume el comportamiento específico: 


| Formato de los datos entrantes | Formato del patrón de filtro para las propiedades de datos | Acción resultante | 
| --- | --- | --- | 
|  JSON válido  |  JSON válido  |  Lambda filtra en función de los criterios de filtro.  | 
|  JSON válido  |  Sin patrón de filtro para las propiedades de datos  |  Lambda filtra (solo en las demás propiedades de metadatos) en función de los criterios de filtro.  | 
|  JSON válido  |  No JSON  |  Lambda genera una excepción al crear o actualizar la asignación de origen de eventos. El formato JSON del patrón de filtro de las propiedades de datos debe ser válido.  | 
|  No JSON  |  JSON válido  |  Lambda elimina el registro.  | 
|  No JSON  |  Sin patrón de filtro para las propiedades de datos  |  Lambda filtra (solo en las demás propiedades de metadatos) en función de los criterios de filtro.  | 
|  No JSON  |  No JSON  |  Lambda genera una excepción al crear o actualizar la asignación de origen de eventos. El formato JSON del patrón de filtro de las propiedades de datos debe ser válido.  | 

## Filtrado de registros agregados de Kinesis
<a name="filtering-kinesis-efo"></a>

Con Kinesis, puede agregar varios registros en un solo registro de Kinesis Data Streams para aumentar el rendimiento de sus datos. Lambda solo puede aplicar criterios de filtro a los registros agregados cuando se utiliza la [distribución mejorada](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html) de Kinesis. El filtrado de registros agregados con Kinesis estándar no es compatible. Al utilizar la distribución mejorada, usted configura un consumidor de rendimiento dedicado de Kinesis para que actúe como desencadenador de la función de Lambda. A continuación, Lambda filtra los registros agregados y solo pasa los registros que cumplen los criterios de filtro.

Para obtener más información sobre la agregación de registros de Kinesis, consulte la sección [Agregación](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation) de la página de conceptos clave de la Kinesis Producer Library (KPL). Para obtener más información sobre el uso de Lambda con la expansión mejorada de Kinesis, consulte [Aumento del rendimiento del procesamiento de transmisiones en tiempo real con la distribución mejorada de Amazon Kinesis Data Streams y AWS Lambda](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/) en el blog de informática de AWS.

# Tutorial: Uso de Lambda con Kinesis Data Streams
<a name="with-kinesis-example"></a>

En este tutorial, creará una función de Lambda para consumir eventos de un flujo de datos de Amazon Kinesis. 

1. Una aplicación personalizada escribe los registros en el flujo.

1. AWS Lambda sondea el flujo y, cuando detecta registros nuevos en él, llama a la función de Lambda.

1. AWS Lambda ejecuta la función de Lambda asumiendo el rol de ejecución que se especificó en el momento de crear la función de Lambda.

## Requisitos previos
<a name="with-kinesis-prepare"></a>

### Instala la AWS Command Line Interface
<a name="install_aws_cli"></a>

Si aún no ha instalado AWS Command Line Interface, siga los pasos que se indican en [Instalación o actualización de la versión más reciente de AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) para instalarlo.

El tutorial requiere un intérprete de comandos o un terminal de línea de comando para ejecutar los comandos. En Linux y macOS, use su administrador de intérprete de comandos y paquetes preferido.

**nota**  
En Windows, algunos comandos de la CLI de Bash que se utilizan habitualmente con Lambda (por ejemplo, `zip`) no son compatibles con los terminales integrados del sistema operativo. Para obtener una versión de Ubuntu y Bash integrada con Windows, [instale el subsistema de Windows para Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Creación del rol de ejecución
<a name="with-kinesis-example-create-iam-role"></a>

Cree el [rol de ejecución](lambda-intro-execution-role.md) que concederá a su función permiso para obtener acceso a los recursos de AWS.

**Para crear un rol de ejecución**

1. Abra la [página Roles](https://console.aws.amazon.com/iam/home#/roles) en la consola de IAM.

1. Elija **Crear rol**.

1. Cree un rol con las propiedades siguientes.
   + **Trusted entity (Entidad de confianza** – **AWS Lambda**.
   + **Permisos**: **AWSLambdaKinesisExecutionRole**.
   + **Role name (Nombre de rol** – **lambda-kinesis-role**.

La política **AWSLambdaKinesisExecutionRole** tiene permisos que la función necesita para leer elementos de Kinesis y escribir registros a Registros de CloudWatch.

## Creación de la función
<a name="with-kinesis-example-create-function"></a>

Cree una función de Lambda: que procese los mensajes de Kinesis. El código de función registra el ID del evento y los datos del evento del registro de Kinesis en Registros de CloudWatch.

En este tutorial, se utiliza el tiempo de ejecución de Node.js 24, pero también hemos proporcionado archivos de código de ejemplo en otros lenguajes de tiempo de ejecución. Puede seleccionar la pestaña del siguiente cuadro para ver el código del tiempo de ejecución que le interesa. El código JavaScript que usará en este paso está en el primer ejemplo que se muestra en la pestaña **JavaScript**.

------
#### [ .NET ]

**SDK para .NET**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Uso de un evento de Kinesis con Lambda mediante .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK para Go V2**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Uso de un evento de Kinesis con Lambda mediante Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK para Java 2.x**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Uso de un evento de Kinesis con Lambda mediante Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK para JavaScript (v3)**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda). 
Uso de un evento de Kinesis con Lambda mediante JavaScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Uso de un evento de Kinesis con Lambda mediante TypeScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK para PHP**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Uso de un evento de Kinesis con Lambda mediante PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK para Python (Boto3)**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Uso de un evento de Kinesis con Lambda mediante Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK para Ruby**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Uso de un evento de Kinesis con Lambda mediante Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK para Rust**  
 Hay más en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de [ejemplos de tecnología sin servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir un evento de Kinesis con Lambda mediante Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**Cómo crear la función**

1. Cree un directorio para el proyecto y, a continuación, cambie a ese directorio.

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. Copie el código de muestra de JavaScript en un nuevo archivo con el nombre `index.js`.

1. Cree un paquete de implementación.

   ```
   zip function.zip index.js
   ```

1. Cree una función de Lambda con el comando `create-function`.

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Prueba de la función de Lambda
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

Invoque la función de Lambda manualmente mediante el comando de la CLI de `invoke` AWS Lambda y un evento de Kinesis de muestra.

**Probar la función de Lambda**

1. Copie el siguiente JSON en un archivo y guárdelo como `input.txt`. 

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. Utilice el comando `invoke` para enviar el evento a la función.

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   La opción **cli-binary-format** es obligatoria si va a utilizar la versión 2 de la AWS CLI. Para que esta sea la configuración predeterminada, ejecute `aws configure set cli-binary-format raw-in-base64-out`. Para obtener más información, consulte [Opciones de la línea de comandos globales compatibles con AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) en la *Guía del usuario de la AWS Command Line Interface versión 2*.

   La respuesta se guardará en el archivo `out.txt`.

## Creación de un flujo de Kinesis
<a name="with-kinesis-example-configure-event-source-create"></a>

Utilice el comando `create-stream ` para crear un flujo.

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

Ejecute el siguiente comando `describe-stream` para obtener el ARN del flujo.

```
aws kinesis describe-stream --stream-name lambda-stream
```

Debería ver los siguientes datos de salida:

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

Utilice el ARN del flujo en el siguiente paso para asociar el flujo a la función de Lambda.

## Añadir un origen de eventos en AWS Lambda
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

Ejecute el siguiente comando `add-event-source` de la AWS CLI.

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

Tenga en cuenta el ID de mapeo para un uso posterior. Para obtener una lista de mapeos de orígenes de eventos, ejecute el comando `list-event-source-mappings`.

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

En la respuesta, puede verificar que el valor de estado es `enabled`. Las asignaciones de orígenes de eventos se pueden deshabilitar para poner en pausa temporalmente el sondeo sin perder de registros.

## Prueba de la configuración
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

Para probar la asignación de orígenes de eventos, agregue los registros de eventos a su flujo de Kinesis. El valor de `--data` es una cadena que la CLI codifica en base64 antes de enviarlo a Kinesis. Puede ejecutar el mismo comando más de una vez para añadir varios registros al flujo.

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda utiliza el rol de ejecución para leer los registros desde el flujo. A continuación, se invoca la función de Lambda y se pasan lotes de registros. La función descodifica los datos de cada registro y los registra, enviando la salida a Registros de CloudWatch. Puede ver los registros en la [consola de CloudWatch](https://console.aws.amazon.com/cloudwatch).

## Eliminación de sus recursos
<a name="cleanup"></a>

A menos que desee conservar los recursos que creó para este tutorial, puede eliminarlos ahora. Si elimina los recursos de AWS que ya no utiliza, evitará gastos innecesarios en su Cuenta de AWS.

**Cómo eliminar el rol de ejecución**

1. Abra la página [Roles](https://console.aws.amazon.com/iam/home#/roles) en la consola de IAM.

1. Seleccione el rol de ejecución que creó.

1. Elija **Eliminar**.

1. Si desea continuar, escriba el nombre del rol en el campo de entrada de texto y elija **Delete** (Eliminar).

**Cómo eliminar la función de Lambda**

1. Abra la [página de Funciones](https://console.aws.amazon.com/lambda/home#/functions) en la consola de Lambda.

1. Seleccione la función que ha creado.

1. Elija **Acciones**, **Eliminar**.

1. Escriba **confirm** en el campo de entrada de texto y elija **Delete** (Eliminar).

**Para eliminar el flujo de Kinesis**

1. Inicie sesión en la Consola de administración de AWS y abra la consola de Kinesis en [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Seleccione el flujo que ha creado.

1. Elija **Actions** (Acciones), **Delete** (Eliminar).

1. Introduzca **delete** en el campo de entrada de texto.

1. Elija **Eliminar**.