Selecione suas preferências de cookies

Usamos cookies essenciais e ferramentas semelhantes que são necessárias para fornecer nosso site e serviços. Usamos cookies de desempenho para coletar estatísticas anônimas, para que possamos entender como os clientes usam nosso site e fazer as devidas melhorias. Cookies essenciais não podem ser desativados, mas você pode clicar em “Personalizar” ou “Recusar” para recusar cookies de desempenho.

Se você concordar, a AWS e terceiros aprovados também usarão cookies para fornecer recursos úteis do site, lembrar suas preferências e exibir conteúdo relevante, incluindo publicidade relevante. Para aceitar ou recusar todos os cookies não essenciais, clique em “Aceitar” ou “Recusar”. Para fazer escolhas mais detalhadas, clique em “Personalizar”.

Usando o Apache Kafka como alvo para AWS Database Migration Service

Modo de foco
Usando o Apache Kafka como alvo para AWS Database Migration Service - AWS Database Migration Service

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Você pode usar AWS DMS para migrar dados para um cluster Apache Kafka. O Apache Kafka é uma plataforma de streaming distribuída. É possível utilizar o Apache Kafka para a ingestão e o processamento de dados de streaming em tempo real.

AWS também oferece Amazon Managed Streaming para Apache Kafka (MSKAmazon) para usar como destino. AWS DMS MSKA Amazon é um serviço de streaming Apache Kafka totalmente gerenciado que simplifica a implementação e o gerenciamento de instâncias do Apache Kafka. Ele funciona com versões de código aberto do Apache Kafka, e você acessa MSK instâncias da Amazon como AWS DMS destinos, exatamente como qualquer instância do Apache Kafka. Para obter mais informações, consulte O que é a AmazonMSK? no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

Um cluster do Kafka armazena fluxos de registros em categorias chamadas tópicos que são divididos em partições. As partições são sequências de registros de dados identificadas exclusivamente (mensagens) em um tópico. As partições podem ser distribuídas entre vários agentes em um cluster para permitir o processamento paralelo dos registros de um tópico. Para obter mais informações sobre tópicos e partições e sua distribuição no Apache Kafka, consulte Tópicos e logs e Distribuição.

Seu cluster Kafka pode ser uma MSK instância da Amazon, um cluster executado em uma EC2 instância da Amazon ou um cluster local. Uma MSK instância da Amazon ou um cluster em uma EC2 instância da Amazon pode estar na mesma instância VPC ou em uma diferente. Se o cluster estiver on-premises, será possível utilizar o seu próprio servidor de nomes on-premises para a instância de replicação para resolver o nome do host do cluster. Para obter informações sobre como configurar um servidor de nomes na instância de replicação, consulte Utilização do seu próprio servidor de nomes on-premises. Para obter mais informações sobre a configuração de uma rede, consulte Configurar uma rede para uma instância de replicação.

Ao usar um MSK cluster da Amazon, certifique-se de que seu grupo de segurança permita acesso a partir da sua instância de replicação. Para obter informações sobre como alterar o grupo de segurança de um MSK cluster da Amazon, consulte Alteração do grupo de segurança de um MSK cluster da Amazon.

AWS Database Migration Service publica registros em um tópico do Kafka usando. JSON Durante a conversão, AWS DMS serializa cada registro do banco de dados de origem em um par atributo-valor no formato. JSON

Para migrar os dados de uma fonte de dados compatível para um cluster de destino do Kafka, utilize o mapeamento de objetos. Com o mapeamento de objetos, você determina como estruturar os registros de dados no tópico de destino. Também é possível definir uma chave de partição para cada tabela, que será utilizada pelo Apache Kafka para agrupar os dados em suas partições.

Atualmente, AWS DMS oferece suporte a um único tópico por tarefa. Para uma única tarefa com várias tabelas, todas as mensagens vão para um único tópico. Cada mensagem inclui uma seção de metadados que identifica o esquema e a tabela de destino. AWS DMS as versões 3.4.6 e superiores oferecem suporte à replicação multitópica usando mapeamento de objetos. Para obter mais informações, consulte Replicação de multitópico utilizando o mapeamento de objetos.

Configurações do endpoint do Apache Kafka

Você pode especificar os detalhes da conexão por meio das configurações do endpoint no AWS DMS console ou da --kafka-settings opção noCLI. Os requisitos para cada configuração são os seguintes:

  • Broker: especifique a localização de um ou mais agentes no cluster do Kafka na forma de uma lista separada por vírgulas de cada broker-hostname:port. Um exemplo é "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Essa configuração pode especificar os locais de qualquer um ou de todos os agentes no cluster. Todos os agentes de cluster se comunicam para lidar com o particionamento de registros de dados migrados para o tópico.

  • Topic: (opcional) especifique o nome do tópico com um comprimento máximo de 255 letras e símbolos. É possível utilizar ponto (.), sublinhado (_) e sinal de subtração (-). Os nomes de tópicos com um ponto (.) ou sublinhado (_) podem colidir em estruturas de dados internas. Utilize qualquer um, mas não esses dois símbolos no nome do tópico. Se você não especificar um nome de tópico, AWS DMS use "kafka-default-topic" como tópico de migração.

    nota

    Para AWS DMS criar um tópico de migração especificado por você ou o tópico padrão, defina-o auto.create.topics.enable = true como parte da configuração do cluster do Kafka. Para ter mais informações, consulte Limitações ao usar o Apache Kafka como alvo para AWS Database Migration Service

  • MessageFormat: o formato de saída dos registros criados no endpoint. O formato da mensagem é JSON (padrão) ou JSON_UNFORMATTED (uma única linha sem guia).

  • MessageMaxBytes: o tamanho máximo em bytes dos registros criados no endpoint. O padrão é 1.000.000.

    nota

    Você só pode usar AWS CLI/SDKpara mudar MessageMaxBytes para um valor não padrão. Por exemplo, para modificar o endpoint existente do Kafka e alterar MessageMaxBytes, utilize o comando a seguir.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails: fornece informações detalhadas sobre transações do banco de dados de origem. Essas informações incluem um timestamp de confirmação, uma posição no log e valores para transaction_id, previous_transaction_id e transaction_record_id (o deslocamento de registro dentro de uma transação). O padrão é false.

  • IncludePartitionValue: mostra o valor da partição na saída da mensagem do Kafka, a menos que o tipo de partição seja schema-table-type. O padrão é false.

  • PartitionIncludeSchemaTable: prefixa os nomes de esquema e de tabela em valores de partições, quando o tipo de partição for primary-key-type. Isso aumenta a distribuição de dados entre partições do Kafka. Por exemplo, suponha que um esquema SysBench tenha milhares de tabelas, e cada tabela tenha apenas um intervalo limitado para uma chave primária. Nesse caso, a mesma chave primária é enviada de milhares de tabelas para a mesma partição, o que provoca o controle de utilização. O padrão é false.

  • IncludeTableAlterOperations— Inclui todas as operações de linguagem de definição de dados (DDL) que alteram a tabela nos dados de controle rename-tabledrop-table, comoadd-column,drop-column,, rename-column e. O padrão é false.

  • IncludeControlDetails: mostra informações detalhadas de controle para definição de tabela, definição de coluna e alterações de tabela e coluna na saída de mensagem do Kafka. O padrão é false.

  • IncludeNullAndEmpty— Inclua NULL e esvazie colunas no alvo. O padrão é false.

  • SecurityProtocol— Define uma conexão segura com um endpoint de destino do Kafka usando Transport Layer Security (). TLS As opções incluem ssl-authentication, ssl-encryption e sasl-ssl. A utilização de sasl-ssl requer SaslUsername e SaslPassword.

  • SslEndpointIdentificationAlgorithm: define a verificação do nome de host para o certificado. Essa configuração é compatível com o AWS DMS versão 3.5.1 e posteriores. As opções incluem o seguinte:

    • NONE: desabilite a verificação do nome do host do broker na conexão do cliente.

    • HTTPS: habilite a verificação do nome do host do broker na conexão do cliente.

  • useLargeIntegerValue— Use int de até 18 dígitos em vez de converter ints como duplos, disponível a partir da AWS DMS versão 3.5.4. O padrão é falso.

É possível utilizar configurações para ajudar a aumentar a velocidade da transferência. Para fazer isso, o AWS DMS é compatível com uma carga multithreaded completa para um cluster de destino do Apache Kafka. O AWS DMS é compatível com esse multithreading com configurações de tarefa que incluem o seguinte:

  • MaxFullLoadSubTasks— Use essa opção para indicar o número máximo de tabelas de origem a serem carregadas paralelamente. AWS DMS carrega cada tabela em sua tabela de destino correspondente do Kafka usando uma subtarefa dedicada. O padrão é 8; o valor máximo é 49.

  • ParallelLoadThreads— Use essa opção para especificar o número de segmentos AWS DMS usados para carregar cada tabela em sua tabela de destino do Kafka. O valor máximo para um destino do Apache Kafka é 32. Você pode solicitar o aumento desse limite máximo.

  • ParallelLoadBufferSize: utilize esta opção para especificar o número máximo de registros a serem armazenados no buffer utilizado pelos threads de carregamento paralelo utilizam para carregar dados no destino do Kafka. O valor padrão é 50. Valor máximo de 1.000. Use essa configuração com ParallelLoadThreads; ParallelLoadBufferSize é válido somente quando há mais de um thread.

  • ParallelLoadQueuesPerThread: utilize esta opção para especificar o número de filas que cada thread simultâneo acessa para extrair registros de dados das filas e gerar uma carga em lote para o destino. O padrão é um. O máximo é 512.

Você pode melhorar o desempenho da captura de dados de alteração (CDC) para endpoints do Kafka ajustando as configurações de tarefas para encadeamentos paralelos e operações em massa. Para fazer isso, especifique o número de threads simultâneos, filas por thread e o número de registros a serem armazenados em um buffer usando as configurações da tarefa ParallelApply*. Por exemplo, suponha que você queira realizar uma CDC carga e aplicar 128 threads em paralelo. Você também quer acessar 64 filas por thread, com 50 registros armazenados por buffer.

Para promover o CDC desempenho, AWS DMS oferece suporte às seguintes configurações de tarefas:

  • ParallelApplyThreads— Especifica o número de segmentos simultâneos AWS DMS usados durante um CDC carregamento para enviar registros de dados para um endpoint de destino do Kafka. O valor padrão é zero (0) e o valor máximo é 32.

  • ParallelApplyBufferSize— Especifica o número máximo de registros a serem armazenados em cada fila de buffer para que threads simultâneos sejam enviados para um endpoint de destino do Kafka durante um carregamento. CDC O valor padrão é 100 e o valor máximo é 1.000. Use essa opção quando ParallelApplyThreads especificar mais de um thread.

  • ParallelApplyQueuesPerThread— Especifica o número de filas que cada thread acessa para retirar registros de dados das filas e gerar um carregamento em lote para um endpoint do Kafka durante. CDC O padrão é um. O máximo é 512.

Ao usar configurações da tarefa ParallelApply*, o partition-key-type padrão é a primary-key da tabela, não o schema-name.table-name.

Conectando-se ao Kafka usando o Transport Layer Security () TLS

Um cluster Kafka aceita conexões seguras usando Transport Layer Security ()TLS. ComDMS, você pode usar qualquer uma das três opções de protocolo de segurança a seguir para proteger uma conexão de endpoint Kafka.

SSLcriptografia (server-encryption)

Os clientes validam a identidade do servidor por meio do certificado do servidor. Uma conexão criptografada é feita entre o servidor e o cliente.

SSLautenticação (mutual-authentication)

O servidor e o cliente validam a identidade entre si por meio de seus próprios certificados. Uma conexão criptografada é feita entre o servidor e o cliente.

SASL-SSL (mutual-authentication)

