Amazon Managed Streaming para Apache Kafka (Amazon MSK) facilita la ingesta y el procesamiento de datos de streaming en tiempo real con un servicio de Apache Kafka completamente administrado y de alta disponibilidad.
Apache Kafka
Gracias a estas características, Apache Kafka se suele utilizar para crear canalizaciones de datos de streaming en tiempo real. Una canalización de datos procesa y traslada los datos de un sistema a otro de forma fiable y puede ser una parte importante de la adopción de una estrategia de base de datos personalizada, ya que facilita el uso de varias bases de datos que admiten diferentes casos de uso.
Amazon DynamoDB es un destino común en estas canalizaciones de datos para respaldar aplicaciones que utilizan modelos de datos de clave-valor o de documentos y desean una escalabilidad ilimitada con un rendimiento constante de milisegundos de un solo dígito.
Temas
Funcionamiento
Una integración entre Amazon MSK y DynamoDB utiliza una función de Lambda para consumir registros de Amazon MSK y escribirlos en DynamoDB.

Lambda sondea internamente nuevos mensajes de Amazon MSK y, a continuación, invoca de forma sincrónica la función de Lambda de destino. La carga útil de eventos de la función de Lambda contiene lotes de mensajes de Amazon MSK. Para la integración entre Amazon MSK y DynamoDB, la función de Lambda escribe estos mensajes en DynamoDB.
Configuración de una integración entre Amazon MSK y DynamoDB
nota
Puede descargar los recursos utilizados en este ejemplo en el siguiente repositorio de GitHub
En los pasos siguientes se muestra cómo establecer una integración de muestra entre Amazon MSK y Amazon DynamoDB. En el ejemplo se representan datos generados por dispositivos del Internet de las cosas (IoT) e ingeridos en Amazon MSK. A medida que se realiza la ingesta de datos en Amazon MSK, estos pueden integrarse con servicios de análisis o herramientas de terceros compatibles con Apache Kafka, lo que habilita diversos casos de uso de análisis. La integración de DynamoDB también proporciona la búsqueda de valores clave de los registros de dispositivos individuales.
En este ejemplo se demostrará cómo un script de Python escribe datos de sensores IoT en Amazon MSK. A continuación, una función de Lambda escribe los elementos con la clave de partición “deviceid
” en DynamoDB.
La plantilla de CloudFormation proporcionada creará los siguientes recursos: un bucket de Amazon S3, una Amazon VPC, un clúster de Amazon MSK y un AWS CloudShell para probar las operaciones de datos.
Para generar los datos de prueba, cree un tema de Amazon MSK y, a continuación, cree una tabla de DynamoDB. Puede utilizar Session Manager desde la consola de administración para iniciar sesión en el sistema operativo de CloudShell y ejecutar scripts de Python.
Después de ejecutar la plantilla de CloudFormation, puede terminar de crear esta arquitectura mediante las siguientes operaciones.
-
Ejecute la plantilla de CloudFormation
S3bucket.yaml
para crear un bucket de S3. Para cualquier script u operación posterior, ejecútelos en la misma región. IntroduzcaForMSKTestS3
como el nombre de la pila de CloudFormation.Una vez hecho esto, anote el nombre de la salida del bucket de S3 en Salidas. Necesitará el nombre en el paso 3.
-
Cargue el archivo ZIP descargado
fromMSK.zip
en el bucket de S3 que acaba de crear. -
Ejecute la plantilla de CloudFormation
VPC.yaml
para crear una VPC, un clúster de Amazon MSK y una función de Lambda. En la pantalla de introducción de parámetros, introduzca el nombre del bucket de S3 que ha creado en el paso 1, donde se solicita el bucket de S3. Establezca el de la pila de CloudFormation aForMSKTestVPC
. -
Prepare el entorno para ejecutar scripts de Python en CloudShell. Puede usar CloudShell en la AWS Management Console. Para obtener más información sobre el uso de CloudShell, consulte Introducción a AWS CloudShell. Después de iniciar CloudShell, cree un CloudShell que pertenezca a la VPC que acaba de crear para conectarse al clúster de Amazon MSK. Cree el CloudShell en una subred privada. Rellene los siguientes campos:
-
Nombre: se puede establecer a cualquier nombre. Un ejemplo es MSK-VPC
-
VPC: seleccione MSKTest
-
Subred: seleccione Subred privada MSKTest (AZ1)
-
SecurityGroup: seleccione ForMSKSecurityGroup
Una vez iniciado el CloudShell perteneciente a la subred privada, ejecute el siguiente comando:
pip install boto3 kafka-python aws-msk-iam-sasl-signer-python
-
-
Descargue los scripts de Python del bucket de S3.
aws s3 cp s3://[YOUR-BUCKET-NAME]/pythonScripts.zip ./ unzip pythonScripts.zip
-
Consulte la consola de administración y establezca las variables de entorno para la URL de agente y el valor de región en los scripts de Python. Consulte el punto de conexión de agente de clúster de Amazon MSK en la consola de administración.
-
Establezca las variables de entorno en el CloudShell. Si utiliza la región Oeste de EE. UU. (Oregón):
export AWS_REGION="us-west-2" export MSK_BROKER="boot-YOURMSKCLUSTER.c3.kafka-serverless.ap-southeast-1.amazonaws.com:9098"
-
Ejecute los siguientes scripts de Python.
Crear un tema de Amazon MSK:
python ./createTopic.py
Crear una tabla de DynamoDB:
python ./createTable.py
Escribir los datos de prueba en el tema de Amazon MSK:
python ./kafkaDataGen.py
-
Consulte las métricas de CloudWatch correspondientes a los recursos creados de Amazon MSK, Lambda y DynamoDB, y verifique los datos almacenados en la tabla
device_status
mediante el explorador de datos de DynamoDB para asegurarse de que todos los procesos se han ejecutado correctamente. Si cada proceso se ejecuta sin errores, puede comprobar si los datos de prueba escritos desde CloudShell a Amazon MSK también se escriben en DynamoDB. -
Cuando haya terminado con este ejemplo, elimine los recursos creados en este tutorial. Elimine las dos pilas de CloudFormation:
ForMSKTestS3
yForMSKTestVPC
. Si la eliminación de la pila se completa de forma correcta, se eliminarán todos los recursos.
Pasos a seguir a continuación
nota
Si ha creado recursos mientras seguía este ejemplo, recuerde eliminarlos para evitar cargos inesperados.
La integración ha identificado una arquitectura que vincula Amazon MSK y DynamoDB para habilitar los datos de flujo a fin de respaldar las cargas de trabajo OLTP. A partir de aquí, se pueden realizar búsquedas más complejas si se vincula DynamoDB con OpenSearch Service. Considere la posibilidad de realizar una integración con EventBridge para necesidades más complejas basadas en eventos y extensiones como Amazon Managed Service para Apache Flink para requisitos de mayor rendimiento y menor latencia.