아파치 카프카를 타겟으로 사용 AWS Database Migration Service - AWS 데이터베이스 마이그레이션 서비스

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

아파치 카프카를 타겟으로 사용 AWS Database Migration Service

를 사용하여 데이터를 Apache AWS DMS Kafka 클러스터로 마이그레이션할 수 있습니다. Apache Kafka는 분산 스트리밍 플랫폼입니다. Apache Kafka를 사용하여 스트리밍 데이터의 실시간 수집 및 처리를 수행할 수 있습니다.

AWS 또한 타겟으로 사용할 수 있는 Apache Kafka용 아마존 매니지드 스트리밍 (Amazon MSK) 도 제공합니다. AWS DMS Amazon MSK는 Apache Kafka 인스턴스의 구현 및 관리를 단순화하는 완전 관리형 Apache Kafka 스트리밍 서비스입니다. 오픈 소스 Apache Kafka 버전에서 작동하며, 다른 Apache Kafka 인스턴스와 마찬가지로 Amazon MSK 인스턴스에 AWS DMS 대상으로 액세스할 수 있습니다. 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서Amazon MSK란 무엇인가요?를 참조하세요.

Kafka 클러스터는 여러 파티션으로 구분된 주제라는 범주에 레코드 스트림을 저장합니다. 파티션은 주제에서 고유하게 식별되는 데이터 레코드(메시지) 시퀀스입니다. 파티션은 주제 레코드의 병렬 처리가 가능하도록 한 클러스터의 여러 브로커로 분산될 수 있습니다. Apache Kafka의 주제 및 파티션과 분산에 관한 자세한 내용은 주제 및 로그분산을 참조하십시오.

Kafka 클러스터는 Amazon MSK 인스턴스, Amazon EC2 인스턴스에서 실행되는 클러스터 또는 온프레미스 클러스터일 수 있습니다. Amazon MSK 인스턴스 또는 Amazon EC2 인스턴스의 클러스터는 동일한 VPC에 있을 수도 있고 다른 VPC에 있을 수도 있습니다. 클러스터가 온프레미스인 경우 복제 인스턴스용 자체 온프레미스 이름 서버를 사용하여 클러스터의 호스트 이름을 확인할 수 있습니다. 복제 인스턴스에서 이름 서버를 설정하는 방법에 관한 자세한 내용은 자체 온프레미스 이름 서버 사용 단원을 참조하십시오. 네트워크 설정에 관한 자세한 내용은 복제 인스턴스용으로 네트워크 설정 단원을 참조하십시오.

Amazon MSK 클러스터를 사용할 때는 해당 보안 그룹이 복제 인스턴스에서의 액세스를 허용하는지 확인하십시오. Amazon MSK 클러스터의 보안 그룹 변경에 관한 자세한 내용은 Amazon MSK 클러스터의 보안 그룹 변경을 참조하십시오.

AWS Database Migration Service JSON을 사용하여 Kafka 주제에 레코드를 게시합니다. 변환 중 AWS DMS 는 각 레코드를 소스 데이터베이스로부터 JSON 형식의 속성-값 페어로 직렬화합니다.

지원되는 데이터 소스에서 대상 Kafka 클러스터로 데이터를 마이그레이션하려면 객체 매핑을 사용합니다. 객체 매핑을 통해 데이터 레코드를 대상 주제에 구조화하는 방법을 결정합니다. 또한 각 테이블에 대한 파티션 키를 정의합니다. Apache Kafka가 이러한 테이블을 사용해 데이터를 파티션으로 그룹화합니다.

현재는 작업당 단일 주제를 AWS DMS 지원합니다. 테이블이 여러 개 있는 단일 작업의 경우 모든 메시지가 단일 주제로 이동합니다. 각 메시지에는 대상 스키마와 테이블을 식별하는 메타데이터 섹션이 포함되어 있습니다. AWS DMS 버전 3.4.6 이상에서는 객체 매핑을 사용한 멀티토픽 복제를 지원합니다. 자세한 설명은 객체 매핑을 사용한 멀티주제 복제 섹션을 참조하세요.

Apache Kafka 엔드포인트 설정

AWS DMS 콘솔의 엔드포인트 설정 또는 CLI의 --kafka-settings 옵션을 통해 연결 세부 정보를 지정할 수 있습니다. 각 설정의 요구 사항은 다음과 같습니다.

  • Broker – Kafka 클러스터에 있는 하나 이상의 브로커 위치를 쉼표로 구분된 각 broker-hostname:port의 목록 형식으로 지정합니다. 예를 들면, "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"입니다. 이 설정은 클러스터 내 일부 또는 모든 브로커의 위치를 지정할 수 있습니다. 모든 클러스터 브로커는 주제로 마이그레이션된 데이터 레코드의 분할을 처리하기 위해 통신합니다.

  • Topic – (선택 사항) 최대 길이 255개 문자 및 기호로 주제 이름을 지정합니다. 마침표(.), 밑줄(_) 및 빼기(-)를 사용할 수 있습니다. 마침표(.) 또는 밑줄(_)이 있는 주제 이름이 내부 데이터 구조에서 충돌할 수 있습니다. 주제 이름에 이러한 기호 중 하나만 사용하고 둘 다 사용하지는 마십시오. 주제 이름을 지정하지 않는 경우 를 마이그레이션 주제로 AWS DMS 사용합니다"kafka-default-topic".

    참고

    지정한 마이그레이션 주제 또는 기본 주제를 AWS DMS 만들려면 Kafka 클러스터 구성의 auto.create.topics.enable = true 일부로 설정하십시오. 자세한 내용은 Apache Kafka를 대상으로 사용할 때의 제한 사항 AWS Database Migration Service 단원을 참조하세요.

  • MessageFormat – 엔드포인트에 생성된 레코드의 출력 형식입니다. 메시지 형식은 JSON(기본값) 또는 JSON_UNFORMATTED(탭이 없는 한 줄)입니다.

  • MessageMaxBytes – 엔드포인트에 생성된 레코드의 최대 크기(바이트)입니다. 기본값은 1,000,000입니다.

    참고

    AWS CLI/SDK를 사용하여 기본값이 아닌 값으로 변경할 MessageMaxBytes 수만 있습니다. 예를 들어 기존 Kafka 엔드포인트를 수정하고 MessageMaxBytes를 변경하려면 다음 명령을 사용합니다.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails – 소스 데이터베이스의 자세한 트랜잭션 정보를 제공합니다. 이 정보에는 커밋 타임스탬프, 로그 위치 및 transaction_id, previous_transaction_id, transaction_record_id(트랜잭션 내의 레코드 오프셋)에 대한 값이 포함됩니다. 기본값은 false입니다.

  • IncludePartitionValue – 파티션 유형이 schema-table-type이 아닌 경우 Kafka 메시지 출력에 파티션 값을 표시합니다. 기본값은 false입니다.

  • PartitionIncludeSchemaTable – 파티션 유형이 primary-key-type인 경우 스키마 및 테이블 이름을 파티션 값에 접두사로 지정합니다. 이렇게 하면 Kafka 파티션 간의 데이터 분산이 증가합니다. 예를 들어 SysBench 스키마에 수천 개의 테이블이 있고 각 테이블에 기본 키의 범위가 제한되어 있다고 가정하겠습니다. 이 경우 동일한 기본 키가 수천 개의 테이블에서 동일한 파티션으로 전송되어 제한이 발생합니다. 기본값은 false입니다.

  • IncludeTableAlterOperations – 제어 데이터의 테이블을 변경하는 데이터 정의 언어(DDL) 작업(예: rename-table, drop-table, add-column, drop-columnrename-column)을 포함합니다. 기본값은 false입니다.

  • IncludeControlDetails – Kafka 메시지 출력에서 테이블 정의, 열 정의, 테이블 및 열 변경 사항에 관한 자세한 제어 정보를 표시합니다. 기본값은 false입니다.

  • IncludeNullAndEmpty – NULL 및 빈 열을 대상에 포함합니다. 기본값은 false입니다.

  • SecurityProtocol – 전송 계층 보안(TLS)을 사용하여 Kafka 대상 엔드포인트에 대한 보안 연결을 설정합니다. 옵션에는 ssl-authentication, ssl-encryptionsasl-ssl이 포함됩니다. sasl-ssl을 사용하려면 SaslUsernameSaslPassword가 필요합니다.

  • SslEndpointIdentificationAlgorithm— 인증서에 대한 호스트 이름 확인을 설정합니다. 이 설정은 AWS DMS 버전 3.5.1 이상에서 지원됩니다. 옵션에는 다음 사항이 포함됩니다.

    • NONE: 클라이언트 연결에서 브로커의 호스트 이름 확인을 비활성화합니다.

    • HTTPS: 클라이언트 연결에서 브로커의 호스트 이름 확인을 활성화합니다.

설정을 사용하여 전송 속도를 높일 수 있습니다. 이를 위해 AWS DMS 는 Apache Kafka 대상 클러스터에 대한 멀티스레드 전체 로드를 지원합니다. AWS DMS 는 다음을 포함하는 작업 설정으로 이 멀티 스레딩을 지원합니다.

  • MaxFullLoadSubTasks— 이 옵션을 사용하여 병렬로 로드할 소스 테이블의 최대 수를 지정합니다. AWS DMS 전용 하위 작업을 사용하여 각 테이블을 해당 Kafka 대상 테이블에 로드합니다. 기본값은 8이며 최대값은 49입니다.

  • ParallelLoadThreads— 이 옵션을 사용하여 각 테이블을 Kafka 대상 테이블에 로드하는 데 AWS DMS 사용하는 스레드 수를 지정합니다. Apache Kafka 대상의 최대값은 32입니다. 이 최대 한도를 증가시키도록 요청할 수 있습니다.

  • ParallelLoadBufferSize – 이 옵션을 사용하여 병렬 로드 스레드에서 데이터를 Kafka 대상에 로드하기 위해 사용하는 버퍼에 저장할 최대 레코드 수를 지정합니다. 기본값은 50입니다. 최대값은 1,000입니다. 이 설정은 ParallelLoadThreads와 함께 사용하십시오. ParallelLoadBufferSize는 둘 이상의 스레드가 있는 경우에만 유효합니다.

  • ParallelLoadQueuesPerThread – 각 동시 스레드가 액세스하는 대기열 수를 지정하여 대기열에서 데이터 레코드를 가져오고 대상에 대한 배치 로드를 생성하려면 이 옵션을 사용합니다. 기본 값은 1입니다. 최대 한도는 512입니다.

병렬 스레드 및 대량 작업에 대한 작업 설정을 조정하여 Kafka 엔드포인트의 변경 데이터 캡처(CDC) 성능을 개선할 수 있습니다. 이렇게 하려면 ParallelApply* 작업 설정을 사용하여 동시 스레드 수, 스레드당 대기열 및 버퍼에 저장할 레코드 수를 지정해야 합니다. 예를 들어 CDC 로드를 수행하고 128개의 스레드를 병렬로 적용한다고 가정하겠습니다. 또한 버퍼당 50개 레코드가 저장된 스레드당 64개 대기열에 액세스하려고 합니다.

CDC 성능을 높이기 위해 AWS DMS 은 다음과 같은 작업 설정을 지원합니다.

  • ParallelApplyThreads— CDC 로드 중에 데이터 레코드를 Kafka 대상 엔드포인트로 푸시하는 데 AWS DMS 사용하는 동시 스레드 수를 지정합니다. 기본값은 0이고 최대값은 32입니다.

  • ParallelApplyBufferSize – CDC 로드 중에 Kafka 대상 엔드포인트로 푸시할 동시 스레드에 대한 각 버퍼 대기열에 저장할 최대 레코드 수를 지정합니다. 기본값은 100이고 최대값은 1,000입니다. ParallelApplyThreads가 둘 이상의 스레드를 지정할 때 이 옵션을 사용합니다.

  • ParallelApplyQueuesPerThread – 각 스레드가 대기열에서 데이터 레코드를 가져오고 CDC 중에 Kafka 엔드포인트에 대한 배치 로드를 생성하기 위한 대기열 수를 지정합니다. 기본 값은 1입니다. 최대 한도는 512입니다.

ParallelApply* 작업 설정을 사용할 때 partition-key-type 기본값은 테이블의 schema-name.table-name가 아니라 primary-key입니다.

전송 계층 보안(TLS)을 사용하여 Kafka에 연결

Kafka 클러스터는 전송 계층 보안(TLS)을 사용한 보안 연결만 허용합니다. DMS를 사용하면 다음 세 가지 보안 프로토콜 옵션 중 하나를 사용하여 Kafka 엔드포인트 연결을 보호할 수 있습니다.

SSL 암호화(server-encryption)

클라이언트는 서버의 인증서를 통해 서버 ID를 확인합니다. 그러면 서버와 클라이언트 간에 암호화된 연결이 이루어집니다.

SSL 인증(mutual-authentication)

서버와 클라이언트는 자체 인증서를 통해 서로 ID를 확인합니다. 그러면 서버와 클라이언트 간에 암호화된 연결이 이루어집니다.

SASL-SSL(mutual-authentication)

SASL(Simple Authentication and Security Layer) 메서드는 클라이언트 인증서를 사용자 이름 및 암호로 대체하여 클라이언트 ID를 검증합니다. 특히 서버가 클라이언트의 ID를 검증할 수 있도록 서버가 등록한 사용자 이름과 암호를 제공합니다. 그러면 서버와 클라이언트 간에 암호화된 연결이 이루어집니다.

중요

Apache Kafka와 Amazon MSK는 확인된 인증서를 수락합니다. 이는 Kafka와 Amazon MSK에서 해결해야 하는 알려진 제한 사항입니다. 자세한 내용은 Apache Kafka 문제, KAFKA-3700단원을 참조하세요.

Amazon MSK를 사용하는 경우 이 알려진 제한에 대한 해결 방법으로 액세스 제어 목록(ACL) 을 사용하는 것이 좋습니다. ACL 사용에 관한 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서Apache Kafka ACL 단원을 참조하세요.

자체 관리형 Kafka 클러스터를 사용하는 경우, 클러스터 구성에 관한 자세한 내용은 2018년 10월 21일자 설명을 참조하십시오.

Amazon MSK 또는 자체 관리형 Kafka 클러스터에서 SSL 암호화 사용

Amazon MSK 또는 자체 관리형 Kafka 클러스터에 대한 엔드포인트 연결을 보호하는 데 SSL 암호화를 사용할 수 있습니다. SSL 암호화 인증 방법을 사용하면 클라이언트가 서버의 인증서를 통해 서버의 ID를 검증합니다. 그러면 서버와 클라이언트 간에 암호화된 연결이 이루어집니다.

SSL 암호화를 사용하여 Amazon MSK에 연결하려면
  • 대상 Kafka 엔드포인트를 생성할 때 ssl-encryption 옵션을 사용하여 보안 프로토콜 엔드포인트 설정(SecurityProtocol)을 설정합니다.

    다음 JSON 예제는 보안 프로토콜을 SSL 암호화로 설정합니다.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
자체 관리형 Kafka 클러스터에 SSL 암호화를 사용하려면
  1. 온프레미스 Kafka 클러스터에서 프라이빗 CA(인증 기관)를 사용하는 경우 프라이빗 CA 인증서를 업로드하고 Amazon 리소스 이름(ARN)을 받으십시오.

  2. 대상 Kafka 엔드포인트를 생성할 때 ssl-encryption 옵션을 사용하여 보안 프로토콜 엔드포인트 설정(SecurityProtocol)을 설정합니다. 다음 JSON 예제는 보안 프로토콜을 ssl-encryption으로 설정합니다.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. 프라이빗 CA를 사용하는 경우 위의 첫 번째 단계에서 얻은 ARN에 SslCaCertificateArn을 설정하십시오.

SSL 인증 사용

Amazon MSK 또는 자체 관리형 Kafka 클러스터에 대한 엔드포인트 연결을 보호하는 데 SSL 인증을 사용할 수 있습니다.

Amazon MSK에 연결할 때 SSL 인증을 사용한 클라이언트 인증 및 암호화를 활성화하려면 다음을 수행하십시오.

  • Kafka용 프라이빗 키와 퍼블릭 인증서를 준비합니다.

  • 인증서를 DMS Certificate Manager에 업로드합니다.

  • Kafka 엔드포인트 설정에 지정된 해당 인증서 ARN을 사용하여 Kafka 대상 엔드포인트를 생성합니다.

Amazon MSK용 프라이빗 키와 퍼블릭 인증서를 준비하려면
  1. Amazon Managed Streaming for Apache Kafka 개발자 안내서클라이언트 인증 단원에 있는 1~9단계에 설명된 대로 EC2 인스턴스를 생성하고 클라이언트가 인증을 사용하도록 설정합니다.

    이러한 단계를 완료하면 인증서-ARN(ACM에 저장된 공개 인증서 ARN)과 kafka.client.keystore.jks 파일에 포함된 프라이빗 키를 갖게 됩니다.

  2. 다음 명령을 사용하여 퍼블릭 인증서를 가져오고 인증서를 signed-certificate-from-acm.pem 파일에 복사합니다.

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    이 명령은 다음 예와 유사한 정보를 반환합니다.

    {"Certificate": "123", "CertificateChain": "456"}

    그런 다음, "123"과 동일한 데이터를 signed-certificate-from-acm.pem 파일에 복사합니다.

  3. 다음 예제와 같이 kafka.client.keystore.jks to keystore.p12에서 msk-rsa 키를 가져와서 프라이빗 키를 받습니다.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. 다음 명령을 사용하여 keystore.p12.pem 형식으로 내보냅니다.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Enter PEM 암호문 메시지가 나타나고 인증서를 암호화하는 데 적용되는 키를 식별합니다.

  5. .pem 파일에서 백 속성 및 키 속성을 제거하여 첫 번째 줄이 다음 문자열로 시작되도록 합니다.

    ---BEGIN ENCRYPTED PRIVATE KEY---
퍼블릭 인증서와 프라이빗 키를 DMS 인증서 관리자에 업로드하고 Amazon MSK에 대한 연결을 테스트하려면
  1. 다음 명령을 사용하여 DMS 인증서 관리자에 업로드합니다.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Amazon MSK 대상 엔드포인트를 생성하고 연결을 테스트하여 TLS 인증이 작동하는지 확인합니다.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
중요

자체 관리형 Kafka 클러스터에 대한 연결을 보호하는 데 SSL 인증을 사용할 수 있습니다. 경우에 따라 온프레미스 Kafka 클러스터에서 프라이빗 CA(인증 기관)를 사용할 수 있습니다. 그렇다면 CA 체인, 퍼블릭 인증서, 프라이빗 키를 DMS 인증서 관리자에게 업로드하세요. 그런 다음 온프레미스 Kafka 대상 엔드포인트를 생성할 때 엔드포인트 설정에서 해당 Amazon 리소스 이름(ARN)을 사용합니다.

자체 관리형 Kafka 클러스터에 사용할 프라이빗 키와 서명된 인증서를 준비하려면
  1. 다음 예제에서 보는 것처럼 키 페어를 생성합니다.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. 인증서 서명 요청(CSR)을 생성합니다.

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. 클러스터 트러스트 스토어의 CA를 사용하여 CSR에 서명합니다. CA가 없는 경우 자체 프라이빗 CA를 생성할 수 있습니다.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. ca-cert를 서버 트러스트 스토어 및 키 스토어로 가져옵니다. 신뢰 스토어가 없는 경우에는 다음 명령을 사용하여 트러스트 스토어를 생성하고 ca-cert 를 해당 스토어로 가져옵니다.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. 인증서에 서명합니다.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. 서명된 인증서를 키 스토어로 가져옵니다.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. 다음 명령을 사용하여 kafka.server.keystore.jks에서 keystore.p12으로 on-premise-rsa 키를 가져옵니다.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. 다음 명령을 사용하여 keystore.p12.pem 형식으로 내보냅니다.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. encrypted-private-server-key.pem, signed-certificate.pemca-cert를 DMS 인증서 관리자에 업로드합니다.

  10. 반환된 ARN을 사용하여 엔드포인트를 생성합니다.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

SASL-SSL 인증을 사용하여 Amazon MSK에 연결

SASL(Simple Authentication and Security Layer) 메서드는 사용자 이름과 암호를 사용하여 클라이언트 ID를 검증하고 서버와 클라이언트 간에 암호화된 연결을 만듭니다.

SASL을 사용하려면 Amazon MSK 클러스터를 설정할 때 먼저 안전한 사용자 이름과 암호를 생성해야 합니다. Amazon MSK 클러스터의 보안 사용자 이름과 암호를 설정하는 방법에 관한 설명은 Amazon Managed Streaming for Apache Kafka 개발자 안내서Amazon MSK 클러스터용 SASL/SCRAM 인증 설정을 참조하세요.

그런 다음, Kafka 대상 엔드포인트를 생성할 때 sasl-ssl 옵션을 사용하여 보안 프로토콜 엔드포인트 설정(SecurityProtocol)을 설정합니다. SaslUsernameSaslPassword 옵션도 설정합니다. Amazon MSK 클러스터를 처음 설정할 때 다음 JSON 예제에서 보는 것처럼 이 옵션 설정이 보안 사용자 이름 및 암호와 일치하는지 확인하십시오.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
참고
  • 현재는 공용 CA 지원 AWS DMS SASL-SSL만 지원합니다. DMS는 프라이빗 CA가 지원하는 자체 관리형 Kafka와 함께 사용할 수 있는 SASL-SSL을 지원하지 않습니다.

  • SASL-SSL 인증의 경우 기본적으로 SCRAM-SHA-512 메커니즘을 AWS DMS 지원합니다. AWS DMS 버전 3.5.0 이상에서는 일반 메커니즘도 지원합니다. Plain 메커니즘을 지원하려면 KafkaSettings API 데이터 형식의 SaslMechanism 파라미터를 PLAIN으로 설정하십시오.

Apache Kafka 대상의 경우 이전 이미지를 사용하여 CDC 행의 원래 값 보기

Kafka 같은 데이터 스트리밍 대상에 CDC 업데이트를 작성하는 경우 업데이트를 통한 변경 전에 소스 데이터베이스 행의 원래 값을 볼 수 있습니다. 이를 가능하게 하기 위해 소스 데이터베이스 엔진에서 제공한 데이터를 기반으로 업데이트 이벤트의 이전 이미지를 AWS DMS 채웁니다.

소스 데이터베이스 엔진에 따라 이전 이미지에 대해 서로 다른 양의 정보를 제공합니다.

  • Oracle은 열이 변경되는 경우에만 열에 대한 업데이트를 제공합니다.

  • PostgreSQL은 기본 키의 일부인 열에 대한 데이터(변경 여부에 관계 없음)만 제공합니다. 논리적 복제를 사용 중이고 원본 테이블에 대해 REPLICA IDENTITY FULL이 설정된 경우 WAL에 기록된 행의 전체 이전 정보 및 이후 정보를 얻을 수 있으며 여기에서 확인할 수 있습니다.

  • MySQL은 일반적으로 모든 열에 대한 데이터(변경 여부에 관계 없음)를 제공합니다.

이전 이미징을 활성화하여 소스 데이터베이스의 원래 값을 AWS DMS 출력에 추가하려면 BeforeImageSettings 태스크 설정 또는 add-before-image-columns 파라미터를 사용합니다. 이 파라미터는 열 변환 규칙을 적용합니다.

BeforeImageSettings는 다음과 같이 소스 데이터베이스 시스템에서 수집된 값을 사용하여 모든 업데이트 작업에 새 JSON 속성을 추가합니다.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
참고

전체 로드와 CDC 작업(기존 데이터 마이그레이션 및 지속적인 변경 사항 복제) 또는 CDC 전용 작업(데이터 변경 사항만 복제)에 BeforeImageSettings를 적용합니다. 전체 로드 전용 작업에는 BeforeImageSettings를 적용하지 마십시오.

BeforeImageSettings 옵션의 경우, 다음이 적용됩니다.

  • 이전 이미징을 활성화하려면 EnableBeforeImage 옵션을 true로 설정합니다. 기본값은 false입니다.

  • FieldName 옵션을 사용하여 새 JSON 속성에 이름을 지정합니다. EnableBeforeImagetrue인 경우 FieldName은 필수이며 비워 둘 수 없습니다.

  • ColumnFilter 옵션은 이전 이미징을 사용하여 추가할 열을 지정합니다. 테이블 기본 키의 일부에 속하는 열만 추가하려면 기본값 pk-only를 사용하고, LOB 유형이 아닌 열만 추가하려면 non-lob를 사용하고, 이전 이미지 값이 있는 모든 열을 추가하려면 all을 사용합니다.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

이전 이미지 변환 규칙 사용

작업 설정 대신 열 변환 규칙을 적용하는 add-before-image-columns 파라미터를 사용할 수 있습니다. 이 파라미터를 사용하면 Kafka 같은 데이터 스트리밍 대상에서 CDC 중에 이전 이미징을 활성화할 수 있습니다.

변환 규칙에 add-before-image-columns를 사용하면 이전 이미지 결과를 보다 세밀하게 제어할 수 있습니다. 변환 규칙을 통해 객체 로케이터를 사용하여 규칙에 대해 선택한 테이블을 제어할 수 있습니다. 또한 변환 규칙을 함께 연결하여 테이블마다 서로 다른 규칙을 적용할 수 있습니다. 그런 다음, 다른 규칙을 사용하여 생성된 열을 조작할 수 있습니다.

참고

동일한 작업 내에서 BeforeImageSettings 작업 설정과 함께 add-before-image-columns 파라미터를 사용해서는 안 됩니다. 단일 작업에는 이 파라미터 또는 설정 중 하나만 사용하고 둘 다 사용하지는 마십시오.

해당 열에 대해 add-before-image-columns 파라미터가 있는 transformation 규칙 유형이 before-image-def 단원을 제공해야 합니다. 다음은 그 한 예입니다.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix의 값은 열 이름 앞에 추가되며, column-prefix의 기본값은 BI_입니다. column-suffix의 값은 열 이름 뒤에 추가되며, 기본값은 비어 있습니다. column-prefixcolumn-suffix를 모두 빈 문자열로 설정하지 마십시오.

column-filter에 대해 하나의 값을 선택합니다. 테이블 기본 키의 일부인 열만 추가하려면 pk-only를 선택하고, LOB 유형이 아닌 열만 추가하려면 non-lob를 선택하고, 이전 이미지 값이 있는 모든 열을 추가하려면 all을 선택합니다.

이전 이미지 변환 규칙의 예

다음 예의 변환 규칙은 대상에서 BI_emp_no라는 새 열을 추가합니다. UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 같은 문은 BI_emp_no 필드를 1로 채웁니다. Amazon S3 대상에 CDC 업데이트를 작성할 때 BI_emp_no 열을 통해 업데이트된 원래 행을 알 수 있습니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

add-before-image-columns 규칙 작업 사용에 관한 자세한 내용은 변환 규칙 및 작업 단원을 참조하십시오.

Apache Kafka를 대상으로 사용할 때의 제한 사항 AWS Database Migration Service

