Usare Apache Kafka come obiettivo per AWS Database Migration Service - AWS Servizio di migrazione del Database

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Usare Apache Kafka come obiettivo per AWS Database Migration Service

È possibile utilizzarlo AWS DMS per migrare i dati in un cluster Apache Kafka. Apache Kafka è una piattaforma di streaming distribuita. È possibile utilizzare Apache Kafka per l'inserimento e l'elaborazione dei dati di streaming in tempo reale.

AWS offre anche Amazon Managed Streaming for Apache Kafka (MSKAmazon) da utilizzare come destinazione. AWS DMS Amazon MSK è un servizio di streaming Apache Kafka completamente gestito che semplifica l'implementazione e la gestione delle istanze Apache Kafka. Funziona con le versioni open source di Apache Kafka e accedi alle MSK istanze Amazon come AWS DMS destinazioni esattamente come qualsiasi istanza di Apache Kafka. Per ulteriori informazioni, consulta What is AmazonMSK? nella Amazon Managed Streaming for Apache Kafka Developer Guide.

Un cluster Kafka archivia i flussi di record in categorie denominate argomenti che sono suddivisi in partizioni. Le partizioni sono sequenze di record di dati (messaggi) identificate in modo univoco in un argomento. Le partizioni possono essere distribuite tra più broker in un cluster per consentire l'elaborazione parallela dei record dell'argomento. Per ulteriori informazioni su argomenti e partizioni e la loro distribuzione in Apache Kafka, consulta Argomenti e registri e Distribuzione.

Il tuo cluster Kafka può essere un'MSKistanza Amazon, un cluster in esecuzione su un'EC2istanza Amazon o un cluster locale. Un'MSKistanza Amazon o un cluster su un'EC2istanza Amazon può trovarsi nella stessa istanza VPC o in un'altra. Se il cluster è on-premise, è possibile utilizzare il server dei nomi on-premise dell'istanza di replica per risolvere il nome host del cluster. Per informazioni sulla configurazione di un server dei nomi per l'istanza di replica, consulta Utilizzo del server dei nomi in locale. Per ulteriori informazioni sulla configurazione di una rete, consulta Configurazione di una rete per un'istanza di replica.

Quando utilizzi un MSK cluster Amazon, assicurati che il relativo gruppo di sicurezza consenta l'accesso dalla tua istanza di replica. Per informazioni sulla modifica del gruppo di sicurezza per un MSK cluster Amazon, consulta Modifica del gruppo di sicurezza di un MSK cluster Amazon.

AWS Database Migration Service pubblica record su un argomento di Kafka utilizzando. JSON Durante la conversione, AWS DMS serializza ogni record dal database di origine in una coppia attributo-valore in formato. JSON

È possibile usare la mappatura degli oggetti per migrare i dati da qualsiasi origine dati supportata a un cluster Kafka di destinazione. Con la mappatura degli oggetti, determini il modo in cui strutturare i record di dati nell'argomento di destinazione. Puoi inoltre definire una chiave di partizionamento per ogni tabella che viene utilizzata da Kafka per raggruppare i dati nelle sue partizioni.

Attualmente, AWS DMS supporta un singolo argomento per attività. Per una singola attività con più tabelle, tutti i messaggi vengono indirizzati a un unico argomento. Ogni messaggio include una sezione di metadati che identifica lo schema e la tabella di destinazione. AWS DMS le versioni 3.4.6 e successive supportano la replica multitopica utilizzando la mappatura degli oggetti. Per ulteriori informazioni, consulta Replica di più argomenti con la mappatura degli oggetti.

Impostazioni endpoint Apache Kafka

