Procesamiento de registros de Amazon Kinesis Data Streams - AWS Lambda

Procesamiento de registros de Amazon Kinesis Data Streams

Para procesar los registros de Amazon Kinesis Data Streams con Lambda, cree un consumidor para el flujo y, a continuación, cree una asignación de orígenes de eventos de Lambda.

Configurar su flujo de datos y función.

Su función de Lambda es una aplicación consumidora para su flujo de datos. Procesa un lote de registros a la vez desde cada partición. Puede asignar una función Lambda a un consumidor de rendimiento compartido (iterador estándar) o a un consumidor de rendimiento dedicado con distribución ramificada mejorada.

  • Iterador estándar: Lambda sondea cada partición del flujo de Kinesis y busca registros a una velocidad base de una vez por segundo. Cuando hay más registros disponibles, Lambda sigue procesando lotes hasta que la función se pone al día con el flujo. El mapeo de origen de eventos comparte el rendimiento de lectura con otros consumidores de la partición.

  • Distribución ramificada mejorada: para minimizar la latencia y maximizar el rendimiento de lectura, cree un consumidor de flujo de datos con distribución ramificada mejorada. Los consumidores con distribución ramificada mejorada obtienen una conexión dedicada a cada partición que no afecta a las demás aplicaciones que leen el flujo. Los consumidores de flujos utilizan HTTP/2 para reducir la latencia enviando los registros a Lambda a través de una conexión de larga duración y mediante la compresión de los encabezados de las solicitudes. Es posible crear un consumidor de flujos con la API RegisterStreamConsumer de Kinesis.

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Debería ver los siguientes datos de salida:

{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}

Para incrementar la velocidad con la que la función procesa los registros, agregue particiones al flujo de datos. Lambda procesa registros en cada partición en orden. Deja de procesar registros adicionales en una partición si la función devuelve un error. Al haber más particiones, se procesan más lotes simultáneamente, lo que reduce el impacto de los errores de simultaneidad.

Si la función no puede aumentar para administrar el número total de lotes simultáneos, solicite un aumento de cuota o reserve la simultaneidad para la función.

Creación de una asignación de orígenes de eventos para invocar la función de Lambda

Para invocar la función de Lambda con registros del flujo de datos, cree una asignación de orígenes de eventos. Puede crear varias asignaciones de orígenes de eventos para procesar los mismos datos con distintas funciones de Lambda o para procesar elementos de varios flujos de datos con una sola función. Al procesar elementos de múltiples flujos de datos, cada lote solo contendrá registros de una única partición o flujo.

Puede configurar las asignaciones de orígenes de eventos para procesar los registros de un flujo en una Cuenta de AWS diferente. Para obtener más información, consulte Creación de asignaciones de orígenes de eventos entre cuentas.

Antes de crear una asignación de orígenes de eventos, debe dar permiso a la función de Lambda para leer desde un flujo de datos de Kinesis. Lambda necesita los siguientes permisos para administrar los recursos relacionados con el flujo de datos de Kinesis:

La política administrada de AWS AWSLambdaKinesisExecutionRole incluye estos permisos. Agregue esta política administrada a la función, tal como se describe en el siguiente procedimiento.

AWS Management Console
Cómo añadir permisos de Kinesis a la función
  1. Abra la página Funciones de la consola de Lambda y seleccione su función.

  2. En la pestaña Configuración, elija Permisos.

  3. En el panel de Roles de ejecución, en Nombre del rol, elija el enlace al rol de ejecución de la función. Este enlace abre la página para ese rol en la consola de IAM.

  4. En el panel Políticas de permisos, elija Agregar permisos y, a continuación, elija Adjuntar políticas.

  5. En el campo de búsqueda, escriba AWSLambdaKinesisExecutionRole.

  6. Seleccione la casilla situada junto a la política y elija Añadir permisos.