O método Simple Authentication and Security Layer (SASL) substitui o certificado do cliente por um nome de usuário e senha para validar a identidade do cliente. Especificamente, forneça um nome de usuário e uma senha que o servidor registrou para que o servidor possa validar a identidade de um cliente. Uma conexão criptografada é feita entre o servidor e o cliente.

Importante

O Apache Kafka e a Amazon MSK aceitam certificados resolvidos. Essa é uma limitação conhecida do Kafka e da Amazon que deve ser MSK resolvida. Para obter mais informações, consulte Problemas do Apache Kafka, -3700. KAFKA

Se você estiver usando a AmazonMSK, considere usar listas de controle de acesso (ACLs) como uma solução alternativa para essa limitação conhecida. Para obter mais informações sobre o usoACLs, consulte a seção Apache Kafka ACLs do Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

Se estiver utilizando um cluster do Kafka autogerenciado, consulte Comentário datado de 18/out/21 para obter informações sobre como configurar o cluster.

Usando SSL criptografia com a Amazon MSK ou um cluster Kafka autogerenciado

Você pode usar a SSL criptografia para proteger uma conexão de endpoint com a Amazon MSK ou um cluster Kafka autogerenciado. Quando você usa o método de autenticação por SSL criptografia, os clientes validam a identidade de um servidor por meio do certificado do servidor. Uma conexão criptografada é feita entre o servidor e o cliente.

Para usar a SSL criptografia para se conectar à Amazon MSK
  • Defina a configuração do endpoint do protocolo de segurança (SecurityProtocol) utilizando a opção ssl-encryption ao criar o endpoint do Kafka de destino.

    O JSON exemplo a seguir define o protocolo de segurança como SSL criptografia.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Para usar SSL criptografia para um cluster Kafka autogerenciado
  1. Se você estiver usando uma Autoridade de Certificação (CA) privada em seu cluster Kafka local, faça o upload do seu certificado de CA privado e obtenha um Amazon Resource Name (). ARN

  2. Defina a configuração do endpoint do protocolo de segurança (SecurityProtocol) utilizando a opção ssl-encryption ao criar o endpoint do Kafka de destino. O JSON exemplo a seguir define o protocolo de segurança comossl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Se você estiver usando uma CA privada, SslCaCertificateArn defina o ARN que você obteve na primeira etapa acima.

Utilizar a autenticação SSL

Você pode usar a SSL autenticação para proteger uma conexão de endpoint com a Amazon MSK ou um cluster Kafka autogerenciado.

Para habilitar a autenticação e a criptografia do cliente usando a SSL autenticação para se conectar à AmazonMSK, faça o seguinte:

  • Prepare uma chave privada e um certificado público para o Kafka.

  • Faça upload de certificados para o gerenciador de DMS certificados.

  • Crie um endpoint de destino do Kafka com o certificado correspondente ARNs especificado nas configurações do endpoint do Kafka.

Para preparar uma chave privada e um certificado público para a Amazon MSK
  1. Crie uma EC2 instância e configure um cliente para usar a autenticação conforme descrito nas etapas de 1 a 9 na seção Autenticação de cliente do Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

    Depois de concluir essas etapas, você tem um Certificado ARN (o certificado público ARN salvo emACM) e uma chave privada contida em um kafka.client.keystore.jks arquivo.

  2. Obtenha o certificado público e copie o certificado no arquivo signed-certificate-from-acm.pem, utilizando o comando a seguir:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    O comando retorna informações semelhantes às do exemplo a seguir.

    {"Certificate": "123", "CertificateChain": "456"}

    Copie o equivalente de "123" no arquivo signed-certificate-from-acm.pem.

  3. Obtenha a chave privada importando a chave msk-rsa de kafka.client.keystore.jks to keystore.p12, conforme mostrado no exemplo a seguir.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Utilize o comando a seguir para exportar keystore.p12 para o formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    A mensagem PEMInserir frase secreta é exibida e identifica a chave aplicada para criptografar o certificado.

  5. Remova os atributos bag e os atributos-chave do arquivo .pem para garantir que a primeira linha comece com a sequência de caracteres a seguir.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Para carregar um certificado público e uma chave privada para o gerenciador de DMS certificados e testar a conexão com a Amazon MSK
  1. Faça o upload para o gerenciador de DMS certificados usando o comando a seguir.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Crie um endpoint de MSK destino da Amazon e teste a conexão para garantir que a TLS autenticação funcione.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Importante

