구성 제공자가 있는 Debezium 소스 커넥터 - Amazon Managed Streaming for Apache Kafka

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

구성 제공자가 있는 Debezium 소스 커넥터

이 예제에서는 My 호환 SQL Amazon Aurora 데이터베이스를 소스로 사용하여 Debezium My SQL 커넥터 플러그인을 사용하는 방법을 보여줍니다. 이 예제에서는 AWS Secrets Manager에서 데이터베이스 보안 인증 정보를 외부화하기 위해 오픈 소스 AWS Secrets Manager 구성 공급자도 설정했습니다. 구성 공급자에 대해 자세히 알아보려면 구성 공급자를 사용하여 민감한 정보 외부화 섹션을 참조하세요.

중요

Debezium My SQL 커넥터 플러그인은 하나의 작업만 지원하며 Amazon Connect의 자동 크기 조정 용량 모드에서는 작동하지 않습니다. MSK 대신 프로비저닝된 용량 모드를 사용하고 커넥터 구성에서 workerCount를 1로 설정해야 합니다. MSKConnect의 용량 모드에 대한 자세한 내용은 을 참조하십시오커넥터 용량.

시작하기 전 준비 사항

커넥터가 외부에 있는 서비스와 상호 작용할 수 있으려면 커넥터가 인터넷에 액세스할 수 있어야 Amazon Virtual Private Cloud합니다. AWS Secrets Manager 이 섹션의 단계에 따라 다음 작업을 완료하여 인터넷 액세스를 활성화할 수 있습니다.

  • 게이트웨이를 호스팅하고 트래픽을 내 인터넷 NAT 게이트웨이로 라우팅하는 퍼블릭 서브넷을 설정하십시오. VPC

  • 프라이빗 서브넷 트래픽을 게이트웨이로 보내는 기본 경로를 생성하십시오. NAT

자세한 내용은 Amazon MSK Connect를 위한 인터넷 액세스 활성화 단원을 참조하십시오.

사전 조건

인터넷 액세스를 활성화하기 전에 다음 항목이 필요합니다.

  • 클러스터와 관련된 Amazon Virtual Private Cloud (VPC) 의 ID. 예를 들어 vpc-123456ab입니다.

  • 내 프라이빗 IDs 서브넷의 수입니다. VPC 예를 들어 subnet-a1b2c3de, subnet-f4g5h6ij 등입니다. 커넥터를 프라이빗 서브넷으로 구성해야 합니다.

