Exportar secuencias de datos a la Nube de AWS (CLI) - AWS IoT Greengrass

AWS IoT Greengrass Version 1 entró en la fase de vida útil prolongada el 30 de junio de 2023. Para obtener más información, consulte la política de mantenimiento de AWS IoT Greengrass V1 Después de esta fecha, AWS IoT Greengrass V1 no se publicarán actualizaciones que proporcionen funciones, mejoras, correcciones de errores o parches de seguridad. Los dispositivos que se ejecuten AWS IoT Greengrass V1 no se verán afectados y seguirán funcionando y conectándose a la nube. Le recomendamos encarecidamente que migre a AWS IoT Greengrass Version 2, ya que añade importantes funciones nuevas y es compatible con plataformas adicionales.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Exportar secuencias de datos a la Nube de AWS (CLI)

En este tutorial se muestra cómo utilizar el AWS CLI para configurar e implementar un grupo de AWS IoT Greengrass con el gestor de secuencias habilitado. El grupo contiene una función de Lambda definida por el usuario que escribe en una secuencia en el administrador de secuencias, que luego se exporta automáticamente a la Nube de AWS.

El administrador de secuencias hace que la asimilación, el procesamiento y la exportación de flujos de datos de gran volumen sea más eficiente y fiable. En este tutorial, va a crear una función de Lambda de TransferStream que consume datos de IoT. La función de Lambda utiliza el SDK de AWS IoT Greengrass Core para crear una secuencia en el administrador de secuencias y luego leer y escribir en ella. El administrador de secuencias exporta la secuencia al Flujo de datos Kinesis. En el siguiente diagrama se muestra este flujo de trabajo.

Diagrama del flujo de trabajo de administración de secuencias.

El objetivo de este tutorial es mostrar cómo utilizan las funciones de Lambda definidas por el usuario el objeto de StreamManagerClient en el SDK de AWS IoT Greengrass Core para interactuar con el administrador de flujos. Para simplificar, la función de Lambda que va a crear en este tutorial genera datos de dispositivos simulados.

Cuando utilice la API de AWS IoT Greengrass, que incluye los comandos de Greengrass en la AWS CLI, para crear un grupo, el gestor de flujos estará desactivado por defecto. Para habilitar el administrador de secuencias en su núcleo, cree una versión de definición de función que incluya la función de Lambda del sistema GGStreamManager y una versión de grupo que haga referencia a la nueva versión de definición de función. A continuación, implemente el grupo.

Requisitos previos

Para completar este tutorial, se necesita lo siguiente:

  • Un grupo de Greengrass y un núcleo de Greengrass (versión 1.10 o posterior). Para obtener información acerca de cómo crear un núcleo y un grupo de Greengrass, consulte Empezar con AWS IoT Greengrass. El tutorial de introducción también incluye pasos para instalar el software AWS IoT Greengrass Core.

    nota

    El administrador de secuencias no es compatible con las distribuciones OpenWrt.

  • Java 8 Runtime (JDK 8) instalado en el dispositivo principal.

  • SDK de AWS IoT Greengrass Core para Python versión 1.5.0 o versiones posteriores Para usar StreamManagerClient en el SDK de AWS IoT Greengrass Core para Python, debe:

    • Instalar Python 3.7 o versiones posteriores en el dispositivo principal.

    • Incluya el SDK y sus dependencias en su paquete de implementación de la función de Lambda. Se incluyen las instrucciones en este tutorial.

    sugerencia

    Puede usar StreamManagerClient con Java o NodeJS. Para ver código de ejemplo, consulte el SDK de AWS IoT Greengrass Core para Java y el SDK de AWS IoT Greengrass Core para Node.js en GitHub.

  • Una secuencia de destino denominada MyKinesisStream creada en Amazon Kinesis Data Streams en la misma Región de AWS que su grupo de Greengrass. Para obtener más información, consulte Crear un flujo en la Guía para desarrolladores de Amazon Kinesis.

    nota

    En este tutorial, el administrador de secuencias exporta datos Kinesis Data Streams, lo que deriva en cargos a su Cuenta de AWS. Para obtener información acerca de los precios, consulte Precios de Kinesis Data Streams.

    Para no incurrir en gastos, puede ejecutar este tutorial sin crear una secuencia de datos Kinesis. En este caso, compruebe los registros para ver si el administrador de flujos intentó exportar la secuencia al flujo de datos Kinesis.

  • Una política de IAM agregada al Rol de grupo de Greengrass que permita la acción de kinesis:PutRecords en el flujo de datos de destino, tal y como se muestra en el siguiente ejemplo:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

 

El tutorial contiene los siguientes pasos generales:

Completar el tutorial debería tomarle aproximadamente 30 minutos.

Paso 1: Crear un paquete de implementación de la función de Lambda

En este paso, va a crear un paquete de implementación de funciones de Lambda que contiene código de función y dependencias de Python. Cargará este paquete más adelante cuando cree la función de Lambda en AWS Lambda. La función de Lambda utiliza el SDK de AWS IoT Greengrass Core para crear secuencias locales e interactuar con ellas.

nota

Sus funciones de Lambda definidas por el usuario deben utilizar el SDK de AWS IoT Greengrass Core para interactuar con el administrador de flujo. Para obtener más información sobre los requisitos del administrador de secuencias de Greengrass, consulte Requisitos del administrador de secuencias de Greengrass.

  1. Descargue el SDK de AWS IoT Greengrass Core para Python versión 1.5.0 o posterior.

  2. Descomprima el paquete descargado para obtener el SDK. El SDK es la carpeta greengrasssdk.

  3. Instale dependencias de paquetes para incluirlas con el SDK en su paquete de implementación de funciones de Lambda.

    1. Vaya al directorio de SDK que contiene el archivo de requirements.txt. Este archivo registra las dependencias.

    2. Instale las dependencias del SDK. Por ejemplo, ejecute el siguiente comando de pip para instalarlas en el directorio actual:

      pip install --target . -r requirements.txt
  4. Guarde la siguiente función de código de Python en un archivo local llamado "transfer_stream.py".

    sugerencia

    Para ver un ejemplo de código que utiliza Java y NodeJS, consulte el SDK de AWS IoT Greengrass Core for Java y SDK de AWS IoT Greengrass Core para Node.js en GitHub.

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. Comprima en un archivo ZIP los siguientes elementos en un archivo denominado "transfer_stream_python.zip". Este es el paquete de implementación de la función de Lambda.

    • transfer_stream.py. Lógica de la aplicación.

    • greengrasssdk. Biblioteca necesaria para las funciones de Lambda de Python Greengrass que publican mensajes MQTT.

      Las operaciones del administrador de secuencias están disponibles en la versión 1.5.0 o en las versiones posteriores del SDK de AWS IoT Greengrass Core para Python.

    • Las dependencias que instaló para el SDK de AWS IoT Greengrass Core para Python (por ejemplo, los directorios de cbor2).

    Al crear el archivo de zip, incluya solo estos elementos, no la carpeta que los contiene.

Paso 2: crear una función Lambda

  1. Cree un rol de IAM para poder pasar el ARN del rol cuando cree la función.

    JSON Expanded
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }'
    JSON Single-line
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'
    nota

    AWS IoT Greengrass no utiliza este rol, ya que los permisos de las funciones de Lambda de Greengrass se especifican en la función de rol de grupo de Greengrass. En este tutorial, va a crear un rol vacío.

  2. Copie la Arn del resultado.

  3. Utilice la API de AWS Lambda para crear la función de TransferStream. El siguiente comando da por hecho que el archivo ZIP está en el directorio actual.

    • Reemplace role-arn por el Arn que ha copiado.

    aws lambda create-function \ --function-name TransferStream \ --zip-file fileb://transfer_stream_python.zip \ --role role-arn \ --handler transfer_stream.function_handler \ --runtime python3.7
  4. Publique una versión de la función.

    aws lambda publish-version --function-name TransferStream --description 'First version'
  5. Cree un alias a la versión publicada.

    Los grupos de Greengrass pueden hacer referencia a una función de Lambda por versión o alias (recomendado). El uso de un alias facilita la gestión de las actualizaciones del código porque no tiene que cambiar la tabla de suscripción o la definición del grupo cuando se actualiza el código de la función. En su lugar, basta con apuntar el alias a la nueva versión de la función.

    aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
    nota

    AWS IoT Greengrass no admite alias de Lambda; para versiones de $LATEST.

  6. Copie la AliasArn del resultado. Este valor se usa al configurar la función para AWS IoT Greengrass.

Ahora está listo para configurar la función para AWS IoT Greengrass.

Paso 3: Crear una versión y una definición de la función

Este paso crea una versión de definición de función que hace referencia a la función de Lambda GGStreamManager del sistema y a la función Lambda TransferStream definida por el usuario. Para habilitar el administrador de secuencias al usar la API AWS IoT Greengrass, la versión de definición de funciones debe incluir la función GGStreamManager.

  1. Cree una definición de función con una versión inicial que contenga las funciones de Lambda del sistema y definidas por el usuario.

    Con la siguiente versión de definición se habilita al administrador de secuencias con la configuración de parámetros predeterminada. Para configurar parámetros personalizados, debe definir variables de entorno para los parámetros correspondientes del administrador de secuencias. Para ver un ejemplo, consulte Para habilitar, deshabilitar o configurar el administrador de secuencias(CLI). AWS IoT Greengrass utiliza características predeterminadas para los parámetros que se omiten. MemorySize debería ser al menos 128000. Pinned debe establecerse en true.

    nota

    Una función de Lambda de larga duración (o anclada) se inicia automáticamente después de arrancar AWS IoT Greengrass y sigue ejecutándose en su propio contenedor. Esto contrasta con una función de Lambda bajo demanda, que se inicia cuando se la invoca y se detiene cuando no quedan tareas que ejecutar. Para obtener más información, consulte Configuración del ciclo de vida de las funciones de Lambda de Greengrass.

    • Reemplace id-función-arbitraria con un nombre para la función, como, por ejemplo, stream-manager.

    • Reemplace alias-arn por el AliasArn que ha copiado al crear el alias para la función de Lambda de TransferStream.

     

    JSON expanded
    aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{ "Functions": [ { "Id": "arbitrary-function-id", "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": { "MemorySize": 128000, "Pinned": true, "Timeout": 3 } }, { "Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": { "Executable": "transfer_stream.function_handler", "MemorySize": 16000, "Pinned": true, "Timeout": 5 } } ] }'
    JSON single
    aws greengrass create-function-definition \ --name MyGreengrassFunctions \ --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
    nota

    Timeout es necesario para la versión de definición de función, pero GGStreamManager no lo usa. Para obtener más información sobre Timeout y otras configuraciones a nivel de grupo, consulte Control de la ejecución de funciones de Greengrass Lambda utilizando la configuración específica del grupo.

  2. Copie la LatestVersionArn del resultado. Este valor se usa para añadir la versión de la definición de función a la versión de grupo que implementó en el núcleo.

Paso 4: Crear una versión y una definición del registrador

Defina la configuración de registro del grupo. Para este tutorial, debe configurar los componentes del sistema AWS IoT Greengrass, las funciones de Lambda definidas por el usuario y los conectores para que escriban los registros en el sistema de archivos del dispositivo principal. Puede usar registros para solucionar cualquier problema que pueda surgir. Para obtener más información, consulte Monitorización con registros de AWS IoT Greengrass.

  1. Cree una definición del registro que incluya una versión inicial.

    JSON Expanded
    aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{ "Loggers": [ { "Id": "1", "Component": "GreengrassSystem", "Level": "INFO", "Space": 10240, "Type": "FileSystem" }, { "Id": "2", "Component": "Lambda", "Level": "INFO", "Space": 10240, "Type": "FileSystem" } ] }'
    JSON Single-line
    aws greengrass create-logger-definition \ --name "LoggingConfigs" \ --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
  2. Copie el LatestVersionArn de la definición del registro del resultado. Este valor se usa para añadir la versión de definición del registro a la versión del grupo que implementa en el núcleo.

Paso 5: Obtener el ARN de la versión de la definición del núcleo

Obtenga el ARN de la versión de definición del núcleo para agregar a su nueva versión de grupo. Para implementar una versión de grupo, debe hacer referencia a una versión de definición de núcleo que contenga exactamente un núcleo.

  1. Obtenga los ID de la versión de grupo y grupo de Greengrass de destino. En este procedimiento, suponemos que estos son el último grupo y la última versión de grupo. La siguiente consulta devuelve el grupo creado más recientemente.

    aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"

    También puede hacer la consulta por nombre. No es necesario que los nombres de grupo sean únicos, por lo que podrían devolverse varios grupos.

    aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
    nota

    También puede encontrar estos valores en la consola de AWS IoT. El ID de grupo se muestra en la página Settings (Configuración) del grupo. Los ID de versión del grupo se muestran en la pestaña Implementaciones del grupo.

  2. Copie el Id del grupo de destino de la salida. Puede utilizar esto para obtener la versión de la definición de núcleo y al implementar el grupo.

  3. Copie el LatestVersion del resultado, que es el ID de la última versión añadida al grupo. Puede utilizar esto para obtener la versión de la definición de núcleo.

  4. Obtenga el ARN de la versión de la definición principal:

    1. Obtenga la versión de grupo.

      • Reemplace id-grupo con el Id que ha copiado para el grupo.

      • Reemplace group-version-id por el parámetro LatestVersion que ha copiado para el grupo.

      aws greengrass get-group-version \ --group-id group-id \ --group-version-id group-version-id
    2. Copie la CoreDefinitionVersionArn del resultado. Este valor se usa para añadir la versión de la definición del núcleo a la versión de grupo que implementó en el núcleo.

Paso 6: Crear una versión del grupo

Ahora, puede crear una versión de grupo que contenga las entidades que desea implementar. Para ello, cree una versión de grupo que haga referencia a la versión de destino de cada tipo de componente. Para este tutorial, incluirá una versión de definición de núcleo, una versión de definición de función y una versión de definición de registrador.

  1. Cree una versión de grupo.

    • Reemplace id-grupo con el Id que ha copiado para el grupo.

    • Reemplace core-definition-version-arn por el CoreDefinitionVersionArn que ha copiado para la nueva versión de la definición de núcleo.

    • Reemplace function-definition-version-arn con el LatestVersionArn que ha copiado para su nueva versión de definición de la función.

    • Reemplace logger-definition-version-arn con el LatestVersionArn que copió para la nueva versión de definición del registrador.

    aws greengrass create-group-version \ --group-id group-id \ --core-definition-version-arn core-definition-version-arn \ --function-definition-version-arn function-definition-version-arn \ --logger-definition-version-arn logger-definition-version-arn
  2. Copie la Version del resultado. Este es el ID de la nueva versión del grupo.

Paso 7: Crear una implementación

Implemente el grupo en el dispositivo del núcleo.

  1. Asegúrese de que el núcleo de AWS IoT Greengrass se está ejecutando. Ejecute los siguientes comandos en el terminal de Raspberry Pi según sea necesario.

    1. Para comprobar si el demonio está en ejecución:

      ps aux | grep -E 'greengrass.*daemon'

      Si la salida contiene una entrada root para /greengrass/ggc/packages/ggc-version/bin/daemon, el demonio está en ejecución.

      nota

      La versión que figura en la ruta depende de la versión del software AWS IoT Greengrass Core que esté instalada en el dispositivo del núcleo.

    2. Iniciar el daemon:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. Cree una implementación de .

    • Reemplace id-grupo con el Id que ha copiado para el grupo.

    • Reemplace id-versión-grupo con el Version que ha copiado para el nuevo grupo.

    aws greengrass create-deployment \ --deployment-type NewDeployment \ --group-id group-id \ --group-version-id group-version-id
  3. Copie la DeploymentId del resultado.

  4. Obtenga el estado de las implementaciones.

    • Reemplace id-grupo con el Id que ha copiado para el grupo.

    • Reemplace deployment-id por el parámetro DeploymentId que ha copiado para la implementación.

    aws greengrass get-deployment-status \ --group-id group-id \ --deployment-id deployment-id

    Si el estado es Success, la implementación fue correcta. Para obtener ayuda sobre la resolución de problemas, consulte Solución de problemas de AWS IoT Greengrass.

Paso 8: Probar la aplicación

La función de Lambda TransferStream genera datos simulados del dispositivo. Escribe datos en una secuencia que el administrador de secuencias exporta a la secuencia de datos de Kinesis de destino.

  1. En la consola de Amazon Kinesis , bajo las Secuencias de datos de Kinesis, seleccione MyKinesisStream.

    nota

    Si ejecutó el tutorial sin una secuencia de datos de Kinesis de destino, compruebe el archivo de registro del administrador de secuencias (GGStreamManager). Si contiene export stream MyKinesisStream doesn't exist en un mensaje de error, la prueba se ha realizado correctamente. Este error significa que el servicio intentó exportar a la secuencia, pero que la secuencia no existe.

  2. En la página MyKinesisStream, seleccione Monitoring (Supervisión). Si la prueba se realiza correctamente, debería ver los datos en los gráficos Put Records. En función de la conexión, es posible que tarde un minuto en mostrar los datos.

    importante

    Cuando haya terminado la prueba, elimine la secuencia de datos de Kinesis para evitar incurrir en más gastos.

    O ejecute el siguiente comando para detener el daemon de Greengrass. Así evitará que el núcleo envíe mensajes hasta que esté listo para continuar las pruebas.

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. Elimine la función de Lambda TransferStream del núcleo.

    1. Siga Paso 6: Crear una versión del grupo para crear una nueva versión de grupo. pero elimine la opción --function-definition-version-arn en el comando create-group-version. O bien, cree una versión de definición de función que no incluya la función de Lambda TransferStream.

      nota

      Al omitir la función de Lambda de GGStreamManager del sistema de la versión de grupo implementada, se deshabilita la administración de secuencias en el núcleo.

    2. Siga Paso 7: Crear una implementación para implementar la nueva versión de grupo.

Para ver la información de registro o solucionar problemas con las secuencias, compruebe los registros para las funciones TransferStream y GGStreamManager. Debe tener permisos de root para leer registros AWS IoT Greengrass en el sistema de archivos.

  • TransferStream escribe entradas de registro en greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log.

  • GGStreamManager escribe entradas de registro en greengrass-root/ggc/var/log/system/GGStreamManager.log.

Si necesita más información sobre la solución de problemas, puede establecer el nivel de registro de Lambda a DEBUG y, a continuación, crear e implementar una nueva versión de grupo.

Véase también