Você pode usar a SSL autenticação para proteger uma conexão com um cluster Kafka autogerenciado. Em alguns casos, é possível utilizar uma Autoridade de Certificação (CA) privada no cluster do Kafka on-premises. Nesse caso, carregue sua cadeia de CA, certificado público e chave privada para o gerenciador de DMS certificados. Em seguida, use o Amazon Resource Name (ARN) correspondente nas configurações do seu endpoint ao criar seu endpoint de destino Kafka local.

Como preparar uma chave privada e um certificado assinado para um cluster do Kafka autogerenciado
  1. Gere um par de chaves como mostrado no exemplo a seguir.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Gere uma solicitação de assinatura de certificado (CSR).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Use a CA no armazenamento confiável do cluster para assinar o. CSR Se não tiver uma CA, você poderá criar sua própria CA privada.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importe ca-cert para o truststore e o keystore do servidor. Se você não tiver um truststore, utilize o seguinte comando para criar o truststore e importar ca-cert nele.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Assine o certificado.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importe o certificado assinado para o keystore.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Utilize o comando a seguir para importar a chave on-premise-rsa de kafka.server.keystore.jks para keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Utilize o comando a seguir para exportar keystore.p12 para o formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Faça upload de encrypted-private-server-key.pemsigned-certificate.pem, e ca-cert para o gerenciador DMS de certificados.

  10. Crie um endpoint usando o retornadoARNs.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Usando SASL - SSL autenticação para se conectar à Amazon MSK

O método Simple Authentication and Security Layer (SASL) usa um nome de usuário e uma senha para validar a identidade do cliente e faz uma conexão criptografada entre o servidor e o cliente.

Para usarSASL, primeiro você cria um nome de usuário e uma senha seguros ao configurar seu MSK cluster da Amazon. Para obter uma descrição de como configurar um nome de usuário e senha seguros para um MSK cluster da Amazon, consulte Configurando SASL SCRAM /autenticando um MSK cluster da Amazon no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

Crie o endpoint de destino do Kafka, defina a configuração do endpoint do protocolo de segurança (SecurityProtocol) utilizando a opção sasl-ssl. Defina também as opções SaslUsername e SaslPassword. Certifique-se de que sejam consistentes com o nome de usuário e a senha seguros que você criou quando configurou seu MSK cluster da Amazon pela primeira vez, conforme mostrado no JSON exemplo a seguir.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
nota
  • Atualmente, AWS DMS oferece suporte apenas a uma CA pública apoiada por SASL -SSL. DMSnão suporta SASL - SSL para uso com o Kafka autogerenciado que é apoiado por uma CA privada.

  • Para SASL - SSL autenticação, AWS DMS suporta o mecanismo SCRAM - SHA -512 por padrão. AWS DMS as versões 3.5.0 e superiores também suportam o mecanismo Plain. Para oferecer suporte ao mecanismo Plain, defina o SaslMechanism parâmetro do tipo de KafkaSettings API dados comoPLAIN.

Usando uma imagem anterior para visualizar os valores originais das CDC linhas do Apache Kafka como alvo

Ao gravar CDC atualizações em um destino de streaming de dados como o Kafka, você pode visualizar os valores originais de uma linha do banco de dados de origem antes de serem alterados por uma atualização. Para tornar isso possível, AWS DMS preenche uma imagem anterior dos eventos de atualização com base nos dados fornecidos pelo mecanismo do banco de dados de origem.

Diferentes mecanismos de banco de dados de origem fornecem diferentes quantidades de informações para uma imagem anterior:

  • O Oracle fornece atualizações para colunas somente se elas forem alteradas.

  • O Postgre SQL fornece somente dados para colunas que fazem parte da chave primária (alteradas ou não). Se a replicação lógica estiver em uso e REPLICA IDENTITY FULL estiver definida para a tabela de origem, você poderá obter informações completas sobre antes e depois na linha gravada no WALs e disponível aqui.

  • O My SQL geralmente fornece dados para todas as colunas (alteradas ou não).

