Processar registros do Amazon Kinesis Data Streams com o Lambda - AWS Lambda

Processar registros do Amazon Kinesis Data Streams com o Lambda

Para processar registros do Amazon Kinesis Data Streams com o Lambda, crie um consumidor para seu fluxo e, em seguida, crie um mapeamento de origem de evento do Lambda.

Configurar o stream de dados e a função

Sua função do Lambda é uma aplicação de consumidor para seu fluxo de dados. Ele processa um lote de registros por vez de cada estilhaço. É possível mapear uma função Lambda para um consumidor de throughput compartilhado (iterador padrão) ou para um consumidor de throughput dedicado com distribuição avançada.

  • Iterador padrão: o Lambda sonda cada fragmento em seu fluxo do Kinesis para identificar registros a uma taxa básica de uma vez por segundo. Quando houver mais registros disponíveis, o Lambda mantém lotes de processamento até que a função alcance o fluxo. O mapeamento da origem do evento compartilha a throughput de leitura com outros consumidores do fragmento.

  • Distribuição avançada: para minimizar a latência e maximizar o throughput da leitura, crie um consumidor de fluxo de dados com distribuição avançada. Os consumidores da distribuição avançada obtêm uma conexão dedicada para cada fragmento que não afeta outros aplicativos que fazem leitura do stream. Os consumidores de fluxos usam HTTP/2 para reduzir a latência, enviando os registros para o Lambda por meio de uma conexão de longa duração e compactando cabeçalhos de solicitação. Você pode criar um consumidor de fluxo com a API RegisterStreamConsumer do Kinesis.

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

A seguinte saída deverá ser mostrada:

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

Para aumentar a velocidade com que sua função processa registros, adicione fragmentos ao fluxo de dados. O Lambda processa registros em cada fragmento sequencialmente. Ele deixará de processar registros adicionais em um estilhaço se sua função retornar um erro. Com mais estilhaços, há mais lotes sendo processados de uma só vez, o que diminui o impacto de erros na simultaneidade.

Se a função não conseguir se expandir para lidar com o número total de lotes simultâneos, solicite um aumento de cota ou reserve simultaneidade para a função.

Criar um mapeamento da origem do evento para invocar uma função do Lambda

Para invocar sua função do Lambda com registros do fluxo de dados, crie um mapeamento da origem do evento. É possível criar vários mapeamentos de origem de eventos para processar os mesmos dados com várias funções do Lambda ou processar itens de vários fluxos de dados com uma única função. Ao processar itens de vários fluxos, cada lote conterá registros somente de um único fragmento ou transmissão.

Você pode configurar mapeamentos da origem do evento para processar registros de um fluxo em outra Conta da AWS. Para saber mais, consulte Como criar mapeamentos da origem do evento entre contas.

Antes de criar um mapeamento da origem do evento, você precisará dar permissão à função do Lambda para ler a partir de um fluxo de dados do Kinesis. O Lambda precisa das permissões a seguir para gerenciar recursos relacionados ao seu fluxo de dados do Kinesis:

A política gerenciada da AWS AWSLambdaKinesisExecutionRole inclui essas permissões. Adicione essa política gerenciada à sua função, conforme descrito no procedimento a seguir.

AWS Management Console
Para adicionar permissões do Kinesis à sua função
  1. Abra a página Funções do console do Lambda e selecione a função.

  2. Na guia Configuração, escolha Permissões.

  3. No painel Perfil de execução, em Nome do perfil, escolha o link para o perfil de execução da sua função. Esse link abre a página para esse perfil no console do IAM.

  4. No painel Políticas de permissões, escolha Adicionar permissões e, em seguida, selecione Anexar políticas.

  5. No campo de pesquisa, digite AWSLambdaKinesisExecutionRole.

  6. Marque a caixa de seleção próxima à política e escolha Adicionar permissão.

