Como o Lambda processa registros do Amazon Kinesis Data Streams
Você pode usar uma função do Lambda para processar registros em um fluxo de dados do Amazon Kinesis. É possível mapear uma função do Lambda em um consumidor de throughput compartilhado (iterador padrão) do Kinesis Data Stream ou em um consumidor de throughput dedicado com distribuição avançada. Para iteradores padrão, o Lambda sonda cada fragmento no stream do Kinesis em busca de registros usando o protocolo HTTP. O mapeamento da origem do evento compartilha a throughput de leitura com outros consumidores do fragmento.
Para obter detalhes sobre transmissões de dados do Kinesis, consulteLer dados do Amazon Kinesis Data Streams.
nota
O Kinesis cobra por cada fragmento e, para distribuição avançada, dados lidos da transmissão. Para obter detalhes de preço, consulte Preço do Amazon Kinesis
Tópicos
- Fluxos de sondagem e agrupamento em lotes
- Evento de exemplo
- Processar registros do Amazon Kinesis Data Streams com o Lambda
- Configurar a resposta em lote parcial com o Kinesis Data Streams e o Lambda
- Reter registros de lotes descartados para uma origem de eventos do Kinesis Data Streams no Lambda
- Implementando o processamento com estado do Kinesis Data Streams no Lambda
- Parâmetros do Lambda para mapeamento de origem de eventos do Amazon Kinesis Data Streams.
- Usar a filtragem de eventos com uma origem de eventos do Kinesis
- Tutorial: Usar o Lambda com o Kinesis Data Streams
Fluxos de sondagem e agrupamento em lotes
O Lambda lê registros do fluxo de dados e invoca sua função de maneira síncrona com um evento que contém registros de transmissão. O Lambda lê registros em lotes e invoca sua função para processar registros do lote. Cada lote contém registros de um único fragmento/fluxo de dados.
Para os fluxos de dados padrão do Kinesis, o Lambda sonda os fragmentos do fluxo para identificar os registros a uma taxa básica de uma vez por segundo. Para a distribuição avançada do Kinesis o Lambda usa uma conexão HTTP/2 para detectar os registros que estão sendo enviados por push ao Kinesis. Quando os registros estão disponíveis, o Lambda invoca a função e aguarda o resultado.
Por padrão, o Lambda invoca a função assim que os registros estão disponíveis. Se o lote que o Lambda lê da fonte de eventos tiver apenas um registro, o Lambda enviará apenas um registro à função. Para evitar a invocação da função com poucos registros, instrua a fonte de eventos para armazenar os registros em buffer por até cinco minutos, configurando uma janela de lotes. Antes de invocar a função, o Lambda continua a ler registros da fonte de eventos até coletar um lote inteiro, até que a janela de lote expire ou até que o lote atinja o limite de carga útil de 6 MB. Para ter mais informações, consulte Comportamento de lotes.
Atenção
Os mapeamentos da origem do evento do Lambda processam cada evento ao menos uma vez, podendo haver o processamento duplicado de registros. Para evitar possíveis problemas relacionados a eventos duplicados, é altamente recomendável tornar o código da função idempotente. Para saber mais, consulte Como tornar minha função do Lambda idempotente
O Lambda não espera a conclusão de nenhuma extensãoconfigurada para enviar o próximo lote para processamento. Em outras palavras, suas extensões podem continuar sendo executadas enquanto o Lambda processa o próximo lote de registros. Isso pode causar problemas de controle de utilização se você violar quaisquer configurações ou limites de simultaneidade de sua conta. Para detectar se esse é um problema em potencial, monitore suas funções e verifique se você está vendo métricas de simultaneidade mais altas do que o esperado para o seu mapeamento da origem do evento. Devido ao curto intervalo entre as invocações, o Lambda pode relatar brevemente um uso de simultaneidade maior do que o número de fragmentos. Isso pode ser verdadeiro até mesmo para funções do Lambda sem extensões.
Configure a opção ParallelizationFactor para processar um fragmento de um fluxo de dados do Kinesis com mais de uma invocação do Lambda simultaneamente. Você pode especificar o número de lotes simultâneos que o Lambda pesquisa de um fragmento por meio de um fator de paralelização de 1 (padrão) a 10. Por exemplo, quando você define ParallelizationFactor
como 2, pode ter até 200 invocações simultâneas do Lambda para processar 100 fragmentos de dados do Kinesis (embora, na prática, você possa ver valores diferentes para a métrica ConcurrentExecutions
). Isso ajuda a aumentar o throughput de processamento quando o volume de dados é volátil e o valor de IteratorAge
é alto. Quando você aumenta o número de lotes simultâneos por fragmento, o Lambda ainda garante o processamento em ordem no nível de chave de partição.
Você também pode usar ParallelizationFactor
com a agregação do Kinesis. O comportamento do mapeamento da origem do evento depende de você estar ou não usando fan-out avançado:
-
Sem fan-out avançado: todos os eventos dentro de um evento agregado devem ter a mesma chave de partição. A chave de partição também deve ser igual à do evento agregado. Se os eventos dentro do evento agregado tiverem chaves de partição diferentes, o Lambda não poderá garantir o processamento dos eventos na ordem, por chave de partição.
-
Com fan-out avançado: primeiro o Lambda decodifica o evento agregado em eventos individuais. O evento agregado pode ter uma chave de partição diferente da chave dos eventos que ele contém. Porém, os eventos que não correspondem à chave de partição são descartados e perdidos
. O Lambda não processa esses eventos e não os envia para um destino de falha configurado.
Evento de exemplo
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }