

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Usare Apache Kafka come obiettivo per AWS Database Migration Service
<a name="CHAP_Target.Kafka"></a>

È possibile utilizzarlo AWS DMS per migrare i dati in un cluster Apache Kafka. Apache Kafka è una piattaforma di streaming distribuita. È possibile utilizzare Apache Kafka per l'inserimento e l'elaborazione dei dati di streaming in tempo reale.

AWS offre anche Amazon Managed Streaming for Apache Kafka (Amazon MSK) da utilizzare come destinazione. AWS DMS Amazon MSK è un servizio di streaming Apache Kafka completamente gestito che semplifica l'implementazione e la gestione delle istanze Apache Kafka. Funziona con le versioni open source di Apache Kafka e accedi alle istanze Amazon MSK come AWS DMS destinazioni esattamente come qualsiasi istanza di Apache Kafka. Per ulteriori informazioni, consulta [Che cos'è Amazon MSK?](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) nella *Guida per sviluppatori di Streaming gestito da Amazon per Apache Kafka*.

Un cluster Kafka archivia i flussi di record in categorie denominate argomenti che sono suddivisi in partizioni. Le *partizioni* sono sequenze di record di dati (messaggi) identificate in modo univoco in un argomento. Le partizioni possono essere distribuite tra più broker in un cluster per consentire l'elaborazione parallela dei record dell'argomento. Per ulteriori informazioni su argomenti e partizioni e la loro distribuzione in Apache Kafka, consulta [Argomenti e registri](https://kafka.apache.org/documentation/#intro_topics) e [Distribuzione](https://kafka.apache.org/documentation/#intro_distribution).

Il cluster Kafka può essere un'istanza Amazon MSK, un cluster in esecuzione su un'istanza Amazon EC2 o un cluster on-premise. Un'istanza Amazon MSK o un cluster su un'istanza Amazon EC2 può trovarsi nello stesso VPC o in uno diverso. Se il cluster è on-premise, è possibile utilizzare il server dei nomi on-premise dell'istanza di replica per risolvere il nome host del cluster. Per informazioni sulla configurazione di un server dei nomi per l'istanza di replica, consulta [Utilizzo del server dei nomi in locale](CHAP_BestPractices.md#CHAP_BestPractices.Rte53DNSResolver). Per ulteriori informazioni sulla configurazione di una rete, consulta [Configurazione di una rete per un'istanza di replica](CHAP_ReplicationInstance.VPC.md).

Quando utilizzi un cluster Amazon MSK, assicurati che il relativo gruppo di sicurezza consenta l'accesso dall'istanza di replica. Per informazioni sulla modifica del gruppo di sicurezza di un cluster Amazon MSK, consulta [Modifica del gruppo di sicurezza di un cluster Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/change-security-group.html).

AWS Database Migration Service pubblica i record relativi a un argomento di Kafka utilizzando JSON. Durante la conversione, AWS DMS serializza ogni record dal database di origine in una coppia attributo-valore in formato JSON.

È possibile usare la mappatura degli oggetti per migrare i dati da qualsiasi origine dati supportata a un cluster Kafka di destinazione. Con la mappatura degli oggetti, determini il modo in cui strutturare i record di dati nell'argomento di destinazione. Puoi inoltre definire una chiave di partizionamento per ogni tabella che viene utilizzata da Kafka per raggruppare i dati nelle sue partizioni. 

Attualmente, AWS DMS supporta un singolo argomento per attività. Per una singola attività con più tabelle, tutti i messaggi vengono indirizzati a un unico argomento. Ogni messaggio include una sezione di metadati che identifica lo schema e la tabella di destinazione. AWS DMS le versioni 3.4.6 e successive supportano la replica multitopica utilizzando la mappatura degli oggetti. Per ulteriori informazioni, consulta [Replica di più argomenti con la mappatura degli oggetti](#CHAP_Target.Kafka.MultiTopic).

**Impostazioni endpoint Apache Kafka**

È possibile specificare i dettagli della connessione tramite le impostazioni dell'endpoint nella AWS DMS console o l'`--kafka-settings`opzione nella CLI. I requisiti per ogni impostazione sono i seguenti:
+ `Broker`: specifica le posizioni di uno o più broker nel cluster Kafka sotto forma di elenco di voci separate da virgole per ogni `broker-hostname:port`. Un esempio è `"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"`. Questa impostazione può specificare le posizioni di uno o tutti i broker del cluster. Tutti i broker del cluster comunicano per gestire il partizionamento dei record di dati migrati verso l'argomento.
+ `Topic` (facoltativo): specifica il nome dell'argomento con una lunghezza massima di 255 lettere e simboli. È possibile utilizzare punto (.), carattere di sottolineatura (\$1) e segno meno (-). I nomi degli argomenti con un punto (.) o un carattere di sottolineatura (\$1) possono interferire con le strutture dati interne. Nel nome dell'argomento utilizzare uno dei due simboli ma non entrambi. Se non specificate il nome di un argomento, AWS DMS lo usa `"kafka-default-topic"` come argomento di migrazione.
**Nota**  
Se desiderate AWS DMS creare un argomento di migrazione specificato dall'utente o l'argomento predefinito, impostatelo `auto.create.topics.enable = true` come parte della configurazione del cluster Kafka. Per ulteriori informazioni, consulta [Limitazioni nell'utilizzo di Apache Kafka come destinazione per AWS Database Migration Service](#CHAP_Target.Kafka.Limitations)
+ `MessageFormat`: il formato di output per i record creati nell'endpoint. Il formato del messaggio è `JSON` (predefinito) o `JSON_UNFORMATTED` (una singola riga senza tabulazione).
+ `MessageMaxBytes`: la dimensione massima in byte per i record creati nell'endpoint. Il valore di default è 1.000.000.
**Nota**  
È possibile utilizzare solo AWS CLI/SDK per passare `MessageMaxBytes` a un valore non predefinito. Ad esempio utilizza il seguente comando per modificare l'endpoint Kafka esistente e cambiare `MessageMaxBytes`.  

  ```
  aws dms modify-endpoint --endpoint-arn your-endpoint 
  --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...",
  Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  ```
+ `IncludeTransactionDetails`: fornisce informazioni dettagliate sulle transazioni dal database di origine. Tali informazioni includono un timestamp di commit, una posizione nel log e valori per `transaction_id`, `previous_transaction_id` e `transaction_record_id`(l'offset del record all'interno di una transazione). Il valore predefinito è `false`.
+ `IncludePartitionValue`: mostra il valore della partizione nell'output dei messaggi di Kafka, a meno che il tipo della partizione non sia `schema-table-type`. Il valore predefinito è `false`.
+ `PartitionIncludeSchemaTable`: aggiunge ai nomi di schemi e tabelle il prefisso con i valori di partizione, quando il tipo di partizione è `primary-key-type`. In questo modo si aumenta la distribuzione dei dati tra le partizioni di Kafka. Ad esempio, si supponga che uno schema `SysBench` includa migliaia di tabelle e che ogni tabella faccia riferimento solo a un intervallo limitato di valori della chiave primaria. In questo caso, la stessa chiave primaria viene inviata da migliaia di tabelle alla stessa partizione, causando un rallentamento. Il valore predefinito è `false`.
+ `IncludeTableAlterOperations`: include tutte le operazioni DDL (Data Definition Language) che modificano i dati di controllo della tabella, ad esempio `rename-table`, `drop-table`, `add-column`, `drop-column` e `rename-column`. Il valore predefinito è `false`. 
+ `IncludeControlDetails`: mostra le informazioni dettagliate del controllo per la definizione di tabelle, la definizione di colonne e le modifiche di tabelle e colonne nell'output dei messaggi Kafka. Il valore predefinito è `false`.
+ `IncludeNullAndEmpty`: include le colonne vuote e NULL nella destinazione. Il valore predefinito è `false`.
+ `SecurityProtocol`: imposta una connessione sicura a un endpoint di destinazione Kafka utilizzando Transport Layer Security (TLS). Le opzioni includono `ssl-authentication`, `ssl-encryption` e `sasl-ssl`. L'utilizzo di `sasl-ssl` richiede `SaslUsername` e `SaslPassword`.
+ `SslEndpointIdentificationAlgorithm`— Imposta la verifica del nome host per il certificato. Questa impostazione è supportata nella AWS DMS versione 3.5.1 e successive. Le opzioni sono le seguenti: 
  + `NONE`: Disattiva la verifica del nome host del broker nella connessione del client.
  + `HTTPS`: Abilita la verifica del nome host del broker nella connessione del client.
+ `useLargeIntegerValue`— Usa fino a 18 cifre int invece di emettere int come doppi, disponibile a partire dalla AWS DMS versione 3.5.4. Il valore predefinito è false.

È possibile utilizzare le impostazioni per aumentare la velocità del trasferimento. Per farlo, AWS DMS supporta il pieno carico multithread in un cluster Apache Kafka di destinazione. AWS DMS supporta il multithreading con le impostazioni delle attività seguenti:
+ `MaxFullLoadSubTasks`— Utilizzate questa opzione per indicare il numero massimo di tabelle di origine da caricare in parallelo. AWS DMS carica ogni tabella nella tabella di destinazione Kafka corrispondente utilizzando una sottoattività dedicata. Il valore predefinito è 8; il valore il massimo è 49.
+ `ParallelLoadThreads`— Utilizzate questa opzione per specificare il numero di thread da utilizzare per caricare ogni tabella nella relativa tabella di destinazione Kafka. AWS DMS Il valore massimo per una destinazione Apache Kafka è 32. Puoi chiedere che questo limite massimo venga aumentato.
+ `ParallelLoadBufferSize`: utilizza questa opzione per specificare il numero massimo di record da archiviare nel buffer usato dai thread di caricamento parallelo per caricare i dati nella destinazione Kafka. Il valore predefinito è 50. Il valore massimo è 1.000. Utilizzare questo parametro con `ParallelLoadThreads`; `ParallelLoadBufferSize` è valido solo quando è presente più di un thread.
+ `ParallelLoadQueuesPerThread`: utilizza questa opzione per specificare il numero di code a cui accede ogni thread simultaneo per eliminare i record di dati dalle code e generare un carico batch per la destinazione. Il valore di default è 1. Il numero massimo è 512.

È possibile migliorare le prestazioni dell'acquisizione dei dati di modifica (CDC) per gli endpoint Kafka ottimizzando le impostazioni delle attività per thread paralleli e operazioni in blocco. A tale scopo, è possibile specificare il numero di thread simultanei, di code per thread e di record da memorizzare in un buffer utilizzando le impostazioni delle attività `ParallelApply*`. Ad esempio, si supponga di voler eseguire un carico CDC e applicare 128 thread in parallelo. Si desidera inoltre accedere a 64 code per thread, con 50 record memorizzati per buffer. 

Per promuovere le prestazioni del CDC, AWS DMS supporta le seguenti impostazioni delle attività:
+ `ParallelApplyThreads`— specifica il numero di thread simultanei che vengono AWS DMS utilizzati durante un caricamento CDC per inviare i record di dati a un endpoint di destinazione Kafka. Il valore predefinito è zero (0) e il valore massimo è 32.
+ `ParallelApplyBufferSize`: specifica il numero massimo di record da archiviare in ogni coda di buffer per eseguire il push dei thread simultanei a un endpoint di destinazione Kafka durante un carico CDC. Il valore predefinito è 100 e il valore massimo è 1.000. Utilizzare questa opzione quando `ParallelApplyThreads` specifica più di un thread. 
+ `ParallelApplyQueuesPerThread`: specifica il numero di code a cui ogni thread accede per eliminare i record di dati dalle code e generare un carico batch per un endpoint Kafka durante la CDC. Il valore di default è 1. Il numero massimo è 512.

Quando si utilizzano le impostazioni delle attività `ParallelApply*`, l'impostazione di `partition-key-type` predefinita è la `primary-key` della tabella, non `schema-name.table-name`.

## Connessione a Kafka utilizzando Transport Layer Security (TLS)
<a name="CHAP_Target.Kafka.TLS"></a>

Il cluster Kafka accetta solo connessioni protette con Transport Layer Security (TLS). Con DMS, è possibile utilizzare una qualsiasi delle seguenti tre opzioni di protocollo di sicurezza per proteggere la connessione degli endpoint Kafka.

**Crittografia SSL (`server-encryption`)**  
I client convalidano l'identità del server tramite il certificato del server. Quindi viene stabilita una connessione crittografata tra server e client.

**Autenticazione SSL (`mutual-authentication`)**  
Server e client convalidano l'identità reciprocamente tramite i propri certificati. Quindi viene stabilita una connessione crittografata tra server e client.

**SASL-SSL (`mutual-authentication`)**  
Il metodo Simple Authentication and Security Layer (SASL) sostituisce il certificato del client con un nome utente e una password per convalidare l'identità del client. In particolare, si forniscono un nome utente e una password registrati dal server in modo che il server possa convalidare l'identità del client. Quindi viene stabilita una connessione crittografata tra server e client.

**Importante**  
Apache Kafka e Amazon MSK accettano certificati risolti. Questa è una limitazione nota di Kafka e Amazon MSK da risolvere. Per ulteriori informazioni, consulta [Apache Kafka issues, KAFKA-3700](https://issues.apache.org/jira/browse/KAFKA-3700).  
Se utilizzi Amazon MSK, prendi in considerazione l'utilizzo delle liste di controllo degli accessi (ACLs) come soluzione alternativa a questa limitazione nota. Per ulteriori informazioni sull'utilizzo ACLs, consulta la sezione [Apache Kafka ACLs](https://docs.aws.amazon.com//msk/latest/developerguide/msk-acls.html) della *Amazon Managed Streaming for Apache Kafka Developer* Guide.  
Se utilizzi un cluster Kafka autogestito, consulta il [commento datato 21 ottobre 2018](https://issues.apache.org/jira/browse/KAFKA-3700?focusedCommentId=16658376) per informazioni sulla configurazione del cluster.

### Utilizzo della crittografia SSL con Amazon MSK o un cluster Kafka autogestito
<a name="CHAP_Target.Kafka.TLS.SSLencryption"></a>

È possibile utilizzare la crittografia SSL per proteggere la connessione di un endpoint ad Amazon MSK o a un cluster Kafka autogestito. Quando usi il metodo di autenticazione con crittografia SSL, i client convalidano l'identità di un server tramite il certificato del server. Quindi viene stabilita una connessione crittografata tra server e client.

**Per utilizzare la crittografia SSL per connettersi ad Amazon MSK**
+ Configura l'impostazione dell'endpoint del protocollo di sicurezza (`SecurityProtocol`) utilizzando l'opzione `ssl-encryption` quando crei l'endpoint Kafka di destinazione. 

  L'esempio JSON che segue imposta il protocollo di sicurezza come crittografia SSL.

```
"KafkaSettings": {
    "SecurityProtocol": "ssl-encryption", 
}
```

**Per utilizzare la crittografia SSL per un cluster Kafka autogestito**

1. Se utilizzi un'autorità di certificazione (CA, Certification Authority) privata nel cluster Kafka on-premise, carica il certificato CA privato e ottieni un nome della risorsa Amazon (ARN). 

1. Configura l'impostazione dell'endpoint del protocollo di sicurezza (`SecurityProtocol`) utilizzando l'opzione `ssl-encryption` quando crei l'endpoint Kafka di destinazione. L'esempio JSON che segue imposta il protocollo di sicurezza come `ssl-encryption`.

   ```
   "KafkaSettings": {
       "SecurityProtocol": "ssl-encryption", 
   }
   ```

1. Se utilizzi una CA privata, imposta `SslCaCertificateArn` nell'ARN che hai ottenuto nella prima fase precedente.

### Utilizzo dell'autenticazione SSL
<a name="CHAP_Target.Kafka.TLS.SSLauthentication"></a>

È possibile utilizzare l'autenticazione SSL per proteggere la connessione di un endpoint ad Amazon MSK o a un cluster Kafka autogestito.

Per abilitare l'autenticazione e la crittografia del client utilizzando l'autenticazione SSL per la connessione ad Amazon MSK, procedi come segue:
+ Prepara una chiave privata e un certificato pubblico per Kafka.
+ Carica i certificati nella gestione certificati DMS.
+ Crea un endpoint di destinazione Kafka con il certificato corrispondente specificato nelle impostazioni dell'endpoint Kafka. ARNs 

**Per preparare una chiave privata e un certificato pubblico per Amazon MSK**

1. Crea un'istanza EC2 e configura un client per utilizzare l'autenticazione come descritto nelle fasi da 1 a 9 della sezione [Client Authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html) della *Guida per sviluppatori di Streaming gestito da Amazon per Apache Kafka*.

   Dopo aver completato queste fasi, avrai un Certificate-ARN (l'ARN del certificato pubblico salvato in ACM) e una chiave privata contenuta in un file `kafka.client.keystore.jks`.

1. Recupera il certificato pubblico e copia il certificato nel file `signed-certificate-from-acm.pem` utilizzando il comando seguente:

   ```
   aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN
   ```

   Il comando restituisce informazioni simili a quelle mostrate nell'esempio seguente:

   ```
   {"Certificate": "123", "CertificateChain": "456"}
   ```

   Quindi copia l'equivalente di `"123"` nel file `signed-certificate-from-acm.pem`.

1. Recupera la chiave privata importando la chiave `msk-rsa` da `kafka.client.keystore.jks to keystore.p12`, come mostrato nel seguente esempio.

   ```
   keytool -importkeystore \
   -srckeystore kafka.client.keystore.jks \
   -destkeystore keystore.p12 \
   -deststoretype PKCS12 \
   -srcalias msk-rsa-client \
   -deststorepass test1234 \
   -destkeypass test1234
   ```

1. Utilizza il comando seguente per esportare `keystore.p12` nel formato `.pem`. 

   ```
   Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts
   ```

   Viene visualizzato il messaggio **Enter PEM pass phrase** che richiede la chiave applicata per crittografare il certificato.

1. Rimuovi gli attributi contenitore e gli attributi chiave dal file `.pem` per assicurarti che la prima riga inizi con la stringa seguente.

   ```
                                   ---BEGIN ENCRYPTED PRIVATE KEY---
   ```

**Per caricare un certificato pubblico e una chiave privata nella gestione certificati DMS e testare la connessione ad Amazon MSK**

1. Carica nella gestione certificati DMS utilizzando il comando seguente.

   ```
   aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert
   aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
   ```

1. Crea un endpoint di destinazione Amazon MSK e verifica la connessione per assicurarti che l'autenticazione TLS funzioni.

   ```
   aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings 
   '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", 
   "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:",
   "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}'
   aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
   ```

**Importante**  
È possibile utilizzare l'autenticazione SSL per proteggere una connessione a un cluster Kafka autogestito. In alcuni casi, si potrebbe utilizzare un'autorità di certificazione (CA) privata nel cluster Kafka on-premise. In tal caso, carica la catena di CA, il certificato pubblico e la chiave privata nella gestione certificati DMS. Quindi, utilizza il corrispondente nome della risorsa Amazon (ARN) nelle impostazioni degli endpoint quando crei l'endpoint di destinazione Kafka on-premise.

**Per preparare una chiave privata e un certificato firmato per un cluster Kafka autogestito**

1. Genera una coppia di chiavi come nel seguente esempio.

   ```
   keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password 
   -keypass your-key-passphrase -dname "CN=your-cn-name" 
   -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
   ```

1. Genera una richiesta di firma del certificato (CSR, Certificate Sign Request). 

   ```
   keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password 
   -keypass your-key-password
   ```

1. Usa la CA nel truststore del cluster per firmare la CSR. Se non disponi di una CA, puoi creare una CA privata.

   ```
   openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days                            
   ```

1. Importa `ca-cert` nel truststore e nel keystore del server. Se non si dispone di un truststore, utilizza il comando seguente per crearlo e importare `ca-cert `. 

   ```
   keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
   keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
   ```

1. Firma il certificato.

   ```
   openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem 
   -days validate-days -CAcreateserial -passin pass:ca-password
   ```

1. Importa il certificato firmato nel keystore.

   ```
   keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password 
   -keypass your-key-password
   ```

1. Utilizza il seguente comando per importare la chiave `on-premise-rsa` da `kafka.server.keystore.jks` a `keystore.p12`.

   ```
   keytool -importkeystore \
   -srckeystore kafka.server.keystore.jks \
   -destkeystore keystore.p12 \
   -deststoretype PKCS12 \
   -srcalias on-premise-rsa \
   -deststorepass your-truststore-password \
   -destkeypass your-key-password
   ```

1. Utilizza il comando seguente per esportare `keystore.p12` nel formato `.pem`.

   ```
   Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
   ```

1. Carica `encrypted-private-server-key.pem` e `signed-certificate.pem` e `ca-cert` nella gestione certificati DMS.

1. Crea un endpoint utilizzando il valore restituito. ARNs

   ```
   aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings 
   '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", 
   "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", 
   "SslCaCertificateArn": "your-ca-certificate-arn"}'
                               
   aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
   ```

### Utilizzo dell'autenticazione SASL-SSL per connettersi ad Amazon MSK
<a name="CHAP_Target.Kafka.TLS.SSL-SASL"></a>

Il metodo Simple Authentication and Security Layer (SASL) utilizza un nome utente e una password per convalidare l'identità di un client e stabilisce una connessione crittografata tra server e client.

Per utilizzare SASL, devi prima creare un nome utente e una password sicuri quando configuri il cluster Amazon MSK. Per una descrizione di come configurare un nome utente e una password sicuri per un cluster Amazon MSK, consulta [Configurazione dell' SASL/SCRAM autenticazione per un cluster Amazon MSK nella Amazon Managed Streaming for](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-tutorial) *Apache Kafka Developer Guide*.

Quindi, quando crei l'endpoint di destinazione Kafka, imposta l'impostazione dell'endpoint del protocollo di sicurezza (`SecurityProtocol`) utilizzando l'opzione `sasl-ssl`. Imposti anche le opzioni `SaslUsername` e `SaslPassword`. Assicurati che siano coerenti con il nome utente e la password sicuri creati durante la configurazione del cluster Amazon MSK, come mostrato nel seguente esempio JSON.

```
                   
"KafkaSettings": {
    "SecurityProtocol": "sasl-ssl",
    "SaslUsername":"Amazon MSK cluster secure user name",
    "SaslPassword":"Amazon MSK cluster secure password"                    
}
```

**Nota**  
Attualmente, AWS DMS supporta solo SASL-SSL pubblico supportato da CA. DMS non supporta SASL-SSL per l'uso con Kafka autogestito supportato da una CA privata.
Per l'autenticazione SASL-SSL, supporta il meccanismo SCRAM-SHA-512 per impostazione predefinita. AWS DMS AWS DMS le versioni 3.5.0 e successive supportano anche il meccanismo Plain. Per supportare il meccanismo Plain, imposta il parametro `SaslMechanism` del tipo di dati API `KafkaSettings` su `PLAIN`. Il tipo di dati `PLAIN` è supportato da Kafka, ma non da Amazon Kafka (MSK).

## Utilizzo di un'immagine precedente per visualizzare i valori originali delle righe CDC per Apache Kafka come destinazione
<a name="CHAP_Target.Kafka.BeforeImage"></a>

Quando si scrivono aggiornamenti CDC su una destinazione di streaming dati come Kafka, è possibile visualizzare i valori originali di una riga di database di origine prima di apportare modifiche da un aggiornamento. Per rendere possibile ciò, AWS DMS compila un'*immagine precedente degli eventi di aggiornamento in base ai dati forniti* dal motore di database di origine. 

Diversi motori di database di origine forniscono diverse quantità di informazioni per un'immagine precedente: 
+ Oracle fornisce aggiornamenti alle colonne solo se cambiano. 
+ PostgreSQL fornisce solo i dati per le colonne che fanno parte della chiave primaria (modificata o meno). Se è in uso la replica logica e REPLICA IDENTITY FULL è impostato per la tabella di origine, è possibile ottenere informazioni complete prima e dopo sulla riga scritta WALs e disponibile qui.
+ MySQL generalmente fornisce dati per tutte le colonne (modificate o meno).

Per consentire prima dell'imaging di aggiungere valori originali dal database di origine all'output AWS DMS , utilizzare l'impostazione dell'attività `BeforeImageSettings` o il parametro `add-before-image-columns`. Questo parametro applica una regola di trasformazione della colonna. 

`BeforeImageSettings` aggiunge un nuovo attributo JSON a ogni operazione di aggiornamento con valori raccolti dal sistema di database di origine, come illustrato di seguito.

```
"BeforeImageSettings": {
    "EnableBeforeImage": boolean,
    "FieldName": string,  
    "ColumnFilter": pk-only (default) / non-lob / all (but only one)
}
```

**Nota**  
Applicare `BeforeImageSettings` alle attività CDC a pieno carico e alle attività CDC (che eseguono la migrazione dei dati esistenti e replicano le modifiche in corso) o solo alle attività CDC (che replicano solo le modifiche dei dati). Non applicare `BeforeImageSettings` alle attività a pieno carico.

Per le opzioni `BeforeImageSettings`, si applica quanto segue:
+ Impostare l'opzione `EnableBeforeImage` su `true` da abilitare prima dell'imaging. Il valore predefinito è `false`. 
+ Utilizzare l'opzione `FieldName` per assegnare un nome al nuovo attributo JSON. Quando `EnableBeforeImage` è `true`, `FieldName` è richiesto e non può essere vuoto.
+ L'opzione `ColumnFilter` specifica una colonna da aggiungere utilizzando l'imaging precedente. Per aggiungere solo colonne che fanno parte delle chiavi primarie della tabella, utilizzare il valore predefinito, `pk-only`. Per aggiungere solo colonne non di tipo LOB, utilizzare `non-lob`. Per aggiungere qualsiasi colonna con un valore immagine prima, utilizzare `all`. 

  ```
  "BeforeImageSettings": {
      "EnableBeforeImage": true,
      "FieldName": "before-image",
      "ColumnFilter": "pk-only"
    }
  ```

### Utilizzo di una regola di trasformazione dell'immagine precedente
<a name="CHAP_Target.Kafka.BeforeImage.Transform-Rule"></a>

In alternativa alle impostazioni delle attività, è possibile utilizzare il parametro `add-before-image-columns`, che applica una regola di trasformazione delle colonne. Con questo parametro, è possibile abilitare l'imaging precedente durante il CDC su destinazioni di streaming dati come Kafka.

Utilizzando `add-before-image-columns` in una una regola di trasformazione, è possibile applicare un controllo più dettagliato dei risultati dell'immagine precedente. Le regole di trasformazione consentono di utilizzare un localizzatore di oggetti che consente di controllare le tabelle selezionate per la regola. Inoltre, è possibile concatenare le regole di trasformazione, consentendo l'applicazione di regole diverse a tabelle diverse. È quindi possibile manipolare le colonne prodotte utilizzando altre regole. 

**Nota**  
Non utilizzare il parametro `add-before-image-columns` insieme all'impostazione dell'attività `BeforeImageSettings` all'interno della stessa attività. Utilizzare invece il parametro o l'impostazione, ma non entrambi, per una singola attività.

Un tipo di regola `transformation` regola con il parametro `add-before-image-columns` per una colonna deve fornire una sezione `before-image-def`. Di seguito viene riportato un esempio.

```
    {
      "rule-type": "transformation",
      …
      "rule-target": "column",
      "rule-action": "add-before-image-columns",
      "before-image-def":{
        "column-filter": one-of  (pk-only / non-lob / all),
        "column-prefix": string,
        "column-suffix": string,
      }
    }
```

Il valore di `column-prefix` viene anteposto a un nome di colonna e il valore predefinito di `column-prefix` è `BI_`. Il valore di `column-suffix` viene aggiunto al nome della colonna e il valore predefinito è vuoto. Non impostare sia `column-prefix` sia `column-suffix` sull'opzione per svuotare le stringhe.

Scegliere un valore per `column-filter`. Per aggiungere solo colonne che fanno parte delle chiavi primarie della tabella, scegliere `pk-only` . Scegliere `non-lob` per aggiungere solo colonne non di tipo LOB. Oppure scegliere `all` per aggiungere qualsiasi colonna con un valore immagine precedente.

### Esempio di una regola di trasformazione dell'immagine precedente
<a name="CHAP_Target.Kafka.BeforeImage.Example"></a>

La regola di trasformazione nell'esempio seguente aggiunge una nuova colonna chiamata `BI_emp_no` nella destinazione. Quindi una dichiarazione come `UPDATE employees SET emp_no = 3 WHERE emp_no = 1;` popola il campo `BI_emp_no` con 1. Quando si scrivono aggiornamenti CDC alle destinazioni Amazon S3, la colonna `BI_emp_no` indica quale riga originale è stata aggiornata.

```
{
  "rules": [
    {
      "rule-type": "selection",
      "rule-id": "1",
      "rule-name": "1",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "include"
    },
    {
      "rule-type": "transformation",
      "rule-id": "2",
      "rule-name": "2",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "employees"
      },
      "rule-action": "add-before-image-columns",
      "before-image-def": {
        "column-prefix": "BI_",
        "column-suffix": "",
        "column-filter": "pk-only"
      }
    }
  ]
}
```

Per informazioni sull'utilizzo dell'operazione della regola `add-before-image-columns`, consulta [Operazioni e regole di trasformazione](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md).

## Limitazioni nell'utilizzo di Apache Kafka come destinazione per AWS Database Migration Service
<a name="CHAP_Target.Kafka.Limitations"></a>

Utilizzando Apache Kafka come destinazione valgono le seguenti limitazioni:
+ AWS DMS Gli endpoint target Kafka non supportano il controllo degli accessi IAM per Amazon Managed Streaming for Apache Kafka (Amazon MSK).
+ La modalità LOB completa non è supportata.
+ Specificate un file di configurazione Kafka per il vostro cluster con proprietà che consentano di creare automaticamente nuovi argomenti. AWS DMS Includere l'impostazione `auto.create.topics.enable = true`. Se si utilizza Amazon MSK, è possibile specificare la configurazione predefinita alla creazione del cluster Kafka, quindi modificare l'impostazione `auto.create.topics.enable` su `true`. Per ulteriori informazioni sulle impostazioni di configurazione predefinite, consulta [La configurazione predefinita di Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html) nella *Guida per gli sviluppatori di Streaming gestito da Amazon per Apache Kafka*. Se devi modificare un cluster Kafka esistente creato utilizzando Amazon MSK, esegui il AWS CLI comando `aws kafka create-configuration` per aggiornare la configurazione di Kafka, come nell'esempio seguente:

  ```
  14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration
  {
      "LatestRevision": {
          "Revision": 1,
          "CreationTime": "2019-09-06T14:39:37.708Z"
      },
      "CreationTime": "2019-09-06T14:39:37.708Z",
      "Name": "kafka-configuration",
      "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3"
  }
  ```

  Qui, `//~/kafka_configuration` è il file di configurazione creato con le impostazioni delle proprietà richieste.

  Se utilizzi la tua istanza Kafka installata su Amazon EC2, modifica la configurazione del cluster Kafka con l'impostazione per AWS DMS consentire `auto.create.topics.enable = true` la creazione automatica di nuovi argomenti, utilizzando le opzioni fornite con l'istanza.
+ AWS DMS pubblica ogni aggiornamento di un singolo record nel database di origine come un unico record di dati (messaggio) in un determinato argomento Kafka indipendentemente dalle transazioni.
+ AWS DMS supporta i seguenti quattro moduli per le chiavi di partizione:
  + `SchemaName.TableName`: una combinazione del nome dello schema e quello della tabella.
  + `${AttributeName}`: il valore di uno dei campi nel formato JSON o la chiave primaria della tabella del database di origine.
  + `transaction-id`: L'ID della transazione CDC. Tutti i record all'interno della stessa transazione vengono inseriti nella stessa partizione.
  + `constant`: un valore letterale fisso per ogni record indipendentemente dalla tabella o dai dati. Tutti i record vengono inviati allo stesso valore della chiave di partizione «costante», che fornisce un ordinamento globale rigoroso in tutte le tabelle.

  ```
  {
      "rule-type": "object-mapping",
      "rule-id": "2",
      "rule-name": "TransactionIdPartitionKey",
      "rule-action": "map-record-to-document",
      "object-locator": {
          "schema-name": "onprem",
          "table-name": "it_system"
      },
      "mapping-parameters": {
          "partition-key-type": "transaction-id | constant | attribute-name | schema-table"
      }
  }
  ```
+ L'impostazione dell'`IncludeTransactionDetails`endpoint è supportata solo quando l'endpoint di origine è Oracle, SQL Server, PostgreSQL o MySQL. Per altri tipi di endpoint di origine, i dettagli della transazione non verranno inclusi.
+ `BatchApply` non è supportato per un endpoint Kafka. L'utilizzo dell'applicazione in batch, ad esempio l'impostazione dell'attività dei metadati di destinazione `BatchApplyEnabled`, per una destinazione Kafka potrebbe causare la perdita di dati.
+ AWS DMS non supporta la migrazione di valori di tipo di `BigInt` dati con più di 16 cifre. Per aggirare questa limitazione puoi utilizzare la seguente regola di trasformazione per convertire la colonna `BigInt` in una stringa. Per ulteriori informazioni sulle regole di trasformazione, consulta [Operazioni e regole di trasformazione](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md).

  ```
  {
      "rule-type": "transformation",
      "rule-id": "id",
      "rule-name": "name",
      "rule-target": "column",
      "object-locator": {
          "schema-name": "valid object-mapping rule action",
          "table-name": "",
          "column-name": ""
      },
      "rule-action": "change-data-type",
      "data-type": {
          "type": "string",
          "length": 20
      }
  }
  ```
+ AWS DMS Gli endpoint target Kafka non supportano Amazon MSK servless.
+ Quando si definiscono regole di mappatura, non sono supportate sia la regola di mappatura degli oggetti che una regola di trasformazione. È necessario impostare una sola regola. 
+ AWS DMS supporta l'autenticazione SASL per le versioni di Apache Kafka fino alla 3.8. Se utilizzi Kafka 4.0 o versioni successive, puoi connetterti solo senza autenticazione SASL.
+ AWS DMS non supporta i dati di origine contenenti `'\0'` caratteri incorporati quando si utilizza Kafka come endpoint di destinazione. I dati contenenti `'\0'` caratteri incorporati verranno troncati al primo carattere. `'\0'`

## Utilizzo della mappatura degli oggetti per la migrazione dei dati in un argomento Kafka
<a name="CHAP_Target.Kafka.ObjectMapping"></a>

AWS DMS utilizza regole di mappatura delle tabelle per mappare i dati dall'argomento Kafka di origine a quello di destinazione. Per mappare i dati a un argomento di destinazione, è necessario utilizzare una regola di mappatura delle tabelle denominata mappatura degli oggetti. La mappatura degli oggetti consente di definire il modo in cui i record di dati nell'origine vengono mappati ai record di dati pubblicati nell'argomento Kafka. 

Gli argomenti Kafka non dispongono di una struttura preimpostata oltre a una chiave di partizione.

**Nota**  
Non è necessario utilizzare la mappatura degli oggetti. È possibile utilizzare la normale mappatura delle tabelle per varie trasformazioni. Tuttavia, il tipo di chiave della partizione segue questi comportamenti predefiniti:   
La chiave primaria viene utilizzata come chiave di partizione per il pieno carico.
Se non vengono utilizzate le impostazioni delle attività di applicazione parallela, `schema.table` viene usato come chiave di partizione per la CDC.
Se vengono utilizzate le impostazioni delle attività di applicazione parallela, la chiave primaria viene utilizzata come chiave di partizione per CDC.

Per creare una regola di mappatura degli oggetti, è necessario impostare il parametro `rule-type` su `object-mapping`. Questa regola specifica il tipo di mappatura degli oggetti da utilizzare. 

Di seguito è riportata la struttura per la regola.

```
{
    "rules": [
        {
            "rule-type": "object-mapping",
            "rule-id": "id",
            "rule-name": "name",
            "rule-action": "valid object-mapping rule action",
            "object-locator": {
                "schema-name": "case-sensitive schema name",
                "table-name": ""
            }
        }
    ]
}
```

AWS DMS attualmente supporta `map-record-to-record` e `map-record-to-document` è l'unico valore valido per il parametro. `rule-action` Queste impostazioni influiscono sui valori che non sono esclusi come parte dell'elenco degli attributi `exclude-columns`. I `map-record-to-document` valori `map-record-to-record` and specificano come AWS DMS gestisce questi record per impostazione predefinita. Questi valori non influiscono in alcun modo sulle mappature degli attributi. 

Nel caso di migrazione da un database relazionale a un argomento Kafka utilizza `map-record-to-record`. Questo tipo di regola utilizza il valore `taskResourceId.schemaName.tableName` dal database relazionale come chiave di partizione nell'argomento Kafka e crea un attributo per ogni colonna nel database di origine. 

Quando utilizzi `map-record-to-record`, tieni presente quanto segue:
+ Questa impostazione ha effetto solo sulle colonne escluse dall'elenco `exclude-columns`.
+ Per ogni colonna di questo tipo, AWS DMS crea un attributo corrispondente nell'argomento di destinazione.
+ AWS DMS crea questo attributo corrispondente indipendentemente dal fatto che la colonna di origine venga utilizzata in una mappatura degli attributi. 

Per comprendere il funzionamento di `map-record-to-record`, è opportuno esaminarne il comportamento in azione. Per questo esempio, supponiamo che tu stia iniziando con una riga di tabella del database relazionale con la struttura e i dati seguenti.


| FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth | 
| --- | --- | --- | --- | --- | --- | --- | --- | 
| Randy | Marsh | 5 | 221B Baker Street | 1234567890 | 31 Spooner Street, Quahog  | 9876543210 | 02/29/1988 | 

Per eseguire la migrazione di queste informazioni da uno schema denominato `Test` verso un argomento Kafka, crea le regole per mappare i dati sull'argomento di destinazione. La regola seguente illustra la mappatura. 

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "DefaultMapToKafka",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            }
        }
    ]
}
```

Dato un argomento Kafka e una chiave di partizione (in questo caso, `taskResourceId.schemaName.tableName`), il seguente esempio illustra il formato di record risultante per l'argomento di destinazione Kafka utilizzando i dati di esempio: 

```
  {
     "FirstName": "Randy",
     "LastName": "Marsh",
     "StoreId":  "5",
     "HomeAddress": "221B Baker Street",
     "HomePhone": "1234567890",
     "WorkAddress": "31 Spooner Street, Quahog",
     "WorkPhone": "9876543210",
     "DateOfBirth": "02/29/1988"
  }
```

**Topics**
+ [Ristrutturazione dei dati con la mappatura degli attributi](#CHAP_Target.Kafka.AttributeMapping)
+ [Replica di più argomenti con la mappatura degli oggetti](#CHAP_Target.Kafka.MultiTopic)
+ [Formato dei messaggi per Apache Kafka](#CHAP_Target.Kafka.Messageformat)

### Ristrutturazione dei dati con la mappatura degli attributi
<a name="CHAP_Target.Kafka.AttributeMapping"></a>

Utilizzando una mappa degli attributi puoi modificare la struttura dei dati mentre li stai migrando verso un argomento Kafka. Ad esempio, potresti voler combinare più campi nell'origine in un unico campo nella destinazione. La seguente mappa degli attributi illustra come ristrutturare i dati.

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "TransformToKafka",
            "rule-action": "map-record-to-record",
            "target-table-name": "CustomerData",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            },
            "mapping-parameters": {
                "partition-key-type": "attribute-name",
                "partition-key-name": "CustomerName",
                "exclude-columns": [
                    "firstname",
                    "lastname",
                    "homeaddress",
                    "homephone",
                    "workaddress",
                    "workphone"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "CustomerName",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${lastname}, ${firstname}"
                    },
                    {
                        "target-attribute-name": "ContactDetails",
                        "attribute-type": "document",
                        "attribute-sub-type": "json",
                        "value": {
                            "Home": {
                                "Address": "${homeaddress}",
                                "Phone": "${homephone}"
                            },
                            "Work": {
                                "Address": "${workaddress}",
                                "Phone": "${workphone}"
                            }
                        }
                    }
                ]
            }
        }
    ]
}
```

Per impostare un valore costante per`partition-key`, specifica`"partition-key-type: "constant"`, questo imposta il valore della partizione su. `constant` Ad esempio, potresti eseguire questa operazione per forzare la memorizzazione di tutti i dati in una singola partizione. Questo approccio viene illustrato nella mappatura seguente. 

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "1",
            "rule-name": "TransformToKafka",
            "rule-action": "map-record-to-document",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customer"
            },
            "mapping-parameters": {
                "partition-key-type": "constant",
                "exclude-columns": [
                    "FirstName",
                    "LastName",
                    "HomeAddress",
                    "HomePhone",
                    "WorkAddress",
                    "WorkPhone"
                ],
                "attribute-mappings": [
                    {
                        "attribute-name": "CustomerName",
                        "value": "${FirstName},${LastName}"
                    },
                    {
                        "attribute-name": "ContactDetails",
                        "value": {
                            "Home": {
                                "Address": "${HomeAddress}",
                                "Phone": "${HomePhone}"
                            },
                            "Work": {
                                "Address": "${WorkAddress}",
                                "Phone": "${WorkPhone}"
                            }
                        }
                    },
                    {
                        "attribute-name": "DateOfBirth",
                        "value": "${DateOfBirth}"
                    }
                ]
            }
        }
    ]
}
```

**Nota**  
Il valore `partition-key` per un record di controllo per una tabella specifica è `TaskId.SchemaName.TableName`. Il valore `partition-key` per un record di controllo per un'attività specifica è il `TaskId` del record. La specifica si un valore `partition-key` nella mappatura degli oggetti non influisce sul parametro `partition-key` per un record di controllo.  
 Quando `partition-key-type` è impostato su `attribute-name` in una regola di mappatura della tabella, è necessario specificare`partition-key-name`, che deve fare riferimento a una colonna della tabella di origine o a una colonna personalizzata definita nella mappatura. Inoltre, `attribute-mappings` deve essere fornito per definire il modo in cui le colonne di origine vengono mappate all'argomento Kafka di destinazione.

### Replica di più argomenti con la mappatura degli oggetti
<a name="CHAP_Target.Kafka.MultiTopic"></a>

Per impostazione predefinita, AWS DMS le attività migrano tutti i dati di origine su uno degli argomenti di Kafka seguenti:
+ Come specificato nel campo **Argomento** dell'endpoint di destinazione. AWS DMS 
+ quello specificato da `kafka-default-topic` se il campo **Argomento** dell'endpoint di destinazione non è compilato e l'impostazione Kafka `auto.create.topics.enable` è impostata su `true`.

Con le versioni AWS DMS del motore 3.4.6 e successive, è possibile utilizzare l'`kafka-target-topic`attributo per mappare ogni tabella di origine migrata su un argomento separato. Ad esempio, le regole di mappatura degli oggetti riportate di seguito migrano le tabelle di origine `Customer` e `Address` agli argomenti Kafka `customer_topic` e `address_topic` rispettivamente. Allo stesso tempo, AWS DMS migra tutte le altre tabelle di origine, inclusa la `Bills` tabella nello `Test` schema, all'argomento specificato nell'endpoint di destinazione.

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "MapToKafka1",
            "rule-action": "map-record-to-record",
            "kafka-target-topic": "customer_topic",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customer" 
            },
            "partition-key-type": "constant"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "3",
            "rule-name": "MapToKafka2",
            "rule-action": "map-record-to-record",
            "kafka-target-topic": "address_topic",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Address"
            },
            "partition-key-type": "constant"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "4",
            "rule-name": "DefaultMapToKafka",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Bills"
            }
        }
    ]
}
```

Utilizzando la replica di più argomenti Kafka, è possibile raggruppare e migrare le tabelle di origine su argomenti Kafka separati utilizzando un'unica attività di replica.

### Formato dei messaggi per Apache Kafka
<a name="CHAP_Target.Kafka.Messageformat"></a>

L'output JSON è semplicemente un elenco di coppie chiave-valore. 

**RecordType**  
Il tipo di record può essere relativo ai dati o al controllo. I *record di dati *rappresentano le righe effettive nell'origine. I *record di controllo* sono per eventi importanti nel flusso, ad esempio un riavvio dell'attività.

**Operation**  
Per i record di dati, l'operazione può essere `load`, `insert`, `update` o `delete`.  
Per i record di controllo, l'operazione può essere `create-table`, `rename-table`, `drop-table`, `change-columns`, `add-column`, `drop-column`, `rename-column` o `column-type-change`.

**SchemaName**  
Lo schema di origine per il record. Questo campo può essere vuoto per un record di controllo.

**TableName**  
La tabella di origine per il record. Questo campo può essere vuoto per un record di controllo.

**Time stamp**  
Il timestamp relativo al momento della creazione del messaggio JSON. Il campo viene formattato con il formato ISO 8601.

Il seguente esempio di messaggio JSON illustra un tipo di dati con tutti i metadati aggiuntivi.

```
{ 
   "data":{ 
      "id":100000161,
      "fname":"val61s",
      "lname":"val61s",
      "REGION":"val61s"
   },
   "metadata":{ 
      "timestamp":"2019-10-31T22:53:59.721201Z",
      "record-type":"data",
      "operation":"insert",
      "partition-key-type":"primary-key",
      "partition-key-value":"sbtest.sbtest_x.100000161",
      "schema-name":"sbtest",
      "table-name":"sbtest_x",
      "transaction-id":9324410911751,
      "transaction-record-id":1,
      "prev-transaction-id":9324410910341,
      "prev-transaction-record-id":10,
      "commit-timestamp":"2019-10-31T22:53:55.000000Z",
      "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209"
   }
}
```

Il seguente esempio di messaggio JSON illustra un tipo di controllo.

```
{ 
   "control":{ 
      "table-def":{ 
         "columns":{ 
            "id":{ 
               "type":"WSTRING",
               "length":512,
               "nullable":false
            },
            "fname":{ 
               "type":"WSTRING",
               "length":255,
               "nullable":true
            },
            "lname":{ 
               "type":"WSTRING",
               "length":255,
               "nullable":true
            },
            "REGION":{ 
               "type":"WSTRING",
               "length":1000,
               "nullable":true
            }
         },
         "primary-key":[ 
            "id"
         ],
         "collation-name":"latin1_swedish_ci"
      }
   },
   "metadata":{ 
      "timestamp":"2019-11-21T19:14:22.223792Z",
      "record-type":"control",
      "operation":"create-table",
      "partition-key-type":"task-id",
      "schema-name":"sbtest",
      "table-name":"sbtest_t1"
   }
}
```