È possibile specificare i dettagli della connessione tramite le impostazioni dell'endpoint nella console o l' AWS DMS opzione in. --kafka-settings CLI I requisiti per ogni impostazione sono i seguenti:

  • Broker: specifica le posizioni di uno o più broker nel cluster Kafka sotto forma di elenco di voci separate da virgole per ogni broker-hostname:port. Un esempio è "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Questa impostazione può specificare le posizioni di uno o tutti i broker del cluster. Tutti i broker del cluster comunicano per gestire il partizionamento dei record di dati migrati verso l'argomento.

  • Topic (facoltativo): specifica il nome dell'argomento con una lunghezza massima di 255 lettere e simboli. È possibile utilizzare punto (.), carattere di sottolineatura (_) e segno meno (-). I nomi degli argomenti con un punto (.) o un carattere di sottolineatura (_) possono interferire con le strutture dati interne. Nel nome dell'argomento utilizzare uno dei due simboli ma non entrambi. Se non specificate il nome di un argomento, AWS DMS lo usa "kafka-default-topic" come argomento di migrazione.

    Nota

    Se desiderate AWS DMS creare un argomento di migrazione specificato dall'utente o l'argomento predefinito, impostatelo auto.create.topics.enable = true come parte della configurazione del cluster Kafka. Per ulteriori informazioni, consulta Limitazioni nell'utilizzo di Apache Kafka come destinazione per AWS Database Migration Service

  • MessageFormat: il formato di output per i record creati nell'endpoint. Il formato del messaggio è JSON (predefinito) o JSON_UNFORMATTED (una singola riga senza tabulazione).

  • MessageMaxBytes: la dimensione massima in byte per i record creati nell'endpoint. Il valore di default è 1.000.000.

    Nota

    È possibile utilizzare il pulsante AWS CLI/solo SDK per passare MessageMaxBytes a un valore non predefinito. Ad esempio utilizza il seguente comando per modificare l'endpoint Kafka esistente e cambiare MessageMaxBytes.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails: fornisce informazioni dettagliate sulle transazioni dal database di origine. Tali informazioni includono un timestamp di commit, una posizione nel log e valori per transaction_id, previous_transaction_id e transaction_record_id(l'offset del record all'interno di una transazione). Il valore predefinito è false.

  • IncludePartitionValue: mostra il valore della partizione nell'output dei messaggi di Kafka, a meno che il tipo della partizione non sia schema-table-type. Il valore predefinito è false.

  • PartitionIncludeSchemaTable: aggiunge ai nomi di schemi e tabelle il prefisso con i valori di partizione, quando il tipo di partizione è primary-key-type. In questo modo si aumenta la distribuzione dei dati tra le partizioni di Kafka. Ad esempio, si supponga che uno schema SysBench includa migliaia di tabelle e che ogni tabella faccia riferimento solo a un intervallo limitato di valori della chiave primaria. In questo caso, la stessa chiave primaria viene inviata da migliaia di tabelle alla stessa partizione, causando un rallentamento. Il valore predefinito è false.

  • IncludeTableAlterOperations— Include tutte le operazioni del linguaggio di definizione dei dati (DDL) che modificano la tabella nei dati di controllorename-table, ad esempiodrop-table, add-columndrop-column, erename-column. Il valore predefinito è false.

  • IncludeControlDetails: mostra le informazioni dettagliate del controllo per la definizione di tabelle, la definizione di colonne e le modifiche di tabelle e colonne nell'output dei messaggi Kafka. Il valore predefinito è false.

  • IncludeNullAndEmpty— NULL Includi colonne vuote nella destinazione. Il valore predefinito è false.

  • SecurityProtocol— Imposta una connessione sicura a un endpoint di destinazione Kafka utilizzando Transport Layer Security (). TLS Le opzioni includono ssl-authentication, ssl-encryption e sasl-ssl. L'utilizzo di sasl-ssl richiede SaslUsername e SaslPassword.

  • SslEndpointIdentificationAlgorithm— Imposta la verifica del nome host per il certificato. Questa impostazione è supportata nella AWS DMS versione 3.5.1 e successive. Le opzioni sono le seguenti:

    • NONE: Disattiva la verifica del nome host del broker nella connessione del client.

    • HTTPS: Abilita la verifica del nome host del broker nella connessione del client.

  • useLargeIntegerValue— Usa fino a 18 cifre int invece di emettere int come doppi, disponibile a partire dalla AWS DMS versione 3.5.4. Il valore predefinito è false.

