从 Kinesis v1 迁移到 v2 - Amazon Monitron

从2024年10月31日起,亚马逊Monitron将不再向新客户开放。如果您想使用该服务,请在该日期之前注册。现有客户可以继续照常使用该服务。如需了解与 Amazon Monitron 类似的功能,请参阅我们的博客文章

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

从 Kinesis v1 迁移到 v2

如果您当前使用的是 v1 数据架构,您可能已经在向 Amazon S3 发送数据,或者已经在使用 Lambda 进一步处理数据流负载。

将数据架构更新到 v2

如果您已经使用 v1 架构配置了数据流,则可以通过执行以下操作来更新数据导出流程:

  1. 打开您的 Amazon Monitron 控制台。

  2. 导航到您的项目。

  3. 停止当前实时数据导出

  4. 启动实时数据导出以创建新数据流。

  5. 选择新创建的数据流。

  6. 选择启动实时数据导出。此时,新架构将通过数据流发送您的负载。

  7. (可选)转到 Kinesis 控制台并删除您的旧数据流。

  8. 使用 v2 架构为新创建的数据流配置新传输方式。

您的新数据流现在会将符合 v2 架构的负载传输到您的新桶。我们建议您使用两个不同的桶来保证格式一致,以防您要处理这些桶中的所有数据。例如,使用其他服务,例如 Athena 和。 AWS Glue

注意

如果您要将数据传输到 Amazon S3,请了解如何将导出的数据存储在 Amazon S3 中,进而详细了解如何使用 v2 架构将数据传输到 Amazon S3。

注意

如果您要使用 Lambda 函数来处理负载,请了解如何使用 Lambda 处理数据。您还可以参阅使用 Lambda 进行更新部分以获取更多信息。

使用 Lambda 更新数据处理

使用 Lambda 更新数据处理时需要注意,v2 数据流现在基于事件。您的初始 v1 Lambda 代码可能与下面类似:

import base64 def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) measurement = payload["measurement"] projectDisplayName = payload["projectDisplayName"] # Process the content of the measurement # ...

由于 v1 数据架构正处于弃用过程中,因此以前的 Lambda 代码并不适用于所有新数据流。

以下 Python 示例代码将使用数据架构 v2 处理来自 Kinesis 流的事件。此代码使用新 eventType 参数将处理定向到合适的处理程序:

import base64 handlers = { "measurement": measurementEventHandler, "gatewayConnected": gatewayConnectedEventHandler, "gatewayDisconnected": gatewayDisconnectedEventHandler, "sensorConnected": sensorConnectedEventHandler, "sensorDisconnected": sensorDisconnectedEventHandler, } def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) eventType = payload["eventType"] if eventType not in handler.keys(): log.info("No event handler found for the event type: {event['eventType']}") return # Invoke the appropriate handler based on the event type. eventPayload = payload["eventPayload"] eventHandler = handlers[eventType] eventHandler(eventPayload) def measurementEventHandler(measurementEventPayload): # Handle measurement event projectName = measurementEventPayload["projectName"] # ... def gatewayConnectedEventHandler(gatewayConnectedEventPayload): # Handle gateway connected event # Other event handler functions