Tutorial: Usar o Lambda com o Kinesis Data Streams - AWS Lambda

Tutorial: Usar o Lambda com o Kinesis Data Streams

Neste tutorial, você criará uma função do Lambda para consumir eventos de um fluxo de dados do Amazon Kinesis.

  1. O aplicativo personalizado grava registros no fluxo.

  2. O AWS Lambda sonda a transmissão e, quando detecta novos registros, invoca sua função do Lambda.

  3. O AWS Lambda executa a função do Lambda assumindo a função de execução especificada no momento da criação da função do Lambda.

Pré-requisitos

Este tutorial presume que você tenha algum conhecimento de operações básicas do Lambda e do console do Lambda. Caso ainda não tenha feito isso, siga as instruções em Criar uma função do Lambda com o console para criar sua primeira função do Lambda.

Para concluir as etapas a seguir, a AWS Command Line Interface (AWS CLI) versão 2 será necessária. Os comandos e a saída esperada são mostrados em blocos separados:

aws --version

A seguinte saída deverá ser mostrada:

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

Para comandos longos, um caractere de escape (\) é usado para dividir um comando em várias linhas.

No Linux e no macOS, use seu gerenciador preferido de pacotes e de shell.

nota

No Windows, alguns comandos da CLI do Bash que você costuma usar com o Lambda (como zip) não são compatíveis com os terminais integrados do sistema operacional. Para obter uma versão do Ubuntu com o Bash integrada no Windows, instale o Subsistema do Windows para Linux. Os exemplos de comandos da CLI neste guia usam a formatação Linux. Os comandos que incluem documentos JSON em linha deverão ser reformatados se você estiver usando a CLI do Windows.

Criar a função de execução

Crie a função de execução que dá à sua função permissão para acessar recursos do AWS.

Para criar uma função de execução
  1. Abra a página Roles (Funções) no console do IAM.

  2. Selecione Create role (Criar função).

  3. Crie uma função com as propriedades a seguir.

    • Trusted entity (Entidade confiável): AWS Lambda.

    • Permissões: AWSLambdaKinesisExecutionRole.

    • Role name (Nome da função): lambda-kinesis-role.

A política AWSLambdaKinesisExecutionRole tem as permissões necessárias para a função ler itens do Kinesis e gravar logs no CloudWatch Logs.

Criar a função

Crie uma função do Lambda que processe suas mensagens do Kinesis. O código da função registra o ID do evento e os dados do evento do registro do Kinesis no CloudWatch Logs.

Este tutorial usa o runtime do Node.js 18.x, mas também fornecemos exemplos de arquivos em outras linguagens de runtime. Você pode selecionar a guia na caixa a seguir para ver o código do runtime do seu interesse. O código JavaScript que você usará nesta etapa é o primeiro exemplo mostrado na guia JavaScript.

