Usare Apache Kafka come obiettivo per AWS Database Migration Service - AWS Servizio di migrazione del Database

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Usare Apache Kafka come obiettivo per AWS Database Migration Service

È possibile utilizzarlo AWS DMS per migrare i dati in un cluster Apache Kafka. Apache Kafka è una piattaforma di streaming distribuita. È possibile utilizzare Apache Kafka per l'inserimento e l'elaborazione dei dati di streaming in tempo reale.

AWS offre anche Amazon Managed Streaming for Apache Kafka (Amazon MSK) da utilizzare come destinazione. AWS DMS Amazon MSK è un servizio di streaming Apache Kafka completamente gestito che semplifica l'implementazione e la gestione delle istanze Apache Kafka. Funziona con le versioni open source di Apache Kafka e accedi alle istanze Amazon MSK come AWS DMS destinazioni esattamente come qualsiasi istanza di Apache Kafka. Per ulteriori informazioni, consulta Che cos'è Amazon MSK? nella Guida per sviluppatori di Streaming gestito da Amazon per Apache Kafka.

Un cluster Kafka archivia i flussi di record in categorie denominate argomenti che sono suddivisi in partizioni. Le partizioni sono sequenze di record di dati (messaggi) identificate in modo univoco in un argomento. Le partizioni possono essere distribuite tra più broker in un cluster per consentire l'elaborazione parallela dei record dell'argomento. Per ulteriori informazioni su argomenti e partizioni e la loro distribuzione in Apache Kafka, consulta Argomenti e registri e Distribuzione.

Il cluster Kafka può essere un'istanza Amazon MSK, un cluster in esecuzione su un'istanza Amazon EC2 o un cluster on-premise. Un'istanza Amazon MSK o un cluster su un'istanza Amazon EC2 può trovarsi nello stesso VPC o in uno diverso. Se il cluster è on-premise, è possibile utilizzare il server dei nomi on-premise dell'istanza di replica per risolvere il nome host del cluster. Per informazioni sulla configurazione di un server dei nomi per l'istanza di replica, consulta Utilizzo del server dei nomi in locale. Per ulteriori informazioni sulla configurazione di una rete, consulta Configurazione di una rete per un'istanza di replica.

Quando utilizzi un cluster Amazon MSK, assicurati che il relativo gruppo di sicurezza consenta l'accesso dall'istanza di replica. Per informazioni sulla modifica del gruppo di sicurezza di un cluster Amazon MSK, consulta Modifica del gruppo di sicurezza di un cluster Amazon MSK.

AWS Database Migration Service pubblica i record relativi a un argomento di Kafka utilizzando JSON. Durante la conversione, AWS DMS serializza ogni record dal database di origine in una coppia attributo-valore in formato JSON.

È possibile usare la mappatura degli oggetti per migrare i dati da qualsiasi origine dati supportata a un cluster Kafka di destinazione. Con la mappatura degli oggetti, determini il modo in cui strutturare i record di dati nell'argomento di destinazione. Puoi inoltre definire una chiave di partizionamento per ogni tabella che viene utilizzata da Kafka per raggruppare i dati nelle sue partizioni.

Attualmente, AWS DMS supporta un singolo argomento per attività. Per una singola attività con più tabelle, tutti i messaggi vengono indirizzati a un unico argomento. Ogni messaggio include una sezione di metadati che identifica lo schema e la tabella di destinazione. AWS DMS le versioni 3.4.6 e successive supportano la replica multitopica utilizzando la mappatura degli oggetti. Per ulteriori informazioni, consulta Replica di più argomenti con la mappatura degli oggetti.

Impostazioni endpoint Apache Kafka