AWS CLI
Cómo añadir permisos de Kinesis a la función
  • Ejecute el siguiente comando de la CLI para adjuntar la política de AWSLambdaKinesisExecutionRole al rol de ejecución de la función:

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
Cómo añadir permisos de Kinesis a la función
  • En la definición de la función, agregue la propiedad Policies, tal como se muestra en el siguiente ejemplo:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

Una vez que haya configurado los permisos necesarios, cree la asignación de orígenes de eventos.

AWS Management Console
Cómo crear la asignación de orígenes de eventos para Kinesis
  1. Abra la página Funciones de la consola de Lambda y seleccione su función.

  2. En el panel Información general de la función, elija Agregar desencadenador.

  3. En Configuración del desencadenador, para el origen, seleccione Kinesis.

  4. Seleccione el flujo de Kinesis para el que quiere crear la asignación de orígenes de eventos y, si lo desea, un consumidor del flujo.

  5. (Opcional) Edite el tamaño del lote, la posición inicial y la ventana del lote para la asignación de orígenes de eventos.

  6. Elija Añadir.

Al crear la asignación de orígenes de eventos desde la consola, el rol de IAM debe tener los permisos kinesis:ListStreams y kinesis:ListStreamConsumers.

AWS CLI
Cómo crear la asignación de orígenes de eventos de Kinesis
  • Ejecute el siguiente comando de la CLI para crear una asignación de orígenes de eventos de Kinesis. Elija su propio tamaño de lote y posición inicial según su caso de uso.

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

Para especificar una ventana de procesamiento por lotes, agregue la opción --maximum-batching-window-in-seconds. Para obtener más información sobre el uso de este u otros parámetros, consulte create-event-source-mapping en la Referencia de comandos de la AWS CLI.

AWS SAM
Cómo crear la asignación de orígenes de eventos de Kinesis
  • En la definición de la función, agregue la propiedad KinesisEvent, tal como se muestra en el siguiente ejemplo:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

Para obtener más información sobre cómo crear una asignación de orígenes de eventos para Kinesis Data Streams en AWS SAM, consulte Kinesis en la Guía para desarrolladores de AWS Serverless Application Model.

Posición inicial de flujos y sondeo

Tenga en cuenta que el sondeo de flujos durante la creación y las actualizaciones de la asignación de orígenes de eventos es, en última instancia, coherente.

  • Durante la creación de la asignación de orígenes de eventos, es posible que se demore varios minutos en iniciar el sondeo de los eventos del flujo.

  • Durante las actualizaciones de la asignación de orígenes de eventos, es posible que se demore varios minutos en detener y reiniciar el sondeo de los eventos del flujo.

Este comportamiento significa que, si especifica LATEST como posición inicial del flujo, la asignación de orígenes de eventos podría omitir eventos durante la creación o las actualizaciones. Para garantizar que no se pierda ningún evento, especifique la posición inicial del flujo como TRIM_HORIZON o AT_TIMESTAMP.

Creación de asignaciones de orígenes de eventos entre cuentas

Amazon Kinesis Data Streams admite políticas basadas en recursos. En consecuencia, puede procesar los datos ingeridos en un flujo en una Cuenta de AWS con una función de Lambda en otra cuenta.

Para crear una asignación de orígenes de eventos para la función de Lambda mediante un flujo de Kinesis en otra Cuenta de AWS, debe configurar el flujo mediante una política basada en recursos para conceder a la función de Lambda permiso para leer elementos. Para obtener información sobre cómo configurar la transmisión para permitir el acceso entre cuentas, consulte Acceso compartido con funciones de AWS Lambda entre cuentas en la Guía para desarrolladores de Amazon Kinesis Streams.

Una vez que haya configurado la transmisión con una política basada en recursos que otorgue a la función de Lambda los permisos necesarios, cree la asignación de orígenes de eventos mediante cualquiera de los métodos descritos en la sección anterior.

Si decide crear la asignación de orígenes de eventos mediante la consola Lambda, pegue el ARN de la transmisión directamente en el campo de entrada. Si quiere especificar un consumidor para la transmisión, al pegar el ARN del consumidor se rellena en forma automática el campo de transmisión.