È possibile utilizzare le impostazioni per aumentare la velocità del trasferimento. Per farlo, AWS DMS supporta il pieno carico multithread in un cluster Apache Kafka di destinazione. AWS DMS supporta il multithreading con le impostazioni delle attività seguenti:

  • MaxFullLoadSubTasks— Utilizzate questa opzione per indicare il numero massimo di tabelle di origine da caricare in parallelo. AWS DMS carica ogni tabella nella tabella di destinazione Kafka corrispondente utilizzando una sottoattività dedicata. Il valore predefinito è 8; il valore il massimo è 49.

  • ParallelLoadThreads— Utilizzate questa opzione per specificare il numero di thread da utilizzare per caricare ogni tabella nella relativa tabella di destinazione Kafka. AWS DMS Il valore massimo per una destinazione Apache Kafka è 32. Puoi chiedere che questo limite massimo venga aumentato.

  • ParallelLoadBufferSize: utilizza questa opzione per specificare il numero massimo di record da archiviare nel buffer usato dai thread di caricamento parallelo per caricare i dati nella destinazione Kafka. Il valore predefinito è 50. Il valore massimo è 1.000. Utilizzare questo parametro con ParallelLoadThreads; ParallelLoadBufferSize è valido solo quando è presente più di un thread.

  • ParallelLoadQueuesPerThread: utilizza questa opzione per specificare il numero di code a cui accede ogni thread simultaneo per eliminare i record di dati dalle code e generare un carico batch per la destinazione. Il valore di default è 1. Il numero massimo è 512.

È possibile migliorare le prestazioni di change data capture (CDC) per gli endpoint Kafka ottimizzando le impostazioni delle attività per thread paralleli e operazioni in blocco. A tale scopo, è possibile specificare il numero di thread simultanei, di code per thread e di record da memorizzare in un buffer utilizzando le impostazioni delle attività ParallelApply*. Ad esempio, supponiamo di voler eseguire un CDC caricamento e applicare 128 thread in parallelo. Si desidera inoltre accedere a 64 code per thread, con 50 record memorizzati per buffer.

Per CDC migliorare le prestazioni, AWS DMS supporta le seguenti impostazioni delle attività:

  • ParallelApplyThreads— specifica il numero di thread simultanei che vengono AWS DMS utilizzati durante un CDC caricamento per inviare i record di dati a un endpoint di destinazione Kafka. Il valore predefinito è zero (0) e il valore massimo è 32.

  • ParallelApplyBufferSize— Speciifica il numero massimo di record da archiviare in ogni coda di buffer per i thread simultanei da inviare a un endpoint di destinazione Kafka durante un caricamento. CDC Il valore predefinito è 100 e il valore massimo è 1.000. Utilizzare questa opzione quando ParallelApplyThreads specifica più di un thread.

  • ParallelApplyQueuesPerThread— Speciifica il numero di code a cui ogni thread accede per estrarre i record di dati dalle code e generare un caricamento in batch per un endpoint Kafka durante. CDC Il valore di default è 1. Il numero massimo è 512.

Quando si utilizzano le impostazioni delle attività ParallelApply*, l'impostazione di partition-key-type predefinita è la primary-key della tabella, non schema-name.table-name.

Connessione a Kafka tramite Transport Layer Security () TLS

Un cluster Kafka accetta connessioni sicure utilizzando Transport Layer Security (). TLS ConDMS, è possibile utilizzare una qualsiasi delle seguenti tre opzioni di protocollo di sicurezza per proteggere una connessione endpoint Kafka.

SSLcrittografia () server-encryption

I client convalidano l'identità del server tramite il certificato del server. Quindi viene stabilita una connessione crittografata tra server e client.

SSLautenticazione (mutual-authentication)

Server e client convalidano l'identità reciprocamente tramite i propri certificati. Quindi viene stabilita una connessione crittografata tra server e client.

SASL-SSL (mutual-authentication)

Il metodo Simple Authentication and Security Layer (SASL) sostituisce il certificato del client con un nome utente e una password per convalidare l'identità del client. In particolare, si forniscono un nome utente e una password registrati dal server in modo che il server possa convalidare l'identità del client. Quindi viene stabilita una connessione crittografata tra server e client.

Importante

Apache Kafka e Amazon MSK accettano certificati risolti. Questa è una limitazione nota di Kafka e Amazon MSK da risolvere. Per ulteriori informazioni, consulta Problemi con Apache Kafka, -3700. KAFKA

Se utilizzi AmazonMSK, prendi in considerazione l'utilizzo delle liste di controllo degli accessi (ACLs) come soluzione alternativa a questa limitazione nota. Per ulteriori informazioni sull'utilizzoACLs, consulta la sezione Apache Kafka ACLs della Amazon Managed Streaming for Apache Kafka Developer Guide.

Se utilizzi un cluster Kafka autogestito, consulta il commento datato 21 ottobre 2018 per informazioni sulla configurazione del cluster.

Utilizzo della SSL crittografia con Amazon MSK o un cluster Kafka autogestito

Puoi utilizzare SSL la crittografia per proteggere una connessione endpoint ad Amazon MSK o a un cluster Kafka autogestito. Quando utilizzi il metodo di autenticazione SSL crittografata, i client convalidano l'identità di un server tramite il certificato del server. Quindi viene stabilita una connessione crittografata tra server e client.

Per utilizzare SSL la crittografia per connettersi ad Amazon MSK
  • Configura l'impostazione dell'endpoint del protocollo di sicurezza (SecurityProtocol) utilizzando l'opzione ssl-encryption quando crei l'endpoint Kafka di destinazione.

    L'JSONesempio seguente imposta il protocollo di sicurezza come SSL crittografia.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Per utilizzare la SSL crittografia per un cluster Kafka autogestito
  1. Se utilizzi un'Autorità di certificazione (CA) privata nel tuo cluster Kafka locale, carica il tuo certificato CA privato e ottieni un Amazon Resource Name (). ARN

  2. Configura l'impostazione dell'endpoint del protocollo di sicurezza (SecurityProtocol) utilizzando l'opzione ssl-encryption quando crei l'endpoint Kafka di destinazione. L'JSONesempio seguente imposta il protocollo di sicurezza come. ssl-encryption

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Se utilizzi una CA privata, imposta SslCaCertificateArn quella ARN che hai ottenuto nel primo passaggio precedente.

Utilizzo SSL dell'autenticazione

Puoi utilizzare l'SSLautenticazione per proteggere una connessione endpoint ad Amazon MSK o a un cluster Kafka autogestito.

Per abilitare l'autenticazione e la crittografia dei client utilizzando SSL l'autenticazione per connettersi ad AmazonMSK, procedi come segue:

  • Prepara una chiave privata e un certificato pubblico per Kafka.

  • Carica i certificati nel gestore dei DMS certificati.

  • Crea un endpoint di destinazione Kafka con il certificato corrispondente ARNs specificato nelle impostazioni dell'endpoint Kafka.

Per preparare una chiave privata e un certificato pubblico per Amazon MSK
  1. Crea un'EC2istanza e configura un client per utilizzare l'autenticazione come descritto nei passaggi da 1 a 9 nella sezione Client Authentication della Amazon Managed Streaming for Apache Kafka Developer Guide.

    Dopo aver completato questi passaggi, avrai un Certificate- ARN (il certificato pubblico ARN salvato inACM) e una chiave privata contenuti in un file. kafka.client.keystore.jks

  2. Recupera il certificato pubblico e copia il certificato nel file signed-certificate-from-acm.pem utilizzando il comando seguente:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    Il comando restituisce informazioni simili a quelle mostrate nell'esempio seguente:

    {"Certificate": "123", "CertificateChain": "456"}

    Quindi copia l'equivalente di "123" nel file signed-certificate-from-acm.pem.

  3. Recupera la chiave privata importando la chiave msk-rsa da kafka.client.keystore.jks to keystore.p12, come mostrato nel seguente esempio.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Utilizza il comando seguente per esportare keystore.p12 nel formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Viene visualizzato il messaggio Inserisci la PEM passphrase che identifica la chiave applicata per crittografare il certificato.

  5. Rimuovi gli attributi contenitore e gli attributi chiave dal file .pem per assicurarti che la prima riga inizi con la stringa seguente.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Per caricare un certificato pubblico e una chiave privata nel gestore dei DMS certificati e testare la connessione ad Amazon MSK
  1. Caricalo nel gestore dei DMS certificati utilizzando il seguente comando.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Crea un endpoint Amazon MSK Target e testa la connessione per assicurarti che TLS l'autenticazione funzioni.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Importante

Puoi utilizzare l'SSLautenticazione per proteggere una connessione a un cluster Kafka autogestito. In alcuni casi, si potrebbe utilizzare un'autorità di certificazione (CA) privata nel cluster Kafka on-premise. In tal caso, carica la catena CA, il certificato pubblico e la chiave privata nel gestore dei DMS certificati. Quindi, utilizza il corrispondente Amazon Resource Name (ARN) nelle impostazioni dell'endpoint quando crei l'endpoint di destinazione Kafka locale.

Per preparare una chiave privata e un certificato firmato per un cluster Kafka autogestito
  1. Genera una coppia di chiavi come nel seguente esempio.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Genera una richiesta di firma del certificato (). CSR

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Usa la CA nel tuo truststore del cluster per firmare ilCSR. Se non disponi di una CA, puoi creare una CA privata.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importa ca-cert nel truststore e nel keystore del server. Se non si dispone di un truststore, utilizza il comando seguente per crearlo e importare ca-cert .

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Firma il certificato.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importa il certificato firmato nel keystore.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Utilizza il seguente comando per importare la chiave on-premise-rsa da kafka.server.keystore.jks a keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Utilizza il comando seguente per esportare keystore.p12 nel formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Carica encrypted-private-server-key.pem e signed-certificate.pem ca-cert al gestore dei DMS certificati.

  10. Crea un endpoint utilizzando il valore ARNs restituito.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Utilizzo SASL SSL dell'autenticazione per connettersi ad Amazon MSK

Il metodo Simple Authentication and Security Layer (SASL) utilizza un nome utente e una password per convalidare l'identità di un client e stabilisce una connessione crittografata tra server e client.

Per utilizzarliSASL, devi prima creare un nome utente e una password sicuri quando configuri il tuo MSK cluster Amazon. Per una descrizione di come configurare un nome utente e una password sicuri per un MSK cluster Amazon, consulta ConfigurazioneSASL/SCRAMautenticazione per un MSK cluster Amazon nella Amazon Managed Streaming for Apache Kafka Developer Guide.

Quindi, quando crei l'endpoint di destinazione Kafka, imposta l'impostazione dell'endpoint del protocollo di sicurezza (SecurityProtocol) utilizzando l'opzione sasl-ssl. Imposti anche le opzioni SaslUsername e SaslPassword. Assicurati che siano coerenti con il nome utente e la password sicuri che hai creato quando hai configurato per la prima volta il tuo MSK cluster Amazon, come mostrato nell'JSONesempio seguente.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
Nota
  • Attualmente, AWS DMS supporta solo CA pubblica supportata da SASL -SSL. DMSnon supportaSASL, da utilizzare con Kafka autogestito supportato da una CA privata. SSL

  • Per SASL SSL l'autenticazione, AWS DMS supporta il meccanismo SCRAM - SHA -512 per impostazione predefinita. AWS DMS le versioni 3.5.0 e successive supportano anche il meccanismo Plain. Per supportare il meccanismo Plain, imposta il SaslMechanism parametro del tipo di KafkaSettings API dati suPLAIN.

Utilizzo di un'immagine precedente per visualizzare i valori originali delle CDC righe per Apache Kafka come destinazione

Quando si scrivono CDC aggiornamenti su una destinazione di streaming di dati come Kafka, è possibile visualizzare i valori originali di una riga del database di origine prima di modificarli mediante un aggiornamento. Per rendere possibile ciò, AWS DMS compila un'immagine precedente degli eventi di aggiornamento sulla base dei dati forniti dal motore del database di origine.

Diversi motori di database di origine forniscono diverse quantità di informazioni per un'immagine precedente:

  • Oracle fornisce aggiornamenti alle colonne solo se cambiano.

  • Postgre SQL fornisce solo i dati per le colonne che fanno parte della chiave primaria (modificate o meno). Se la replica logica è in uso ed REPLICA IDENTITY FULL è impostata per la tabella di origine, è possibile ottenere informazioni complete prima e dopo sulla riga scritta WALs e disponibile qui.

  • SQLIn genere, My fornisce dati per tutte le colonne (modificate o meno).

Per consentire prima dell'imaging di aggiungere valori originali dal database di origine all'output AWS DMS , utilizzare l'impostazione dell'attività BeforeImageSettings o il parametro add-before-image-columns. Questo parametro applica una regola di trasformazione della colonna.

