자습서: Amazon MSK 이벤트 소스 매핑을 사용하여 간접적으로 Lambda 함수 간접 호출 - AWS Lambda

자습서: Amazon MSK 이벤트 소스 매핑을 사용하여 간접적으로 Lambda 함수 간접 호출

이 자습서에서는 다음을 수행합니다.

  • 기존 Amazon MSK 클러스터와 동일한 AWS 계정에서 Lambda 함수를 생성합니다.

  • Lambda가 Amazon MSK와 통신할 수 있도록 네트워킹과 인증을 구성합니다.

  • 주제에 이벤트가 표시될 때 Lambda 함수를 실행하는 Lambda Amazon MSK 이벤트 소스 매핑을 설정합니다.

이 단계를 완료한 후 Amazon MSK로 이벤트가 전송되면 사용자 지정 Lambda 코드를 사용하여 해당 이벤트를 자동으로 처리하도록 Lambda 함수를 설정할 수 있습니다.

이 기능으로 무엇을 할 수 있나요?

예제 솔루션: MSK 이벤트 소스 매핑을 사용하여 고객에게 실시간 점수를 제공하세요.

다음 시나리오를 생각해 보세요. 회사에서 고객이 스포츠 경기와 같은 실시간 이벤트에 대한 정보를 볼 수 있는 웹 애플리케이션을 호스팅하고 있습니다. 게임의 정보 업데이트는 Amazon MSK의 Kafka 주제를 통해 팀에 제공됩니다. 개발하는 애플리케이션 내에서 고객에게 라이브 이벤트의 업데이트된 보기를 제공하기 위해 MSK 주제의 업데이트를 사용하는 솔루션을 설계하려고 합니다. 다음과 같은 설계 접근 방식을 결정했습니다. 클라이언트 애플리케이션은 AWS에서 호스팅되는 서버리스 백엔드와 통신합니다. 클라이언트는 Amazon API Gateway WebSocket API를 사용하여 WebSocket 세션을 통해 연결됩니다.

이 솔루션에서는 MSK 이벤트를 읽고 일부 사용자 지정 로직을 수행하여 애플리케이션 계층에 맞게 해당 이벤트를 준비한 다음 해당 정보를 API Gateway API로 전달하는 구성 요소가 필요합니다. Lambda 함수에 사용자 지정 로직을 제공한 다음 AWS Lambda Amazon MSK 이벤트 소스 매핑을 통해 직접적으로 호출하여 AWS Lambda로 이 구성 요소를 구현할 수 있습니다.

Amazon API Gateway WebSocket API를 사용하여 솔루션을 구현하는 방법에 대한 자세한 내용은 API Gateway 설명서의 WebSocket API 자습서를 참조하세요.

사전 조건

다음과 같은 사전 구성된 리소스가 있는 AWS 계정:

이러한 사전 요구 사항을 충족하려면 Amazon MSK 설명서의 Getting started using Amazon MSK를 따르는 것이 좋습니다.

  • Amazon MSK 클러스터. Getting started using Amazon MSKCreate an Amazon MSK cluster를 참조하세요.

  • 다음 구성:

    • 클러스터 보안 설정에서 IAM 역할 기반 인증활성화됨인지 확인합니다. 이렇게 하면 필요한 Amazon MSK 리소스에만 액세스하도록 Lambda 함수를 제한하여 보안이 강화됩니다. 이는 새 Amazon MSK 클러스터에서 기본적으로 활성화됩니다.

    • 클러스터 네트워킹 설정에서 퍼블릭 액세스가 꺼져 있는지 확인합니다. Amazon MSK 클러스터의 인터넷 액세스를 제한하면 데이터를 처리하는 중개자 수가 제한되어 보안이 강화됩니다. 이는 새 Amazon MSK 클러스터에서 기본적으로 활성화됩니다.

  • 이 솔루션에 사용할 Amazon MSK 클러스터의 Kafka 주제. Getting started using Amazon MSK의 Create a topic을 참조하세요.

  • Kafka 클러스터에서 정보를 검색하고 테스트를 위해 Kafka 이벤트를 주제에 전송하도록 설정된 Kafka 관리자 호스트(예: Kafka 관리 CLI와 Amazon MSK IAM 라이브러리가 설치된 Amazon EC2 인스턴스). Getting started using Amazon MSK의 Create a client machine을 참조하세요.

이러한 리소스를 설정한 후에는 AWS 계정에서 다음 정보를 수집하여 계속할 준비가 되었는지 확인하세요.

  • Amazon MSK 클러스터의 이름. 이 정보는 Amazon MSK 콘솔에서 확인할 수 있습니다.

  • Amazon MSK 클러스터용 ARN의 일부인 클러스터 UUID(Amazon MSK 콘솔에서 찾을 수 있음). 이 정보를 찾으려면 Amazon MSK 설명서의 Listing clusters에 나와 있는 절차를 따르세요.

  • Amazon MSK 클러스터와 연결된 보안 그룹. 이 정보는 Amazon MSK 콘솔에서 확인할 수 있습니다. 다음 단계에서는 이를 clusterSecurityGroups라고 합니다.

  • Amazon MSK 클러스터를 포함하는 Amazon VPC의 ID. Amazon MSK 콘솔에서 Amazon MSK 클러스터와 연결된 서브넷을 식별한 다음 Amazon VPC 콘솔에서 해당 서브넷과 연결된 Amazon VPC를 식별하여 이 정보를 찾을 수 있습니다.

  • 솔루션에 사용되는 Kafka 주제의 이름. Kafka 관리 호스트에서 Kafka topics CLI로 Amazon MSK 클러스터를 직접적으로 호출하여 이 정보를 찾을 수 있습니다. 주제 CLI에 대한 자세한 내용은 Kafka 설명서의 Adding and removing topics를 참조하세요.

  • Lambda 함수에서 사용하기에 적합한 Kafka 주제에 대한 소비자 그룹의 이름. 이 그룹은 Lambda에서 자동으로 생성할 수 있으므로 Kafka CLI를 사용하여 생성할 필요가 없습니다. 소비자 그룹을 관리해야 하는 경우 소비자 그룹 CLI에 대한 자세한 내용은 Kafka 설명서의 Managing Consumer Groups를 참조하세요.

AWS 계정의 다음 권한:

  • Lambda 함수를 생성하고 관리할 수 있는 권한

  • IAM 정책을 생성하고 Lambda 함수와 연결할 수 있는 권한

  • Amazon MSK 클러스터를 호스팅하는 Amazon VPC에서 Amazon VPC 엔드포인트를 생성하고 네트워킹 구성을 변경할 수 있는 권한

Lambda가 Amazon MSK와 통신할 수 있도록 네트워크 연결 구성

AWS PrivateLink를 사용하여 Lambda와 Amazon MSK를 연결합니다. 이를 위해 Amazon VPC 콘솔에서 인터페이스 Amazon VPC 엔드포인트를 생성할 수 있습니다. 네트워크 구성에 대한 자세한 내용은 네트워크 보안 구성 섹션을 참조하세요.

Amazon MSK 이벤트 소스 매핑이 Lambda 함수를 대신하여 실행되는 경우 Lambda 함수의 실행 역할을 수임합니다. 이 IAM 역할은 Amazon MSK 클러스터와 같이 IAM으로 보호되는 리소스에 액세스할 수 있도록 매핑 권한을 부여합니다. 구성 요소는 실행 역할을 공유하지만 다음 다이어그램과 같이 Amazon MSK 매핑과 Lambda 함수에는 해당 작업에 대한 별도의 연결 요구 사항이 있습니다.

Lambda 함수는 클러스터를 폴링하며, AWS STS를 사용하여 Lambda와 통신합니다.

이벤트 소스 매핑은 Amazon MSK 클러스터 보안 그룹에 속합니다. 이 네트워킹 단계에서는 Amazon MSK 클러스터 VPC에서 Amazon VPC 엔드포인트를 생성하여 Lambda 및 STS 서비스에 이벤트 소스 매핑을 연결합니다. 이러한 엔드포인트를 보호하여 Amazon MSK 클러스터 보안 그룹의 트래픽을 허용하세요. 그런 다음 Amazon MSK 클러스터 보안 그룹을 조정하여 이벤트 소스 매핑이 Amazon MSK 클러스터와 통신할 수 있도록 합니다.

AWS Management Console을 사용하여 다음 단계를 구성할 수 있습니다.

Lambda와 Amazon MSK를 연결하도록 인터페이스 Amazon VPC 엔드포인트를 구성하려면 다음을 수행하세요.
  1. 443에서 clusterSecurityGroups의 인바운드 TCP 트래픽을 허용하는 인터페이스 Amazon VPC 엔드포인트에 대한 보안 그룹인 endpointSecurityGroup을 생성합니다. Amazon EC2 설명서의 보안 그룹 생성에 나와 있는 절차에 따라 보안 그룹을 생성합니다. 그런 다음 Amazon EC2 설명서의 보안 그룹에 규칙 추가 절차에 따라 적절한 규칙을 추가합니다.

    다음 정보로 보안 그룹 생성:

    인바운드 규칙을 추가할 때 clusterSecurityGroups에서 각 보안 그룹에 대한 규칙을 생성합니다. 각 규칙에 대해 다음을 수행합니다.

    • 타입에 대해 HTTPS를 선택합니다.

    • 소스clusterSecurityGroups 중 하나를 선택합니다.

  2. Amazon MSK 클러스터가 포함된 Amazon VPC에 Lambda 서비스를 연결하는 엔드포인트를 생성합니다. 인터페이스 엔드포인트 생성 절차를 따릅니다.

    다음 정보로 인터페이스 엔드포인트 생성:

    • 서비스 이름으로 com.amazonaws.regionName.lambda를 선택합니다. 여기서 regionName은 Lambda 함수를 호스팅합니다.

    • VPC로 Amazon MSK 클러스터가 포함된 Amazon VPC를 선택합니다.

    • 보안 그룹으로 앞에서 생성한 endpointSecurityGroup을 선택합니다.

    • 서브넷으로 Amazon MSK 클러스터를 호스팅하는 서브넷을 선택합니다.

    • 정책으로 Lambda 서비스 주체가 lambda:InvokeFunction 작업에 사용할 엔드포인트를 보호하는 다음 정책 문서를 제공합니다.

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • DNS 이름 활성화가 설정된 상태로 유지되는지 확인합니다.

  3. Amazon MSK 클러스터가 포함된 Amazon VPC에 AWS STS 서비스를 연결하는 엔드포인트를 생성합니다. 인터페이스 엔드포인트 생성 절차를 따릅니다.

    다음 정보로 인터페이스 엔드포인트 생성:

    • 서비스 이름에서 AWS STS를 선택합니다.

    • VPC로 Amazon MSK 클러스터가 포함된 Amazon VPC를 선택합니다.

    • 보안 그룹으로 endpointSecurityGroup을 선택합니다.

    • 서브넷으로 Amazon MSK 클러스터를 호스팅하는 서브넷을 선택합니다.

    • 정책으로 Lambda 서비스 주체가 sts:AssumeRole 작업에 사용할 엔드포인트를 보호하는 다음 정책 문서를 제공합니다.

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • DNS 이름 활성화가 설정된 상태로 유지되는지 확인합니다.

  4. Amazon MSK 클러스터와 연결된 각 보안 그룹, 즉 clusterSecurityGroups에 대해 다음을 허용합니다.

    • 9098의 모든 인바운드 및 아웃바운드 TCP 트래픽을 자체 내부를 포함하여 모든 clusterSecurityGroups에 허용합니다.

    • 443에서 모든 아웃바운드 TCP 트래픽을 허용합니다.

    이 트래픽 중 일부는 기본 보안 그룹 규칙에 의해 허용되므로 클러스터가 단일 보안 그룹에 연결되어 있고 해당 그룹에 기본 규칙이 있는 경우 추가 규칙이 필요하지 않습니다. 보안 그룹 규칙을 조정하려면 Amazon EC2 설명서의 보안 그룹에 규칙 추가 절차를 따릅니다.

    다음 정보로 보안 그룹에 규칙 추가

    • 포트 9098에 대한 각 인바운드 규칙 또는 아웃바운드 규칙에 대해 다음을 제공합니다.

      • 유형에서 Custom TCP(사용자 지정 TCP)를 선택합니다.

      • 포트 범위에 9098을 입력합니다.

      • 소스clusterSecurityGroups 중 하나를 제공합니다.

    • 포트 443에 대한 각 인바운드 규칙에 대해 유형으로 HTTPS를 선택합니다.

Lambda가 Amazon MSK 주제에서 읽을 수 있도록 IAM 역할 생성

Lambda가 Amazon MSK 주제에서 읽기 위한 인증 요구 사항을 파악한 다음 정책에서 정의합니다. Lambda가 이러한 권한을 사용할 수 있도록 승인하는 역할인 lambdaAuthRole을 생성합니다. kafka-cluster IAM 작업을 사용하여 Amazon MSK 클러스터에서 작업 권한을 부여합니다. 그런 다음, Lambda가 Amazon MSK 클러스터를 검색하고 연결하는 데 필요한 Amazon MSK kafka 및 Amazon EC2 작업을 수행하도록 권한을 부여하고, Lambda가 수행한 작업을 기록할 수 있도록 CloudWatch 작업을 수행하도록 권한을 부여합니다.

Lambda가 Amazon MSK에서 읽기 위한 인증 요구 사항을 설명하려면 다음을 수행하세요.
  1. Lambda가 Kafka 소비자 그룹을 사용하여 Amazon MSK 클러스터의 Kafka 주제에서 읽을 수 있도록 허용하는 IAM 정책 문서(JSON 문서)인 clusterAuthPolicy를 작성합니다. Lambda는 읽을 때 Kafka 소비자 그룹을 설정해야 합니다.

    사전 요구 사항에 맞게 다음 템플릿 변경

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    자세한 내용은 IAM 역할 기반 인증을 참조하세요. 정책을 작성할 때

    • regionaccount-id로 Amazon MSK 클러스터를 호스팅하는 항목을 제공합니다.

    • mskClusterName으로 Amazon MSK 클러스터의 이름을 제공합니다.

    • cluster-uuid로 Amazon MSK 클러스터의 ARN에 UUID를 제공합니다.

    • mskTopicName으로 Kafka 주제의 이름을 제공합니다.

    • mskGroupName으로 Kafka 소비자 그룹의 이름을 제공합니다.

  2. Lambda가 Amazon MSK 클러스터를 검색 및 연결하고 해당 이벤트를 기록하는 데 필요한 Amazon MSK, Amazon EC2 및 CloudWatch 권한을 식별합니다.

    AWSLambdaMSKExecutionRole 관리형 정책은 필요한 권한을 허용적으로 정의합니다. 다음 단계에서 사용합니다.

    프로덕션 환경에서는 AWSLambdaMSKExecutionRole을 평가하여 최소 권한 원칙에 따라 실행 역할 정책을 제한한 다음 이 관리형 정책을 대체하는 역할에 대한 정책을 작성합니다.

IAM 정책 언어에 대한 자세한 내용은 IAM 설명서를 참조하세요.

정책 문서를 작성했으니 이제 IAM 정책을 생성하여 역할에 연결할 수 있습니다. 다음 절차에 따라 콘솔을 사용하여 이 작업을 수행할 수 있습니다.

정책 문서에서 IAM 정책을 생성하려면 다음을 수행하세요.
  1. AWS Management Console에 로그인하여 https://console.aws.amazon.com/iam/ 에서 IAM 콘솔을 엽니다.

  2. 왼쪽의 탐색 창에서 정책을 선택합니다.

  3. 정책 생성을 선택합니다.

  4. 정책 편집기 섹션에서 JSON 옵션을 선택합니다.

  5. clusterAuthPolicy를 붙여넣습니다.

  6. 정책에 권한 추가를 완료했으면 다음을 선택합니다.

  7. 검토 및 생성 페이지에서 생성하는 정책에 대한 정책 이름설명(선택 사항)을 입력합니다. 이 정책에 정의된 권한을 검토하여 정책이 부여한 권한을 확인합니다.

  8. 정책 생성을 선택하고 새로운 정책을 저장합니다.

자세한 내용은 IAM 설명서의 IAM 정책 생성을 참조하세요.

이제 적절한 IAM 정책이 있으므로 역할을 생성하고 이에 IAM 정책을 연결합니다. 다음 절차에 따라 콘솔을 사용하여 이 작업을 수행할 수 있습니다.

IAM 콘솔에서 실행 역할을 생성하려면
  1. IAM 콘솔에서 역할(Roles) 페이지를 엽니다.

  2. 역할 생성을 선택합니다.

  3. 신뢰할 수 있는 엔터티 유형에서 AWS 서비스를 선택합니다.

  4. 사용 사례에서 Lambda를 선택합니다.

  5. Next(다음)를 선택합니다.

  6. 다음 정책을 선택합니다.

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. Next(다음)를 선택합니다.

  8. 역할 이름lambdaAuthRole을 입력한 다음 역할 생성을 선택합니다.

자세한 내용은 실행 역할로 Lambda 함수 권한 정의 단원을 참조하십시오.

Amazon MSK 주제에서 읽을 Lambda 함수 생성

IAM 역할을 사용하도록 구성된 Lambda 함수를 생성합니다. 콘솔을 사용하여 Lambda 함수를 생성할 수 있습니다.

인증 구성을 사용하여 Lambda 함수를 생성하려면 다음을 수행하세요.
  1. Lambda 콘솔을 열고 헤더에서 함수 생성을 선택합니다.

  2. 새로 작성을 선택합니다.

  3. 함수 이름으로 원하는 적절한 이름을 제공합니다.

  4. 런타임으로 이 자습서에 제공된 코드를 사용하려면 지원되는 최신 버전의 Node.js를 선택합니다.

  5. 기본 실행 역할 변경을 선택합니다.

  6. 기존 역할 사용을 선택합니다.

  7. 기존 역할lambdaAuthRole을 선택합니다.

프로덕션 환경에서는 일반적으로 Amazon MSK 이벤트를 의미 있게 처리하기 위해 Lambda 함수의 실행 역할에 추가 정책을 추가해야 합니다. 역할에 정책을 추가하는 방법에 대한 자세한 내용은 IAM 설명서의 ID 권한 추가 또는 제거를 참조하세요.

Lambda 함수에 대한 이벤트 소스 매핑 생성

Amazon MSK 이벤트 소스 매핑은 적절한 Amazon MSK 이벤트가 발생할 때 Lambda를 간접적으로 간접 호출하는 데 필요한 정보를 Lambda 서비스에 제공합니다. 콘솔을 사용하여 Amazon MSK 매핑을 생성할 수 있습니다. Lambda 트리거를 생성하면 이벤트 소스 매핑이 자동으로 설정됩니다.

Lambda 트리거 및 이벤트 소스 매핑을 생성하려면 다음을 수행하세요.
  1. Lambda 함수의 개요 페이지로 이동합니다.

  2. 함수 개요 섹션의 왼쪽 하단에서 트리거 추가를 선택합니다.

  3. 소스 선택 드롭다운에서 Amazon MSK를 선택합니다.

  4. 인증을 설정하지 마세요.

  5. MSK 클러스터로 클러스터의 이름을 선택합니다.

  6. 배치 크기로 1을 입력합니다. 이 단계를 수행하면 이 기능을 더 쉽게 테스트할 수 있지만 프로덕션에서는 이상적인 값이 아닙니다.

  7. 주제 이름으로 Kafka 주제의 이름을 입력합니다.

  8. 소비자 그룹 ID로 Kafka 소비자 그룹의 ID를 제공합니다.

스트리밍 데이터를 읽도록 Lambda 함수 업데이트

Lambda는 이벤트 메서드 파라미터를 통해 Kafka 이벤트에 대한 정보를 제공합니다. Amazon MSK 이벤트의 예제 구조는 예제 이벤트 섹션을 참조하세요. Lambda가 전달한 Amazon MSK 이벤트를 해석하는 방법을 이해한 후에는 제공된 정보를 사용하도록 Lambda 함수 코드를 변경할 수 있습니다.

테스트 목적으로 Lambda Amazon MSK 이벤트의 내용을 기록하려면 Lambda 함수에 다음 코드를 제공하세요.

.NET
AWS SDK for .NET
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

.NET을 사용하여 Lambda로 Amazon MSK 이벤트 사용

using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
Go
SDK for Go V2
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Go를 사용하여 Lambda로 Amazon MSK 이벤트 사용

package main import ( "encoding/base64" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(event events.KafkaEvent) { for key, records := range event.Records { fmt.Println("Key:", key) for _, record := range records { fmt.Println("Record:", record) decodedValue, _ := base64.StdEncoding.DecodeString(record.Value) message := string(decodedValue) fmt.Println("Message:", message) } } } func main() { lambda.Start(handler) }
Java
SDK for Java 2.x
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Java를 사용하여 Lambda로 Amazon MSK 이벤트를 사용합니다.

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK for JavaScript (v3)
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

JavaScript를 사용하여 Lambda로 Amazon MSK 이벤트를 사용합니다.

exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
PHP
SDK for PHP
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

PHP를 사용하여 Lambda로 Amazon MSK 이벤트 사용

<?php // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kafka\KafkaEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): void { $kafkaEvent = new KafkaEvent($event); $this->logger->info("Processing records"); $records = $kafkaEvent->getRecords(); foreach ($records as $record) { try { $key = $record->getKey(); $this->logger->info("Key: $key"); $values = $record->getValue(); $this->logger->info(json_encode($values)); foreach ($values as $value) { $this->logger->info("Value: $value"); } } catch (Exception $e) { $this->logger->error($e->getMessage()); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Python을 사용하여 Lambda로 Amazon MSK 이벤트 사용

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)
Ruby
SDK for Ruby
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Ruby를 사용하여 Lambda로 Amazon MSK 이벤트 사용

require 'base64' def lambda_handler(event:, context:) # Iterate through keys event['records'].each do |key, records| puts "Key: #{key}" # Iterate through records records.each do |record| puts "Record: #{record}" # Decode base64 msg = Base64.decode64(record['value']) puts "Message: #{msg}" end end end

콘솔을 사용하여 Lambda에 함수 코드를 제공할 수 있습니다.

콘솔 코드 편집기를 사용하여 함수 코드 업데이트
  1. Lambda 콘솔의 함수 페이지를 열고 함수를 선택합니다.

  2. 코드 탭을 선택합니다.

  3. 코드 소스 창에서 소스 코드 파일을 선택하고 통합 코드 편집기에서 편집합니다.

  4. 배포 섹션에서 배포를 선택하여 함수의 코드를 업데이트하세요.

    Lambda 콘솔 코드 편집기에서의 배포 버튼

Lambda 함수를 테스트하여 Amazon MSK 주제에 연결되어 있는지 확인합니다.

이제 CloudWatch 이벤트 로그를 검사하여 Lambda가 이벤트 소스에 의해 간접적으로 간접 호출되고 있는지 여부를 확인할 수 있습니다.

Lambda 함수가 간접적으로 간접 호출되고 있는지 확인하려면 다음을 수행하세요.
  1. Kafka 관리 호스트를 사용하여 kafka-console-producer CLI를 통해 Kafka 이벤트를 생성합니다. 자세한 내용은 Kafka 설명서의 Write some events into the topic을 참조하세요. 이전 단계에서 정의된 이벤트 소스 매핑에 대해 배치 크기로 정의된 배치를 채우기에 충분한 이벤트를 전송합니다. 그렇지 않으면 Lambda가 추가 정보가 간접적으로 간접 호출될 때까지 기다립니다.

  2. 함수가 실행되면 Lambda는 CloudWatch에 발생한 일을 기록합니다. 콘솔에서 Lambda 함수의 세부 정보 페이지로 이동합니다.

  3. [구성(Configuration)] 탭을 선택합니다.

  4. 사이드바에서 모니터링 및 운영 도구를 선택합니다.

  5. 로깅 구성에서 CloudWatch 로그 그룹을 식별합니다. 로그 그룹은 /aws/lambda로 시작해야 합니다. 로그 그룹의 링크를 선택합니다.

  6. CloudWatch 콘솔의 로그 이벤트에서 Lambda가 로그 스트림으로 전송한 로그 이벤트가 있는지 검사합니다. 다음 이미지와 같이 Kafka 이벤트의 메시지가 포함된 로그 이벤트가 있는지 확인합니다. 로그 이벤트가 있으면 Lambda 이벤트 소스 매핑을 사용하여 Lambda 함수를 Amazon MSK에 성공적으로 연결한 것입니다.

    제공된 코드에서 추출한 이벤트 정보를 보여주는 CloudWatch의 로그 이벤트.