Para habilitar a criação de imagem anterior para adicionar valores originais do banco de dados de origem à saída do AWS DMS , use a configuração de tarefa BeforeImageSettings ou o parâmetro add-before-image-columns. Esse parâmetro aplica uma regra de transformação de coluna.

BeforeImageSettingsadiciona um novo JSON atributo a cada operação de atualização com valores coletados do sistema de banco de dados de origem, conforme mostrado a seguir.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
nota

Aplique BeforeImageSettings à carga total mais CDC as tarefas (que migram dados existentes e replicam as mudanças em andamento) ou CDC somente às tarefas (que replicam somente as alterações de dados). Não aplique BeforeImageSettings a tarefas que são somente de carga total.

Para opções BeforeImageSettings, aplica-se o seguinte:

  • Defina a opção EnableBeforeImage como true para habilitar a criação de imagem anterior. O padrão é false.

  • Use a FieldName opção para atribuir um nome ao novo JSON atributo. Quando EnableBeforeImage for true, FieldName será necessário e não poderá estar vazio.

  • A opção ColumnFilter especifica uma coluna a ser adicionada usando imagem anterior. Para adicionar somente colunas que fazem parte das chaves primárias da tabela, use o valor padrão, pk-only. Para adicionar somente colunas que não sejam do LOB tipo, usenon-lob. Para adicionar qualquer coluna que tenha um valor de imagem anterior, use all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Usar uma regra de transformação de imagem anterior

Como alternativa às configurações de tarefa, é possível usar o parâmetro add-before-image-columns, que aplica uma regra de transformação de coluna. Com esse parâmetro, você pode ativar antes da geração de imagens durante alvos CDC de streaming de dados, como o Kafka.

Usando add-before-image-columns em uma regra de transformação, é possível aplicar um controle mais refinado dos resultados da imagem anterior. As regras de transformação permitem que você use um localizador de objetos que oferece controle sobre as tabelas selecionadas para a regra. Além disso, é possível encadear regras de transformação, o que permite que regras diferentes sejam aplicadas a tabelas diferentes. Depois, você poderá manipular as colunas produzidas usando outras regras.

nota

Não use o parâmetro add-before-image-columns junto com a configuração da tarefa BeforeImageSettings na mesma tarefa. Em vez disso, use o parâmetro ou a configuração, mas não ambos, para uma única tarefa.

Um tipo de regra transformation com o parâmetro add-before-image-columns de uma coluna deve fornecer uma seção before-image-def. Por exemplo:

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

O valor de column-prefix precede um nome de coluna e o valor padrão de column-prefix é BI_. O valor de column-suffix é anexado ao nome da coluna e o padrão é vazio. Não defina column-prefix e column-suffix como strings vazias.

Escolha um valor para column-filter. Para adicionar somente colunas que fazem parte das chaves primárias da tabela, escolha pk-only. Escolha non-lob adicionar somente colunas que não sejam do LOB tipo. Ou escolha all para adicionar qualquer coluna que tenha um valor de imagem anterior.

Exemplo de uma regra de transformação de imagem anterior

A regra de transformação no exemplo a seguir adiciona uma nova coluna chamada BI_emp_no no destino. Portanto, uma instrução como UPDATE employees SET emp_no = 3 WHERE emp_no = 1; preenche o campo BI_emp_no com 1. Quando você grava CDC atualizações nos destinos do Amazon S3, a BI_emp_no coluna possibilita saber qual linha original foi atualizada.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Para obter informações sobre como usar a ação da regra add-before-image-columns, consulte Regras de transformação e ações.

Limitações ao usar o Apache Kafka como alvo para AWS Database Migration Service

