Pré-processar dados usando uma função do Lambda - Guia do Desenvolvedor de Amazon Kinesis Data Analytics para aplicativos SQL

Após uma análise cuidadosa, decidimos descontinuar as aplicações do Amazon Kinesis Data Analytics para SQL em duas etapas:

1. A partir de 15 de outubro de 2025, você não poderá mais criar aplicações do Kinesis Data Analytics para SQL.

2. Excluiremos as aplicações a partir de 27 de janeiro de 2026. Você não poderá mais iniciar nem operar as aplicações do Amazon Kinesis Data Analytics para SQL. A partir dessa data, não haverá mais suporte ao Amazon Kinesis Data Analytics para SQL. Para ter mais informações, consulte Descontinuação de aplicações do Amazon Kinesis Data Analytics para SQL.

Pré-processar dados usando uma função do Lambda

nota

Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics para SQL. Para obter mais informações, consulte Limites.

Se os dados no seu stream precisarem de conversão de formato, transformação, enriquecimento ou filtragem, você pode pré-processar esses dados usando a função AWS Lambda. Você pode fazer isso antes que o código SQL do aplicativo seja executado ou antes que seu aplicativo crie um esquema para seu streaming de dados.

Usar uma função do Lambda para pré-processamento de registros é útil nos seguintes cenários:

  • Transformar registros de outros formatos (como KPL ou GZIP) em formatos que o Kinesis Data Analytics pode analisar. O Kinesis Data Analytics atualmente é compatível com formatos de dados JSON ou CSV.

  • Expandir dados em um formato mais acessível para operações como agregação ou detecção de anomalias. Por exemplo, se vários valores de dados são armazenados juntos em uma string, você pode expandir os dados em colunas separadas.

  • Enriquecer dados com outros serviços da Amazon, como extrapolação ou correção de erros.

  • Aplicar transformação complexa de string em campos de registro.

  • Filtrar dados para limpar os dados.

Usar uma função do Lambda para pré-processamento de registros

Ao criar o aplicativo do Kinesis Data Analytics, você habilita o pré-processamento do Lambda na página Conectar a uma fonte.

Para usar uma função do Lambda para pré-processar registros em um aplicativo do Kinesis Data Analytics
  1. Faça login em AWS Management Console e abra o console do Managed Service for Apache Flink em https://console.aws.amazon.com/kinesisanalytics.

  2. Na página Conectar a uma fonte do seu aplicativo, escolha Habilitada na seção Registrar pré-processamento com AWS Lambda.

  3. Para usar uma função do Lambda que você já tenha criado, escolha a função na lista suspensa Função do Lambda.

  4. Para criar uma nova função do Lambda de um dos modelos de pré-processamento do Lambda, escolha o modelo na lista suspensa. Em seguida, escolha View template name > in Lambda (Exibir <nome do modelo> no Lambda) para editar a função.

  5. Para criar uma nova função do Lambda, selecione Criar nova. Para obter informações sobre como criar uma função do Lambda, consulte Criar uma função do Lambda HelloWorld e explorar o console no AWS Lambda Guia do desenvolvedor.

  6. Escolha a versão da função do Lambda a ser usada. Para usar a versão mais recente, escolha $LATEST.

Quando você escolhe ou cria uma função do Lambda para pré-processamento de registros, os registros são pré-processados antes da execução do código SQL do seu aplicativo ou o aplicativo gera um esquema dos registros.

Permissões de pré-processamento do Lambda

Para usar o pré-processamento do Lambda, o perfil do IAM do aplicativo requer a seguinte política de permissões:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Métricas de pré-processamento do Lambda

Use o Amazon CloudWatch para monitorar o número de invocações do Lambda, bytes processados, êxitos, falhas e assim por diante. Para obter informações sobre métricas do CloudWatch emitidas pelo pré-processamento do Kinesis Data Analytics Lambda, consulte Métricas do Amazon Kinesis Analytics.

Usar o AWS Lambda com a Kinesis Producer Library

A Kinesis Producer Library (KPL) agrega pequenos registros formatados pelo usuário em registros maiores de até 1 MB para usar melhor a throughput do Amazon Kinesis Data Streams. A Kinesis Client Library (KCL) para Java é compatível com a desagregação desses registros. No entanto, é necessário usar um módulo especial para desagregar os registros ao usar o AWS Lambda como consumidor de seus fluxos.

Para obter código e instruções necessários, consulte os Módulos de desagregação da Kinesis Producer Library para AWS Lambda no GitHub. Você pode usar os componentes deste projeto para processar dados serializados da KPL dentro do AWS Lambda em Java, Node.js, and Python. Você também pode usar esses componentes como parte de um aplicativo da KCL de várias linguagens.

Modelo de dados de entrada de eventos de pré-processamento de dados ou modelo de resposta de registros

Para pré-processar registros, sua função do Lambda precisa estar em conformidade com os modelos de dados de entrada de eventos e modelos de resposta de registros exigidos.

Modelo de dados de entrada de eventos

O Kinesis Data Analytics lê continuamente os dados do fluxo de dados do Kinesis ou fluxo de entrega do Firehose. Para cada lote de registros recuperado, o serviço gerencia como esse lote é enviado à sua função do Lambda. Sua função recebe uma lista de registros como entrada. Dentro da função, você segue a lista e aplica a lógica de negócios para cumprir requisitos de pré-processamento (como enriquecimento ou conversão de formato de dados).

O modelo de entrada da sua função de pré-processamento varia um pouco se os dados são recebidos de um fluxo de dados do Kinesis ou de um fluxo de entrega do Firehose.

Se a origem for um fluxo de entrega do Firehose, o modelo de dados de entrada de eventos será o seguinte:

Modelo de solicitação de dados do Kinesis Data Firehose

Campo Descrição
invocationId O Id de invocação do Lambda (GUID aleatório).
applicationArn O nome do recurso da Amazon (ARN) do aplicativo Kinesis Data Analytics
streamArn ARN do fluxo de entrega
registros
Campo Descrição
recordId ID de registro (GUID aleatório)
kinesisFirehoseRecordMetadata
Campo Descrição
approximateArrivalTimestamp Tempo de chegada aproximado do registro do fluxo de entrega
data Carga útil de registros de origem codificada em Base64

O exemplo a seguir mostra a entrada de um fluxo de entrega do Firehose:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

Se a origem for um fluxo de dados do Kinesis, o modelo de dados de entrada de eventos será o seguinte:

Modelo de solicitação de dados do Kinesis Streams

Campo Descrição
invocationId O Id de invocação do Lambda (GUID aleatório).
applicationArn ARN do aplicativo Kinesis Data Analytics
streamArn ARN do fluxo de entrega
registros
Campo Descrição
recordId ID de registro baseado no número de sequência de registro do Kinesis
kinesisStreamRecordMetadata
Campo Descrição
sequenceNumber Número de sequência do registro do stream do Kinesis
partitionKey Chave de partição do registro do stream do Kinesis
shardId ShardId do registro do fluxo do Kinesis
approximateArrivalTimestamp Tempo de chegada aproximado do registro do fluxo de entrega
data Carga útil de registros de origem codificada em Base64

O exemplo a seguir mostra a entrada de um streaming de dados do Kinesis:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

Modelo de resposta de registros

Todos os registros retornados da função de pré-processamento do Lambda (com IDs de registro) que são enviados para a função do Lambda devem ser retornados. Eles devem conter os seguintes parâmetros, ou o Kinesis Data Analytics os rejeita e aponta uma falha de pré-processamento de dados. A parte de carga útil de dados do registro pode ser transformada para cumprir requisitos de pré-processamento.

Modelo de resposta de dados

registros
Campo Descrição
recordId O ID do registro é transmitido do Kinesis Data Analytics para o Lambda durante a invocação. O registro transformado deve conter o mesmo ID de registro. Qualquer incompatibilidade entre o ID do registro original e o ID do registro transformado é considerada uma falha de pré-processamento de dados.
result O status da transformação de dados do registro. Os valores possíveis são:
  • Ok: o registro foi transformado com êxito. O Kinesis Data Analytics ingere o registro para processamento de SQL.

  • Dropped: o registro foi eliminado intencionalmente por sua lógica de processamento. O Kinesis Data Analytics elimina o registro para processamento de SQL. O campo de carga útil de dados é opcional para um registro Dropped.

  • ProcessingFailed: não foi possível transformar o registro. O Kinesis Data Analytics considera uma falha no processamento pela função do Lambda e grava um erro no stream de erros. Para obter mais informações sobre o stream de erros, consulte Como tratar erros. O campo de carga útil de dados é opcional para um registro ProcessingFailed.

data A carga útil dos dados transformados, após a codificação base64. Cada carga de dados pode conter vários documentos JSON se o formato de dados de ingestão do aplicativo for JSON. Ou cada carga pode conter várias linhas CSV (com um delimitador de linha especificado em cada linha) se o formato de dados de ingestão do aplicativo for CSV. O serviço Kinesis Data Analytics analisa e processa dados com êxito com vários documentos JSON ou linhas CSV na mesma carga útil de dados.

O exemplo a seguir mostra a saída de uma função Lambda:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

Falhas de pré-processamento de dados comuns

Veja a seguir os motivos comuns pelos quais o pré-processamento pode falhar.

  • Nem todos os registros (com IDs de registro) em um lote que são enviados para a função do Lambda são retornados para o serviço do Kinesis Data Analytics.

  • A resposta não tem o ID do registro, o status ou o campo de carga útil de dados. O campo de carga útil de dados é opcional para um registro Dropped ou ProcessingFailed.

  • O tempo limite da função do Lambda não é suficiente para pré-processar os dados.

  • A resposta da função do Lambda excede os limites de resposta impostos pelo serviço AWS Lambda.

Para falhas de processamento de dados, o Kinesis Data Analytics continua a fazer novas tentativas de invocação do Lambda para o mesmo conjunto de registros até que tenha êxito. Você pode monitorar as métricas do CloudWatch a seguir para obter informações sobre os erros.

  • Aplicativo do Kinesis Data Analytics MillisBehindLatest: indica o tempo de atraso do aplicativo na leitura da origem de streaming.

  • Métricas do CloudWatch aplicativo Kinesis Data Analytics InputPreprocessing indicam o número de êxitos e falhas, além de outras estatísticas. Para obter mais informações, consulte Métricas do Amazon Kinesis Analytics.

  • AWS Lambda métricas e logs do CloudWatch da função.