Lambda에서 자체 관리형 Apache Kafka 메시지 처리 - AWS Lambda

Lambda에서 자체 관리형 Apache Kafka 메시지 처리

참고

Lambda 함수 이외의 대상으로 데이터를 전송하거나 데이터를 전송하기 전에 데이터를 보강하려는 경우 Amazon EventBridge 파이프를 참조하세요.

Kafka 클러스터를 이벤트 소스로 추가

이벤트 소스 매핑을 생성하려면 Lambda 콘솔, AWS SDK 또는 AWS Command Line Interface(AWS CLI)를 사용해 Kafka를 Lambda 함수 트리거로 추가합니다.

이 단원에서는 Lambda 콘솔 및 AWS CLI를 사용해 이벤트 소스 매핑을 생성하는 방법을 설명합니다.

사전 조건

  • 자체 관리형 Apache Kafka 클러스터. Lambda는 Apache Kafka 버전 0.10.1.0 이상을 지원합니다.

  • 자체 관리형 Kafka 클러스터에서 사용하는 AWS 리소스에 액세스할 수 있는 권한이 있는 실행 역할입니다.

사용자 지정이 가능한 소비자 그룹 ID

Kafka를 이벤트 소스로 설정할 때 소비자 그룹 ID를 지정할 수 있습니다. 이 소비자 그룹 ID는 Lambda 함수에 가입하려는 Kafka 소비자 그룹의 기존 식별자입니다. 이 기능을 사용하여 진행 중인 모든 Kafka 레코드 처리 설정을 다른 소비자에서 Lambda로 원활하게 마이그레이션할 수 있습니다.

소비자 그룹 ID를 지정하고 해당 소비자 그룹 내에 다른 활성 폴러가 있는 경우 Kafka는 모든 소비자에게 메시지를 배포합니다. 즉, Lambda는 Kafka 주제에 대한 모든 메시지를 수신하지는 않습니다. Lambda가 주제에 있는 모든 메시지를 처리하도록 하려면 해당 소비자 그룹의 다른 모든 폴러를 끄십시오.

또한 소비자 그룹 ID를 지정했는데 Kafka가 동일한 ID를 가진 유효한 기존 소비자 그룹을 찾으면 Lambda는 이벤트 소스 매핑을 위한 StartingPosition 파라미터를 무시합니다. 대신 Lambda는 소비자 그룹의 커밋된 오프셋에 따라 레코드 처리를 시작합니다. 소비자 그룹 ID를 지정했는데 Kafka가 기존 소비자 그룹을 찾을 수 없는 경우, Lambda는 StartingPosition에서 지정된 대로 이벤트 소스를 구성합니다.

지정하는 소비자 그룹 ID는 모든 Kafka 이벤트 소스 중에서 고유해야 합니다. 지정된 소비자 그룹 ID로 Kafka 이벤트 소스 매핑을 생성한 후에는 이 값을 업데이트할 수 없습니다.

자체 관리형 Kafka 클러스터 추가(콘솔)

다음 단계에 따라 자체 관리형 Apache Kafka 클러스터 및 Kafka 주제를 Lambda 함수의 트리거로 추가합니다.

Lambda 함수에 Apache Kafka 트리거를 추가하려면(콘솔)
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. Lambda 함수의 이름을 선택합니다.

  3. 함수 개요(Function overview)에서 트리거 추가(Add trigger)를 선택합니다.

  4. 트리거 구성에서 다음을 수행합니다.

    1. Apache Kafka 트리거 유형을 선택합니다.

    2. Bootstrap 서버에는 클러스터의 Kafka 브로커 호스트 및 포트 페어 주소를 입력한 다음 추가를 선택합니다. 클러스터의 각 Kafka 브로커에 대해 이를 반복합니다.

    3. 주제 이름에는 클러스터에 레코드를 저장하는 데 사용되는 Kafka 주제의 이름을 입력합니다.

    4. (선택 사항) 배치 크기(Batch size)에 단일 배치에서 검색할 최대 레코드 수를 입력합니다.

    5. Batch window에서 Lambda가 함수를 호출하기 전에 레코드를 수집하는 데 걸리는 최대 시간(초)을 입력합니다.

    6. (선택 사항) Consumer group ID에서 가입할 Kafka 소비자 그룹의 ID를 입력합니다.

    7. (선택 사항) 시작 위치의 경우 최신 레코드에서 스트림 읽기를 시작하려면 최신을 선택하고, 사용 가능한 가장 빠른 레코드에서 시작하려면 수평 트리밍을 선택하고, 읽기를 시작할 타임스탬프를 지정하려면 타임스탬프를 선택합니다.

    8. (선택 사항) VPC에서 Kafka 클러스터용 Amazon VPC를 선택합니다. 그런 다음 VPC 서브넷(VPC subnets)VPC 보안 그룹(VPC security groups)을 선택합니다.

      VPC 내의 사용자만 브로커에 액세스하는 경우 이 설정이 필요합니다.

    9. (선택 사항) 인증(Authentication)에서 추가(Add)를 선택한 후 다음을 수행합니다.

      1. 클러스터에 있는 Kafka 브로커의 액세스 또는 인증 프로토콜을 선택합니다.

        • Kafka 브로커가 SASL/PLAIN 인증을 사용하는 경우 BASIC_AUTH를 선택합니다.

        • 브로커가 SASL/SCRAM 인증을 사용하는 경우 SASL_SCRAM 프로토콜 중 하나를 선택합니다.

        • mTLS 인증을 구성하는 경우 CLIENT_CERTIFICATE_TLS_AUTH 프로토콜을 선택합니다.

      2. SASL/SCRAM 또는 mTLS 인증의 경우 Kafka 클러스터에 대한 자격 증명이 포함된 Secrets Manager 비밀 키를 선택합니다.

    10. (선택 사항) 암호화(Encryption)에서 Kafka 브로커가 프라이빗 CA에서 서명한 인증서를 사용하는 경우 Kafka 브로커가 TLS 암호화에 사용하는 루트 CA 인증서가 포함된 Secrets Manager 암호를 선택합니다.

      이 설정은 SASL/SCRAM 또는 SASL/PLANE에 대한 TLS 암호화 및 mTLS 인증에 적용됩니다.

    11. 테스트를 위해 트리거를 비활성화된 상태에서 생성하려면(권장됨) 트리거 사용을 선택 해제합니다. 또는 트리거를 즉시 사용하려면 트리거 사용을 선택합니다.

  5. 트리거를 생성하려면 추가를 선택합니다.

자체 관리형 Kafka 클러스터 추가(AWS CLI)

다음 예제 AWS CLI 명령을 사용하여 Lambda 함수에 대한 자체 관리형 Apache Kafka 트리거를 생성하고 확인합니다.

SASL/SCRAM 사용

Kafka 사용자가 인터넷을 통해 Kafka 브로커에 액세스한다면 SASL/SCRAM 인증용으로 생성한 Secrets Manager 비밀 정보를 지정합니다. 다음 예제에서는 create-event-source-mapping AWS CLI 명령을 사용하여 my-kafka-function이라는 Lambda 함수를 AWSKafkaTopic이라는 Kafka 주제에 매핑합니다.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

VPC 사용

VPC 내의 Kafka 사용자만 Kafka 브로커에 액세스한다면 VPC, 서브넷 및 VPC 보안 그룹을 지정해야 합니다. 다음 예제에서는 create-event-source-mapping AWS CLI 명령을 사용하여 my-kafka-function이라는 Lambda 함수를 AWSKafkaTopic이라는 Kafka 주제에 매핑합니다.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

AWS CLI를 사용하여 상태 확인

다음 예제에서는 get-event-source-mapping AWS CLI 명령을 사용하여 생성한 이벤트 소스 매핑의 상태를 설명합니다.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

자체 관리형 Apache Kafka 구성 파라미터

모든 Lambda 이벤트 소스 유형은 동일한 CreateEventSourceMappingUpdateEventSourceMapping API 작업을 공유합니다. 그러나 파라마터 중 일부는 Apache Kafka에 적용됩니다.

파라미터 필수 기본값 참고

BatchSize

N

100

최대값: 10,000

활성

N

활성화됨

없음

FunctionName

Y

N/A

없음

FilterCriteria

N

N/A

Lambda가 함수로 보내는 이벤트에 대한 제어

MaximumBatchingWindowInSeconds

N

500ms

일괄 처리 동작

SelfManagedEventSource

Y

N/A

Kafka 브로커의 목록. 생성 시에만 설정할 수 있음

SelfManagedKafkaEventSourceConfig

N

기본적으로 고유한 값으로 설정되는 소비자 그룹 ID 필드를 포함합니다.

생성 시에만 설정할 수 있음

SourceAccessConfigurations

N

자격 증명 없음

클러스터에 대한 VPC 정보 또는 인증 자격 증명

SASL_PLAIN의 경우 BASIC_AUTH로 설정

StartingPosition

Y

N/A

AT_TIMESTAMP, TRIM_HORIZON, 또는 LATEST

생성 시에만 설정할 수 있음

StartingPositionTimestamp

N

N/A

StartingPosition이 AT_TIMESTAMP로 설정된 경우에만 필요합니다.

주제

Y

N/A

주제 이름

생성 시에만 설정할 수 있음

Kafka 클러스터를 이벤트 소스로 사용

Apache Kafka 또는 Amazon MSK 클러스터를 Lambda 함수의 트리거로 추가하면 해당 클러스터가 이벤트 소스로 사용됩니다.

Lambda는 사용자가 지정한 StartingPosition을 기반으로 CreateEventSourceMapping 요청에서 Topics로 지정한 Kafka 주제에서 이벤트 데이터를 읽습니다. 성공적인 처리 후, Kafka 토픽은 Kafka 클러스터에 커밋됩니다.

StartingPositionLATEST로 지정하면 Lambda는 주제에 속한 각 파티션의 최신 메시지에서 읽기를 시작합니다. 트리거 구성 후 Lambda가 메시지를 읽기 시작하기까지 약간의 지연이 발생할 수 있으므로 Lambda는 이 기간 중에 생성된 메시지를 읽지 않습니다.

Lambda는 지정한 하나 이상의 Kafka 주제 파티션의 레코드를 처리하고 JSON 페이로드를 함수로 보냅니다. 사용 가능한 레코드가 더 있는 경우 Lambda는 함수가 주제를 따라잡을 때까지 CreateEventSourceMapping 요청에서 지정한 BatchSize 값을 기반으로 배치로 레코드를 계속 처리합니다.

함수가 배치의 어떤 메시지에 대해 오류를 반환하면 Lambda는 처리가 성공하거나 메시지가 만료될 때까지 전체 메시지 배치를 다시 시도합니다. 모든 재시도에 실패한 레코드를 실패 시 대상으로 전송하여 나중에 처리하도록 할 수 있습니다.

참고

Lambda 함수의 최대 제한 시간은 일반적으로 15분이지만 Amazon MSK, 자체 관리형 Apache Kafka, Amazon DocumentDB, ActiveMQ 및 RabbitMQ용 Amazon MQ에 대한 이벤트 소스 매핑은 최대 제한 시간이 14분인 함수만 지원합니다. 이 제약 조건에 따라 이벤트 소스 매핑에서 함수 오류 및 재시도를 적절히 처리할 수 있습니다.

폴링 및 스트리밍 시작 위치

이벤트 소스 매핑 생성 및 업데이트 중 스트림 폴링은 최종적으로 일관됩니다.

  • 이벤트 소스 매핑 생성 중 스트림에서 이벤트 폴링을 시작하는 데 몇 분 정도 걸릴 수 있습니다.

  • 이벤트 소스 매핑 업데이트 중 스트림에서 이벤트 폴링을 중지했다가 다시 시작하는 데 몇 분 정도 걸릴 수 있습니다.

이 동작은 스트림의 시작 위치로 LATEST를 지정하면 이벤트 소스 매핑이 생성 또는 업데이트 중에 이벤트를 놓칠 수 있음을 의미합니다. 누락된 이벤트가 없도록 하기 위해서는 스트림 시작 위치를 TRIM_HORIZON 또는 AT_TIMESTAMP로 지정하세요.

Kafka 이벤트 소스의 Auto Scaling

Apache Kafka 이벤트 소스를 처음 생성하면 Lambda는 한 명의 소비자를 할당하여 Kafka 주제의 모든 파티션을 처리합니다. 각 소비자는 증가한 워크로드를 처리하기 위해 여러 프로세서를 병렬로 실행합니다. 그뿐 아니라 Lambda는 워크로드에 따라 소비자 수를 자동으로 늘리거나 줄입니다. 각 파티션에서 메시지 순서를 유지하기 위해 최대 소비자 수는 주제의 파티션당 하나의 소비자입니다.

Lambda는 1분 간격으로 주제의 모든 파티션의 소비자 오프셋 지연을 평가합니다. 지연이 너무 높으면 파티션은 Lambda가 메시지를 처리할 수 있는 것보다 더 빠르게 메시지를 수신합니다. 필요에 따라 Lambda는 주제에 소비자를 추가하거나 제거합니다. 소비자를 추가 또는 제거하는 크기 조정 프로세스는 평가 후 3분 이내에 진행됩니다.

대상 Lambda 함수가 오버로드되면 Lambda는 소비자 수를 줄입니다. 이 동작은 소비자가 검색하고 함수에 보낼 수 있는 메시지 수를 줄임으로써 함수의 워크로드를 줄입니다.

Kafka 주제의 처리량을 모니터링하려면 consumer_lagconsumer_offset 같은 Apache Kafka 소비자 지표를 확인합니다. 얼마나 많은 함수 호출이 병렬로 발생하는지 확인하기 위해 함수에 대한 동시성 지표를 확인할 수도 있습니다.

Amazon CloudWatch 지표

Lambda는 함수가 레코드를 처리하는 동안 OffsetLag 지표를 내보냅니다. 이 지표의 값은 Kafka 이벤트 소스 주제에 작성된 마지막 레코드와 함수의 소비자 그룹에서 처리한 마지막 레코드 간의 오프셋 차이입니다. 레코드가 추가되는 시점과 소비자 그룹이 이를 처리하는 시점 사이의 지연 시간을 추정하는 데 OffsetLag를 사용할 수 있습니다.

OffsetLag의 증가 추세는 함수의 소비자 그룹 폴러에 문제가 있음을 나타낼 수 있습니다. 자세한 내용은 Lambda 함수에 대한 지표 보기 단원을 참조하십시오.