.NET
AWS SDK for .NET
nota

Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consumir um evento do Kinesis com o Lambda usando .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }
Go
SDK para Go V2
nota

Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consumir um evento do Kinesis com o Lambda usando Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { if len(kinesisEvent.Records) == 0 { log.Printf("empty Kinesis event received") return nil } for _, record := range kinesisEvent.Records { log.Printf("processed Kinesis event with EventId: %v", record.EventID) recordDataBytes := record.Kinesis.Data recordDataText := string(recordDataBytes) log.Printf("record data: %v", recordDataText) // TODO: Do interesting work based on the new data } log.Printf("successfully processed %v records", len(kinesisEvent.Records)) return nil } func main() { lambda.Start(handler) }
Java
SDK para Java 2.x
nota

Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consumir um evento do Kinesis com o Lambda usando Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; public class Handler implements RequestHandler<KinesisEvent, Void> { @Override public Void handleRequest(final KinesisEvent event, final Context context) { LambdaLogger logger = context.getLogger(); if (event.getRecords().isEmpty()) { logger.log("Empty Kinesis Event received"); return null; } for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { try { logger.log("Processed Event with EventId: "+record.getEventID()); String data = new String(record.getKinesis().getData().array()); logger.log("Data:"+ data); // TODO: Do interesting work based on the new data } catch (Exception ex) { logger.log("An error occurred:"+ex.getMessage()); throw ex; } } logger.log("Successfully processed:"+event.getRecords().size()+" records"); return null; } }
JavaScript
SDK para JavaScript (v3)
nota

Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consumir um evento do Kinesis com o Lambda usando JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); throw err; } } console.log(`Successfully processed ${event.Records.length} records.`); }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

Consumir um evento do Kinesis com o Lambda usando TypeScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<void> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); throw err; } logger.info(`Successfully processed ${event.Records.length} records.`); } }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
SDK para PHP
nota

Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consumir um evento do Kinesis com o Lambda usando PHP.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Kinesis\KinesisHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends KinesisHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleKinesis(KinesisEvent $event, Context $context): void { $this->logger->info("Processing records"); $records = $event->getRecords(); foreach ($records as $record) { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK para Python (Boto3).
nota

Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consumir um evento do Kinesis com o Lambda usando Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")
Ruby
SDK para Ruby
nota

Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consumir um evento do Kinesis com o Lambda usando Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue => err $stderr.puts "An error occurred #{err}" raise err end end puts "Successfully processed #{event['Records'].length} records." end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('UTF-8') # Placeholder for actual async work # You can use Ruby's asynchronous programming tools like async/await or fibers here. return data end
Rust
SDK para Rust
nota

Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consuma um evento do Kinesis com o Lambda usando Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
Para criar a função
  1. Crie um diretório para o projeto e depois mude para esse diretório.

    mkdir kinesis-tutorial cd kinesis-tutorial
  2. Copie o código JavaScript de amostra em um novo arquivo denominado index.js.

  3. Crie um pacote de implantação.

    zip function.zip index.js
  4. Crie uma função do Lambda com o comando create-function.

    aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-kinesis-role

Testar a função do Lambda

Invoque sua função do Lambda manualmente usando o comando invoke da CLI do AWS Lambda e um evento do Kinesis de amostra.

Para testar a função do Lambda
  1. Copie o JSON a seguir em um arquivo e salve-o como input.txt.

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream" } ] }
  2. Use o comando invoke para enviar o evento para a função.

    aws lambda invoke --function-name ProcessKinesisRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

    A opção cli-binary-format será necessária se você estiver usando a AWS CLI versão 2. Para que essa seja a configuração padrão, execute aws configure set cli-binary-format raw-in-base64-out. Para obter mais informações, consulte A AWS CLI comporta opções de linha de comando globais no Guia do usuário da AWS Command Line Interface versão 2.

    A resposta é salva no out.txt.

Criar uma transmissão do Kinesis

Use o comando create-stream para criar um fluxo.

aws kinesis create-stream --stream-name lambda-stream --shard-count 1

Execute o comando describe-stream a seguir para obter o ARN do stream.

aws kinesis describe-stream --stream-name lambda-stream

A seguinte saída deverá ser mostrada:

{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }

Você usa o ARN do stream na próxima etapa para associar o stream à sua função do Lambda.

Adicionar uma fonte de eventos no AWS Lambda

Execute o comando AWS CLI a seguir da add-event-source.

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

Observe o ID do mapeamento para uso posterior. Você pode obter uma lista de mapeamentos de origem de evento executando o comando list-event-source-mappings a seguir.

aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream

Na resposta, pode verificar que o valor do status é enabled. Os mapeamentos de origem de eventos podem ser desabilitados para pausar a pesquisa temporariamente, sem perder nenhum registro.

Testar a configuração

Para testar o mapeamento da origem do evento, adicione registros de eventos ao seu stream do Kinesis. O valor --data é uma string que a CLI codifica em base64 antes de enviá-la ao Kinesis. Você pode executar o mesmo comando mais de uma vez para adicionar vários registros ao fluxo.

aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

O Lambda usa a função de execução para ler registros da transmissão. Em seguida, ele invoca sua função do Lambda, transmitindo lotes de registros. A função decodifica os dados de cada registro e os registra, enviando a saída para CloudWatch Logs. Visualize os logs no console do CloudWatch.

Limpe os recursos

Agora você pode excluir os recursos criados para este tutorial, a menos que queira mantê-los. Excluindo os recursos da AWS que você não está mais usando, você evita cobranças desnecessárias em sua Conta da AWS.

Para excluir a função de execução
  1. Abra a página Roles (Funções) no console do IAM.

  2. Selecione a função de execução que você criou.

  3. Escolha Excluir.

  4. Insira o nome do perfil no campo de entrada de texto e escolha Delete (Excluir).

Como excluir a função do Lambda
  1. Abra a página Functions (Funções) no console do Lambda.

  2. Selecione a função que você criou.

  3. Escolha Ações, Excluir.

  4. Digite delete no campo de entrada de texto e escolha Delete (Excluir).

Para excluir a transmissão do Kinesis
  1. Faça login no AWS Management Console e abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. Selecione a transmissão criada.

  3. Escolha Ações, Excluir.

  4. Digite delete no campo de entrada de texto.

  5. Escolha Excluir.