Apache Kafka - AWS IoT Core

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Kafka

Apache Kafka(Kafka) 작업은 Amazon Managed Streaming for Apache Kafka(AmazonMSK), Confluent Cloud와 같은 타사 공급자가 관리하는 Apache Kafka 클러스터 또는 자체 관리형 Apache Kafka 클러스터로 직접 메시지를 전송합니다. Kafka 규칙 작업을 사용하면 IoT 데이터를 Kafka 클러스터로 라우팅할 수 있습니다. 이를 통해 스트리밍 분석, 데이터 통합, 시각화 및 미션 크리티컬 비즈니스 애플리케이션과 같은 다양한 목적을 위해 고성능 데이터 파이프라인을 구축할 수 있습니다.

참고

이 주제에서는 Apache Kafka 플랫폼 및 관련 개념에 대해 잘 알고 있다고 가정합니다. Apache Kafka에 대한 자세한 내용은 Apache Kafka를 참조하세요. MSK Serverless는 지원되지 않습니다. MSK 서버리스 클러스터는 Apache Kafka 규칙 작업이 현재 지원하지 않는 IAM 인증을 통해서만 수행할 수 있습니다. Confluent AWS IoT Core 를 사용하여를 구성하는 방법에 대한 자세한 내용은 Confluent 활용 및 IoT 디바이스 및 데이터 관리 문제 AWS 해결을 참조하세요.

요구 사항

이 규칙 작업은 다음 요구 사항을 충족해야 합니다.

  • ec2:CreateNetworkInterface, , , ec2:DescribeNetworkInterfaces, ec2:CreateNetworkInterfacePermission, ec2:DeleteNetworkInterfaceec2:DescribeSubnets, 및 ec2:DescribeVpcs ec2:DescribeVpcAttribute ec2:DescribeSecurityGroups 작업을 수행하도록 맡을 AWS IoT 수 있는 IAM 역할입니다. 이 역할은 Amazon Virtual Private Cloud에 대한 탄력적인 네트워크 인터페이스를 생성 및 관리하여 Kafka 브로커에 도달할 수 있습니다. 자세한 내용은 AWS IoT 규칙에 필요한 액세스 권한 부여 단원을 참조하십시오.

    AWS IoT 콘솔에서가이 규칙 작업을 수행하도록 허용하는 역할을 선택하거나 생성할 수 AWS IoT Core 있습니다.

    네트워크 인터페이스에 대한 자세한 내용은 Amazon EC2 사용 설명서탄력적 네트워크 인터페이스를 참조하세요.

    지정한 역할에 연결된 정책은 다음 예시와 같습니다.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • AWS Secrets Manager 를 사용하여 Kafka 브로커에 연결하는 데 필요한 자격 증명을 저장하는 경우 secretsmanager:GetSecretValuesecretsmanager:DescribeSecret 작업을 수행하는 데 맡을 AWS IoT Core 수 있는 IAM 역할을 생성해야 합니다.

    지정한 역할에 연결된 정책은 다음 예시와 같습니다.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Amazon Virtual Private Cloud(Amazon ) 내에서 Apache Kafka 클러스터를 실행할 수 있습니다VPC. Amazon VPC 대상을 생성하고 서브넷의 NAT 게이트웨이를 사용하여의 메시지를 퍼블릭 Kafka 클러스터 AWS IoT 로 전달해야 합니다. 규칙 엔진은 AWS IoT VPC 대상에 나열된 각 서브넷에 네트워크 인터페이스를 생성하여 트래픽을 로 직접 라우팅합니다VPC. VPC 대상을 생성하면 AWS IoT 규칙 엔진이 VPC 규칙 작업을 자동으로 생성합니다. VPC 규칙 작업에 대한 자세한 내용은 섹션을 참조하세요가상 프라이빗 클라우드(VPC) 대상.

  • 고객 관리형 AWS KMS key (KMS 키)를 사용하여 저장 데이터를 암호화하는 경우 서비스에는 호출자를 대신하여 KMS 키를 사용할 수 있는 권한이 있어야 합니다. 자세한 내용은 Amazon Managed Streaming for Apache Kafka 개발자 안내서의 Amazon MSK 암호화를 참조하세요.

파라미터

이 작업으로 AWS IoT 규칙을 생성할 때 다음 정보를 지정해야 합니다.

destinationArn

VPC 대상의 Amazon 리소스 이름(ARN)입니다. VPC 대상 생성에 대한 자세한 내용은 섹션을 참조하세요가상 프라이빗 클라우드(VPC) 대상.

주제

메시지에 대한 Kafka 주제는 Kafka 브로커로 전송됩니다.

대체 템플릿을 사용하여 이 필드를 대체할 수 있습니다. 자세한 내용은 대체 템플릿 단원을 참조하세요.

key(선택 사항)

Kafka 메시지 키입니다.

대체 템플릿을 사용하여 이 필드를 대체할 수 있습니다. 자세한 내용은 대체 템플릿 단원을 참조하십시오.

헤더(선택 사항)

지정한 Kafka 헤더 목록입니다. 각 헤더는 Kafka 작업을 생성할 때 지정할 수 있는 키-값 페어입니다. 메시지 페이로드를 수정하지 않고도 이러한 헤더를 사용하여 IoT 클라이언트의 데이터를 다운스트림 Kafka 클러스터로 라우팅할 수 있습니다.

대체 템플릿을 사용하여 이 필드를 대체할 수 있습니다. Kafka 작업의 헤더에서 인라인 규칙의 함수를 대체 템플릿으로 전달하는 방법을 이해하려면 예시를 참조하세요. 자세한 내용은 대체 템플릿 단원을 참조하십시오.

참고

바이너리 형식의 헤더는 지원되지 않습니다.

partition(선택 사항)

Kafka 메시지 파티션입니다.

대체 템플릿을 사용하여 이 필드를 대체할 수 있습니다. 자세한 내용은 대체 템플릿 단원을 참조하십시오.

clientProperties

Apache Kafka 프로듀서 클라이언트의 속성을 정의하는 객체입니다.

acks(선택 사항)

프로듀서가 요청이 완료되었음을 고려하기 전에 서버가 수신해야 하는 승인 수입니다.

값으로 0을 지정하면 프로듀서는 서버의 승인을 기다리지 않습니다. 서버에서 메시지를 받지 못하면 프로듀서는 메시지를 전송하려고 다시 시도하지 않습니다.

유효한 값: -1, 0, 1, all. 기본값은 1입니다.

bootstrap.servers

Kafka 클러스터에 대한 초기 연결을 설정하는 데 사용되는 호스트 및 포트 페어 목록(예: host1:port1, host2:port2)입니다.

compression.type(선택 사항)

생산자가 생성한 모든 데이터에 대한 압축 유형입니다.

유효한 값:none ,gzip ,snappy ,lz4 .zstd 기본값은 none입니다.

security.protocol

Kafka 브로커에 연결하는 데 사용되는 보안 프로토콜입니다.

유효한 값: SSL, SASL_SSL. 기본값은 SSL입니다.

key.serializer

ProducerRecord와 함께 제공되는 키 객체를 바이트로 변환하는 방법을 지정합니다.

유효한 값: StringSerializer.

value.serializer

ProducerRecord와 함께 제공하는 값 객체를 바이트로 변환하는 방법을 지정합니다.

유효한 값: ByteBufferSerializer.

ssl.truststore

base64 형식의 truststore 파일 또는 AWS Secrets Manager의 truststore 파일 위치입니다. Amazon 인증 기관(CA)에서 사용자의 트러스트 스토어를 신뢰하는 경우에는 이 값이 필요하지 않습니다.

이 필드는 대체 템플릿을 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 자격 증명을 저장하는 경우 get_secret SQL 함수를 사용하여이 필드의 값을 검색할 수 있습니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요. get_secret SQL 함수에 대한 자세한 내용은 섹션을 참조하세요get_secret(secretId, secretType, 키, roleArn). truststore가 파일 형식인 경우 SecretBinary 파라미터를 사용합니다. truststore가 문자열 형식인 경우 SecretString 파라미터를 사용합니다.

이 값의 최대 크기는 65KB입니다.

ssl.truststore.password

truststore의 암호입니다. 이 값은 truststore의 암호를 만든 경우에만 필요합니다.

ssl.keystore

keystore 파일입니다. SSLsecurity.protocol의 값으로 지정할 때 이 값이 필요합니다.

이 필드는 대체 템플릿을 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장합니다. 이 필드의 값을 검색하려면 get_secret SQL 함수를 사용합니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요. get_secret SQL 함수에 대한 자세한 내용은 섹션을 참조하세요get_secret(secretId, secretType, 키, roleArn). SecretBinary 파라미터를 사용합니다.

ssl.keystore.password

keystore 파일의 스토어 암호입니다. ssl.keystore에 값이 지정된 경우 이 값이 필요합니다.

이 필드의 값은 일반 텍스트일 수 있습니다. 이 필드는 대체 템플릿도 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장합니다. 이 필드의 값을 검색하려면 get_secret SQL 함수를 사용합니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요. get_secret SQL 함수에 대한 자세한 내용은 섹션을 참조하세요get_secret(secretId, secretType, 키, roleArn). SecretString 파라미터를 사용합니다.

ssl.key.password

keystore 파일에 있는 프라이빗 키의 암호입니다.

이 필드는 대체 템플릿을 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장합니다. 이 필드의 값을 검색하려면 get_secret SQL 함수를 사용합니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요. get_secret SQL 함수에 대한 자세한 내용은 섹션을 참조하세요get_secret(secretId, secretType, 키, roleArn). SecretString 파라미터를 사용합니다.