커넥터에 인터넷 액세스를 활성화하려면 다음을 수행합니다.
  1. 에서 Amazon Virtual Private Cloud https://console.aws.amazon.com/vpc/콘솔을 여십시오.

  2. 설명이 포함된 이름을 사용하여 NAT 게이트웨이의 퍼블릭 서브넷을 만들고 서브넷 ID를 기록해 둡니다. 자세한 지침은 서브넷 생성을 참조하십시오. VPC

  3. 인터넷과 VPC 통신할 수 있도록 인터넷 게이트웨이를 만들고 게이트웨이 ID를 기록해 둡니다. 인터넷 게이트웨이를 사용자 게이트웨이에 연결합니다VPC. 지침은 인터넷 게이트웨이 생성 및 연결을 참조하세요.

  4. 프라이빗 서브넷의 호스트가 퍼블릭 서브넷에 도달할 수 있도록 퍼블릭 NAT 게이트웨이를 프로비저닝하십시오. NAT게이트웨이를 생성할 때 이전에 만든 퍼블릭 서브넷을 선택하십시오. 지침은 NAT게이트웨이 생성을 참조하십시오.

  5. 라우팅 테이블을 구성합니다. 이 설정을 완료하려면 총 2개의 라우팅 테이블이 있어야 합니다. 기본 라우팅 테이블과 동시에 자동으로 생성된 기본 라우팅 테이블이 이미 있을 것입니다VPC. 이 단계에서는 퍼블릭 서브넷에 대한 추가 라우팅 테이블을 생성합니다.

    1. 다음 설정을 사용하여 프라이빗 서브넷이 트래픽을 NAT 게이트웨이로 라우팅하도록 기본 라우팅 테이블을 수정하십시오VPC. 지침은 Amazon Virtual Private Cloud사용 설명서에서 라우팅 테이블로 작업을 참조하세요.

      프라이빗 MSKC 라우팅 테이블
      속성
      [Name tag] 이 라우팅 테이블을 식별하는 데 도움이 되도록 설명이 포함된 이름 태그 지정을 권장합니다. 예: 비공개 MSKC.
      관련 서브넷 프라이빗 서브넷
      MSKConnect를 위한 인터넷 액세스를 지원하는 경로
      • 대상 주소: 0.0.0.0/0

      • 대상: NAT 게이트웨이 ID. nat-12a345bc6789efg1h를 예로 들 수 있습니다.

      내부 트래픽을 위한 로컬 경로
      • 대상 주소: 10.0.0.0/16. 이 값은 CIDR 블록에 따라 다를 수 VPC 있습니다.

      • 대상: 로컬

    2. 사용자 지정 라우팅 테이블 생성의 지침에 따라 퍼블릭 서브넷에 대한 라우팅 테이블을 생성합니다. 테이블을 생성할 때 테이블이 연결된 서브넷을 식별할 수 있도록 이름 태그 필드에 설명이 포함된 이름을 입력합니다. 예: 공개 MSKC.

    3. 다음 설정을 사용하여 퍼블릭 MSKC 라우팅 테이블을 구성하십시오.

      속성
      [Name tag] 선택한 퍼블릭 MSKC 또는 다른 설명이 포함된 이름
      관련 서브넷 게이트웨이가 있는 퍼블릭 서브넷 NAT
      MSKConnect를 위한 인터넷 액세스를 지원하는 경로
      • 대상 주소: 0.0.0.0/0

      • 대상: 사용자의 인터넷 게이트웨이 ID. igw-1a234bc5를 예로 들 수 있습니다.

      내부 트래픽을 위한 로컬 경로
      • 대상 주소: 10.0.0.0/16. 이 값은 사용자 CIDR 블록에 따라 다를 수 VPC 있습니다.

      • 대상: 로컬

Amazon MSK Connect에 대한 인터넷 액세스를 활성화했으므로 이제 커넥터를 만들 준비가 되었습니다.