Apache Kafka를 대상으로 사용할 때 다음 제한 사항이 적용됩니다.

  • AWS DMS Kafka 대상 엔드포인트는 Apache Kafka용 아마존 매니지드 스트리밍 (Amazon MSK) 에 대한 IAM 액세스 제어를 지원하지 않습니다.

  • 전체 LOB 모드는 지원되지 않습니다.

  • 새 주제를 자동으로 생성할 수 있는 속성을 포함하는 클러스터의 Kafka 구성 파일을 지정하십시오. AWS DMS auto.create.topics.enable = true 설정을 포함합니다. Amazon MSK 설정을 사용하는 경우 Kafka 클러스터를 만들 때 기본 구성을 지정한 다음 auto.create.topics.enable 설정을 true로 변경할 수 있습니다. 기본 구성 설정에 관한 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서기본 Amazon MSK 구성을 참조하세요. Amazon MSK를 사용하여 만든 기존 Kafka 클러스터를 수정해야 하는 경우 다음 예와 같이 AWS CLI 명령을 aws kafka create-configuration 실행하여 Kafka 구성을 업데이트하십시오.

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    여기서, //~/kafka_configuration은 필요한 속성 설정으로 만든 구성 파일입니다.

    Amazon EC2에 설치된 자체 Kafka 인스턴스를 사용하는 경우, 인스턴스와 함께 auto.create.topics.enable = true 제공된 옵션을 사용하여 새 주제를 자동으로 생성할 AWS DMS 수 있는 설정으로 Kafka 클러스터 구성을 수정하십시오.

  • AWS DMS 트랜잭션과 상관없이 각 업데이트를 소스 데이터베이스의 단일 레코드에 지정된 Kafka 주제에 하나의 데이터 레코드 (메시지) 로 게시합니다.

  • AWS DMS 다음과 같은 두 가지 형식의 파티션 키를 지원합니다.

    • SchemaName.TableName: 스키마와 테이블 이름의 조합입니다.

    • ${AttributeName}: JSON 내 한 필드의 값 또는 소스 데이터베이스 내 테이블의 기본 키입니다.

  • BatchApply는 Kafka 엔드포인트에는 지원되지 않습니다. Kafka 대상에 대해 Batch Apply(예: BatchApplyEnabled 대상 메타데이터 작업 설정)를 사용하면 데이터 손실이 발생할 수 있습니다.

  • AWS DMS 16자리가 넘는 BigInt 데이터 유형 값의 마이그레이션을 지원하지 않습니다. 이 제한을 해결하기 위해 다음 변환 규칙을 사용하여 BigInt 열을 문자열로 변환할 수 있습니다. 변환 규칙에 대한 자세한 내용은 변환 규칙 및 작업 섹션을 참조하세요.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

객체 매핑을 사용하여 데이터를 Kafka 주제로 마이그레이션

AWS DMS 테이블 매핑 규칙을 사용하여 원본의 데이터를 대상 Kafka 주제에 매핑합니다. 데이터를 대상 주제에 매핑하려면 객체 매핑이라는 테이블 매핑 규칙 유형을 사용합니다. 객체 매핑을 사용하여 원본의 데이터 레코드를 Kafka 주제에 게시된 데이터 레코드로 매핑하는 방법을 지정합니다.

Kafka 주제에는 파티션 키 외에 별다른 사전 설정 구조가 없습니다.

참고

객체 매핑을 사용할 필요는 없습니다. 일반 테이블 매핑을 다양한 변환에 사용할 수 있습니다. 하지만 파티션 키 유형은 다음과 같은 기본 동작을 따릅니다.

  • 기본 키는 전체 로드의 파티션 키로 사용됩니다.

  • paralle-apply 작업 설정을 사용하지 않는 경우, CDC의 파티션 키로 schema.table이 사용됩니다.

  • paralle-apply 작업 설정을 사용하지 않는 경우, CDC의 파티션 키로 기본 키가 사용됩니다.

object-mapping 규칙을 만들려면 rule-typeobject-mapping으로 지정합니다. 이 규칙은 사용할 객체 매핑의 유형을 지정합니다.

이 규칙의 구조는 다음과 같습니다.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS 현재 매개 변수에 대해 map-record-to-recordmap-record-to-document 만 유효한 값으로 지원합니다. rule-action 이러한 설정은 exclude-columns 속성 목록의 일부로 제외되지 않은 값에 영향을 줍니다. map-record-to-recordmap-record-to-document 값은 이러한 레코드를 기본적으로 AWS DMS 처리하는 방법을 지정합니다. 이러한 값은 어떤 식으로든 속성 매핑에 영향을 미치지 않습니다.

관계형 데이터베이스에서 Kafka 주제로 마이그레이션할 때 map-record-to-record를 사용합니다. 이러한 규칙 유형은 Kafka 주제의 파티션 키로 관계형 데이터베이스의 taskResourceId.schemaName.tableName 값을 사용하며 소스 데이터베이스의 각 열마다 속성을 하나씩 생성합니다.