BeforeImageSettingsaggiunge un nuovo JSON attributo a ogni operazione di aggiornamento con valori raccolti dal sistema di database di origine, come illustrato di seguito.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Nota

Si applica BeforeImageSettings alle CDC attività a pieno carico Plus (che migrano i dati esistenti e replicano le modifiche in corso) o CDC solo alle attività (che replicano solo le modifiche ai dati). Non applicare BeforeImageSettings alle attività a pieno carico.

Per le opzioni BeforeImageSettings, si applica quanto segue:

  • Impostare l'opzione EnableBeforeImage su true da abilitare prima dell'imaging. Il valore predefinito è false.

  • Utilizzate l'FieldNameopzione per assegnare un nome al nuovo attributo. JSON Quando EnableBeforeImage è true, FieldName è richiesto e non può essere vuoto.

  • L'opzione ColumnFilter specifica una colonna da aggiungere utilizzando l'imaging precedente. Per aggiungere solo colonne che fanno parte delle chiavi primarie della tabella, utilizzare il valore predefinito, pk-only. Per aggiungere solo colonne che non sono di LOB tipo, usanon-lob. Per aggiungere qualsiasi colonna con un valore immagine prima, utilizzare all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Utilizzo di una regola di trasformazione dell'immagine precedente

In alternativa alle impostazioni delle attività, è possibile utilizzare il parametro add-before-image-columns, che applica una regola di trasformazione delle colonne. Con questo parametro, è possibile abilitare la funzione Before Imaging durante lo streaming di dati CDC su destinazioni come Kafka.

Utilizzando add-before-image-columns in una una regola di trasformazione, è possibile applicare un controllo più dettagliato dei risultati dell'immagine precedente. Le regole di trasformazione consentono di utilizzare un localizzatore di oggetti che consente di controllare le tabelle selezionate per la regola. Inoltre, è possibile concatenare le regole di trasformazione, consentendo l'applicazione di regole diverse a tabelle diverse. È quindi possibile manipolare le colonne prodotte utilizzando altre regole.

Nota

Non utilizzare il parametro add-before-image-columns insieme all'impostazione dell'attività BeforeImageSettings all'interno della stessa attività. Utilizzare invece il parametro o l'impostazione, ma non entrambi, per una singola attività.

Un tipo di regola transformation regola con il parametro add-before-image-columns per una colonna deve fornire una sezione before-image-def. Di seguito viene riportato un esempio.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

Il valore di column-prefix viene anteposto a un nome di colonna e il valore predefinito di column-prefix è BI_. Il valore di column-suffix viene aggiunto al nome della colonna e il valore predefinito è vuoto. Non impostare sia column-prefix sia column-suffix sull'opzione per svuotare le stringhe.

Scegliere un valore per column-filter. Per aggiungere solo colonne che fanno parte delle chiavi primarie della tabella, scegliere pk-only . Scegliete non-lob di aggiungere solo colonne non tipograficheLOB. Oppure scegliere all per aggiungere qualsiasi colonna con un valore immagine precedente.

Esempio di una regola di trasformazione dell'immagine precedente

La regola di trasformazione nell'esempio seguente aggiunge una nuova colonna chiamata BI_emp_no nella destinazione. Quindi una dichiarazione come UPDATE employees SET emp_no = 3 WHERE emp_no = 1; popola il campo BI_emp_no con 1. Quando scrivi CDC aggiornamenti alle destinazioni Amazon S3, la BI_emp_no colonna consente di determinare quale riga originale è stata aggiornata.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Per informazioni sull'utilizzo dell'operazione della regola add-before-image-columns, consulta Operazioni e regole di trasformazione.

Limitazioni nell'utilizzo di Apache Kafka come destinazione per AWS Database Migration Service

Utilizzando Apache Kafka come destinazione valgono le seguenti limitazioni:

  • AWS DMS Gli endpoint target Kafka non supportano il controllo degli IAM accessi per Amazon Managed Streaming for Apache Kafka (Amazon). MSK

  • La modalità completa non è supportata. LOB

  • Specificate un file di configurazione Kafka per il vostro cluster con proprietà che consentano di AWS DMS creare automaticamente nuovi argomenti. Includere l'impostazione auto.create.topics.enable = true. Se utilizzi AmazonMSK, puoi specificare la configurazione predefinita quando crei il tuo cluster Kafka, quindi modificare l'auto.create.topics.enableimpostazione in. true Per ulteriori informazioni sulle impostazioni di configurazione predefinite, consulta La MSK configurazione predefinita di Amazon nella Amazon Managed Streaming for Apache Kafka Developer Guide. Se devi modificare un cluster Kafka esistente creato utilizzando AmazonMSK, esegui il AWS CLI comando aws kafka create-configuration per aggiornare la configurazione di Kafka, come nell'esempio seguente:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Qui, //~/kafka_configuration è il file di configurazione creato con le impostazioni delle proprietà richieste.

    Se utilizzi la tua istanza Kafka installata su AmazonEC2, modifica la configurazione del cluster Kafka con l'auto.create.topics.enable = trueimpostazione per consentire AWS DMS la creazione automatica di nuovi argomenti, utilizzando le opzioni fornite con l'istanza.

  • AWS DMS pubblica ogni aggiornamento di un singolo record nel database di origine come un unico record di dati (messaggio) in un determinato argomento di Kafka indipendentemente dalle transazioni.

  • AWS DMS supporta i seguenti due moduli per le chiavi di partizione:

    • SchemaName.TableName: una combinazione del nome dello schema e quello della tabella.

    • ${AttributeName}: il valore di uno dei campi della o JSON la chiave primaria della tabella nel database di origine.

  • BatchApply non è supportato per un endpoint Kafka. L'utilizzo dell'applicazione in batch, ad esempio l'impostazione dell'attività dei metadati di destinazione BatchApplyEnabled, per una destinazione Kafka potrebbe causare la perdita di dati.

  • AWS DMS non supporta la migrazione di valori di tipi di BigInt dati con più di 16 cifre. Per aggirare questa limitazione puoi utilizzare la seguente regola di trasformazione per convertire la colonna BigInt in una stringa. Per ulteriori informazioni sulle regole di trasformazione, consulta Operazioni e regole di trasformazione.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Utilizzo della mappatura degli oggetti per la migrazione dei dati in un argomento Kafka

AWS DMS utilizza regole di mappatura delle tabelle per mappare i dati dall'argomento Kafka di origine a quello di destinazione. Per mappare i dati a un argomento di destinazione, è necessario utilizzare una regola di mappatura delle tabelle denominata mappatura degli oggetti. La mappatura degli oggetti consente di definire il modo in cui i record di dati nell'origine vengono mappati ai record di dati pubblicati nell'argomento Kafka.

Gli argomenti Kafka non dispongono di una struttura preimpostata oltre a una chiave di partizione.

Nota

Non è necessario utilizzare la mappatura degli oggetti. È possibile utilizzare la normale mappatura delle tabelle per varie trasformazioni. Tuttavia, il tipo di chiave della partizione segue questi comportamenti predefiniti:

  • La chiave primaria viene utilizzata come chiave di partizione per il pieno carico.

  • Se non vengono utilizzate impostazioni di attività di applicazione parallela, schema.table viene utilizzata come chiave di partizione per. CDC

  • Se vengono utilizzate le impostazioni delle attività di applicazione parallela, la chiave primaria viene utilizzata come chiave di partizione per. CDC

Per creare una regola di mappatura degli oggetti, è necessario impostare il parametro rule-type su object-mapping. Questa regola specifica il tipo di mappatura degli oggetti da utilizzare.

Di seguito è riportata la struttura per la regola.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS attualmente supporta map-record-to-record e è map-record-to-document l'unico valore valido per il parametro. rule-action Queste impostazioni influiscono sui valori che non sono esclusi come parte dell'elenco degli attributi exclude-columns. I map-record-to-document valori map-record-to-record and specificano come AWS DMS gestisce questi record per impostazione predefinita. Questi valori non influiscono in alcun modo sulle mappature degli attributi.

Nel caso di migrazione da un database relazionale a un argomento Kafka utilizza map-record-to-record. Questo tipo di regola utilizza il valore taskResourceId.schemaName.tableName dal database relazionale come chiave di partizione nell'argomento Kafka e crea un attributo per ogni colonna nel database di origine.

Quando utilizzi map-record-to-record, tieni presente quanto segue:

  • Questa impostazione ha effetto solo sulle colonne escluse dall'elenco exclude-columns.

  • Per ogni colonna di questo tipo, AWS DMS crea un attributo corrispondente nell'argomento di destinazione.

  • AWS DMS crea questo attributo corrispondente indipendentemente dal fatto che la colonna di origine venga utilizzata in una mappatura degli attributi.

Per comprendere il funzionamento di map-record-to-record, è opportuno esaminarne il comportamento in azione. Per questo esempio, supponiamo che tu stia iniziando con una riga di tabella del database relazionale con la struttura e i dati seguenti.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Per eseguire la migrazione di queste informazioni da uno schema denominato Test verso un argomento Kafka, crea le regole per mappare i dati sull'argomento di destinazione. La regola seguente illustra la mappatura.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Dato un argomento Kafka e una chiave di partizione (in questo caso, taskResourceId.schemaName.tableName), il seguente esempio illustra il formato di record risultante per l'argomento di destinazione Kafka utilizzando i dati di esempio:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Ristrutturazione dei dati con la mappatura degli attributi

Utilizzando una mappa degli attributi puoi modificare la struttura dei dati mentre li stai migrando verso un argomento Kafka. Ad esempio, potresti voler combinare più campi nell'origine in un unico campo nella destinazione. La seguente mappa degli attributi illustra come ristrutturare i dati.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Per impostare un valore costante per partition-key, specificare un valore partition-key. Ad esempio, potresti eseguire questa operazione per forzare la memorizzazione di tutti i dati in una singola partizione. Questo approccio viene illustrato nella mappatura seguente.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Nota

Il valore partition-key per un record di controllo per una tabella specifica è TaskId.SchemaName.TableName. Il valore partition-key per un record di controllo per un'attività specifica è il TaskId del record. La specifica si un valore partition-key nella mappatura degli oggetti non influisce sul parametro partition-key per un record di controllo.

Replica di più argomenti con la mappatura degli oggetti

Per impostazione predefinita, AWS DMS le attività migrano tutti i dati di origine su uno degli argomenti di Kafka seguenti:

  • Come specificato nel campo Argomento dell'endpoint di destinazione. AWS DMS

  • quello specificato da kafka-default-topic se il campo Argomento dell'endpoint di destinazione non è compilato e l'impostazione Kafka auto.create.topics.enable è impostata su true.

Con le versioni AWS DMS del motore 3.4.6 e successive, è possibile utilizzare l'kafka-target-topicattributo per mappare ogni tabella di origine migrata su un argomento separato. Ad esempio, le regole di mappatura degli oggetti riportate di seguito migrano le tabelle di origine Customer e Address agli argomenti Kafka customer_topic e address_topic rispettivamente. Allo stesso tempo, AWS DMS migra tutte le altre tabelle di origine, inclusa la Bills tabella nello Test schema, all'argomento specificato nell'endpoint di destinazione.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Utilizzando la replica di più argomenti Kafka, è possibile raggruppare e migrare le tabelle di origine su argomenti Kafka separati utilizzando un'unica attività di replica.

Formato dei messaggi per Apache Kafka

L'JSONoutput è semplicemente un elenco di coppie chiave-valore.

RecordType

Il tipo di record può essere relativo ai dati o al controllo. I record di dati rappresentano le righe effettive nell'origine. I record di controllo sono per eventi importanti nel flusso, ad esempio un riavvio dell'attività.

Operazione

Per i record di dati, l'operazione può essere load, insert, update o delete.

Per i record di controllo, l'operazione può essere create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column o column-type-change.

SchemaName

Lo schema di origine per il record. Questo campo può essere vuoto per un record di controllo.

TableName

La tabella di origine per il record. Questo campo può essere vuoto per un record di controllo.

Timestamp

Il timestamp di quando è stato creato il JSON messaggio. Il campo è formattato con il ISO formato 8601.

Il seguente esempio di JSON messaggio illustra un messaggio di tipo di dati con tutti i metadati aggiuntivi.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

Il seguente esempio di JSON messaggio illustra un messaggio di tipo di controllo.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }