

# Tutorial: Usar um mapeamento de origem do evento do Amazon MSK para invocar uma função do Lambda
<a name="services-msk-tutorial"></a>

Neste tutorial, você fará o seguinte:
+ Criar uma função do Lambda na mesma conta da AWS que um cluster do Amazon MSK existente.
+ Configurar a rede e a autenticação para que o Lambda se comunique com o Amazon MSK.
+ Configurar um mapeamento de origem do evento do Amazon MSK do Lambda, o qual executa sua função do Lambda quando os eventos aparecem no tópico.

Depois de concluir essas etapas, quando os eventos forem enviados para o Amazon MSK, você poderá configurar uma função do Lambda para processar esses eventos automaticamente com seu próprio código do Lambda personalizado.

 **O que você pode fazer com esse recurso?** 

**Exemplo de solução: use um mapeamento de origem de evento do MSK para fornecer pontuações ao vivo para seus clientes.**

Considere o seguinte cenário: sua empresa hospeda uma aplicação Web em que seus clientes podem ver informações sobre eventos ao vivo, como competições esportivas. As atualizações de informações do jogo são fornecidas à sua equipe por meio de um tópico do Kafka no Amazon MSK. Você deseja criar uma solução que consuma atualizações do tópico do MSK para fornecer uma visão atualizada do evento ao vivo aos clientes dentro de uma aplicação desenvolvida por você. Você optou pela seguinte abordagem de design: suas aplicações clientes se comunicarão com um backend sem servidor hospedado na AWS. Os clientes se conectarão por meio de sessões de websocket usando a API de WebSocket do Amazon API Gateway.

Nessa solução, você precisa de um componente que leia eventos do MSK, execute uma lógica personalizada para preparar esses eventos para a camada de aplicação e, em seguida, encaminhe essas informações para a API do API Gateway. É possível implementar esse componente com o AWS Lambda, fornecendo sua lógica personalizada em uma função do Lambda e, em seguida, chamando-a com um mapeamento de origem de eventos do Amazon MSK do AWS Lambda.

Para obter mais informações sobre a implementação de soluções usando a API de WebSocket do Amazon API Gateway, consulte os [tutoriais da API de WebSocket](https://docs.aws.amazon.com/apigateway/latest/developerguide/websocket-api-chat-app.html) na documentação do API Gateway.

## Pré-requisitos
<a name="w2aad101c23c15c35c19"></a>

Uma conta da AWS com os seguintes recursos pré-configurados:

**Para cumprir esses pré-requisitos, recomendamos seguir a seção [Conceitos básicos do uso do Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/getting-started.html) na documentação do Amazon MSK.**
+ Um cluster do Amazon MSK. Consulte [Criar um cluster do Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/create-cluster.html) em *Conceitos básicos do uso do Amazon MSK*.
+ A seguinte configuração:
  + Certifique-se de que a **autenticação baseada em perfis do IAM** esteja **habilitada** nas configurações de segurança do cluster. Isso melhora sua segurança ao limitar sua função do Lambda para acessar somente os recursos necessários do Amazon MSK. Esse comportamento é habilitado por padrão em todos os novos clusters do Amazon MSK.
  + Certifique-se de que o **acesso público** esteja desativado nas configurações de rede do cluster. Restringir o acesso do cluster do Amazon MSK à Internet melhora sua segurança ao limitar o número de intermediários que lidam com seus dados. Esse comportamento é habilitado por padrão em todos os novos clusters do Amazon MSK.
+ Um tópico do Kafka no cluster do Amazon MSK para usar nesta solução. Consulte [Criar um tópico](https://docs.aws.amazon.com//msk/latest/developerguide/create-topic.html) em *Conceitos básicos do uso do Amazon MSK*.
+ Um host administrativo do Kafka configurado para recuperar informações do cluster do Kafka e enviar eventos do Kafka para seu tópico para teste, como uma instância do Amazon EC2 com a CLI de administração do Kafka e a biblioteca do IAM do Amazon MSK instalada. Consulte [Criar uma máquina cliente](https://docs.aws.amazon.com//msk/latest/developerguide/create-client-machine.html) em *Conceitos básicos do uso do Amazon MSK*.

Depois de configurar esses recursos, reúna as seguintes informações da sua conta da AWS para confirmar que está tudo pronto para continuar.
+ O nome do cluster do Amazon MSK. Essas informações podem ser encontradas no console do Amazon MSK.
+ O UUID do cluster, parte do ARN do seu cluster do Amazon MSK, o qual pode ser encontrado no console do Amazon MSK. Siga os procedimentos em [Listar clusters](https://docs.aws.amazon.com/msk/latest/developerguide/msk-list-clusters.html) na documentação do Amazon MSK para encontrar essas informações.
+ Os grupos de segurança associados ao cluster do Amazon MSK. Essas informações podem ser encontradas no console do Amazon MSK. Nas etapas a seguir, faça referência a elas como *clusterSecurityGroups*.
+ O ID da Amazon VPC contendo seu cluster do Amazon MSK. É possível encontrar essas informações identificando sub-redes associadas ao seu cluster do Amazon MSK no console do Amazon MSK e, em seguida, identificando a Amazon VPC associada à sub-rede no console da Amazon VPC.
+ O nome do tópico do Kafka usado na solução. É possível encontrar essas informações chamando o cluster do Amazon MSK com a CLI de `topics` do Kafka no host administrativo do Kafka. Para obter mais informações sobre a CLI de tópicos, consulte [Adicionar e remover tópicos](https://kafka.apache.org/documentation/#basic_ops_add_topic) na documentação do Kafka.
+ O nome de um grupo de consumidores para seu tópico do Kafka, adequado para uso por sua função do Lambda. Esse grupo pode ser criado automaticamente pelo Lambda, então não é necessário criá-lo com a CLI do Kafka. Se você precisar gerenciar seus grupos de consumidores, para saber mais sobre a CLI de grupos de consumidores, consulte [Gerenciar grupos de consumidores](https://kafka.apache.org/documentation/#basic_ops_consumer_group) na documentação do Kafka.

As seguintes permissões em sua conta da AWS:
+ Permissão para criar e gerenciar uma função do Lambda
+ Permissão para criar políticas do IAM e associá-las à sua função do Lambda.
+ Permissão para criar endpoints da Amazon VPC e alterar a configuração de rede na Amazon VPC que hospeda seu cluster do Amazon MSK.

### Instalar o AWS Command Line Interface
<a name="install_aws_cli"></a>

Se você ainda não instalou a AWS Command Line Interface, siga as etapas em [Instalar ou atualizar a versão mais recente da AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) para instalá-la.

O tutorial requer um terminal de linha de comando ou um shell para executar os comandos. No Linux e no macOS, use o gerenciador de pacotes e de shell de sua preferência.

**nota**  
No Windows, alguns comandos da CLI do Bash que você costuma usar com o Lambda (como `zip`) não são compatíveis com os terminais integrados do sistema operacional. Para obter uma versão do Ubuntu com o Bash integrada no Windows, [instale o Subsistema do Windows para Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Configurar a conectividade de rede para que o Lambda se comunique com o Amazon MSK
<a name="w2aad101c23c15c35c21"></a>

 Use AWS PrivateLink para conectar o Lambda e o Amazon MSK. Isso pode ser feito com a criação de endpoints da Amazon VPC de interface no console do Amazon VPC. Para obter mais informações sobre a configuração de rede, consulte [Configurar o cluster do Amazon MSK e a rede do Amazon VPC para o Lambda](with-msk-cluster-network.md). 

Quando um mapeamento de origem de evento do Amazon MSK é executado em nome de uma função do Lambda, ele assume o perfil de execução da função do Lambda. Essa função do IAM autoriza o mapeamento a acessar recursos protegidos pelo IAM, como seu cluster do Amazon MSK. Embora os componentes compartilhem uma função de execução, o mapeamento do Amazon MSK e sua função do Lambda têm requisitos de conectividade separados para suas respectivas tarefas, conforme mostrado no diagrama a seguir.

![\[\]](http://docs.aws.amazon.com/pt_br/lambda/latest/dg/images/msk_tut_network.png)


Seu mapeamento da origem do evento pertence ao seu grupo de segurança do cluster do Amazon MSK. Nesta etapa de rede, crie endpoints do Amazon VPC da VPC do cluster do Amazon MSK para conectar o mapeamento da origem do evento aos serviços do Lambda e STS. Proteja esses endpoints para aceitar tráfego do seu grupo de segurança de cluster do Amazon MSK. Em seguida, ajuste os grupos de segurança do cluster do Amazon MSK para permitir que o mapeamento da origem do evento se comunique com o cluster do Amazon MSK.

 É possível configurar as etapas a seguir usando o Console de gerenciamento da AWS.

**Para configurar os endpoints da Amazon VPC de interface para conectar o Lambda e o Amazon MSK**

1. Crie um grupo de segurança para os endpoints da Amazon VPC de interface, *endpointSecurityGroup*, que permite tráfego TCP de entrada na porta 443 proveniente de *clusterSecurityGroups*. Siga o procedimento em [Criar um grupo de segurança](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#creating-security-group) na documentação do Amazon EC2 para criar um grupo de segurança. Em seguida, siga o procedimento em [Adicionar regras a um grupo de segurança](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) na documentação do Amazon EC2 para adicionar as regras apropriadas. 

   **Crie um grupo de segurança com as seguintes informações:**

   Ao adicionar suas regras de entrada, crie uma regra para cada grupo de segurança em *clusterSecurityGroups*. Para cada regra:
   + Em **Tipo**, selecione **HTTPS**.
   + Em **Origem**, selecione um dos *clusterSecurityGroups*.

1.  Crie um endpoint conectando o serviço do Lambda à Amazon VPC contendo seu cluster do Amazon MSK. Siga o procedimento em [Criar um endpoint de interface](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html).

   **Crie um endpoint de interface com as seguintes informações:**
   + Em **Nome do serviço**, selecione `com.amazonaws.regionName.lambda`, onde *regionName* hospeda sua função do Lambda.
   + Para **VPC**, selecione a Amazon VPC que contém seu cluster do Amazon MSK.
   + Em **Grupos de segurança**, selecione o *endpointSecurityGroup* que você criou anteriormente.
   + Para **Sub-redes**, selecione as sub-redes que hospedam seu cluster do Amazon MSK.
   + Para **Política**, forneça o documento de política a seguir, que protege o endpoint para uso pela entidade principal do serviço do Lambda para a ação `lambda:InvokeFunction`.

     ```
     {
         "Statement": [
             {
                 "Action": "lambda:InvokeFunction",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + Certifique-se de que a opção **Habilitar nome de DNS** permaneça selecionada.

1.  Crie um endpoint conectando o serviço do AWS STS à Amazon VPC que contém seu cluster do Amazon MSK. Siga o procedimento em [Criar um endpoint de interface](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html).

   **Crie um endpoint de interface com as seguintes informações:**
   + Em **Nome do serviço**, selecione AWS STS.
   + Para **VPC**, selecione a Amazon VPC que contém seu cluster do Amazon MSK.
   + Em **Grupos de segurança**, selecione *endpointSecurityGroup*.
   + Para **Sub-redes**, selecione as sub-redes que hospedam seu cluster do Amazon MSK.
   + Para **Política**, forneça o documento de política a seguir, que protege o endpoint para uso pela entidade principal do serviço do Lambda para a ação `sts:AssumeRole`.

     ```
     {
         "Statement": [
             {
                 "Action": "sts:AssumeRole",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + Certifique-se de que a opção **Habilitar nome de DNS** permaneça selecionada.

1. Para cada grupo de segurança associado ao seu cluster do Amazon MSK, ou seja, em *clusterSecurityGroups*, permita o seguinte:
   + Permita todo o tráfego TCP de entrada e saída na porta 9098 para todos os *clusterSecurityGroups*, inclusive dentro dele mesmo.
   + Permita todo o tráfego TCP de saída na porta 443.

   Parte desse tráfego é permitido por regras padrão do grupo de segurança. Portanto, se seu cluster estiver vinculado a um único grupo de segurança e esse grupo tiver regras padrão, regras adicionais não serão necessárias. Para adicionar regras do grupo de segurança, siga o procedimento em [Adicionar regras a um grupo de segurança](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) na documentação do Amazon EC2.

   **Adicione regras aos seus grupos de segurança com as seguintes informações:**
   + Para cada regra de entrada ou regra de saída para a porta 9098, forneça
     + Em **Tipo**, selecione **TCP personalizado**.
     + Em **Intervalo de portas**, forneça 9098.
     + Em **Origem**, forneça um dos *clusterSecurityGroups*.
   + Para cada regra de entrada da porta 443, em **Tipo**, selecione **HTTPS**.

## Crie um perfil do IAM para que o Lambda leia seu tópico do Amazon MSK
<a name="w2aad101c23c15c35c23"></a>

Identifique os requisitos de autenticação para que o Lambda leia seu tópico do Amazon MSK e, em seguida, defina-os em uma política. Crie uma função, *lambdaAuthRole*, que autorize o Lambda a usar essas permissões. Autorize ações no seu cluster Amazon MSK usando ações do IAM do `kafka-cluster`. Em seguida, autorize o Lambda a realizar as ações do Amazon MSK `kafka` e do Amazon EC2 necessárias para descobrir e se conectar ao seu cluster do Amazon MSK, bem como as ações do CloudWatch para que o Lambda possa registrar em log o que fez.

**Para descrever os requisitos de autenticação para que o Lambda leia a partir do Amazon MSK**

1. Escreva um documento de política do IAM (um documento JSON), *clusterAuthPolicy*, que permita que o Lambda leia seu tópico do Kafka no seu cluster do Amazon MSK usando seu grupo de consumidores do Kafka. O Lambda exige que um grupo de consumidores do Kafka seja definido durante a leitura.

   Altere o modelo a seguir para se alinhar aos seus pré-requisitos:

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "kafka-cluster:Connect",
                   "kafka-cluster:DescribeGroup",
                   "kafka-cluster:AlterGroup",
                   "kafka-cluster:DescribeTopic",
                   "kafka-cluster:ReadData",
                   "kafka-cluster:DescribeClusterDynamicConfiguration"
               ],
               "Resource": [
                   "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid",
                   "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName",
                   "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName"
               ]
           }
       ]
   }
   ```

------

   Para obter mais informações, consulte : [Configuração de permissões do Lambda para mapeamentos da origem do evento do Amazon MSK](with-msk-permissions.md). Ao redigir sua política:
   + Substitua *us-east-1* e *111122223333* por Região da AWS e por Conta da AWS de seu cluster Amazon MSK.
   + Em *mskClusterName*, forneça o nome do seu cluster do Amazon MSK.
   + Em *cluster-uuid*, forneça o UUID no ARN para seu cluster do Amazon MSK.
   + Em *mskTopicName*, forneça o nome do seu tópico do Kafka.
   + Em *mskGroupName*, forneça o nome do seu grupo de consumidores do Kafka.

1. Identifique as permissões do Amazon MSK, do Amazon EC2 e do CloudWatch necessárias para que o Lambda descubra e conecte seu cluster Amazon MSK e registre esses eventos no log.

   A política gerenciada `AWSLambdaMSKExecutionRole` define de forma permissiva as permissões necessárias. Use-a nas etapas a seguir.

   Em um ambiente de produção, avalie `AWSLambdaMSKExecutionRole` para restringir sua política de perfil de execução com base no princípio do privilégio mínimo e, em seguida, escreva uma política para seu perfil que substitua essa política gerenciada.

Para obter detalhes sobre a linguagem de políticas do IAM, consulte a [Documentação do IAM](https://docs.aws.amazon.com//iam/).

Agora que você escreveu seu documento de política, crie uma política do IAM para poder anexá-la ao seu perfil. É possível fazer isso usando o console com o procedimento a seguir.

**Para criar uma política do IAM a partir do seu documento de política**

1. Faça login no Console de gerenciamento da AWS e abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. No painel de navegação à esquerda, escolha **Políticas**. 

1. Escolha **Criar política**.

1. Na seção **Editor de políticas**, escolha a opção **JSON**.

1. Cole *clusterAuthPolicy*.

1. Quando terminar de adicionar as permissões à política, escolha **Avançar**.

1. Na página **Revisar e criar**, digite um **nome de política** e uma **descrição** (opcional) para a política que você está criando. Revise **Permissões definidas nessa política** para ver as permissões que são concedidas pela política.

1. Escolha **Criar política** para salvar sua nova política.

Para obter mais informações, consulte [Criar políticas do IAM](https://docs.aws.amazon.com//IAM/latest/UserGuide/access_policies_create.html) na documentação do IAM.

Agora que você tem as políticas apropriadas do IAM, crie uma perfil e anexe-o a ela. É possível fazer isso usando o console com o procedimento a seguir.

**Como criar uma função de execução no console do IAM**

1. Abra a página [Roles (Funções)](https://console.aws.amazon.com/iam/home#/roles) no console do IAM.

1. Selecione **Create role** (Criar função).

1. Em **Tipo de entidade confiável**, selecione **Serviço da AWS**.

1. Em **Use case** (Caso de uso), escolha **Lambda**.

1. Escolha **Próximo**.

1. Selecione as políticas a seguir:
   + *clusterAuthPolicy*
   + `AWSLambdaMSKExecutionRole`

1. Escolha **Próximo**.

1. Em **Nome do perfil**, insira *lambdaAuthRole* e, em seguida, escolha **Criar perfil**.

Para obter mais informações, consulte [Definir permissões de uma função do Lambda com um perfil de execução](lambda-intro-execution-role.md).

## Crie uma função do Lambda para ler do seu tópico do Amazon MSK
<a name="w2aad101c23c15c35c25"></a>

Crie uma função do Lambda configurada para usar seu perfil do IAM. É possível criar sua função do Lambda usando o console.

**Para criar uma função do Lambda usando sua configuração de autenticação**

1.  Abra o console do Lambda e selecione **Criar função** no cabeçalho. 

1. Selecione **Criar do zero**.

1. Em **Nome da função**, forneça um nome apropriado de sua escolha.

1. Em **Runtime**, escolha a versão **Mais recente com suporte** do `Node.js` para usar o código fornecido neste tutorial.

1. Escolha **Alterar perfil de execução padrão**.

1. Selecione **Usar perfil existente**.

1. Em **Perfil existente**, selecione *lambdaAuthRole*.

Em um ambiente de produção, geralmente é necessário adicionar mais políticas ao perfil de execução da sua função do Lambda para processar de forma significativa seus eventos do Amazon MSK. Para obter mais informações sobre como adicionar políticas ao seu perfil, consulte [Adicionar ou remover permissões de identidade](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html#add-policies-console) na documentação do IAM.

## Criar um mapeamento da origem do evento para sua função do Lambda
<a name="w2aad101c23c15c35c27"></a>

Seu mapeamento de origem de eventos do Amazon MSK fornece ao serviço do Lambda as informações necessárias para invocar seu Lambda quando eventos apropriados do Amazon MSK ocorrem. É possível criar um mapeamento do Amazon MSK usando o console. Crie um acionador do Lambda e, em seguida, o mapeamento de origem de eventos será configurado automaticamente.

**Para criar um acionador do Lambda (e mapeamento de origem de eventos)**

1. Navegue até a página de visão geral da sua função do Lambda.

1. Na seção de visão geral da função, selecione **Adicionar acionador** no canto inferior esquerdo.

1. No menu suspenso **Selecionar uma origem**, selecione **Amazon MSK**.

1. Não defina a **autenticação**.

1. Em **Cluster do MSK**, selecione o nome do seu cluster.

1. Em **Tamanho do lote**, insira 1. Essa etapa facilita o teste desse recurso, mas não é um valor ideal em ambientes de produção.

1. Em **Nome do tópico**, forneça o nome do seu tópico do Kafka.

1. Para **ID do grupo de consumidores**, forneça o ID do seu grupo de consumidores do Kafka.

## Atualize a função do Lambda para ler seus dados de streaming
<a name="w2aad101c23c15c35c29"></a>

 O Lambda fornece informações sobre eventos do Kafka por meio do parâmetro de método de evento. Para obter um exemplo de estrutura de um evento do Amazon MSK, consulte [Evento de exemplo](with-msk.md#msk-sample-event). Depois de entender como interpretar os eventos do Amazon MSK encaminhados pelo Lambda, você pode alterar o código da função do Lambda para usar as informações fornecidas por eles. 

 Forneça o seguinte código à sua função do Lambda para registrar em log o conteúdo de um evento do Amazon MSK do Lambda para fins de teste: 

------
#### [ .NET ]

**SDK para .NET**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consumir um evento do Amazon MSK com o Lambda usando .NET.  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**SDK para Go V2**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consumo de um evento do Amazon MSK com o Lambda usando Go.  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK para Java 2.x**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consumir um evento do Amazon MSK com o Lambda usando Java.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**SDK para JavaScript (v3)**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consumir um evento do Amazon MSK com o Lambda usando JavaScript.  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
Consumir um evento do Amazon MSK com o Lambda usando TypeScript.  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**SDK para PHP**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consumo de um evento do Amazon MSK com o Lambda usando PHP.  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK para Python (Boto3).**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consumir um evento do Amazon MSK com o Lambda usando Python.  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**SDK para Ruby**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consumir um evento do Amazon MSK com o Lambda usando Ruby.  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**SDK para Rust**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consumo de um evento do Amazon MSK com o Lambda usando o Rust.  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------

É possível fornecer código de função para o Lambda usando o console.

**Para atualizar o código da função usando o editor de código do console**

1. Abra a [página Funções](https://console.aws.amazon.com/lambda/home#/functions) do console do Lambda e selecione a função.

1. Selecione a guia **Código**.

1. No painel **Código-fonte**, selecione o arquivo de código-fonte e edite-o no editor de código integrado.

1. Na seção **DEPLOY**, escolha **Implantar** para atualizar o código da função:  
![\[\]](http://docs.aws.amazon.com/pt_br/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## Teste sua função do Lambda para verificar se ela está conectada ao seu tópico do Amazon MSK
<a name="w2aad101c23c15c35c31"></a>

Agora você pode verificar se seu Lambda está sendo invocado pela origem do evento inspecionando os logs de eventos do CloudWatch.

**Para verificar se sua função do Lambda está sendo invocada**

1. Use seu host de administração do Kafka para gerar eventos do Kafka usando a CLI do `kafka-console-producer`. Para obter mais informações, consulte [Escrever alguns eventos no tópico na](https://kafka.apache.org/documentation/#quickstart_send) documentação do Kafka. Envie eventos suficientes para preencher o lote definido pelo tamanho do lote para seu mapeamento de origem de eventos definido na etapa anterior, caso contrário, o Lambda aguardará a invocação de mais informações.

1. Se sua função for executada, o Lambda escreverá o que aconteceu com o CloudWatch. No console, navegue até a página de detalhes da função do Lambda.

1. Selecione a guia **Configuration** (Configuração).

1. Na barra lateral, selecione **Ferramentas de monitoramento e operações**.

1. Identifique o **Grupo de logs do CloudWatch** em **Configuração de log**. O grupo de logs deve começar com `/aws/lambda`. Escolha o link para o grupo de logs.

1. No console do CloudWatch, inspecione os **Eventos de logs** em busca dos eventos de log que o Lambda enviou para o fluxo de logs. Identifique se há eventos de log contendo a mensagem do seu evento do Kafka, como na imagem a seguir. Se houver, você conectou com êxito uma função do Lambda ao Amazon MSK com um mapeamento de origem de eventos do Lambda.  
![\[\]](http://docs.aws.amazon.com/pt_br/lambda/latest/dg/images/msk_tut_log.png)