È possibile specificare i dettagli della connessione tramite le impostazioni dell'endpoint nella AWS DMS console o l'--kafka-settingsopzione nella CLI. I requisiti per ogni impostazione sono i seguenti:

  • Broker: specifica le posizioni di uno o più broker nel cluster Kafka sotto forma di elenco di voci separate da virgole per ogni broker-hostname:port. Un esempio è "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Questa impostazione può specificare le posizioni di uno o tutti i broker del cluster. Tutti i broker del cluster comunicano per gestire il partizionamento dei record di dati migrati verso l'argomento.

  • Topic (facoltativo): specifica il nome dell'argomento con una lunghezza massima di 255 lettere e simboli. È possibile utilizzare punto (.), carattere di sottolineatura (_) e segno meno (-). I nomi degli argomenti con un punto (.) o un carattere di sottolineatura (_) possono interferire con le strutture dati interne. Nel nome dell'argomento utilizzare uno dei due simboli ma non entrambi. Se non specificate il nome di un argomento, AWS DMS lo usa "kafka-default-topic" come argomento di migrazione.

    Nota

    Se desiderate AWS DMS creare un argomento di migrazione specificato dall'utente o l'argomento predefinito, impostatelo auto.create.topics.enable = true come parte della configurazione del cluster Kafka. Per ulteriori informazioni, consultare Limitazioni nell'utilizzo di Apache Kafka come destinazione per AWS Database Migration Service

  • MessageFormat: il formato di output per i record creati nell'endpoint. Il formato del messaggio è JSON (predefinito) o JSON_UNFORMATTED (una singola riga senza tabulazione).

  • MessageMaxBytes: la dimensione massima in byte per i record creati nell'endpoint. Il valore di default è 1.000.000.

    Nota

    È possibile utilizzare solo AWS CLI/SDK per passare MessageMaxBytes a un valore non predefinito. Ad esempio utilizza il seguente comando per modificare l'endpoint Kafka esistente e cambiare MessageMaxBytes.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails: fornisce informazioni dettagliate sulle transazioni dal database di origine. Tali informazioni includono un timestamp di commit, una posizione nel log e valori per transaction_id, previous_transaction_id e transaction_record_id(l'offset del record all'interno di una transazione). Il valore predefinito è false.

  • IncludePartitionValue: mostra il valore della partizione nell'output dei messaggi di Kafka, a meno che il tipo della partizione non sia schema-table-type. Il valore predefinito è false.

  • PartitionIncludeSchemaTable: aggiunge ai nomi di schemi e tabelle il prefisso con i valori di partizione, quando il tipo di partizione è primary-key-type. In questo modo si aumenta la distribuzione dei dati tra le partizioni di Kafka. Ad esempio, si supponga che uno schema SysBench includa migliaia di tabelle e che ogni tabella faccia riferimento solo a un intervallo limitato di valori della chiave primaria. In questo caso, la stessa chiave primaria viene inviata da migliaia di tabelle alla stessa partizione, causando un rallentamento. Il valore predefinito è false.

  • IncludeTableAlterOperations: include tutte le operazioni DDL (Data Definition Language) che modificano i dati di controllo della tabella, ad esempio rename-table, drop-table, add-column, drop-column e rename-column. Il valore predefinito è false.

  • IncludeControlDetails: mostra le informazioni dettagliate del controllo per la definizione di tabelle, la definizione di colonne e le modifiche di tabelle e colonne nell'output dei messaggi Kafka. Il valore predefinito è false.

  • IncludeNullAndEmpty: include le colonne vuote e NULL nella destinazione. Il valore predefinito è false.

  • SecurityProtocol: imposta una connessione sicura a un endpoint di destinazione Kafka utilizzando Transport Layer Security (TLS). Le opzioni includono ssl-authentication, ssl-encryption e sasl-ssl. L'utilizzo di sasl-ssl richiede SaslUsername e SaslPassword.

  • SslEndpointIdentificationAlgorithm— Imposta la verifica del nome host per il certificato. Questa impostazione è supportata nella AWS DMS versione 3.5.1 e successive. Le opzioni sono le seguenti:

    • NONE: Disattiva la verifica del nome host del broker nella connessione del client.

    • HTTPS: Abilita la verifica del nome host del broker nella connessione del client.