map-record-to-record을 사용하는 경우 다음 사항에 유의하세요.

  • 이 설정은 exclude-columns 목록에서 제외된 열에만 영향을 줍니다.

  • 이러한 모든 열에 대해 대상 주제에 해당 속성을 AWS DMS 만듭니다.

  • AWS DMS 소스 열이 속성 매핑에 사용되는지 여부에 관계없이 해당하는 이 속성을 생성합니다.

map-record-to-record를 이해하는 한 가지 방법은 작업 중일 때 관찰하는 것입니다. 이 예에서는 다음 구조와 데이터를 사용하여 관계형 데이터베이스 테이블 행에서 시작한다고 가정합니다.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

1988/02/29

이 정보를 Test라는 스키마에서 Kafka 주제로 마이그레이션하려면 데이터를 대상 주제로 매핑하는 규칙을 생성합니다. 다음 규칙은 그 매핑 과정을 보여 줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

다음은 지정된 Kafka 주제와 파티션 키(이 경우 taskResourceId.schemaName.tableName)로 Kafka 대상 주제의 샘플 데이터를 사용하여 결과 레코드 형식을 보여줍니다.

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

속성 매핑으로 날짜 재구성

속성 맵을 사용하여 데이터를 Kafka 주제로 마이그레이션하는 동안 데이터를 재구성할 수 있습니다. 예를 들어 소스의 필드 몇 개를 대상의 단일 필드로 묶어야 하는 경우도 있을 것입니다. 다음의 속성 맵은 날짜를 재구성하는 방법을 보여줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

partition-key에 상수 값을 설정하려면 partition-key 값을 지정합니다. 예를 들어 이 방법을 통해 모든 데이터를 단일 파티션에 저장할 수 있습니다. 다음 매핑은 이러한 접근 방식을 보여줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
참고

특정 테이블을 위한 제어 레코드에 대한 partition-key 값은 TaskId.SchemaName.TableName입니다. 특정 작업을 위한 제어 레코드에 대한 partition-key 값은 해당 레코드의 TaskId입니다. partition-key 값을 객체 매핑에 지정해도 제어 레코드에 대한 partition-key에는 영향이 없습니다.

객체 매핑을 사용한 멀티주제 복제

기본적으로 AWS DMS 작업은 모든 소스 데이터를 다음 Kafka 주제 중 하나로 마이그레이션합니다.

  • AWS DMS 대상 엔드포인트의 Topic 필드에 지정된 대로

  • 대상 엔드포인트의 Topic 필드가 채워지지 않고 Kafka auto.create.topics.enable 설정이 true로 설정된 경우에는 kafka-default-topic에서 지정한 대로 마이그레이션합니다.

AWS DMS 엔진 버전 3.4.6 이상에서는 kafka-target-topic 속성을 사용하여 마이그레이션된 각 소스 테이블을 별도의 주제에 매핑할 수 있습니다. 예를 들어, 다음 객체 매핑 규칙은 원본 테이블 CustomerAddress를 Kafka 주제 customer_topicaddress_topic에 각각 마이그레이션합니다. 동시에 Test 스키마의 테이블을 포함하여 다른 모든 원본 테이블을 대상 Bills 엔드포인트에 지정된 주제로 AWS DMS 마이그레이션합니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Kafka 멀티주제 복제를 사용하면 단일 복제 작업을 사용하여 원본 테이블을 그룹화하여 별도의 Kafka 주제로 마이그레이션할 수 있습니다.

Apache Kafka 메시지 형식

JSON 출력은 단지 키-값 페어의 목록입니다.

RecordType

레코드 유형은 데이터나 제어일 수 있습니다. 데이터 레코드는 소스 내의 실제 행을 나타냅니다. 제어 레코드는 스트림 내의 중요 이벤트를 나타냅니다(예: 작업의 재시작).

Operation

데이터 레코드의 경우, 작업은 load, insert, update 또는 delete일 수 있습니다.

데이터 레코드의 경우, 작업은 create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column 또는 column-type-change일 수 있습니다.

SchemaName

레코드에 대한 원본 스키마입니다. 제어 레코드의 경우, 이 필드는 비워둘 수 있습니다.

TableName

레코드에 대한 소스 테이블입니다. 제어 레코드의 경우, 이 필드는 비워둘 수 있습니다.

Timestamp

JSON 메시지가 구성된 경우의 타임스탬프입니다. 이 필드는 ISO 8601 형식으로 구성되었습니다.

다음 JSON 메시지 예제는 모든 추가 메타데이터가 포함된 데이터 유형 메시지를 보여줍니다.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

다음 JSON 메시지 예제는 컨트롤 유형 메시지를 보여줍니다.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }