O Amazon Managed Streaming for Apache Kafka (Amazon MSK) facilita a ingestão e o processamento de dados de streaming em tempo real com um serviço Apache Kafka totalmente gerenciado e altamente disponível.
O Apache Kafka
Por conta desses recursos, o Apache Kafka é frequentemente usado para criar pipelines de dados de streaming em tempo real. Um pipeline de dados processa e movimenta dados de um sistema para outro de maneira confiável e pode ser uma parte importante da adoção de uma estratégia de banco de dados com propósito específico, facilitando o uso de vários bancos de dados, cada um comportando diferentes casos de uso.
O Amazon DynamoDB é um destino comum nesses pipelines de dados para comportar aplicações que usam modelos de dados de chave-valor ou de documentos e desejam escalabilidade ilimitada com performance consistente de milissegundos de um dígito.
Como funciona
Uma integração entre o Amazon MSK e o DynamoDB usa uma função do Lambda para consumir registros do Amazon MSK e gravá-los no DynamoDB.

O Lambda pesquisa internamente por novas mensagens do Amazon MSK e, depois, invoca de maneira síncrona a função do Lambda de destino. A carga útil de eventos da função do Lambda contém lotes de mensagens do Amazon MSK. Para a integração entre o Amazon MSK e o DynamoDB, a função do Lambda grava essas mensagens no DynamoDB.
Configurar uma integração entre o Amazon MSK e o DynamoDB
nota
É possível baixar os recursos usados neste exemplo no repositório do GitHub
As etapas abaixo mostram como configurar uma integração de exemplo entre o Amazon MSK e o Amazon DynamoDB. O exemplo representa dados gerados por dispositivos da Internet das Coisas (IoT) e ingeridos no Amazon MSK. À medida que os dados são ingeridos no Amazon MSK, eles podem ser integrados a serviços de analytics ou ferramentas de terceiros compatíveis com o Apache Kafka, possibilitando vários casos de uso de analytics. A integração do DynamoDB também fornece uma pesquisa de chave-valor de registros de dispositivos individuais.
Este exemplo demonstrará como um script Python grava dados de sensores de IoT no Amazon MSK. Depois, uma função do Lambda grava itens com a chave de partição “deviceid
” no DynamoDB.
O modelo fornecido do CloudFormation criará os seguintes recursos: um bucket do Amazon S3, uma Amazon VPC, um cluster do Amazon MSK e um AWS CloudShell para testar operações de dados.
Para gerar dados de teste, crie um tópico do Amazon MSK e, depois, crie uma tabela do DynamoDB. É possível usar o Session Manager no console de gerenciamento para fazer login no sistema operacional do CloudShell e executar scripts Python.
Depois que você executar o modelo do CloudFormation, poderá concluir a criação dessa arquitetura executando as operações a seguir.
-
Execute o modelo do CloudFormation
S3bucket.yaml
para criar um bucket do S3. Para quaisquer scripts ou operações subsequentes, execute-os na mesma região. InsiraForMSKTestS3
como o nome da pilha do CloudFormation.Depois que esse processo for concluído, anote a saída do nome do bucket do S3 em Saídas. Você precisará do nome na Etapa 3.
-
Faça upload do arquivo ZIP
fromMSK.zip
no bucket do S3 recém-criado. -
Execute o modelo do CloudFormation
VPC.yaml
para criar uma VPC, um cluster do Amazon MSK e uma função do Lambda. Na tela de entrada de parâmetros, insira o nome do bucket do S3 que você criou na Etapa 1, onde o bucket do S3 é solicitado. Defina o nome da pilha do CloudFormation comoForMSKTestVPC
. -
Prepare o ambiente para executar scripts Python no CloudShell. É possível usar o CloudShell no AWS Management Console. Para ter mais informações sobre o uso do CloudShell, consulte Getting started with AWS CloudShell. Depois que você iniciar o CloudShell, crie um CloudShell pertencente à VPC que você acabou de criar para se conectar ao cluster do Amazon MSK. Crie o CloudShell em uma sub-rede privada. Preencha os seguintes campos:
-
Nome: pode ser definido como qualquer nome. Um exemplo é MSK-VPC
-
VPC: selecione MSKTest
-
Sub-rede: selecione Sub-rede privada MSKTest (AZ1)
-
SecurityGroup: selecione ForMSKSecurityGroup
Quando o CloudShell pertencente à sub-rede privada for iniciado, execute o seguinte comando:
pip install boto3 kafka-python aws-msk-iam-sasl-signer-python
-
-
Baixe scripts Python do bucket do S3.
aws s3 cp s3://[YOUR-BUCKET-NAME]/pythonScripts.zip ./ unzip pythonScripts.zip
-
Confira o console de gerenciamento e defina as variáveis de ambiente para o URL do agente e o valor da região nos scripts Python. Confira o endpoint do agente do cluster Amazon MSK no console de gerenciamento.
-
Defina as variáveis de ambiente no CloudShell. Se você estiver usando a região do Oeste dos EUA (Oregon):
export AWS_REGION="us-west-2" export MSK_BROKER="boot-YOURMSKCLUSTER.c3.kafka-serverless.ap-southeast-1.amazonaws.com:9098"
-
Execute os scripts Python a seguir.
Crie um tópico do Amazon MSK:
python ./createTopic.py
Crie uma tabela do DynamoDB:
python ./createTable.py
Grave os dados de teste no tópico do Amazon MSK:
python ./kafkaDataGen.py
-
Confira as métricas do CloudWatch para os recursos criados do Amazon MSK, do Lambda e do DynamoDB e verifique os dados armazenados na tabela
device_status
usando o DynamoDB Data Explorer para garantir que todos os processos tenham sido executados corretamente. Se cada processo for executado sem erros, você poderá conferir se os dados de teste gravados do CloudShell no Amazon MSK também foram gravados no DynamoDB. -
Quando terminar esse exemplo, exclua os recursos criados neste tutorial. Exclua as duas pilhas do CloudFormation:
ForMSKTestS3
eForMSKTestVPC
. Se a exclusão das pilhas for concluída com êxito, todos os recursos serão excluídos.
Próximas etapas
nota
Se você criou recursos ao seguir este exemplo, lembre-se de excluí-los para evitar cobranças inesperadas.
A integração identificou uma arquitetura que vincula o Amazon MSK e o DynamoDB para permitir que dados de fluxos comportem workloads OLTP. A partir daqui, pesquisas mais complexas podem ser realizadas vinculando o DynamoDB ao OpenSearch Service. Pense na integração com o EventBridge para necessidades mais complexas baseadas em eventos, e extensões como o Amazon Managed Service for Apache Flink para requisitos de maior throughput e menor latência.