Tutorial: Uso de Lambda con Kinesis Data Streams - AWS Lambda

Tutorial: Uso de Lambda con Kinesis Data Streams

En este tutorial, creará una función de Lambda para consumir eventos de un flujo de datos de Amazon Kinesis.

  1. Una aplicación personalizada escribe los registros en el flujo.

  2. AWS Lambda sondea el flujo y, cuando detecta registros nuevos en él, llama a la función de Lambda.

  3. AWS Lambda ejecuta la función de Lambda asumiendo el rol de ejecución que se especificó en el momento de crear la función de Lambda.

Requisitos previos

En este tutorial, se presupone que tiene algunos conocimientos sobre las operaciones básicas de Lambda y la consola de Lambda. Si aún no lo ha hecho, siga las instrucciones de Cree una función de Lambda con la consola. para crear su primera función de Lambda.

Para completar los siguientes pasos, necesita la versión 2 de la AWS CLI. Los comandos y la salida esperada se enumeran en bloques separados:

aws --version

Debería ver los siguientes datos de salida:

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

Para comandos largos, se utiliza un carácter de escape (\) para dividir un comando en varias líneas.

En Linux y macOS, use su administrador de intérprete de comandos y paquetes preferido.

nota

En Windows, algunos comandos de la CLI de Bash que se utilizan habitualmente con Lambda (por ejemplo, zip) no son compatibles con los terminales integrados del sistema operativo. Para obtener una versión de Ubuntu y Bash integrada con Windows, instale el subsistema de Windows para Linux. Los comandos de la CLI de ejemplo de esta guía utilizan el formato Linux. Los comandos que incluyen documentos JSON en línea deben reformatearse si utiliza la CLI de Windows.

Creación del rol de ejecución

Cree el rol de ejecución que concederá a su función permiso para obtener acceso a los recursos de AWS.

Para crear un rol de ejecución
  1. Abra la página Roles en la consola de IAM.

  2. Elija Crear rol.

  3. Cree un rol con las propiedades siguientes.

    • Trusted entity (Entidad de confianza): AWS Lambda.

    • Permisos: AWSLambdaKinesisExecutionRole.

    • Nombre de rol: lambda-kinesis-role.

La política AWSLambdaKinesisExecutionRole tiene permisos que la función necesita para leer elementos de Kinesis y escribir registros a Registros de CloudWatch.

Creación de la función

Cree una función de Lambda: que procese los mensajes de Kinesis. El código de función registra el ID del evento y los datos del evento del registro de Kinesis en Registros de CloudWatch.

En este tutorial, se utiliza el tiempo de ejecución de Node.js 18.x, pero también hemos proporcionado archivos de código de ejemplo en otros lenguajes de tiempo de ejecución. Puede seleccionar la pestaña del siguiente cuadro para ver el código del tiempo de ejecución que le interesa. El código JavaScript que usará en este paso está en el primer ejemplo que se muestra en la pestaña JavaScript.

