Migração do Kinesis v1 para v2 - Amazon Monitron

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Migração do Kinesis v1 para v2

Se você estiver usando atualmente o esquema de dados v1, talvez já esteja enviando dados para o Amazon S3 ou processando ainda mais a carga útil do fluxo de dados com o Lambda.

Atualizando o esquema de dados para v2

Se você já configurou um fluxo de dados com o esquema v1, você pode atualizar seu processo de exportação de dados fazendo o seguinte:

  1. Abra seu console Amazon Monitron.

  2. Navegue até o projeto.

  3. Pare a exportação atual de dados ao vivo.

  4. Inicie a exportação de dados ao vivo para criar um novo fluxo de dados.

  5. Selecione o fluxo de dados recém-criado.

  6. Escolha iniciar exportação de dados ao vivo. Nesse ponto, o novo esquema enviará sua carga pelo fluxo de dados.

  7. (Opcional) Acesse o console do Kinesis e exclua o fluxo de dados antigo.

  8. Configure um novo método de entrega para seu fluxo de dados recém-criado com o esquema v2.

Seu novo stream agora entrega cargas em conformidade com o esquema v2 para seu novo bucket. Recomendamos usar dois buckets distintos para ter um formato consistente, caso você queira processar todos os dados nesses buckets. Por exemplo, usando outros serviços, como Athena e AWS Glue.

nota

Se você estava entregando seus dados para o Amazon S3, saiba como armazenar dados exportados no Amazon S3 para obter detalhes sobre como entregar seus dados ao Amazon S3 com o esquema v2.

nota

Se você estava usando uma função do Lambda para processar suas cargas, saiba como processar dados com o Lambda. Você também pode consultar a seção Atualização com Lambda para obter mais informações.

Atualizando o processamento de dados com o Lambda

A atualização do processamento de dados com o Lambda exige que você considere que o fluxo de dados v2 agora é baseado em eventos. Seu código Lambda v1 inicial pode ter sido semelhante ao seguinte:

import base64 def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) measurement = payload["measurement"] projectDisplayName = payload["projectDisplayName"] # Process the content of the measurement # ...

Como o esquema de dados v1 está em um caminho de descontinuação, o código Lambda anterior não funcionará com todos os novos fluxos de dados.

O código de amostra do Python a seguir processará eventos do stream do Kinesis com o esquema de dados v2. Esse código usa o novo parâmetro eventType para orientar o processamento para o manipulador apropriado:

import base64 handlers = { "measurement": measurementEventHandler, "gatewayConnected": gatewayConnectedEventHandler, "gatewayDisconnected": gatewayDisconnectedEventHandler, "sensorConnected": sensorConnectedEventHandler, "sensorDisconnected": sensorDisconnectedEventHandler, } def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) eventType = payload["eventType"] if eventType not in handler.keys(): log.info("No event handler found for the event type: {event['eventType']}") return # Invoke the appropriate handler based on the event type. eventPayload = payload["eventPayload"] eventHandler = handlers[eventType] eventHandler(eventPayload) def measurementEventHandler(measurementEventPayload): # Handle measurement event projectName = measurementEventPayload["projectName"] # ... def gatewayConnectedEventHandler(gatewayConnectedEventPayload): # Handle gateway connected event # Other event handler functions