sasl.mechanism

Kafka 브로커에 연결하는 데 사용되는 보안 메커니즘입니다. security.protocolSASL_SSL을 지정할 때 이 값이 필요합니다.

유효한 값: PLAIN, SCRAM-SHA-512, GSSAPI.

참고

SCRAM-SHA-512는 cn-north-1, cn-northwest-1, us-gov-east-1 및 us-gov-west-1 리전에서 지원되는 유일한 보안 메커니즘입니다.

sasl.plain.username

Secrets Manager에서 보안 문자열을 검색하는 데 사용되는 사용자 이름입니다. security.protocolSASL_SSL, sasl.mechanismPLAIN을 지정할 때 이 값이 필요합니다.

sasl.plain.password

Secrets Manager에서 보안 문자열을 검색하는 데 사용되는 암호입니다. security.protocolSASL_SSL, sasl.mechanismPLAIN을 지정할 때 이 값이 필요합니다.

sasl.scram.username

Secrets Manager에서 보안 문자열을 검색하는 데 사용되는 사용자 이름입니다. security.protocolSASL_SSL, sasl.mechanismSCRAM-SHA-512을 지정할 때 이 값이 필요합니다.

sasl.scram.password

Secrets Manager에서 보안 문자열을 검색하는 데 사용되는 암호입니다. security.protocolSASL_SSL, sasl.mechanismSCRAM-SHA-512을 지정할 때 이 값이 필요합니다.

sasl.kerberos.keytab

Secrets Manager의 Kerberos 인증을 위한 keytab 파일입니다. security.protocolSASL_SSL, sasl.mechanismGSSAPI을 지정할 때 이 값이 필요합니다.

이 필드는 대체 템플릿을 지원합니다. Secrets Manager를 사용하여 Kafka 브로커에 연결하는 데 필요한 보안 인증 정보를 저장합니다. 이 필드의 값을 검색하려면 get_secret SQL 함수를 사용합니다. 대체 변수에 대한 자세한 내용은 대체 템플릿 단원을 참조하세요. get_secret SQL 함수에 대한 자세한 내용은 섹션을 참조하세요get_secret(secretId, secretType, 키, roleArn). SecretBinary 파라미터를 사용합니다.

sasl.kerberos.service.name

Apache Kafka가 실행되는 Kerberos 보안 주체 이름입니다. security.protocolSASL_SSL, sasl.mechanismGSSAPI을 지정할 때 이 값이 필요합니다.

sasl.kerberos.krb5.kdc

Apache Kafka 생산자 클라이언트가 연결하는 키 배포 센터(KDC)의 호스트 이름입니다. security.protocolSASL_SSL, sasl.mechanismGSSAPI을 지정할 때 이 값이 필요합니다.

sasl.kerberos.krb5.realm

Apache Kafka 프로듀서 클라이언트가 연결되는 영역입니다. security.protocolSASL_SSL, sasl.mechanismGSSAPI을 지정할 때 이 값이 필요합니다.

sasl.kerberos.principal

Kerberos가 Kerberos 인식 서비스에 액세스하기 위해 티켓을 할당할 수 있는 고유한 Kerberos ID입니다. security.protocolSASL_SSL, sasl.mechanismGSSAPI을 지정할 때 이 값이 필요합니다.

예시

다음 JSON 예제에서는 AWS IoT 규칙에서 Apache Kafka 작업을 정의합니다. 다음 예제에서는 Kafka 작업 헤더에서 sourceIp() 인라인 함수를 대체 템플릿으로 전달합니다.

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Kerberos 설정에 대한 중요 참고 사항

  • 키 배포 센터(KDC)는 대상 내의 프라이빗 도메인 이름 시스템(DNS)을 통해 확인할 수 있어야 합니다VPC. 한 가지 가능한 접근 방식은 프라이빗 호스팅 영역에 KDC DNS 항목을 추가하는 것입니다. 이 접근법에 대한 자세한 내용은 프라이빗 호스팅 영역 작업을 참조하세요.

  • 각 에는 DNS 해결이 활성화되어 있어야 VPC 합니다. 자세한 내용은 DNS에서 사용을 참조하세요VPC.

  • VPC 대상의 네트워크 인터페이스 보안 그룹 및 인스턴스 수준 보안 그룹은 다음 포트VPC에서의 트래픽을 허용해야 합니다.

    • TCP 부트스트랩 브로커 리스너 포트의 트래픽(종종 9092이지만 9000~9100 범위 내에 있어야 함)

    • TCP 및 용 포트 88의 UDP 트래픽 KDC

  • SCRAM-SHA-512는 cn-north-1, cn-northwest-1, us-gov-east-1 및 us-gov-west-1 리전에서 지원되는 유일한 보안 메커니즘입니다.