È possibile utilizzare le impostazioni per aumentare la velocità del trasferimento. Per farlo, AWS DMS supporta il pieno carico multithread in un cluster Apache Kafka di destinazione. AWS DMS supporta il multithreading con le impostazioni delle attività seguenti:

  • MaxFullLoadSubTasks— Utilizzate questa opzione per indicare il numero massimo di tabelle di origine da caricare in parallelo. AWS DMS carica ogni tabella nella corrispondente tabella di destinazione di Kafka utilizzando una sottoattività dedicata. Il valore predefinito è 8; il valore il massimo è 49.

  • ParallelLoadThreads— Utilizzate questa opzione per specificare il numero di thread da utilizzare per caricare ogni tabella nella relativa tabella di destinazione Kafka. AWS DMS Il valore massimo per una destinazione Apache Kafka è 32. Puoi chiedere che questo limite massimo venga aumentato.

  • ParallelLoadBufferSize: utilizza questa opzione per specificare il numero massimo di record da archiviare nel buffer usato dai thread di caricamento parallelo per caricare i dati nella destinazione Kafka. Il valore predefinito è 50. Il valore massimo è 1.000. Utilizzare questo parametro con ParallelLoadThreads; ParallelLoadBufferSize è valido solo quando è presente più di un thread.

  • ParallelLoadQueuesPerThread: utilizza questa opzione per specificare il numero di code a cui accede ogni thread simultaneo per eliminare i record di dati dalle code e generare un carico batch per la destinazione. Il valore di default è 1. Il numero massimo è 512.

È possibile migliorare le prestazioni dell'acquisizione dei dati di modifica (CDC) per gli endpoint Kafka ottimizzando le impostazioni delle attività per thread paralleli e operazioni in blocco. A tale scopo, è possibile specificare il numero di thread simultanei, di code per thread e di record da memorizzare in un buffer utilizzando le impostazioni delle attività ParallelApply*. Ad esempio, si supponga di voler eseguire un carico CDC e applicare 128 thread in parallelo. Si desidera inoltre accedere a 64 code per thread, con 50 record memorizzati per buffer.

Per promuovere le prestazioni del CDC, AWS DMS supporta le seguenti impostazioni delle attività:

  • ParallelApplyThreads— specifica il numero di thread simultanei che vengono AWS DMS utilizzati durante un caricamento CDC per inviare i record di dati a un endpoint di destinazione Kafka. Il valore predefinito è zero (0) e il valore massimo è 32.

  • ParallelApplyBufferSize: specifica il numero massimo di record da archiviare in ogni coda di buffer per eseguire il push dei thread simultanei a un endpoint di destinazione Kafka durante un carico CDC. Il valore predefinito è 100 e il valore massimo è 1.000. Utilizzare questa opzione quando ParallelApplyThreads specifica più di un thread.

  • ParallelApplyQueuesPerThread: specifica il numero di code a cui ogni thread accede per eliminare i record di dati dalle code e generare un carico batch per un endpoint Kafka durante la CDC. Il valore di default è 1. Il numero massimo è 512.

Quando si utilizzano le impostazioni delle attività ParallelApply*, l'impostazione di partition-key-type predefinita è la primary-key della tabella, non schema-name.table-name.

Connessione a Kafka utilizzando Transport Layer Security (TLS)

Il cluster Kafka accetta solo connessioni protette con Transport Layer Security (TLS). Con DMS, è possibile utilizzare una qualsiasi delle seguenti tre opzioni di protocollo di sicurezza per proteggere la connessione degli endpoint Kafka.

Crittografia SSL (server-encryption)

I client convalidano l'identità del server tramite il certificato del server. Quindi viene stabilita una connessione crittografata tra server e client.

Autenticazione SSL (mutual-authentication)

Server e client convalidano l'identità reciprocamente tramite i propri certificati. Quindi viene stabilita una connessione crittografata tra server e client.

SASL-SSL (mutual-authentication)

Il metodo Simple Authentication and Security Layer (SASL) sostituisce il certificato del client con un nome utente e una password per convalidare l'identità del client. In particolare, si forniscono un nome utente e una password registrati dal server in modo che il server possa convalidare l'identità del client. Quindi viene stabilita una connessione crittografata tra server e client.

Importante

Apache Kafka e Amazon MSK accettano certificati risolti. Questa è una limitazione nota di Kafka e Amazon MSK da risolvere. Per ulteriori informazioni, consulta Apache Kafka issues, KAFKA-3700.

Se utilizzi Amazon MSK, prendi in considerazione le liste di controllo degli accessi (ACL) come soluzione alternativa a questa limitazione nota. Per ulteriori informazioni sull'utilizzo delle ACL, consulta la sezione ACL Apache Kafka nella Guida per gli sviluppatori di Streaming gestito da Amazon per Apache Kafka.

Se utilizzi un cluster Kafka autogestito, consulta il commento datato 21 ottobre 2018 per informazioni sulla configurazione del cluster.

Utilizzo della crittografia SSL con Amazon MSK o un cluster Kafka autogestito

È possibile utilizzare la crittografia SSL per proteggere la connessione di un endpoint ad Amazon MSK o a un cluster Kafka autogestito. Quando usi il metodo di autenticazione con crittografia SSL, i client convalidano l'identità di un server tramite il certificato del server. Quindi viene stabilita una connessione crittografata tra server e client.

Per utilizzare la crittografia SSL per connettersi ad Amazon MSK
  • Configura l'impostazione dell'endpoint del protocollo di sicurezza (SecurityProtocol) utilizzando l'opzione ssl-encryption quando crei l'endpoint Kafka di destinazione.

    L'esempio JSON che segue imposta il protocollo di sicurezza come crittografia SSL.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Per utilizzare la crittografia SSL per un cluster Kafka autogestito
  1. Se utilizzi un'autorità di certificazione (CA, Certification Authority) privata nel cluster Kafka on-premise, carica il certificato CA privato e ottieni un nome della risorsa Amazon (ARN).

  2. Configura l'impostazione dell'endpoint del protocollo di sicurezza (SecurityProtocol) utilizzando l'opzione ssl-encryption quando crei l'endpoint Kafka di destinazione. L'esempio JSON che segue imposta il protocollo di sicurezza come ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Se utilizzi una CA privata, imposta SslCaCertificateArn nell'ARN che hai ottenuto nella prima fase precedente.

Utilizzo dell'autenticazione SSL

È possibile utilizzare l'autenticazione SSL per proteggere la connessione di un endpoint ad Amazon MSK o a un cluster Kafka autogestito.

Per abilitare l'autenticazione e la crittografia del client utilizzando l'autenticazione SSL per la connessione ad Amazon MSK, procedi come segue:

  • Prepara una chiave privata e un certificato pubblico per Kafka.

  • Carica i certificati nella gestione certificati DMS.

  • Crea un endpoint di destinazione Kafka con i corrispondenti ARN di certificati specificati nelle impostazioni degli endpoint Kafka.

Per preparare una chiave privata e un certificato pubblico per Amazon MSK
  1. Crea un'istanza EC2 e configura un client per utilizzare l'autenticazione come descritto nelle fasi da 1 a 9 della sezione Client Authentication della Guida per sviluppatori di Streaming gestito da Amazon per Apache Kafka.

    Dopo aver completato queste fasi, avrai un Certificate-ARN (l'ARN del certificato pubblico salvato in ACM) e una chiave privata contenuta in un file kafka.client.keystore.jks.

  2. Recupera il certificato pubblico e copia il certificato nel file signed-certificate-from-acm.pem utilizzando il comando seguente:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    Il comando restituisce informazioni simili a quelle mostrate nell'esempio seguente:

    {"Certificate": "123", "CertificateChain": "456"}

    Quindi copia l'equivalente di "123" nel file signed-certificate-from-acm.pem.

  3. Recupera la chiave privata importando la chiave msk-rsa da kafka.client.keystore.jks to keystore.p12, come mostrato nel seguente esempio.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Utilizza il comando seguente per esportare keystore.p12 nel formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Viene visualizzato il messaggio Enter PEM pass phrase che richiede la chiave applicata per crittografare il certificato.

  5. Rimuovi gli attributi contenitore e gli attributi chiave dal file .pem per assicurarti che la prima riga inizi con la stringa seguente.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Per caricare un certificato pubblico e una chiave privata nella gestione certificati DMS e testare la connessione ad Amazon MSK
  1. Carica nella gestione certificati DMS utilizzando il comando seguente.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Crea un endpoint di destinazione Amazon MSK e verifica la connessione per assicurarti che l'autenticazione TLS funzioni.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Importante

È possibile utilizzare l'autenticazione SSL per proteggere una connessione a un cluster Kafka autogestito. In alcuni casi, si potrebbe utilizzare un'autorità di certificazione (CA) privata nel cluster Kafka on-premise. In tal caso, carica la catena di CA, il certificato pubblico e la chiave privata nella gestione certificati DMS. Quindi, utilizza il corrispondente nome della risorsa Amazon (ARN) nelle impostazioni degli endpoint quando crei l'endpoint di destinazione Kafka on-premise.

Per preparare una chiave privata e un certificato firmato per un cluster Kafka autogestito
  1. Genera una coppia di chiavi come nel seguente esempio.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Genera una richiesta di firma del certificato (CSR, Certificate Sign Request).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Usa la CA nel truststore del cluster per firmare la CSR. Se non disponi di una CA, puoi creare una CA privata.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importa ca-cert nel truststore e nel keystore del server. Se non si dispone di un truststore, utilizza il comando seguente per crearlo e importare ca-cert .

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Firma il certificato.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importa il certificato firmato nel keystore.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Utilizza il seguente comando per importare la chiave on-premise-rsa da kafka.server.keystore.jks a keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Utilizza il comando seguente per esportare keystore.p12 nel formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Carica encrypted-private-server-key.pem e signed-certificate.pem e ca-cert nella gestione certificati DMS.

  10. Crea un endpoint utilizzando gli ARN restituiti.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Utilizzo dell'autenticazione SASL-SSL per connettersi ad Amazon MSK

Il metodo Simple Authentication and Security Layer (SASL) utilizza un nome utente e una password per convalidare l'identità di un client e stabilisce una connessione crittografata tra server e client.

Per utilizzare SASL, devi prima creare un nome utente e una password sicuri quando configuri il cluster Amazon MSK. Per una descrizione di come configurare un nome utente e una password sicuri per un cluster Amazon MSK, consulta Configurazione dell'autenticazione SASL/SCRAM per un cluster Amazon MSK nella Guida per sviluppatori di Streaming gestito da Amazon per Apache Kafka.

Quindi, quando crei l'endpoint di destinazione Kafka, imposta l'impostazione dell'endpoint del protocollo di sicurezza (SecurityProtocol) utilizzando l'opzione sasl-ssl. Imposti anche le opzioni SaslUsername e SaslPassword. Assicurati che siano coerenti con il nome utente e la password sicuri creati durante la configurazione del cluster Amazon MSK, come mostrato nel seguente esempio JSON.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
Nota
  • Attualmente, supporta solo SASL-SSL pubblico supportato da CA AWS DMS . DMS non supporta SASL-SSL per l'uso con Kafka autogestito supportato da una CA privata.

  • Per l'autenticazione SASL-SSL, AWS DMS supporta il meccanismo SCRAM-SHA-512 per impostazione predefinita. AWS DMS le versioni 3.5.0 e successive supportano anche il meccanismo Plain. Per supportare il meccanismo Plain, imposta il parametro SaslMechanism del tipo di dati API KafkaSettings su PLAIN.

Utilizzo di un'immagine precedente per visualizzare i valori originali delle righe CDC per Apache Kafka come destinazione

Quando si scrivono aggiornamenti CDC su una destinazione di streaming dati come Kafka, è possibile visualizzare i valori originali di una riga di database di origine prima di apportare modifiche da un aggiornamento. Per rendere possibile ciò, AWS DMS compila un'immagine precedente degli eventi di aggiornamento sulla base dei dati forniti dal motore del database di origine.

Diversi motori di database di origine forniscono diverse quantità di informazioni per un'immagine precedente:

  • Oracle fornisce aggiornamenti alle colonne solo se cambiano.

  • PostgreSQL fornisce solo i dati per le colonne che fanno parte della chiave primaria (modificata o meno). Se è in uso la replica logica e REPLICA IDENTITY FULL è impostato per la tabella di origine, è possibile ottenere informazioni complete prima e dopo sulla riga scritta e disponibile nei WAL.

  • MySQL generalmente fornisce dati per tutte le colonne (modificate o meno).

Per consentire prima dell'imaging di aggiungere valori originali dal database di origine all'output AWS DMS , utilizzare l'impostazione dell'attività BeforeImageSettings o il parametro add-before-image-columns. Questo parametro applica una regola di trasformazione della colonna.

BeforeImageSettings aggiunge un nuovo attributo JSON a ogni operazione di aggiornamento con valori raccolti dal sistema di database di origine, come illustrato di seguito.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Nota

Applicare BeforeImageSettings alle attività CDC a pieno carico e alle attività CDC (che eseguono la migrazione dei dati esistenti e replicano le modifiche in corso) o solo alle attività CDC (che replicano solo le modifiche dei dati). Non applicare BeforeImageSettings alle attività a pieno carico.

Per le opzioni BeforeImageSettings, si applica quanto segue:

  • Impostare l'opzione EnableBeforeImage su true da abilitare prima dell'imaging. Il valore predefinito è false.

  • Utilizzare l'opzione FieldName per assegnare un nome al nuovo attributo JSON. Quando EnableBeforeImage è true, FieldName è richiesto e non può essere vuoto.

  • L'opzione ColumnFilter specifica una colonna da aggiungere utilizzando l'imaging precedente. Per aggiungere solo colonne che fanno parte delle chiavi primarie della tabella, utilizzare il valore predefinito, pk-only. Per aggiungere solo colonne non di tipo LOB, utilizzare non-lob. Per aggiungere qualsiasi colonna con un valore immagine prima, utilizzare all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Utilizzo di una regola di trasformazione dell'immagine precedente

In alternativa alle impostazioni delle attività, è possibile utilizzare il parametro add-before-image-columns, che applica una regola di trasformazione delle colonne. Con questo parametro, è possibile abilitare l'imaging precedente durante il CDC su destinazioni di streaming dati come Kafka.

Utilizzando add-before-image-columns in una una regola di trasformazione, è possibile applicare un controllo più dettagliato dei risultati dell'immagine precedente. Le regole di trasformazione consentono di utilizzare un localizzatore di oggetti che consente di controllare le tabelle selezionate per la regola. Inoltre, è possibile concatenare le regole di trasformazione, consentendo l'applicazione di regole diverse a tabelle diverse. È quindi possibile manipolare le colonne prodotte utilizzando altre regole.

Nota

Non utilizzare il parametro add-before-image-columns insieme all'impostazione dell'attività BeforeImageSettings all'interno della stessa attività. Utilizzare invece il parametro o l'impostazione, ma non entrambi, per una singola attività.

Un tipo di regola transformation regola con il parametro add-before-image-columns per una colonna deve fornire una sezione before-image-def. Di seguito viene riportato un esempio.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

Il valore di column-prefix viene anteposto a un nome di colonna e il valore predefinito di column-prefix è BI_. Il valore di column-suffix viene aggiunto al nome della colonna e il valore predefinito è vuoto. Non impostare sia column-prefix sia column-suffix sull'opzione per svuotare le stringhe.

Scegliere un valore per column-filter. Per aggiungere solo colonne che fanno parte delle chiavi primarie della tabella, scegliere pk-only . Scegliere non-lob per aggiungere solo colonne non di tipo LOB. Oppure scegliere all per aggiungere qualsiasi colonna con un valore immagine precedente.

Esempio di una regola di trasformazione dell'immagine precedente

La regola di trasformazione nell'esempio seguente aggiunge una nuova colonna chiamata BI_emp_no nella destinazione. Quindi una dichiarazione come UPDATE employees SET emp_no = 3 WHERE emp_no = 1; popola il campo BI_emp_no con 1. Quando si scrivono aggiornamenti CDC alle destinazioni Amazon S3, la colonna BI_emp_no indica quale riga originale è stata aggiornata.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Per informazioni sull'utilizzo dell'operazione della regola add-before-image-columns, consulta Operazioni e regole di trasformazione.

Limitazioni nell'utilizzo di Apache Kafka come destinazione per AWS Database Migration Service

Utilizzando Apache Kafka come destinazione valgono le seguenti limitazioni:

  • AWS DMS Gli endpoint target Kafka non supportano il controllo degli accessi IAM per Amazon Managed Streaming for Apache Kafka (Amazon MSK).

  • La modalità LOB completa non è supportata.

  • Specificate un file di configurazione Kafka per il vostro cluster con proprietà che consentano di creare automaticamente nuovi argomenti. AWS DMS Includere l'impostazione auto.create.topics.enable = true. Se si utilizza Amazon MSK, è possibile specificare la configurazione predefinita alla creazione del cluster Kafka, quindi modificare l'impostazione auto.create.topics.enable su true. Per ulteriori informazioni sulle impostazioni di configurazione predefinite, consulta La configurazione predefinita di Amazon MSK nella Guida per gli sviluppatori di Streaming gestito da Amazon per Apache Kafka. Se devi modificare un cluster Kafka esistente creato utilizzando Amazon MSK, esegui il AWS CLI comando aws kafka create-configuration per aggiornare la configurazione di Kafka, come nell'esempio seguente:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Qui, //~/kafka_configuration è il file di configurazione creato con le impostazioni delle proprietà richieste.

    Se utilizzi la tua istanza Kafka installata su Amazon EC2, modifica la configurazione del cluster Kafka con l'impostazione per AWS DMS consentire auto.create.topics.enable = true la creazione automatica di nuovi argomenti, utilizzando le opzioni fornite con l'istanza.

  • AWS DMS pubblica ogni aggiornamento di un singolo record nel database di origine come un unico record di dati (messaggio) in un determinato argomento Kafka indipendentemente dalle transazioni.

  • AWS DMS supporta i seguenti due moduli per le chiavi di partizione:

    • SchemaName.TableName: una combinazione del nome dello schema e quello della tabella.

    • ${AttributeName}: il valore di uno dei campi nel formato JSON o la chiave primaria della tabella del database di origine.

  • BatchApply non è supportato per un endpoint Kafka. L'utilizzo dell'applicazione in batch, ad esempio l'impostazione dell'attività dei metadati di destinazione BatchApplyEnabled, per una destinazione Kafka potrebbe causare la perdita di dati.

  • AWS DMS non supporta la migrazione di valori di tipo di BigInt dati con più di 16 cifre. Per aggirare questa limitazione puoi utilizzare la seguente regola di trasformazione per convertire la colonna BigInt in una stringa. Per ulteriori informazioni sulle regole di trasformazione, consulta Operazioni e regole di trasformazione.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Utilizzo della mappatura degli oggetti per la migrazione dei dati in un argomento Kafka

AWS DMS utilizza regole di mappatura delle tabelle per mappare i dati dall'argomento Kafka di origine a quello di destinazione. Per mappare i dati a un argomento di destinazione, è necessario utilizzare una regola di mappatura delle tabelle denominata mappatura degli oggetti. La mappatura degli oggetti consente di definire il modo in cui i record di dati nell'origine vengono mappati ai record di dati pubblicati nell'argomento Kafka.

Gli argomenti Kafka non dispongono di una struttura preimpostata oltre a una chiave di partizione.

Nota

Non è necessario utilizzare la mappatura degli oggetti. È possibile utilizzare la normale mappatura delle tabelle per varie trasformazioni. Tuttavia, il tipo di chiave della partizione segue questi comportamenti predefiniti:

  • La chiave primaria viene utilizzata come chiave di partizione per il pieno carico.

  • Se non vengono utilizzate impostazioni delle attività di applicazione parallela, schema.table viene utilizzato come chiave di partizione per CDC.

  • Se vengono utilizzate le impostazioni delle attività di applicazione parallela, la chiave primaria viene utilizzata come chiave di partizione per CDC.

Per creare una regola di mappatura degli oggetti, è necessario impostare il parametro rule-type su object-mapping. Questa regola specifica il tipo di mappatura degli oggetti da utilizzare.

Di seguito è riportata la struttura per la regola.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS attualmente supporta map-record-to-record e map-record-to-document è l'unico valore valido per il parametro. rule-action Queste impostazioni influiscono sui valori che non sono esclusi come parte dell'elenco degli attributi exclude-columns. I map-record-to-document valori map-record-to-record and specificano come AWS DMS gestisce questi record per impostazione predefinita. Questi valori non influiscono in alcun modo sulle mappature degli attributi.

Nel caso di migrazione da un database relazionale a un argomento Kafka utilizza map-record-to-record. Questo tipo di regola utilizza il valore taskResourceId.schemaName.tableName dal database relazionale come chiave di partizione nell'argomento Kafka e crea un attributo per ogni colonna nel database di origine.

Quando utilizzi map-record-to-record, tieni presente quanto segue:

  • Questa impostazione ha effetto solo sulle colonne escluse dall'elenco exclude-columns.

  • Per ogni colonna di questo tipo, AWS DMS crea un attributo corrispondente nell'argomento di destinazione.

  • AWS DMS crea questo attributo corrispondente indipendentemente dal fatto che la colonna di origine venga utilizzata in una mappatura degli attributi.

Per comprendere il funzionamento di map-record-to-record, è opportuno esaminarne il comportamento in azione. Per questo esempio, supponiamo che tu stia iniziando con una riga di tabella del database relazionale con la struttura e i dati seguenti.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Per eseguire la migrazione di queste informazioni da uno schema denominato Test verso un argomento Kafka, crea le regole per mappare i dati sull'argomento di destinazione. La regola seguente illustra la mappatura.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Dato un argomento Kafka e una chiave di partizione (in questo caso, taskResourceId.schemaName.tableName), il seguente esempio illustra il formato di record risultante per l'argomento di destinazione Kafka utilizzando i dati di esempio:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Ristrutturazione dei dati con la mappatura degli attributi

Utilizzando una mappa degli attributi puoi modificare la struttura dei dati mentre li stai migrando verso un argomento Kafka. Ad esempio, potresti voler combinare più campi nell'origine in un unico campo nella destinazione. La seguente mappa degli attributi illustra come ristrutturare i dati.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Per impostare un valore costante per partition-key, specificare un valore partition-key. Ad esempio, potresti eseguire questa operazione per forzare la memorizzazione di tutti i dati in una singola partizione. Questo approccio viene illustrato nella mappatura seguente.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Nota

Il valore partition-key per un record di controllo per una tabella specifica è TaskId.SchemaName.TableName. Il valore partition-key per un record di controllo per un'attività specifica è il TaskId del record. La specifica si un valore partition-key nella mappatura degli oggetti non influisce sul parametro partition-key per un record di controllo.

Replica di più argomenti con la mappatura degli oggetti

Per impostazione predefinita, AWS DMS le attività migrano tutti i dati di origine su uno degli argomenti di Kafka seguenti:

  • Come specificato nel campo Argomento dell'endpoint di destinazione. AWS DMS

  • quello specificato da kafka-default-topic se il campo Argomento dell'endpoint di destinazione non è compilato e l'impostazione Kafka auto.create.topics.enable è impostata su true.

Con le versioni AWS DMS del motore 3.4.6 e successive, è possibile utilizzare l'kafka-target-topicattributo per mappare ogni tabella di origine migrata su un argomento separato. Ad esempio, le regole di mappatura degli oggetti riportate di seguito migrano le tabelle di origine Customer e Address agli argomenti Kafka customer_topic e address_topic rispettivamente. Allo stesso tempo, AWS DMS migra tutte le altre tabelle di origine, inclusa la Bills tabella nello Test schema, all'argomento specificato nell'endpoint di destinazione.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Utilizzando la replica di più argomenti Kafka, è possibile raggruppare e migrare le tabelle di origine su argomenti Kafka separati utilizzando un'unica attività di replica.

Formato dei messaggi per Apache Kafka

L'output JSON è semplicemente un elenco di coppie chiave-valore.

RecordType

Il tipo di record può essere relativo ai dati o al controllo. I record di dati rappresentano le righe effettive nell'origine. I record di controllo sono per eventi importanti nel flusso, ad esempio un riavvio dell'attività.

Operazione

Per i record di dati, l'operazione può essere load, insert, update o delete.

Per i record di controllo, l'operazione può essere create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column o column-type-change.

SchemaName

Lo schema di origine per il record. Questo campo può essere vuoto per un record di controllo.

TableName

La tabella di origine per il record. Questo campo può essere vuoto per un record di controllo.

Timestamp

Il timestamp relativo al momento della creazione del messaggio JSON. Il campo viene formattato con il formato ISO 8601.

Il seguente esempio di messaggio JSON illustra un tipo di dati con tutti i metadati aggiuntivi.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

Il seguente esempio di messaggio JSON illustra un tipo di controllo.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }