Pré-processar dados usando uma função do Lambda - Guia do desenvolvedor do Amazon Kinesis Data Analytics SQL para aplicativos

Para novos projetos, recomendamos que você use o novo Managed Service para Apache Flink Studio em vez do Kinesis Data Analytics for Applications. SQL O Managed Service for Apache Flink Studio combina facilidade de uso com recursos analíticos avançados, permitindo que você crie aplicativos sofisticados de processamento de stream em minutos.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Pré-processar dados usando uma função do Lambda

nota

Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics for SQL. Para obter mais informações, consulte Limites.

Se os dados em seu stream precisarem de conversão, transformação, enriquecimento ou filtragem de formato, você poderá pré-processar os dados usando uma função. AWS Lambda Você pode fazer isso antes que o código SQL do aplicativo seja executado ou antes que seu aplicativo crie um esquema para seu streaming de dados.

Usar uma função do Lambda para pré-processamento de registros é útil nos seguintes cenários:

  • Transformar registros de outros formatos (como KPL ou GZIP) em formatos que o Kinesis Data Analytics pode analisar. O Kinesis Data Analytics atualmente é compatível com formatos de dados JSON ou CSV.

  • Expandir dados em um formato mais acessível para operações como agregação ou detecção de anomalias. Por exemplo, se vários valores de dados são armazenados juntos em uma string, você pode expandir os dados em colunas separadas.

  • Enriquecer dados com outros serviços da Amazon, como extrapolação ou correção de erros.

  • Aplicar transformação complexa de string em campos de registro.

  • Filtrar dados para limpar os dados.

Usar uma função do Lambda para pré-processamento de registros

Ao criar o aplicativo do Kinesis Data Analytics, você habilita o pré-processamento do Lambda na página Conectar a uma fonte.

Para usar uma função do Lambda para pré-processar registros em um aplicativo do Kinesis Data Analytics
  1. Faça login AWS Management Console e abra o console do Managed Service for Apache Flink em https://console.aws.amazon.com/kinesisanalytics.

  2. Na página Conectar a uma fonte do seu aplicativo, escolha Habilitada na seção Registrar pré-processamento com AWS Lambda.

  3. Para usar uma função do Lambda que você já tenha criado, escolha a função na lista suspensa Função do Lambda.

  4. Para criar uma nova função do Lambda de um dos modelos de pré-processamento do Lambda, escolha o modelo na lista suspensa. Em seguida, escolha View template name > in Lambda (Exibir <nome do modelo> no Lambda) para editar a função.

  5. Para criar uma nova função do Lambda, selecione Criar nova. Para obter informações sobre a criação de uma função Lambda, consulte Criar uma função HelloWorld Lambda e explorar o console no Guia do desenvolvedor.AWS Lambda

  6. Escolha a versão da função do Lambda a ser usada. Para usar a versão mais recente, escolha $LATEST.

Quando você escolhe ou cria uma função do Lambda para pré-processamento de registros, os registros são pré-processados antes da execução do código SQL do seu aplicativo ou o aplicativo gera um esquema dos registros.

Permissões de pré-processamento do Lambda

Para usar o pré-processamento do Lambda, o perfil do IAM do aplicativo requer a seguinte política de permissões:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Métricas de pré-processamento do Lambda

Você pode usar CloudWatch a Amazon para monitorar o número de invocações do Lambda, bytes processados, sucessos e falhas, e assim por diante. Para obter informações sobre CloudWatch métricas emitidas pelo pré-processamento do Kinesis Data Analytics Lambda, consulte Métricas do Amazon Kinesis Analytics.

Usando AWS Lambda com a Kinesis Producer Library

A Kinesis Producer Library (KPL) agrega pequenos registros formatados pelo usuário em registros maiores de até 1 MB para usar melhor a throughput do Amazon Kinesis Data Streams. A Kinesis Client Library (KCL) para Java é compatível com a desagregação desses registros. No entanto, você deve usar um módulo especial para desagregar os registros ao usar AWS Lambda como consumidor de seus streams.

Para obter o código e as instruções do projeto necessários, consulte os Módulos de desagregação da Kinesis Producer Library para saber mais. AWS LambdaGitHub Você pode usar os componentes desse projeto para processar dados serializados KPL AWS Lambda em Java, Node.js e Python. Você também pode usar esses componentes como parte de um aplicativo da KCL de várias linguagens.

Modelo de dados de entrada de eventos de pré-processamento de dados ou modelo de resposta de registros

Para pré-processar registros, sua função do Lambda precisa estar em conformidade com os modelos de dados de entrada de eventos e modelos de resposta de registros exigidos.

Modelo de dados de entrada de eventos

O Kinesis Data Analytics lê continuamente os dados do seu stream de dados do Kinesis ou do stream de entrega do Firehose. Para cada lote de registros recuperado, o serviço gerencia como esse lote é enviado à sua função do Lambda. Sua função recebe uma lista de registros como entrada. Dentro da função, você segue a lista e aplica a lógica de negócios para cumprir requisitos de pré-processamento (como enriquecimento ou conversão de formato de dados).

