As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Usando o Amazon Kinesis Data Streams como alvo para AWS Database Migration Service
Você pode usar AWS DMS para migrar dados para um stream de dados do Amazon Kinesis. O Amazon Kinesis Data Streams faz parte do serviço Amazon Kinesis Data Streams. É possível utilizar os fluxos de dados do Kinesis para coletar e processar grandes registros de dados em tempo real.
O fluxo de dados do Kinesis é composto por fragmentos. O estilhaço é uma sequência de registros de dados identificada de forma exclusiva em um stream. Para obter mais informações sobre fragmentos no Amazon Kinesis Data Streams, consulte o Fragmento no Guia do desenvolvedor do Amazon Kinesis Data Streams.
AWS Database Migration Service publica registros em um stream de dados do Kinesis usando JSON. Durante a conversão, o AWS DMS serializa cada registro do banco de dados de origem em um par atributo/valor no formato JSON ou em um formato de mensagem JSON_UNFORMATTED. Um formato de mensagem JSON_UNFORMATTED é uma string JSON de linha única com novo delimitador de linha. Ele permite que o Amazon Data Firehose entregue dados do Kinesis para um destino do Amazon S3 e, em seguida, consulte-os usando vários mecanismos de consulta, incluindo o Amazon Athena.
O mapeamento de objetos é utilizado para migrar os dados de uma fonte de dados compatível para um fluxo de destino. Com o mapeamento do objeto, você determina como estruturar os registros de dados no stream. Também é possível definir uma chave de partição para cada tabela, que será utilizada pelo Kinesis Data Streams para agrupar os dados em fragmentos.
Ao AWS DMS criar tabelas em um endpoint de destino do Kinesis Data Streams, ele cria tantas tabelas quanto no endpoint do banco de dados de origem. AWS DMS também define vários valores de parâmetros do Kinesis Data Streams. O custo de criação da tabela depende da quantidade de dados e do número de tabelas a serem migradas.
nota
A opção Modo SSL no AWS DMS console ou na API não se aplica a alguns serviços de streaming de dados e NoSQL, como Kinesis e DynamoDB. Eles são seguros por padrão, então AWS DMS mostra que a configuração do modo SSL é igual a nenhuma (Modo SSL = Nenhuma). Não é necessário fornecer nenhuma configuração adicional para que o endpoint utilize o SSL. Por exemplo, ao utilizar o Kinesis como um endpoint de destino, ele é seguro por padrão. Todas as chamadas de API para o Kinesis usam SSL, portanto, não há necessidade de uma opção adicional de SSL no endpoint. AWS DMS É possível colocar e recuperar dados com segurança por meio de endpoints SSL utilizando o protocolo HTTPS, que o AWS DMS utiliza por padrão ao se conectar a um fluxo de dados Kinesis.
Configurações do endpoint do Kinesis Data Streams
Ao usar os endpoints de destino do Kinesis Data Streams, você pode obter detalhes da transação e do controle KinesisSettings
usando a opção na API. AWS DMS
É possível definir as configurações da conexão de uma das seguintes maneiras:
No AWS DMS console, usando as configurações do endpoint.
Na CLI, usando a
kinesis-settings
opção do CreateEndpointcomando.
Na CLI, utilize os seguintes parâmetros de solicitação da opção kinesis-settings
:
nota
Compatibilidade com a configuração do endpoints do IncludeNullAndEmpty
está disponível nas versões 3.4.1 e superiores do AWS DMS . Mas o suporte para as outras configurações de endpoint a seguir para destinos do Kinesis Data Streams está disponível em. AWS DMS
MessageFormat
: o formato de saída dos registros criados no endpoint. O formato da mensagem éJSON
(padrão) ouJSON_UNFORMATTED
(uma única linha sem guia).-
IncludeControlDetails
: mostra informações detalhadas de controle para definição de tabelas, definição de colunas e alterações de tabelas e colunas na saída de mensagem do Kinesis. O padrão éfalse
. -
IncludeNullAndEmpty
: inclui colunas NULL e vazias no destino. O padrão éfalse
. -
IncludePartitionValue
: mostra o valor da partição na saída da mensagem do Kinesis, a menos que o tipo de partição sejaschema-table-type
. O padrão éfalse
. -
IncludeTableAlterOperations
: inclui todas as operações da linguagem de definição de dados (DDL) que alteram a tabela nos dados de controle, comorename-table
,drop-table
,add-column
,drop-column
erename-column
. O padrão éfalse
. -
IncludeTransactionDetails
: fornece informações detalhadas sobre transações do banco de dados de origem. Essas informações incluem um timestamp de confirmação, uma posição no log e valores paratransaction_id
,previous_transaction_id
etransaction_record_id
(o deslocamento de registro dentro de uma transação). O padrão éfalse
. -
PartitionIncludeSchemaTable
: prefixa os nomes de esquema e de tabela em valores de partições, quando o tipo de partição forprimary-key-type
. Isso aumenta a distribuição de dados entre estilhaços do Kinesis. Por exemplo, suponha que um esquemaSysBench
tenha milhares de tabelas, e cada tabela tenha apenas um intervalo limitado para uma chave primária. Nesse caso, a mesma chave primária é enviada de milhares de tabelas para o mesmo estilhaço, o que provoca o controle de utilização. O padrão éfalse
.
O exemplo a seguir mostra a opção kinesis-settings
em uso com um exemplo de comando create-endpoint
emitido utilizando a AWS CLI.
aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true
Configurações da tarefa de carga máxima com vários threads
Para ajudar a aumentar a velocidade da transferência, AWS DMS oferece suporte a uma carga completa multisegmentada em uma instância de destino do Kinesis Data Streams. O DMS oferece suporte a esse multithreading com configurações de tarefa que incluem o seguinte:
-
MaxFullLoadSubTasks
: utilize esta opção para indicar o número máximo de tabelas de origem a serem carregadas em paralelo. O DMS carrega cada tabela na tabela de destino do Kinesis correspondente utilizando uma subtarefa dedicada. O padrão é 8; o valor máximo é 49. -
ParallelLoadThreads
— Use essa opção para especificar o número de threads AWS DMS usados para carregar cada tabela em sua tabela de destino do Kinesis. O valor máximo para um destino do Kinesis Data Streams é 32. Você pode solicitar o aumento desse limite máximo. -
ParallelLoadBufferSize
: utilize essa opção para especificar o número máximo de registros a serem armazenados em buffer utilizado pelos threads de carga paralela para carregar dados no destino do Kinesis. O valor padrão é 50. Valor máximo de 1.000. Use essa configuração comParallelLoadThreads
;ParallelLoadBufferSize
é válido somente quando há mais de um thread. -
ParallelLoadQueuesPerThread
: utilize esta opção para especificar o número de filas que cada thread simultâneo acessa para extrair registros de dados das filas e gerar uma carga em lote para o destino. O padrão é um. No entanto, para destinos do Kinesis de vários tamanhos de carga útil, o intervalo válido é de 5 a 512 filas por thread.
Configurações da tarefa de carga de CDC multithread
É possível melhorar o desempenho da captura de dados de alterações (CDC) para endpoints de destino de streaming de dados em tempo real, como o Kinesis, utilizando configurações de tarefa para modificar o comportamento da chamada da API PutRecords
. Para fazer isso, especifique o número de threads simultâneos, filas por thread e o número de registros a serem armazenados em um buffer usando as configurações da tarefa ParallelApply*
. Por exemplo, suponha que você queira executar um carregamento de CDC e aplicar 128 threads em paralelo. Você também quer acessar 64 filas por thread, com 50 registros armazenados por buffer.
Para promover o desempenho do CDC, AWS DMS oferece suporte a estas configurações de tarefas:
-
ParallelApplyThreads
— Especifica o número de threads simultâneos que são AWS DMS usados durante o carregamento do CDC para enviar registros de dados para um endpoint de destino do Kinesis. O valor padrão é zero (0) e o valor máximo é 32. -
ParallelApplyBufferSize
: especifica o número máximo de registros a serem armazenados em cada fila de buffer para threads simultâneos enviarem para um endpoint de destino do Kinesis durante uma carga de CDC. O valor padrão é 100 e o valor máximo é 1.000. Use essa opção quandoParallelApplyThreads
especificar mais de um thread. -
ParallelApplyQueuesPerThread
: especifica o número de filas que cada thread acessa para extrair registros de dados das filas e gerar uma carga em lote para um endpoint do Kinesis durante a CDC. O valor padrão é 1 e o valor máximo é 512.
Ao usar configurações da tarefa ParallelApply*
, o partition-key-type
padrão é a primary-key
da tabela, não o schema-name.table-name
.
Utilizar uma imagem anterior para visualizar valores originais de linhas da CDC para um fluxo de dados do Kinesis como destino
Ao gravar atualizações da CDC em um destino de streaming de dados, como o Kinesis, é possível visualizar os valores originais de linhas do banco de dados de origem antes da alteração por uma atualização. Para tornar isso possível, AWS DMS preenche uma imagem anterior dos eventos de atualização com base nos dados fornecidos pelo mecanismo do banco de dados de origem.
Diferentes mecanismos de banco de dados de origem fornecem diferentes quantidades de informações para uma imagem anterior:
-
O Oracle fornece atualizações para colunas somente se elas forem alteradas.
-
O PostgreSQL fornece somente dados para colunas que fazem parte da chave primária (alterada ou não). Para fornecer dados para todas as colunas (alteradas ou não), você precisa definir
REPLICA_IDENTITY
comoFULL
em vez deDEFAULT
. Observe que você deve escolher cuidadosamente a configuraçãoREPLICA_IDENTITY
para cada tabela. Se você definirREPLICA_IDENTITY
comoFULL
, todos os valores da coluna serão gravados continuamente no registro em log de gravação antecipada (WAL). Isso pode causar problemas de desempenho ou de recursos com tabelas que são atualizadas com frequência. -
O MySQL geralmente fornece dados para todas as colunas, exceto para os tipos de dados BLOB e CLOB (alterados ou não).
Para habilitar a criação de imagem anterior para adicionar valores originais do banco de dados de origem à saída do AWS DMS , use a configuração de tarefa BeforeImageSettings
ou o parâmetro add-before-image-columns
. Esse parâmetro aplica uma regra de transformação de coluna.
BeforeImageSettings
adiciona um novo atributo JSON a cada operação de atualização com valores coletados do sistema de banco de dados de origem, conforme mostrado a seguir.
"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
nota
Aplique somente BeforeImageSettings
às AWS DMS tarefas que contêm um componente do CDC, como carga total mais tarefas do CDC (que migram dados existentes e replicam as alterações em andamento), ou às tarefas somente do CDC (que replicam somente as alterações de dados). Não aplique BeforeImageSettings
a tarefas que são somente de carga total.
Para opções BeforeImageSettings
, aplica-se o seguinte:
-
Defina a opção
EnableBeforeImage
comotrue
para habilitar a criação de imagem anterior. O padrão éfalse
. -
Use a opção
FieldName
para atribuir um nome ao novo atributo JSON. QuandoEnableBeforeImage
fortrue
,FieldName
será necessário e não poderá estar vazio. -
A opção
ColumnFilter
especifica uma coluna a ser adicionada usando imagem anterior. Para adicionar somente colunas que fazem parte das chaves primárias da tabela, use o valor padrão,pk-only
. Para adicionar qualquer coluna que tenha um valor de imagem anterior, useall
. Observe que a imagem anterior não contém colunas com tipos de dados LOB, como CLOB ou BLOB."BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
nota
Os destinos do Amazon S3 não são compatíveis com BeforeImageSettings
. Para destinos do S3, utilize somente a regra de transformação add-before-image-columns
para executar a criação da imagem anterior durante a CDC.
Usar uma regra de transformação de imagem anterior
Como alternativa às configurações de tarefa, é possível usar o parâmetro add-before-image-columns
, que aplica uma regra de transformação de coluna. Com esse parâmetro, é possível ativar a imagem anterior durante a CDC em destinos de streaming de dados, como o Kinesis.
Usando add-before-image-columns
em uma regra de transformação, é possível aplicar um controle mais refinado dos resultados da imagem anterior. As regras de transformação permitem que você use um localizador de objetos que oferece controle sobre as tabelas selecionadas para a regra. Além disso, é possível encadear regras de transformação, o que permite que regras diferentes sejam aplicadas a tabelas diferentes. Depois, você poderá manipular as colunas produzidas usando outras regras.
nota
Não use o parâmetro add-before-image-columns
junto com a configuração da tarefa BeforeImageSettings
na mesma tarefa. Em vez disso, use o parâmetro ou a configuração, mas não ambos, para uma única tarefa.
Um tipo de regra transformation
com o parâmetro add-before-image-columns
de uma coluna deve fornecer uma seção before-image-def
. Por exemplo:
{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }
O valor de column-prefix
precede um nome de coluna e o valor padrão de column-prefix
é BI_
. O valor de column-suffix
é anexado ao nome da coluna e o padrão é vazio. Não defina column-prefix
e column-suffix
como strings vazias.
Escolha um valor para column-filter
. Para adicionar somente colunas que fazem parte das chaves primárias da tabela, escolha pk-only
. Escolha non-lob
para adicionar somente colunas que não sejam do tipo LOB. Ou escolha all
para adicionar qualquer coluna que tenha um valor de imagem anterior.
Exemplo de uma regra de transformação de imagem anterior
A regra de transformação no exemplo a seguir adiciona uma nova coluna chamada BI_emp_no
no destino. Portanto, uma instrução como UPDATE
employees SET emp_no = 3 WHERE emp_no = 1;
preenche o campo BI_emp_no
com 1. Ao gravar atualizações da CDC em destinos do Amazon S3, a coluna BI_emp_no
possibilita identificar qual linha original foi atualizada.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }
Para obter informações sobre como usar a ação da regra add-before-image-columns
, consulte Regras de transformação e ações.
Pré-requisitos para usar um stream de dados do Kinesis como destino para AWS Database Migration Service
Função do IAM para usar um stream de dados do Kinesis como destino para AWS Database Migration Service
Antes de configurar um stream de dados do Kinesis como destino AWS DMS, certifique-se de criar uma função do IAM. Essa função deve permitir AWS DMS assumir e conceder acesso aos fluxos de dados do Kinesis para os quais estão sendo migrados. O conjunto mínimo de permissões de acesso é mostrado na seguinte política do IAM.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
O perfil que você utiliza para a migração para um fluxo de dados do Kinesis deve ter as seguintes permissões:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:
region
:accountID
:stream/streamName
" } ] }
Acessando um stream de dados do Kinesis como destino para AWS Database Migration Service
Na AWS DMS versão 3.4.7 e superior, para se conectar a um endpoint do Kinesis, você deve fazer o seguinte:
Configure o DMS para usar endpoints da VPC. Para obter mais informações sobre a configuração de endpoints da VPC, consulte Configurar endpoints da VPC como endpoints de origem e de destino do AWS.
Configure o DMS para usar rotas públicas; ou seja, torne a instância de replicação pública. Para obter mais informações sobre instâncias de replicação públicas, consulte Instâncias de replicação públicas e privadas.
Limitações ao usar o Kinesis Data Streams como destino para AWS Database Migration Service
Aplicam-se as seguintes limitações ao utilizar o Kinesis Data Streams como destino:
-
AWS DMS publica cada atualização em um único registro no banco de dados de origem como um registro de dados em um determinado stream de dados do Kinesis, independentemente das transações. No entanto, é possível incluir detalhes da transação para cada registro de dados utilizando parâmetros relevantes da API do
KinesisSettings
. -
O modo Full LOB não é compatível.
-
O tamanho máximo do LOB compatível é 1 MB.
-
Os Kinesis Data Streams não é compatível com a desduplicação. As aplicações que consumem dados de um fluxo precisam tratar os registros duplicados. Para obter mais informações, consulte Tratar registros duplicados no Guia do desenvolvedor do Amazon Kinesis Data Streams.
-
AWS DMS suporta as duas formas a seguir para chaves de partição:
-
SchemaName.TableName
: uma combinação de esquema e nome da tabela. -
${AttributeName}
: o valor de um dos campos no JSON ou a chave primária da tabela no banco de dados de origem.
-
-
Para obter informações sobre como criptografar dados em repouso no Kinesis Data Streams, consulte Proteção de dados no Kinesis Data Streams, no Guia do desenvolvedor do AWS Key Management Service .
-
O
BatchApply
não é compatível com um endpoint do Kinesis. A utilização da aplicação em lote (por exemplo, a configuraçãoBatchApplyEnabled
da tarefa de metadados de destino) para um destino do Kafka pode resultar em perda de dados. -
Os destinos do Kinesis só são compatíveis com um stream de dados do Kinesis na mesma AWS conta e na Região da AWS mesma instância de replicação.
-
Ao migrar de uma fonte MySQL, os dados não incluem BeforeImage os tipos de dados CLOB e BLOB. Para ter mais informações, consulte Utilizar uma imagem anterior para visualizar valores originais de linhas da CDC para um fluxo de dados do Kinesis como destino.
-
AWS DMS não suporta a migração de valores do tipo de
BigInt
dados com mais de 16 dígitos. Para contornar essa limitação, você pode usar a regra de transformação a seguir para converter a colunaBigInt
em uma string. Para obter mais informações sobre regras transformação, consulte Regras de transformação e ações.{ "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }
Utilizar o mapeamento de objetos para migrar dados para um fluxo de dados do Kinesis
AWS DMS usa regras de mapeamento de tabelas para mapear dados da fonte para o stream de dados do Kinesis de destino. Para mapear dados para um fluxo de destino, utilize um tipo de regra de mapeamento de tabela chamado mapeamento de objetos. Utilize o mapeamento de objetos para definir como os registros de dados na origem são mapeados para os registros de dados publicados para o fluxo de dados do Kinesis.
O fluxo de dados do Kinesis não tem estrutura predefinida além de uma chave de partição. Em uma regra de mapeamento de objeto, os valores possíveis de um partition-key-type
para registros de dados são schema-table
, transaction-id
, primary-key
constant
e attribute-name
.
Para criar uma regra de mapeamento de objetos, especifique rule-type
como object-mapping
. Essa regra especifica o tipo de mapeamento de objeto que você deseja usar.
A estrutura da regra é a seguinte:
{ "rules": [ { "rule-type": "object-mapping", "rule-id": "
id
", "rule-name": "name
", "rule-action": "valid object-mapping rule action
", "object-locator": { "schema-name": "case-sensitive schema name
", "table-name": "" } } ] }
AWS DMS atualmente suporta map-record-to-record
e map-record-to-document
como os únicos valores válidos para o rule-action
parâmetro. Essas configurações afetam valores que não são excluídos como parte da lista de atributos exclude-columns
. Os map-record-to-document
valores map-record-to-record
e especificam como AWS DMS manipula esses registros por padrão. Esses valores não afetam os mapeamentos de atributos de forma alguma.
Utilize map-record-to-record
ao migrar de um banco de dados relacional para um fluxo de dados do Kinesis. Esse tipo de regra utiliza o valor taskResourceId.schemaName.tableName
encontrado no banco de dados relacional como a chave de partição no fluxo de dados do Kinesis e cria um atributo para cada coluna no banco de dados de origem.
Ao utilizar map-record-to-record
, observe o seguinte:
Essa configuração afeta somente as colunas excluídas pela lista
exclude-columns
.Para cada coluna desse tipo, AWS DMS cria um atributo correspondente no tópico de destino.
AWS DMS cria esse atributo correspondente independentemente de a coluna de origem ser usada em um mapeamento de atributos.
Utilize map-record-to-document
para colocar colunas de origem em um único documento sem formatação no fluxo de destino apropriado utilizando o nome do atributo “_doc”. O AWS DMS coloca os dados em um único mapa sem formatação na origem chamada “_doc
”. Esse posicionamento se aplica a qualquer coluna na tabela de origem que não aparece na lista de atributos exclude-columns
.
Uma maneira de compreender o map-record-to-record
é vê-lo em ação. Para este exemplo, suponha que você está começando com uma linha de tabela do banco de dados relacional com a seguinte estrutura de dados:
FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth |
---|---|---|---|---|---|---|---|
Randy |
Marsh | 5 |
221B Baker Street |
1234567890 |
31 Spooner Street, Quahog |
9876543210 |
29/02/1988 |
Para migrar essas informações de um esquema chamado Test
para um fluxo de dados do Kinesis, crie regras para mapear os dados para o fluxo de destino. A regra a seguir ilustra o mapeamento.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }
Veja a seguir uma ilustração do formato do registro resultante no fluxo de dados do Kinesis:
-
StreamName: XXX
-
PartitionKey: Test.Customers //schmaname.tableName
-
Data: //A seguinte mensagem do JSON
{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }
No entanto, suponha que você utilize as mesmas regras, mas altere o parâmetro rule-action
para map-record-to-document
e exclua determinadas colunas. A regra a seguir ilustra o mapeamento.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }
Nesse caso, as colunas não listadas no parâmetro exclude-columns
, FirstName
, LastName
, StoreId
e DateOfBirth
são mapeadas para _doc
. Veja a seguir o formato do registro resultante.
{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }
Reestruturação de dados com mapeamento de atributo
É possível reestruturar os dados enquanto estiver migrando-os para um fluxo de dados do Kinesis utilizando um mapa de atributo. Por exemplo, você pode combinar vários campos na origem em um único campo no destino. O mapa de atributo a seguir ilustra como reestruturar os dados.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }
Para definir um valor constante para partition-key
, especifique um valor de partition-key
. Por exemplo, é possível fazer isso para forçar o armazenamento de todos os dados em um único fragmento. O mapeamento a seguir ilustra esse método.
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
nota
O valor do partition-key
para um registro de controle para uma tabela específica é TaskId.SchemaName.TableName
. O valor do partition-key
para um registro de controle específico para uma tarefa é o TaskId
daquele registro. A especificação de um valor do partition-key
no mapeamento do objeto não tem impacto sobre o partition-key
no caso dos registros de controle.
Formato de mensagem do Kinesis Data Streams
A saída JSON é simplesmente uma lista de pares chave/valor. Um formato de mensagem JSON_UNFORMATTED é uma string JSON de linha única com novo delimitador de linha.
AWS DMS fornece os seguintes campos reservados para facilitar o consumo dos dados do Kinesis Data Streams:
- RecordType
-
O tipo de registro pode ser dados ou controle. Os registros de dados representam as linhas reais na origem. Os registros de controle são relacionados a importantes eventos no stream, como a reinicialização de uma tarefa, por exemplo.
- Operation
-
Para registros de dados, a operação pode ser
load
,insert
,update
oudelete
.Para registros de controle, a operação pode ser
create-table
,rename-table
,drop-table
,change-columns
,add-column
,drop-column
,rename-column
oucolumn-type-change
. - SchemaName
-
O esquema de origem para o registro. Esse campo pode estar vazio para um registro de controle.
- TableName
-
A tabela de origem para um registro. Esse campo pode estar vazio para um registro de controle.
- Timestamp
-
A marca de data e hora de quando a mensagem do JSON foi criada. O campo é formatado com o formato ISO 8601.