Aplicam-se as seguintes limitações ao utilizar o Apache Kafka como destino:

  • AWS DMS Os endpoints de destino do Kafka não oferecem suporte IAM ao controle de acesso para Amazon Managed Streaming for Apache Kafka (Amazon). MSK

  • O LOB modo completo não é suportado.

  • Especifique um arquivo de configuração do Kafka para seu cluster com propriedades que permitem AWS DMS criar novos tópicos automaticamente. Inclua a configuração, auto.create.topics.enable = true. Se você estiver usando a AmazonMSK, você pode especificar a configuração padrão ao criar seu cluster Kafka e, em seguida, alterar a auto.create.topics.enable configuração para. true Para obter mais informações sobre as configurações padrão, consulte A configuração padrão da Amazon MSK no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka. Se você precisar modificar um cluster existente do Kafka criado usando a AmazonMSK, execute o AWS CLI comando aws kafka create-configuration para atualizar sua configuração do Kafka, como no exemplo a seguir:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Aqui, //~/kafka_configuration é o arquivo configuração criado com as configurações de propriedades necessárias.

    Se você estiver usando sua própria instância do Kafka instalada na AmazonEC2, modifique a configuração do cluster Kafka com a auto.create.topics.enable = true configuração AWS DMS para permitir a criação automática de novos tópicos, usando as opções fornecidas com sua instância.

  • AWS DMS publica cada atualização em um único registro no banco de dados de origem como um registro de dados (mensagem) em um determinado tópico do Kafka, independentemente das transações.

  • AWS DMS suporta as duas formas a seguir para chaves de partição:

    • SchemaName.TableName: uma combinação de esquema e nome da tabela.

    • ${AttributeName}: o valor de um dos campos noJSON, ou a chave primária da tabela no banco de dados de origem.

  • O BatchApply não é compatível com um endpoint do Kafka. A utilização da aplicação em lote (por exemplo, a configuração da tarefa de metadados de destino BatchApplyEnabled) para um destino do Kafka pode resultar em perda de dados.

  • AWS DMS não suporta a migração de valores do tipo de BigInt dados com mais de 16 dígitos. Para contornar essa limitação, você pode usar a regra de transformação a seguir para converter a coluna BigInt em uma string. Para obter mais informações sobre regras transformação, consulte Regras de transformação e ações.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Utilizar o mapeamento de objetos para migrar dados para um tópico do Kafka

AWS DMS usa regras de mapeamento de tabelas para mapear dados da fonte para o tópico de destino do Kafka. Para mapear dados para um tópico de destino, utilize um tipo de regra de mapeamento de tabelas chamado mapeamento de objetos. Utilize o mapeamento de objetos para definir como os registros de dados na origem são mapeados para os registros de dados publicados em um tópico do Kafka.

Os tópicos do Kafka não têm uma estrutura predefinida além de uma chave de partição.

nota

Não é necessário utilizar o mapeamento de objetos. É possível utilizar o mapeamento de tabela normal para várias transformações. No entanto, o tipo de chave de partição seguirá estes comportamentos padrão:

  • A chave primária é utilizada como uma chave de partição para a carga máxima.

  • Se nenhuma configuração de tarefa de aplicação paralela for usada, schema.table é usada como uma chave de partição para. CDC

  • Se configurações de tarefas de aplicação paralela forem usadas, a chave primária será usada como uma chave de partição para. CDC

Para criar uma regra de mapeamento de objetos, especifique rule-type como object-mapping. Essa regra especifica o tipo de mapeamento de objeto que você deseja usar.

A estrutura da regra é a seguinte:

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS atualmente suporta map-record-to-record e map-record-to-document como os únicos valores válidos para o rule-action parâmetro. Essas configurações afetam valores que não são excluídos como parte da lista de atributos exclude-columns. Os map-record-to-document valores map-record-to-record e especificam como AWS DMS manipula esses registros por padrão. Esses valores não afetam os mapeamentos de atributos de forma alguma.

Utilize o map-record-to-record ao migrar de um banco de dados relacional para um tópico do Kafka. Esse tipo de regra utiliza o valor taskResourceId.schemaName.tableName encontrado no banco de dados relacional como a chave de partição no tópico do Kafka e cria um atributo para cada coluna no banco de dados de origem.

