Selecione suas preferências de cookies

Usamos cookies essenciais e ferramentas semelhantes que são necessárias para fornecer nosso site e serviços. Usamos cookies de desempenho para coletar estatísticas anônimas, para que possamos entender como os clientes usam nosso site e fazer as devidas melhorias. Cookies essenciais não podem ser desativados, mas você pode clicar em “Personalizar” ou “Recusar” para recusar cookies de desempenho.

Se você concordar, a AWS e terceiros aprovados também usarão cookies para fornecer recursos úteis do site, lembrar suas preferências e exibir conteúdo relevante, incluindo publicidade relevante. Para aceitar ou recusar todos os cookies não essenciais, clique em “Aceitar” ou “Recusar”. Para fazer escolhas mais detalhadas, clique em “Personalizar”.

Conceitos básicos do Kinesis Data Streams para o Amazon DynamoDB

Modo de foco
Conceitos básicos do Kinesis Data Streams para o Amazon DynamoDB - Amazon DynamoDB

Esta seção descreve como usar o Kinesis Data Streams para tabelas do Amazon DynamoDB com o console do Amazon DynamoDB, a AWS Command Line Interface (AWS CLI) e a API.

Criar um fluxo de dados ativo do Amazon Kinesis

Todos esses exemplos usam a tabela Music do DynamoDB que foi criada como parte do tutorial Conceitos básicos do DynamoDB.

Para saber mais sobre como criar consumidores e conectar seu fluxo de dados do Kinesis a outros produtos da AWS, consulte Ler dados do Amazon Kinesis Data Streams no Guia do desenvolvedor do Amazon Kinesis Data Streams.

nota

Quando você estiver usando fragmentos do KDS pela primeira vez, recomendamos configurá-los para que aumentem e reduzam a escala verticalmente de acordo com os padrões de uso. Depois de acumular mais dados sobre os padrões de uso, você poderá ajustar os fragmentos em seu fluxo para fazer a correspondência.

Console
  1. Faça login no AWS Management Console e abra o console do Kinesis em https://console.aws.amazon.com/kinesis/.

  2. Escolha Create data stream (Criar fluxo de dados) e siga as instruções para criar um fluxo chamado samplestream.

  3. Abra o console do DynamoDB em https://console.aws.amazon.com/dynamodb/.

  4. No painel de navegação, no lado esquerdo do console, selecione Tables (Tabelas).

  5. Escolha a tabela Music.

  6. Escolha a guia Exports and streams (Exportações e fluxos).

  7. (Opcional) Em Detalhes do stream de dados do Amazon Kinesis, você pode alterar a precisão do carimbo de data e hora do registro de microssegundos (padrão) para milissegundos.

  8. Escolha samplestream na lista suspensa.

  9. Escolha o botão Ativar.