.NET
AWS SDK for .NET
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Kinesis con Lambda mediante .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }
Go
SDK para Go V2
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Kinesis con Lambda mediante Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { if len(kinesisEvent.Records) == 0 { log.Printf("empty Kinesis event received") return nil } for _, record := range kinesisEvent.Records { log.Printf("processed Kinesis event with EventId: %v", record.EventID) recordDataBytes := record.Kinesis.Data recordDataText := string(recordDataBytes) log.Printf("record data: %v", recordDataText) // TODO: Do interesting work based on the new data } log.Printf("successfully processed %v records", len(kinesisEvent.Records)) return nil } func main() { lambda.Start(handler) }
Java
SDK para Java 2.x
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Kinesis con Lambda mediante Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; public class Handler implements RequestHandler<KinesisEvent, Void> { @Override public Void handleRequest(final KinesisEvent event, final Context context) { LambdaLogger logger = context.getLogger(); if (event.getRecords().isEmpty()) { logger.log("Empty Kinesis Event received"); return null; } for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { try { logger.log("Processed Event with EventId: "+record.getEventID()); String data = new String(record.getKinesis().getData().array()); logger.log("Data:"+ data); // TODO: Do interesting work based on the new data } catch (Exception ex) { logger.log("An error occurred:"+ex.getMessage()); throw ex; } } logger.log("Successfully processed:"+event.getRecords().size()+" records"); return null; } }
JavaScript
SDK para JavaScript (v3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Kinesis con Lambda mediante JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); throw err; } } console.log(`Successfully processed ${event.Records.length} records.`); }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

Uso de un evento de Kinesis con Lambda mediante TypeScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<void> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); throw err; } logger.info(`Successfully processed ${event.Records.length} records.`); } }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
SDK para PHP
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Kinesis con Lambda mediante PHP.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Kinesis\KinesisHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends KinesisHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleKinesis(KinesisEvent $event, Context $context): void { $this->logger->info("Processing records"); $records = $event->getRecords(); foreach ($records as $record) { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK para Python (Boto3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Kinesis con Lambda mediante Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")
Ruby
SDK para Ruby
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Kinesis con Lambda mediante Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue => err $stderr.puts "An error occurred #{err}" raise err end end puts "Successfully processed #{event['Records'].length} records." end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('UTF-8') # Placeholder for actual async work # You can use Ruby's asynchronous programming tools like async/await or fibers here. return data end
Rust
SDK para Rust
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumir un evento de Kinesis con Lambda mediante Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
Cómo crear la función
  1. Cree un directorio para el proyecto y, a continuación, cambie a ese directorio.

    mkdir kinesis-tutorial cd kinesis-tutorial
  2. Copie el código de muestra de JavaScript en un nuevo archivo con el nombre index.js.

  3. Cree un paquete de implementación.

    zip function.zip index.js
  4. Cree una función de Lambda con el comando create-function.

    aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-kinesis-role

Probar la función de Lambda

Invoque la función de Lambda manualmente mediante el comando de la CLI de invoke AWS Lambda y un evento de Kinesis de muestra.

Probar la función de Lambda
  1. Copie el siguiente JSON en un archivo y guárdelo como input.txt.

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream" } ] }
  2. Utilice el comando invoke para enviar el evento a la función.

    aws lambda invoke --function-name ProcessKinesisRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

    La opción cli-binary-format es obligatoria si va a utilizar la versión 2 de la AWS CLI. Para que esta sea la configuración predeterminada, ejecute aws configure set cli-binary-format raw-in-base64-out. Para obtener más información, consulte Opciones de la línea de comandos globales compatibles con AWS CLI en la Guía del usuario de la AWS Command Line Interface versión 2.

    La respuesta se guardará en el archivo out.txt.

Creación de un flujo de Kinesis

Utilice el comando create-stream para crear un flujo.

aws kinesis create-stream --stream-name lambda-stream --shard-count 1

Ejecute el siguiente comando describe-stream para obtener el ARN del flujo.

aws kinesis describe-stream --stream-name lambda-stream

Debería ver los siguientes datos de salida:

{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}

Utilice el ARN del flujo en el siguiente paso para asociar el flujo a la función de Lambda.

Añadir un origen de eventos en AWS Lambda

Ejecute el siguiente comando add-event-source de la AWS CLI.

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

Tenga en cuenta el ID de mapeo para un uso posterior. Para obtener una lista de mapeos de orígenes de eventos, ejecute el comando list-event-source-mappings.

aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream

En la respuesta, puede verificar que el valor de estado es enabled. Las asignaciones de orígenes de eventos se pueden deshabilitar para poner en pausa temporalmente el sondeo sin perder de registros.

Prueba de la configuración

Para probar la asignación de orígenes de eventos, agregue los registros de eventos a su flujo de Kinesis. El valor de --data es una cadena que la CLI codifica en base64 antes de enviarlo a Kinesis. Puede ejecutar el mismo comando más de una vez para añadir varios registros al flujo.

aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

Lambda utiliza el rol de ejecución para leer los registros desde el flujo. A continuación, se invoca la función de Lambda y se pasan lotes de registros. La función descodifica los datos de cada registro y los registra, enviando la salida a Registros de CloudWatch. Puede ver los registros en la consola de CloudWatch.

Eliminación de sus recursos

A menos que desee conservar los recursos que creó para este tutorial, puede eliminarlos ahora. Si elimina los recursos de AWS que ya no utiliza, evitará gastos innecesarios en su Cuenta de AWS.

Cómo eliminar el rol de ejecución
  1. Abra la página Roles en la consola de IAM.

  2. Seleccione el rol de ejecución que creó.

  3. Elija Eliminar.

  4. Si desea continuar, escriba el nombre del rol en el campo de entrada de texto y elija Delete (Eliminar).

Cómo eliminar la función de Lambda
  1. Abra la página de Funciones en la consola de Lambda.

  2. Seleccione la función que ha creado.

  3. Elija Acciones, Eliminar.

  4. Escriba delete en el campo de entrada de texto y elija Delete (Eliminar).

Para eliminar el flujo de Kinesis
  1. Inicie sesión en la AWS Management Console y abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. Seleccione el flujo que ha creado.

  3. Elija Actions (Acciones), Delete (Eliminar).

  4. Introduzca delete en el campo de entrada de texto.

  5. Elija Eliminar.