O modelo de entrada para sua função de pré-processamento varia um pouco, dependendo se os dados foram recebidos de um stream de dados do Kinesis ou de um stream de entrega do Firehose.

Se a fonte for um stream de entrega do Firehose, o modelo de dados de entrada do evento será o seguinte:

Modelo de solicitação de dados do Kinesis Data Firehose

Campo Descrição
invocationId O Id de invocação do Lambda (GUID aleatório).
applicationArn O nome do recurso da Amazon (ARN) do aplicativo Kinesis Data Analytics
streamArn ARN do fluxo de entrega
registros
Campo Descrição
recordId ID de registro (GUID aleatório)
kinesisFirehoseRecordMetadata
Campo Descrição
approximateArrivalTimestamp Tempo de chegada aproximado do registro do fluxo de entrega
data Carga útil de registros de origem codificada em Base64

O exemplo a seguir mostra a entrada de um fluxo de entrega do Firehose:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

Se a origem for um fluxo de dados do Kinesis, o modelo de dados de entrada de eventos será o seguinte:

Modelo de solicitação de dados do Kinesis Streams

Campo Descrição
invocationId O Id de invocação do Lambda (GUID aleatório).
applicationArn ARN do aplicativo Kinesis Data Analytics
streamArn ARN do fluxo de entrega
registros
Campo Descrição
recordId ID de registro baseado no número de sequência de registro do Kinesis
kinesisStreamRecordMetadata
Campo Descrição
sequenceNumber Número de sequência do registro do stream do Kinesis
partitionKey Chave de partição do registro do stream do Kinesis
shardId ShardId do registro do fluxo do Kinesis
approximateArrivalTimestamp Tempo de chegada aproximado do registro do fluxo de entrega
data Carga útil de registros de origem codificada em Base64

O exemplo a seguir mostra a entrada de um streaming de dados do Kinesis:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

Modelo de resposta de registros

Todos os registros retornados da função de pré-processamento do Lambda (com IDs de registro) que são enviados para a função do Lambda devem ser retornados. Eles devem conter os seguintes parâmetros, ou o Kinesis Data Analytics os rejeita e aponta uma falha de pré-processamento de dados. A parte de carga útil de dados do registro pode ser transformada para cumprir requisitos de pré-processamento.

Modelo de resposta de dados

registros
Campo Descrição
recordId O ID do registro é transmitido do Kinesis Data Analytics para o Lambda durante a invocação. O registro transformado deve conter o mesmo ID de registro. Qualquer incompatibilidade entre o ID do registro original e o ID do registro transformado é considerada uma falha de pré-processamento de dados.
result O status da transformação de dados do registro. Os valores possíveis são:
  • Ok: o registro foi transformado com êxito. O Kinesis Data Analytics ingere o registro para processamento de SQL.

  • Dropped: o registro foi eliminado intencionalmente por sua lógica de processamento. O Kinesis Data Analytics elimina o registro para processamento de SQL. O campo de carga útil de dados é opcional para um registro Dropped.

  • ProcessingFailed: não foi possível transformar o registro. O Kinesis Data Analytics considera uma falha no processamento pela função do Lambda e grava um erro no stream de erros. Para obter mais informações sobre o stream de erros, consulte Como tratar erros. O campo de carga útil de dados é opcional para um registro ProcessingFailed.

data A carga útil dos dados transformados, após a codificação base64. Cada carga de dados pode conter vários documentos JSON se o formato de dados de ingestão do aplicativo for JSON. Ou cada carga pode conter várias linhas CSV (com um delimitador de linha especificado em cada linha) se o formato de dados de ingestão do aplicativo for CSV. O serviço Kinesis Data Analytics analisa e processa dados com êxito com vários documentos JSON ou linhas CSV na mesma carga útil de dados.

O exemplo a seguir mostra a saída de uma função Lambda:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

Falhas de pré-processamento de dados comuns

Veja a seguir os motivos comuns pelos quais o pré-processamento pode falhar.

  • Nem todos os registros (com IDs de registro) em um lote que são enviados para a função do Lambda são retornados para o serviço do Kinesis Data Analytics.

  • A resposta não tem o ID do registro, o status ou o campo de carga útil de dados. O campo de carga útil de dados é opcional para um registro Dropped ou ProcessingFailed.

  • O tempo limite da função do Lambda não é suficiente para pré-processar os dados.

  • A resposta da função do Lambda excede os limites de resposta impostos pelo serviço AWS Lambda .

Para falhas de processamento de dados, o Kinesis Data Analytics continua a fazer novas tentativas de invocação do Lambda para o mesmo conjunto de registros até que tenha êxito. Você pode monitorar as CloudWatch métricas a seguir para obter informações sobre falhas.

  • Aplicativo do Kinesis Data Analytics MillisBehindLatest: indica o tempo de atraso do aplicativo na leitura da origem de streaming.

  • Métricas do InputPreprocessing CloudWatch aplicativo Kinesis Data Analytics: indica o número de sucessos e fracassos, entre outras estatísticas. Para obter mais informações, consulte Métricas do Amazon Kinesis Analytics.

  • AWS Lambda CloudWatch métricas e registros de funções.