AWS CLI
Para adicionar permissões do Kinesis à sua função
  • Execute o comando da CLI a seguir para adicionar a política AWSLambdaKinesisExecutionRole ao perfil de execução da sua função:

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
Para adicionar permissões do Kinesis à sua função
  • Na definição da sua função, adicione a Policies propriedade, conforme mostrado no seguinte exemplo:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs20.x Policies: - AWSLambdaKinesisExecutionRole

Depois de configurar as permissões necessárias, crie o mapeamento de origem de eventos.

AWS Management Console
Para criar o mapeamento da origem do evento do Kinesis
  1. Abra a página Funções do console do Lambda e selecione a função.

  2. No painel Visão geral da função, escolha Adicionar gatilho.

  3. Em Configuração do acionador, para a origem, selecione Kinesis.

  4. Selecione o fluxo do Kinesis para o qual você deseja criar o mapeamento da origem do evento e, opcionalmente, um consumidor do seu fluxo.

  5. (Opcional) edite o Tamanho do lote, a Posição inicial e a Janela do lote para o mapeamento da origem do evento.

  6. Escolha Adicionar.

Ao criar seu mapeamento da origem do evento no console, seu perfil do IAM deve ter as permissões kinesis:ListStreams e kinesis:ListStreamConsumers.

AWS CLI
Para criar o mapeamento da origem do evento do Kinesis
  • Execute o comando a seguir da CLI para criar um mapeamento da origem do evento do Kinesis. Escolha seu próprio tamanho de lote e posição inicial de acordo com seu caso de uso.

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

Para especificar uma janela de agrupamento em lotes, adicione a opção --maximum-batching-window-in-seconds. Para obter mais informações sobre como este e outros parâmetros, consulte create-event-source-mapping na Referência de comandos da AWS CLI.

AWS SAM
Para criar o mapeamento da origem do evento do Kinesis
  • Na definição da sua função, adicione a KinesisEvent propriedade, conforme mostrado no seguinte exemplo:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs20.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

Para saber mais sobre como criar um mapeamento de origem de eventos para o Kinesis Data Streams no AWS SAM, consulte Kinesis no Guia do desenvolvedor do AWS Serverless Application Model.

Posição inicial de sondagem e fluxo

Esteja ciente de que a sondagem do fluxo durante a criação e as atualizações do mapeamento da origem do evento é, finalmente, consistente.

  • Durante a criação do mapeamento da origem do evento, pode levar alguns minutos para a sondagem de eventos do fluxo iniciar.

  • Durante as atualizações do mapeamento da origem do evento, pode levar alguns minutos para interromper e reiniciar a sondagem de eventos do fluxo.

Esse comportamento significa que, se você especificar LATEST como posição inicial do fluxo, o mapeamento da origem do evento poderá perder eventos durante a criação ou as atualizações. Para garantir que nenhum evento seja perdido, especifique a posição inicial do fluxo como TRIM_HORIZON ou AT_TIMESTAMP.

Como criar mapeamentos da origem do evento entre contas

O Amazon Kinesis Data Streams permite políticas baseadas em recursos. Por isso, você pode processar dados ingeridos em um fluxo em uma Conta da AWS com uma função do Lambda em outra conta.

Para criar um mapeamento da origem do evento para sua função do Lambda usando um fluxo do Kinesis em outra Conta da AWS, você deve configurar o fluxo usando uma política baseada em recursos para dar permissão à função do Lambda para ler itens. Para saber como configurar seu fluxo para permitir o acesso entre contas, consulte Compartilhamento de acesso com funções do AWS Lambda entre contas no Guia do desenvolvedor do Amazon Kinesis Streams.

Depois de configurar seu fluxo com uma política baseada em recursos que conceda à sua função do Lambda as permissões necessárias, crie o mapeamento da origem do evento usando qualquer um dos métodos descritos na seção anterior.

Se você optar por criar seu mapeamento da origem do evento usando o console Lambda, cole o ARN do seu fluxo diretamente no campo de entrada. Se você quiser especificar um consumidor para seu fluxo, colar o ARN do consumidor preencherá automaticamente o campo do fluxo.