AWS CLI
  1. Criar um fluxo de dados do Kinesis denominado samplestream usando o comando create-stream.

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    Consulte Considerações sobre gerenciamento de fragmentos para o Kinesis Data Streams antes de definir o número de fragmentos para o fluxo de dados do Kinesis.

  2. Verifique se o stream do Kinesis está ativo e pronto para uso usando o comando describe-stream.

    aws kinesis describe-stream --stream-name samplestream
  3. Habilite o streaming do Kinesis na tabela do DynamoDB usando o comando enable-kinesis-streaming-destination do DynamoDB. Substitua o valor de stream-arn pelo que foi retornado por describe-stream na etapa anterior. Opcionalmente, habilite o streaming com uma precisão mais granular (microssegundos) dos valores de carimbo de data e hora retornados em cada registro.

    Habilite o streaming com precisão de carimbo de data e hora em microssegundos:

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND

    Ou habilite o streaming com a precisão padrão do carimbo de data e hora (milissegundos):

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
  4. Verifique se o streaming do Kinesis está ativa na tabela usando o comando describe-kinesis-streaming-destination do DynamoDB.

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. Grave os dados na tabela do DynamoDB usando o comando put-item, conforme descrito no Guia do desenvolvedor do DynamoDB.

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. Use o comando get-records da CLI do Kinesis para recuperar o conteúdo de fluxo do Kinesis. Em seguida, use o trecho de código a seguir para desserializar o conteúdo do fluxo.

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. Siga as instruções no Guia do desenvolvedor do Kinesis Data Streams para criar um fluxo de dados do Kinesis chamada samplestream usando Java.

    Consulte Considerações sobre gerenciamento de fragmentos para o Kinesis Data Streams antes de definir o número de fragmentos para o fluxo de dados do Kinesis.

  2. Use o trecho de código a seguir para habilitar a transmissão do Kinesis na tabela do DynamoDB. Opcionalmente, habilite o streaming com uma precisão mais granular (microssegundos) dos valores de carimbo de data e hora retornados em cada registro.

    Habilite o streaming com precisão de carimbo de data e hora em microssegundos:

    EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder() .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND) .build(); EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .enableKinesisStreamingConfiguration(enableKdsConfig) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);

    Ou habilite o streaming com a precisão padrão do carimbo de data e hora (milissegundos):

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. Siga as instruções do Kinesis Data Streams developer guide (Guia do desenvolvedor do Kinesis Data Streams) para ler o fluxo de dados criado.

  4. Use o trecho de código a seguir para desserializar o conteúdo do fluxo

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
  1. Faça login no AWS Management Console e abra o console do Kinesis em https://console.aws.amazon.com/kinesis/.

  2. Escolha Create data stream (Criar fluxo de dados) e siga as instruções para criar um fluxo chamado samplestream.

  3. Abra o console do DynamoDB em https://console.aws.amazon.com/dynamodb/.

  4. No painel de navegação, no lado esquerdo do console, selecione Tables (Tabelas).

  5. Escolha a tabela Music.

  6. Escolha a guia Exports and streams (Exportações e fluxos).

  7. (Opcional) Em Detalhes do stream de dados do Amazon Kinesis, você pode alterar a precisão do carimbo de data e hora do registro de microssegundos (padrão) para milissegundos.

  8. Escolha samplestream na lista suspensa.

  9. Escolha o botão Ativar.

Alterar um fluxo de dados ativo do Amazon Kinesis

Esta seção descreve como alterar uma configuração ativa do Kinesis Data Streams para o DynamoDB usando o console, a AWS CLI e a API.

AWS Management Console

  1. Abra o console do DynamoDB em https://console.aws.amazon.com/dynamodb/.

  2. Acesse a tabela.

  3. Escolha Exportações e streams.

AWS CLI

  1. Chame describe-kinesis-streaming-destination para confirmar se o fluxo está ACTIVE.

  2. Chame UpdateKinesisStreamingDestination, como neste exemplo:

    aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
  3. Chame describe-kinesis-streaming-destination para confirmar se o fluxo está UPDATING.

  4. Chame describe-kinesis-streaming-destination periodicamente até que o status de streaming volte a ser ACTIVE. Normalmente, demora até 5 minutos para que as atualizações de precisão do carimbo de data e hora entrem em vigor. A atualização desse status indicará que a operação terá sido concluída e que o novo valor de precisão será aplicado em registros futuros.

  5. Grave na tabela usando putItem.

  6. Use o comando get-records do Kinesis para obter o conteúdo do fluxo.

  7. Confirme se ApproximateCreationDateTime das gravações tem a precisão desejada.

API Java

  1. Forneça um trecho de código que construa uma solicitação UpdateKinesisStreamingDestination e uma resposta UpdateKinesisStreamingDestination.

  2. Forneça um trecho de código que construa uma solicitação DescribeKinesisStreamingDestination e uma DescribeKinesisStreamingDestination response.

  3. Chame describe-kinesis-streaming-destination periodicamente até que o status de streaming volte a ser ACTIVE, indicando que a atualização foi concluída e que o novo valor de precisão será aplicado em registros futuros.

  4. Faça gravações na tabela.

  5. Faça uma leitura do fluxo e desserialize o conteúdo do fluxo.

  6. Confirme se ApproximateCreationDateTime das gravações tem a precisão desejada.

PrivacidadeTermos do sitePreferências de cookies
© 2025, Amazon Web Services, Inc. ou suas afiliadas. Todos os direitos reservados.