Ao utilizar map-record-to-record, observe o seguinte:

  • Essa configuração afeta somente as colunas excluídas pela lista exclude-columns.

  • Para cada coluna desse tipo, AWS DMS cria um atributo correspondente no tópico de destino.

  • AWS DMS cria esse atributo correspondente independentemente de a coluna de origem ser usada em um mapeamento de atributos.

Uma maneira de compreender o map-record-to-record é vê-lo em ação. Para este exemplo, suponha que você está começando com uma linha de tabela do banco de dados relacional com a seguinte estrutura de dados:

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

29/02/1988

Para migrar essas informações de um esquema chamado Test para um tópico do Kafka, crie regras para mapear os dados para o tópico de destino. A regra a seguir ilustra o mapeamento.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Com um tópico do Kafka e uma chave de partição determinados (neste caso, taskResourceId.schemaName.tableName), o seguinte ilustra o formato do registro resultante utilizando os nossos exemplos de dados no tópico de destino do Kafka:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Reestruturação de dados com mapeamento de atributo

É possível reestruturar os dados ao migrá-los para um tópico do Kafka utilizando um mapa de atributos. Por exemplo, você pode combinar vários campos na origem em um único campo no destino. O mapa de atributo a seguir ilustra como reestruturar os dados.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Para definir um valor constante para partition-key, especifique um valor de partition-key. Por exemplo, é possível fazer isso para forçar o armazenamento de todos os dados em uma única partição. O mapeamento a seguir ilustra esse método.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
nota

O valor do partition-key para um registro de controle para uma tabela específica é TaskId.SchemaName.TableName. O valor do partition-key para um registro de controle específico para uma tarefa é o TaskId daquele registro. A especificação de um valor do partition-key no mapeamento do objeto não tem impacto sobre o partition-key no caso dos registros de controle.

Replicação de multitópico utilizando o mapeamento de objetos

Por padrão, AWS DMS as tarefas migram todos os dados de origem para um dos seguintes tópicos do Kafka:

  • Conforme especificado no campo Tópico do endpoint de AWS DMS destino.

  • Conforme especificado por kafka-default-topic, se o campo Tópico do endpoint de destino não estiver preenchido e a configuração auto.create.topics.enable do Kafka estiver definida como true.

Com as versões 3.4.6 e posteriores do AWS DMS mecanismo, você pode usar o kafka-target-topic atributo para mapear cada tabela de origem migrada para um tópico separado. Por exemplo, as regras de mapeamento de objetos a seguir migram as tabelas de origem Customer e Address para os tópicos customer_topic e address_topic do Kafka, respectivamente. Ao mesmo tempo, AWS DMS migra todas as outras tabelas de origem, incluindo a Bills tabela no Test esquema, para o tópico especificado no endpoint de destino.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Ao utilizar a replicação de multitópico do Kafka, é possível agrupar e migrar tabelas de origem para tópicos separados do Kafka utilizando uma única tarefa de replicação.

Formato de mensagem do Apache Kafka

A JSON saída é simplesmente uma lista de pares de valores-chave.

RecordType

O tipo de registro pode ser dados ou controle. Os registros de dados representam as linhas reais na origem. Os registros de controle são relacionados a importantes eventos no stream, como a reinicialização de uma tarefa, por exemplo.

Operação

Para registros de dados, a operação pode ser load, insert, update ou delete.

Para registros de controle, a operação pode ser create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column ou column-type-change.

SchemaName

O esquema de origem para o registro. Esse campo pode estar vazio para um registro de controle.

TableName

A tabela de origem para um registro. Esse campo pode estar vazio para um registro de controle.

Timestamp

A data e hora de quando a JSON mensagem foi criada. O campo é formatado com o formato ISO 8601.

O exemplo de JSON mensagem a seguir ilustra uma mensagem de tipo de dados com todos os metadados adicionais.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

O exemplo de JSON mensagem a seguir ilustra uma mensagem de tipo de controle.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }
PrivacidadeTermos do sitePreferências de cookies
© 2025, Amazon Web Services, Inc. ou suas afiliadas. Todos os direitos reservados.