Debezium 소스 커넥터 생성

  1. 사용자 지정 플러그인 생성
    1. Debezium 사이트에서 안정적인 최신 릴리스를 보려면 My SQL 커넥터 플러그인을 다운로드하십시오. 다운로드한 Debezium 릴리스 버전(버전 2.x 또는 이전 시리즈 1.x)을 기록해 둡니다. 이 절차의 뒷부분에서 사용 중인 Debezium 버전에 따라 커넥터를 생성합니다.

    2. AWS Secrets Manager 구성 공급자를 다운로드하여 압축을 풉니다.

    3. 다음 아카이브를 같은 디렉터리에 배치합니다.

      • debezium-connector-mysql 폴더

      • jcusten-border-kafka-config-provider-aws-0.1.1 폴더

    4. 이전 단계에서 만든 디렉터리를 파일로 압축한 다음 ZIP 파일을 S3 버킷에 ZIP 업로드합니다. 지침은 Amazon S3 사용 설명서에서 객체 업로드를 참조하세요.

    5. 다음을 JSON 복사하여 파일에 붙여넣습니다. 예: debezium-source-custom-plugin.json. Replace <example-custom-plugin-name> 플러그인에 원하는 이름을 붙이고,<arn-of-your-s3-bucket> 파일을 업로드한 S3 버킷의 <file-key-of-ZIP-object> 것과 S3에 업로드한 ZIP 객체의 파일 키를 사용합니다. ARN ZIP

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. JSON파일을 저장한 폴더에서 다음 AWS CLI 명령을 실행하여 플러그인을 생성합니다.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      다음 예제와 유사한 출력이 표시됩니다.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. 다음 명령을 실행하여 플러그인 상태를 확인합니다. 상태가 CREATING에서 ACTIVE로 변경됩니다. ARN자리 표시자를 이전 명령의 출력에서 가져온 ARN 것으로 바꿉니다.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. 데이터베이스 자격 AWS Secrets Manager 증명을 위한 시크릿 구성 및 생성
    1. 에서 Secrets Manager 콘솔을 엽니다 https://console.aws.amazon.com/secretsmanager/.

    2. 데이터베이스 로그인 보안 인증 정보를 저장할 새로운 비밀번호를 생성합니다. 지침은 AWS Secrets Manager사용 설명서에서 보안 암호 생성을 참조하세요.

    3. 시크릿을 복사하세요ARN.

    4. 다음 예제 정책의 Secrets Manager 권한을 서비스 실행 역할에 추가합니다. Replace <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> 당신의 ARN 비밀과 함께 말이죠.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      IAM권한을 추가하는 방법에 대한 지침은 사용 IAM 설명서의 IAM ID 권한 추가 및 제거를 참조하십시오.

  3. 구성 제공자에 대한 정보를 사용하여 사용자 지정 작업자 구성 생성
    1. 다음 작업자 구성 속성을 파일에 복사하여 자리 표시자 문자열을 시나리오에 해당하는 값으로 변경합니다. AWS Secrets Manager Config 제공자의 구성 속성에 대한 자세한 내용은 플러그인 SecretsManagerConfigProvider설명서를 참조하십시오.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. 다음 AWS CLI 명령을 실행하여 사용자 지정 작업자 구성을 생성합니다.

      다음 값을 교체합니다.

      • <my-worker-config-name> - 사용자 지정 작업자 구성을 설명하는 이름

      • <encoded-properties-file-content-string> - 이전 단계에서 복사한 일반 텍스트 속성의 base64로 인코딩된 버전

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. 커넥터 생성
    1. Debezium 버전 (2.x 또는 1.x) 에 해당하는 다음을 JSON 복사하여 새 파일에 붙여넣습니다. <placeholder> 문자열을 시나리오에 해당하는 값으로 변경합니다. 서비스 실행 역할을 설정하는 방법에 대한 자세한 내용은 MSK Connect의 IAM 역할 및 정책 섹션을 참조하세요.

      이 구성에서는 데이터베이스 보안 인증 정보를 지정할 때 일반 텍스트 대신 ${secretManager:MySecret-1234:dbusername}과 같은 변수를 사용한다는 점에 유의하세요. MySecret-1234를 보안 암호의 이름으로 변경하고 검색하려는 키의 이름을 입력합니다. 또한 사용자 지정 작업자 <arn-of-config-provider-worker-configuration> 구성으로 바꿔야 합니다ARN.

      Debezium 2.x

      Debezium 2.x 버전의 경우 다음을 JSON 복사하여 새 파일에 붙여넣습니다. 교체하십시오.<placeholder> 문자열을 시나리오에 해당하는 값으로 설정하십시오.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Debezium 1.x 버전의 경우 다음을 JSON 복사하여 새 파일에 붙여넣습니다. 교체하십시오.<placeholder> 문자열을 시나리오에 해당하는 값으로 설정하십시오.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. 이전 단계에서 JSON 파일을 저장한 폴더에서 다음 AWS CLI 명령을 실행합니다.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      다음은 명령을 성공적으로 실행했을 때 표시되는 출력의 예제입니다.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

자세한 단계가 포함된 Debezium 커넥터 예제는 Amazon MSK Connect 소개 - 관리형 커넥터를 사용하여 Apache Kafka 클러스터와 데이터 스트리밍을 참조하십시오.