自 2024 年 10 月 31 日起,Amazon Monitron 將不再開放給新客戶。如果您想要使用 服務,請在該日期之前註冊。現有客戶可以繼續正常使用服務。如需類似 Amazon Monitron 的功能,請參閱我們的部落格文章
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
從 Kinesis v1 遷移至 v2
如果您目前正在使用 v1 資料結構描述,您可能已經將資料傳送至 Amazon S3,或使用 Lambda 進一步處理資料串流承載。
將資料結構描述更新至 v2
如果您已使用 v1 結構描述設定資料串流,您可以執行下列動作來更新資料匯出程序:
-
開啟您的 Amazon Monitron 主控台。
-
導覽至您的專案。
-
停止目前的即時資料匯出 。
-
開始即時資料匯出以建立新的資料串流。
-
選取新建立的資料串流。
-
選擇開始即時資料匯出 。此時,新的結構描述會透過資料串流傳送您的承載。
-
(選用) 前往 Kinesis 主控台並刪除舊資料串流。
-
使用 v2 結構描述為新建立的資料串流設定新的交付方法。
您的新串流現在會將符合 v2 結構描述的承載交付至您的新儲存貯體。我們建議您使用兩個不同的儲存貯體來具有一致的格式,以備您想要處理這些儲存貯體中的所有資料時使用。例如,使用 Athena 和 等其他服務 AWS Glue。
注意
如果您要將資料交付至 Amazon S3,請了解如何將匯出的資料儲存在 Amazon S3 中,以取得如何使用 v2 結構描述將資料交付至 Amazon S3 的詳細資訊。
注意
如果您使用 Lambda 函數來處理承載,請了解如何使用 Lambda 處理資料。您也可以參閱使用 Lambda 更新一節,以取得詳細資訊。
使用 Lambda 更新資料處理
使用 Lambda 更新資料處理需要您考慮 v2 資料串流現在是以事件為基礎。您的初始 v1 Lambda 程式碼可能類似於下列內容:
import base64 def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) measurement = payload["measurement"] projectDisplayName = payload["projectDisplayName"] # Process the content of the measurement # ...
由於 v1 資料結構描述位於棄用路徑上,因此先前的 Lambda 程式碼無法與所有新的資料串流搭配使用。
下列 Python 範例程式碼將使用資料結構描述 v2 處理來自 Kinesis 串流的事件。此程式碼使用新eventType
參數將處理導向至適當的處理常式:
import base64 handlers = { "measurement": measurementEventHandler, "gatewayConnected": gatewayConnectedEventHandler, "gatewayDisconnected": gatewayDisconnectedEventHandler, "sensorConnected": sensorConnectedEventHandler, "sensorDisconnected": sensorDisconnectedEventHandler, } def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) eventType = payload["eventType"] if eventType not in handler.keys(): log.info("No event handler found for the event type: {event['eventType']}") return # Invoke the appropriate handler based on the event type. eventPayload = payload["eventPayload"] eventHandler = handlers[eventType] eventHandler(eventPayload) def measurementEventHandler(measurementEventPayload): # Handle measurement event projectName = measurementEventPayload["projectName"] # ... def gatewayConnectedEventHandler(gatewayConnectedEventPayload): # Handle gateway connected event # Other event handler functions