

# Amazon MSK에서 Lambda 사용
<a name="with-msk"></a>

[Amazon Managed Streaming for Apache Kafka(Amazon MSK)](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)는 Apache Kafka를 사용하여 스트리밍 데이터를 처리하는 애플리케이션의 구축 및 실행을 위해 사용할 수 있는 완전관리형 서비스입니다. Amazon MSK는 Kafka 클러스터의 설정, 스케일링, 관리를 간소화합니다. 또한 Amazon MSK는 여러 Availability Zones에 맞게, 그리고 AWS Identity and Access Management(IAM)를 통한 보안을 위해 애플리케이션을 쉽게 구성할 수 있도록 도와줍니다.

이 장에서는 Amazon MSK 클러스터를 Lambda 함수의 이벤트 소스로 사용하는 방법을 설명합니다. Amazon MSK를 Lambda와 통합하는 일반적인 프로세스에는 다음 단계가 포함됩니다.

1. **[클러스터 및 네트워크 설정](with-msk-cluster-network.md)** - 먼저 [Amazon MSK 클러스터](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)를 설정합니다. 여기에는 Lambda가 클러스터에 액세스할 수 있도록 올바른 네트워킹 구성이 포함됩니다.

1. **[이벤트 소스 매핑 설정](with-msk-configure.md)** - 그런 다음 Lambda가 Amazon MSK 클러스터를 함수에 안전하게 연결하는 데 필요한 [이벤트 소스 매핑](invocation-eventsourcemapping.md) 리소스를 생성합니다.

1. **[함수 및 권한 설정](with-msk-permissions.md)** - 마지막으로 함수가 올바르게 설정되었고 [실행 역할](lambda-intro-execution-role.md)에 필요한 권한이 있는지 확인합니다.

**참고**  
이제 Lambda 또는 Amazon MSK 콘솔에서 직접 Amazon MSK 이벤트 소스 매핑을 생성 및 관리할 수 있습니다. 두 콘솔 모두 보다 간편한 구성 프로세스를 위해 필요한 Lambda 실행 역할 권한 설정을 자동으로 처리하는 옵션을 제공합니다.

Amazon MSK 클러스터와 Lambda 통합을 설정하는 방법의 예는 [자습서: Amazon MSK 이벤트 소스 매핑을 사용하여 간접적으로 Lambda 함수 간접 호출](services-msk-tutorial.md), AWS Compute Blog의 [Using Amazon MSK as an event source for AWS Lambda](https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/) 및 Amazon MSK Labs의 [ Amazon MSK Lambda Integration](https://amazonmsk-labs.workshop.aws/en/msklambda.html)을 참조하세요.

**Topics**
+ [예제 이벤트](#msk-sample-event)
+ [Lambda를 위한 Amazon MSK 클러스터 및 Amazon VPC 네트워크 구성](with-msk-cluster-network.md)
+ [Amazon MSK 이벤트 소스 매핑에 대한 Lambda 권한 구성](with-msk-permissions.md)
+ [Lambda용 Amazon MSK 이벤트 소스 구성](with-msk-configure.md)
+ [자습서: Amazon MSK 이벤트 소스 매핑을 사용하여 간접적으로 Lambda 함수 간접 호출](services-msk-tutorial.md)

## 예제 이벤트
<a name="msk-sample-event"></a>

Lambda는 함수를 간접 호출할 때 이벤트 파라미터의 메시지 배치를 보냅니다. 이벤트 페이로드에는 메시지 배열이 포함됩니다. 각 배열 항목에는 Amazon MSK 주제 및 파티션 식별자에 대한 세부 정보와 함께 타임스탬프 및 base64로 인코딩된 메시지가 포함됩니다.

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

# Lambda를 위한 Amazon MSK 클러스터 및 Amazon VPC 네트워크 구성
<a name="with-msk-cluster-network"></a>

AWS Lambda 함수를 Amazon MSK 클러스터에 연결하려면 클러스터와 해당 클러스터가 상주하는 [Amazon Virtual Private Cloud(VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html)를 올바르게 구성해야 합니다. 이 페이지에서는 클러스터와 VPC를 구성하는 방법을 설명합니다. 클러스터와 VPC가 이미 올바르게 구성된 경우 [Lambda용 Amazon MSK 이벤트 소스 구성](with-msk-configure.md) 섹션을 참조하여 이벤트 소스 매핑을 구성합니다.

**Topics**
+ [Lambda 및 MSK 통합을 위한 네트워크 구성 요구 사항 개요](#msk-network-requirements)
+ [MSK 이벤트 소스에 대한 NAT 게이트웨이 구성](#msk-nat-gateway)
+ [MSK 이벤트 소스에 대한 AWS PrivateLink 엔드포인트 구성](#msk-vpc-privatelink)

## Lambda 및 MSK 통합을 위한 네트워크 구성 요구 사항 개요
<a name="msk-network-requirements"></a>

Lambda와 MSK 통합에 필요한 네트워킹 구성은 애플리케이션의 네트워크 아키텍처에 따라 다릅니다. 이 통합에는 Amazon MSK 클러스터, Lambda 함수, Lambda 이벤트 소스 매핑이라는 3개의 주요 리소스가 관련되어 있습니다. 이러한 각 리소스는 서로 다른 VPC에 있습니다.
+ Amazon MSK 클러스터는 일반적으로 사용자가 관리하는 VPC의 프라이빗 서브넷에 상주합니다.
+ Lambda 함수는 Lambda가 소유한 AWS 관리형 VPC에 상주합니다.
+ Lambda 이벤트 소스 매핑은 함수가 포함된 VPC와는 별도로 Lambda가 소유한 다른 AWS 관리형 VPC에 상주합니다.

[이벤트 소스 매핑](invocation-eventsourcemapping.md)은 MSK 클러스터와 Lambda 함수 사이의 중간 리소스입니다. 이벤트 소스 매핑에는 두 가지 기본 작업이 있습니다. 먼저 MSK 클러스터에서 새로운 메시지를 폴링합니다. 그런 다음 해당 메시지와 함께 Lambda 함수를 간접적으로 호출합니다. 이들 3개의 리소스는 서로 다른 VPC에 있으므로 폴링 및 간접 호출 작업 모두 VPC 간 네트워크 직접 호출이 필요합니다.

이벤트 소스 매핑에 대한 네트워크 구성 요구 사항은 다음 다이어그램과 같이 [프로비저닝된 모드](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)와 온디맨드 모드 중 무엇을 사용하는지에 따라 다릅니다.

![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/MSK-esm-network-overview.png)


Lambda 이벤트 소스 매핑이 MSK 클러스터에서 새로운 메시지를 폴링하는 방식은 두 모드 모두에서 동일합니다. 이벤트 소스 매핑과 MSK 클러스터 간의 연결을 설정하기 위해 Lambda는 프라이빗 서브넷에 [하이퍼플레인 ENI](configuration-vpc.md#configuration-vpc-enis)를 생성하거나 기존 ENI가 있는 경우 이를 재사용하여 보안 연결을 설정합니다. 다이어그램에 나와 있듯이 이 하이퍼플레인 ENI는 Lambda 함수가 아닌 MSK 클러스터의 서브넷과 보안 그룹 구성을 사용합니다.

클러스터에서 메시지를 폴링한 후 Lambda가 함수를 간접적으로 호출하는 방법은 각 모드에서 다릅니다.
+ 프로비저닝된 모드에서 Lambda는 이벤트 소스 매핑 VPC와 함수 VPC 간의 연결을 자동으로 처리합니다. 따라서 함수를 간접적으로 호출하기 위해 추가적인 네트워킹 구성 요소가 필요하지 않습니다.
+ 온디맨드 모드에서 Lambda 이벤트 소스 매핑은 고객 관리형 VPC를 통한 경로를 통해 함수를 간접적으로 호출합니다. 이러한 이유로 VPC의 퍼블릭 서브넷에 [NAT 게이트웨이](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)를 구성하거나 Lambda, [AWS Security Token Service(STS)](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html), [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html)(선택 사항)에 대한 액세스를 제공하는 VPC의 프라이빗 서브넷에 [AWS PrivateLink](https://docs.aws.amazon.com/vpc/latest/privatelink/what-is-privatelink.html) 엔드포인트를 구성해야 합니다. 이러한 옵션 중 하나를 올바르게 구성하면 함수를 간접적으로 호출하는 데 필요한 VPC와 Lambda 관리형 런타임 VPC 간 연결이 허용됩니다.

NAT 게이트웨이를 사용하면 프라이빗 서브넷의 리소스가 퍼블릭 인터넷에 액세스할 수 있습니다. 이 구성을 사용하면 Lambda 함수를 간접적으로 호출하기 전에 트래픽이 인터넷을 통과하게 됩니다. AWS PrivateLink 엔드포인트를 사용하면 프라이빗 서브넷이 퍼블릭 인터넷을 통과하지 않고도 AWS 서비스 또는 기타 프라이빗 VPC 리소스에 안전하게 연결할 수 있습니다. 이러한 리소스를 구성하는 방법에 대한 자세한 내용은 [MSK 이벤트 소스에 대한 NAT 게이트웨이 구성](#msk-nat-gateway) 또는 [MSK 이벤트 소스에 대한 AWS PrivateLink 엔드포인트 구성](#msk-vpc-privatelink) 섹션을 참조하세요.

지금까지는 MSK 클러스터가 VPC 내의 프라이빗 서브넷에 상주한다고 가정했는데, 이는 더 일반적인 경우입니다. 그러나 MSK 클러스터가 VPC 내의 퍼블릭 서브넷에 있더라도 보안 연결을 활성화하도록 AWS PrivateLink 엔드포인트를 구성해야 합니다. 다음 표에는 MSK 클러스터와 Lambda 이벤트 소스 매핑을 구성하는 방법에 따른 네트워킹 구성 요구 사항이 요약되어 있습니다.


| MSK 클러스터 위치(고객 관리형 VPC 내) | Lambda 이벤트 소스 매핑 스케일링 모드 | 필수 네트워킹 구성 | 
| --- | --- | --- | 
|  Private subnet  |  온디맨드 모드  |  Lambda, AWS STS 및 Secrets Manager(선택 사항)에 대한 액세스를 활성화하기 위한 NAT 게이트웨이(VPC의 퍼블릭 서브넷 내) 또는 AWS PrivateLink 엔드포인트(VPC의 프라이빗 서브넷 내)  | 
|  Public subnet  |  온디맨드 모드  |  Lambda, AWS STS 및 Secrets Manager(선택 사항)에 대한 액세스를 활성화하기 위한 AWS PrivateLink 엔드포인트(VPC의 퍼블릭 서브넷 내)  | 
|  Private subnet  |  프로비저닝된 모드  |  없음  | 
|  Public subnet  |  프로비저닝된 모드  |  없음  | 

또한 MSK 클러스터와 연결된 보안 그룹이 올바른 포트를 통해 트래픽을 허용해야 합니다. 다음 보안 그룹 규칙이 구성되어 있는지 확인합니다.
+ **인바운드 규칙** - 기본 브로커 포트의 모든 트래픽을 허용합니다. MSK가 사용하는 포트는 클러스터의 인증 유형에 따라 다릅니다. IAM 인증의 경우 `9098`, SASL/SCRAM의 경우 `9096`, TLS의 경우 `9094`입니다. 또는 자체 참조 보안 그룹 규칙을 사용하여 동일한 보안 그룹 내의 인스턴스에서 액세스를 허용할 수 있습니다.
+ **아웃바운드 규칙** - 함수가 다른 AWS 서비스와 통신해야 하는 경우 외부 대상에 대한 포트 `443`의 모든 트래픽을 허용합니다. 다른 AWS 서비스와 통신할 필요가 없는 경우 자체 참조 보안 그룹 규칙을 사용하여 브로커에 대한 액세스를 제한할 수도 있습니다.
+ **Amazon VPC 엔드포인트 인바운드 규칙** - Amazon VPC 엔드포인트를 사용하는 경우 엔드포인트와 연결된 보안 그룹이 클러스터의 보안 그룹에서 포트 `443`의 인바운드 트래픽을 허용해야 합니다.

## MSK 이벤트 소스에 대한 NAT 게이트웨이 구성
<a name="msk-nat-gateway"></a>

이벤트 소스 매핑이 클러스터에서 메시지를 폴링하도록 허용하도록 NAT 게이트웨이를 구성하고 VPC를 통한 경로를 통해 함수를 간접적으로 호출할 수 있습니다. 이는 이벤트 소스 매핑이 온디맨드 모드를 사용하고 클러스터가 VPC의 프라이빗 서브넷 내에 상주하는 경우에만 필요합니다. 클러스터가 VPC의 퍼블릭 서브넷에 상주하거나 이벤트 소스 매핑이 프로비저닝된 모드를 사용하는 경우 NAT 게이트웨이를 구성할 필요가 없습니다.

[NAT 게이트웨이](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)를 사용하면 프라이빗 서브넷의 리소스가 퍼블릭 인터넷에 액세스할 수 있습니다. Lambda에 대한 프라이빗 연결이 필요한 경우 [MSK 이벤트 소스에 대한 AWS PrivateLink 엔드포인트 구성](#msk-vpc-privatelink) 섹션을 대신 참조하세요.

NAT 게이트웨이를 구성한 후 적절한 라우팅 테이블을 구성해야 합니다. 이렇게 하면 프라이빗 서브넷의 트래픽이 NAT 게이트웨이를 통해 퍼블릭 인터넷으로 라우팅될 수 있습니다.

![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/MSK-NAT-Gateway.png)


다음 단계에서는 콘솔을 사용하여 NAT 게이트웨이를 구성하는 방법을 안내합니다. 각 가용 영역(AZ)에 대해 필요에 따라 이 단계를 반복합니다.

**NAT 게이트웨이와 적절한 라우팅을 구성하려면 다음을 수행하세요(콘솔).**

1. 다음 사항에 유의하여 [NAT 게이트웨이 생성](https://docs.aws.amazon.com/vpc/latest/userguide/nat-gateway-working-with.html)의 단계를 따릅니다.
   + NAT 게이트웨이는 항상 퍼블릭 서브넷에 상주해야 합니다. [퍼블릭 연결](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)이 있는 NAT 게이트웨이를 생성합니다.
   + MSK 클러스터가 여러 AZ에 복제되는 경우 AZ당 하나의 NAT 게이트웨이를 생성합니다. 예를 들어, 각 AZ에서 클러스터를 포함하는 프라이빗 서브넷 1개와 NAT 게이트웨이를 포함하는 퍼블릭 서브넷 1개가 VPC에 있어야 합니다. 3개의 AZ를 사용하는 설정의 경우 3개의 프라이빗 서브넷, 3개의 퍼블릭 서브넷, 3개의 NAT 게이트웨이를 갖게 됩니다.

1. NAT 게이트웨이를 생성한 후 [Amazon VPC 콘솔](https://console.aws.amazon.com/vpc/)을 열고 왼쪽 메뉴에서 **라우팅 테이블**을 선택합니다.

1. **라우팅 테이블 생성**을 선택합니다.

1. 이 라우팅 테이블을 MSK 클러스터가 포함된 VPC와 연결합니다. 선택 사항으로 라우팅 테이블의 이름을 입력합니다.

1. **라우팅 테이블 생성**을 선택합니다.

1. 방금 생성한 라우팅 테이블을 선택합니다.

1. **서브넷 연결** 탭에서 **서브넷 연결 편집**을 선택합니다.
   + 이 라우팅 테이블을 MSK 클러스터가 포함된 프라이빗 서브넷과 연결합니다.

1. **라우팅 편집(Edit routes)**을 선택합니다.

1. **라우팅 추가**를 선택합니다.

   1. **Destination(대상)**에 `0.0.0.0/0`을 선택합니다.

   1. **대상**에서 **NAT 게이트웨이**를 선택합니다.

   1. 검색 상자에서 1단계에서 생성한 NAT 게이트웨이를 선택합니다. MSK 클러스터가 포함된 프라이빗 서브넷(6단계에서 이 라우팅 테이블과 연결한 프라이빗 서브넷)과 동일한 AZ의 NAT 게이트웨이여야 합니다.

1. **변경 사항 저장**을 선택합니다.

## MSK 이벤트 소스에 대한 AWS PrivateLink 엔드포인트 구성
<a name="msk-vpc-privatelink"></a>

클러스터에서 메시지를 폴링하도록 허용하도록 AWS PrivateLink 엔드포인트를 구성하고 VPC를 통한 경로를 통해 함수를 간접적으로 호출할 수 있습니다. 이러한 엔드포인트는 MSK 클러스터가 다음에 액세스할 수 있도록 허용해야 합니다.
+ Lambda 서비스
+ [AWS Security Token Service(STS)](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html)
+ [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) 서비스(선택 사항). 클러스터 인증에 필요한 보안 암호가 Secrets Manager에 저장되어 있는 경우 필수입니다.

이벤트 소스 매핑이 온디맨드 모드를 사용하는 경우에만 PrivateLink 엔드포인트 구성이 필요합니다. 이벤트 소스 매핑이 프로비저닝된 모드를 사용하는 경우 Lambda가 필요한 연결을 설정합니다.

PrivateLink 엔드포인트는 AWS PrivateLink를 통해 AWS 서비스에 대한 보안 프라이빗 액세스를 허용합니다. 또는 MSK 클러스터에 퍼블릭 인터넷에 대한 액세스 권한을 부여하도록 NAT 게이트웨이를 구성하려면 [MSK 이벤트 소스에 대한 NAT 게이트웨이 구성](#msk-nat-gateway) 섹션을 참조하세요.

VPC 엔드포인트를 구성한 후 MSK 클러스터에 Lambda, STS 및 Secrets Manager(선택 사항)에 대한 직접 및 프라이빗 액세스 권한이 있어야 합니다.

![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/MSK-PrivateLink-Endpoints.png)


다음 단계에서는 콘솔을 사용하여 PrivateLink 엔드포인트를 구성하는 방법을 안내합니다. 각 엔드포인트(Lambda, STS, Secrets Manager)에 대해 필요에 따라 다음 단계를 반복합니다.

**VPC PrivateLink 엔드포인트를 구성하려면 다음을 수행하세요(콘솔).**

1. [Amazon VPC 콘솔](https://console.aws.amazon.com/vpc/)을 열고 왼쪽 메뉴에서 **엔드포인트**를 선택합니다.

1. **엔드포인트 생성**을 선택합니다.

1. 선택 사항으로 엔드포인트의 이름을 입력합니다.

1. **유형**에서 **AWS 서비스**를 선택합니다.

1. **서비스**에서 서비스 이름 입력을 시작합니다. 예를 들어 Lambda에 연결할 엔드포인트를 생성하려면 검색 상자에 `lambda`를 입력합니다.

1. 결과에 현재 리전의 서비스 엔드포인트가 표시되어야 합니다. 예를 들어, 미국 동부(버지니아 북부) 리전에서는 `com.amazonaws.us-east-2.lambda`가 표시되어야 합니다. 이 서비스를 선택합니다.

1. **네트워크 설정**에서 MSK 클러스터가 포함된 VPC를 선택합니다.

1. **서브넷**에서 MSK 클러스터가 있는 AZ를 선택합니다.
   + 각 AZ의 **서브넷 ID**에서 MSK 클러스터가 포함된 프라이빗 서브넷을 선택합니다.

1. **보안 그룹**에서 MSK 클러스터와 연결된 보안 그룹을 선택합니다.

1. **엔드포인트 생성**을 선택합니다.

기본적으로 Amazon VPC 엔드포인트에는 리소스에 대한 광범위한 액세스를 허용하는 개방형 IAM 정책이 있습니다. 모범 사례는 해당 엔드포인트를 사용하여 필요한 작업을 수행하도록 이러한 정책을 제한하는 것입니다. 예를 들어 Secrets Manager 엔드포인트의 경우 함수의 실행 역할만 보안 암호에 액세스할 수 있도록 정책을 수정할 수 있습니다.

**Example VPC 엔드포인트 정책 – Secrets Manager 엔드포인트**  

```
{
    "Statement": [
        {
            "Action": "secretsmanager:GetSecretValue",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws::iam::123456789012:role/my-role"
                ]
            },
            "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
        }
    ]
}
```

AWS STS 및 Lambda 엔드포인트의 경우 직접 호출 위탁자를 Lambda 서비스 위탁자로 제한할 수 있습니다. 그러나 이러한 정책에서는 `"Resource": "*"`를 사용해야 합니다.

**Example VPC 엔드포인트 정책 - AWS STS 엔드포인트**  

```
{
    "Statement": [
        {
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

**Example VPC 엔드포인트 정책 - Lambda 엔드포인트**  

```
{
    "Statement": [
        {
            "Action": "lambda:InvokeFunction",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

# Amazon MSK 이벤트 소스 매핑에 대한 Lambda 권한 구성
<a name="with-msk-permissions"></a>

Amazon MSK 클러스터에 액세스하려면 함수와 이벤트 소스 매핑에 다양한 Amazon MSK API 작업을 수행할 수 있는 권한이 필요합니다. 함수의 [실행 역할](lambda-intro-execution-role.md)에 이러한 권한을 추가합니다. 사용자에게 액세스 권한이 필요한 경우 사용자 또는 역할에 대한 ID 정책에 필요한 권한을 추가합니다.

[AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 관리형 정책에는 Amazon MSK Lambda 이벤트 소스 매핑에 필요한 최소 권한이 포함되어 있습니다. 다음 작업을 통해 권한 프로세스를 간소화할 수 있습니다.
+ [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 관리형 정책을 실행 역할에 연결합니다.
+ Lambda 콘솔에서 권한을 생성하도록 합니다. [콘솔에서 Amazon MSK 이벤트 소스 매핑을 생성](msk-esm-create.md#msk-console)하면 Lambda에서 실행 역할을 평가하고 권한이 누락된 경우 알립니다. **권한 생성**을 선택하여 실행 역할을 자동으로 업데이트합니다. 실행 역할 정책을 수동으로 생성하거나 수정한 경우 또는 정책이 여러 역할에 연결된 경우에는 효과가 없습니다. [실패 시 대상](kafka-on-failure.md) 또는 [AWS Glue Schema Registry](services-consume-kafka-events.md)와 같은 고급 기능을 사용하는 경우에도 실행 역할에 추가 권한이 필요할 수 있습니다.

**Topics**
+ [필수 권한](#msk-required-permissions)
+ [선택적 권한](#msk-optional-permissions)

## 필수 권한
<a name="msk-required-permissions"></a>

Amazon MSK 이벤트 소스 매핑에 대한 Lambda 함수 실행 역할에는 다음 권한이 필요합니다. 이러한 권한은 [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 관리형 정책에 포함됩니다.

### CloudWatch Logs 권한
<a name="msk-basic-permissions"></a>

다음 권한을 통해 Lambda는 Amazon CloudWatch Logs에 로그를 생성하고 저장할 수 있습니다.
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

### MSK 클러스터 권한
<a name="msk-cluster-permissions"></a>

다음 권한을 통해 Lambda는 사용자를 대신하여 Amazon MSK 클러스터에 액세스할 수 있습니다.
+ [kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html)
+ [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html)
+ [kafka:GetBootstrapBrokers](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html)

[kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html) 대신 [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html)를 사용하는 것이 좋습니다. v2 권한은 프로비저닝된 Amazon MSK 클러스터와 서버리스 Amazon MSK 클러스터 모두에서 작동합니다. 정책에는 이 권한 중 하나만 필요합니다.

### VPC 권한
<a name="msk-vpc-permissions"></a>

다음 권한을 통해 Lambda는 Amazon MSK 클러스터에 연결할 때 네트워크 인터페이스를 생성 및 관리할 수 있습니다.
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## 선택적 권한
<a name="msk-optional-permissions"></a>

 Lambda 함수에 또한 다음 권한이 필요할 수 있습니다.
+ 교차 계정 Amazon MSK 클러스터에 액세스합니다. 교차 계정 이벤트 소스 매핑의 경우 실행 역할에 [kafka:DescribeVpcConnection](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connection-arn.html)이 필요합니다. 교차 계정 이벤트 소스 매핑을 생성하는 IAM 보안 주체에는 [kafka:ListVpcConnections](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connections.html)가 필요합니다.
+ [SASL/SCRAM 인증](msk-cluster-auth.md#msk-sasl-scram)을 사용하는 경우 SCRAM 보안 암호에 액세스 이렇게 하면 함수가 사용자 이름과 암호를 사용하여 Kafka에 연결할 수 있습니다.
+ SASL/SCRAM 또는 [mTLS 인증을](msk-cluster-auth.md#msk-mtls) 사용하는 경우 Secrets Manager 보안 암호 설명 이렇게 하면 함수가 보안 연결에 필요한 자격 증명이나 인증서를 검색할 수 있습니다.
+ AWS Secrets Manager 보안 암호가 AWS KMS 고객 관리형 키로 암호화된 경우 AWS KMS 고객 관리형 키에 액세스합니다.
+ 인증과 함께 스키마 레지스트리를 사용하는 경우 스키마 레지스트리 시크릿에 액세스합니다.
  + AWS Glue 스키마 레지스트리의 경우: 함수에 `glue:GetRegistry` 및 `glue:GetSchemaVersion` 권한이 필요합니다. 이를 통해 함수가 AWS Glue에 저장된 메시지 형식 규칙을 조회하고 사용할 수 있습니다.
  + `BASIC_AUTH` 또는 `CLIENT_CERTIFICATE_TLS_AUTH`를 사용하는 [Confluent 스키마 레지스트리](https://docs.confluent.io/platform/current/schema-registry/security/index.html)의 경우: 함수에 인증 자격 증명이 포함된 시크릿에 대한 `secretsmanager:GetSecretValue` 권한이 필요합니다. 이렇게 하면 함수가 Confluent 스키마 레지스트리에 액세스하는 데 필요한 사용자 이름/암호 또는 인증서를 검색할 수 있습니다.
  + Private CA Certificate의 경우: 함수에 인증서가 포함된 시크릿에 대한 secretsmanager:GetSecretValue 권한이 필요합니다. 이렇게 하면 함수가 사용자 지정 인증서를 사용하는 스키마 레지스트리의 ID를 확인할 수 있습니다.
+ 이벤트 소스 매핑에 IAM 인증을 사용하는 경우 주제에서 Kafka 클러스터 소비자 그룹에 액세스하고 메시지를 폴링합니다.

 이는 다음과 같은 필수 권한에 해당합니다.
+ [kafka:ListScramSecrets](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html) - Kafka 인증을 위한 SCRAM 시크릿 나열 허용
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html) - Secrets Manager에서 시크릿 검색 활성화
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) - AWS KMS를 사용하여 암호화된 데이터의 복호화 허용
+ [glue:GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html) - AWS Glue 스키마 레지스트리에 대한 액세스 허용
+ [glue:GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html) - AWS Glue 스키마 레지스트리에서 특정 스키마 버전 검색 활성화
+ [kafka-cluster:Connect](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - 클러스터에 연결하고 인증할 수 있는 권한 부여
+ [kafka-cluster:AlterGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Apache Kafka의 READ GROUP ACL에 해당하는 클러스터에서 그룹에 참여할 수 있는 권한 부여
+ [kafka-cluster:DescribeGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Apache Kafka의 DESCRIBE GROUP ACL에 해당하는 클러스터에서 그룹을 설명할 수 있는 권한 부여
+ [kafka-cluster:DescribeTopic](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Apache Kafka의 DESCRIBE TOPIC ACL에 해당하는 클러스터에서 주제를 설명할 수 있는 권한 부여
+ [kafka-cluster:ReadData](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Apache Kafka의 READ TOPIC ACL에 해당하는 클러스터에서 주제의 데이터를 읽을 수 있는 권한 부여

 또한 실패한 간접 호출의 레코드를 장애 시 대상으로 전송하려면 대상 유형에 따라 다음과 같은 권한이 필요합니다: 
+ Amazon SQS 대상의 경우: [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html) - Amazon SQS 대기열로 메시지 전송 허용
+ Amazon SNS 대상의 경우: [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html) - Amazon SNS 주제에 메시지 게시 허용
+ Amazon S3 버킷 대상의 경우: [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) 및 [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBucket.html) - Amazon S3 버킷에서 객체 쓰기 및 나열 활성화

일반적인 인증 및 권한 부여 오류 문제를 해결할 경우 [Kafka 이벤트 소스 매핑 오류 문제 해결](with-kafka-troubleshoot.md) 섹션을 참조하세요.

# Lambda용 Amazon MSK 이벤트 소스 구성
<a name="with-msk-configure"></a>

Amazon MSK 클러스터를 Lambda 함수의 이벤트 소스로 사용하려면 두 리소스를 연결하는 [이벤트 소스 매핑](invocation-eventsourcemapping.md)을 생성합니다. 이 페이지에서는 Amazon MSK에 대한 이벤트 소스 매핑을 생성하는 방법을 설명합니다.

이 페이지에서는 MSK 클러스터와 해당 클러스터가 있는 [Amazon Virtual Private Cloud(VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html)를 이미 올바르게 구성했다고 가정합니다. 클러스터나 VPC를 설정해야 하는 경우 [Lambda를 위한 Amazon MSK 클러스터 및 Amazon VPC 네트워크 구성](with-msk-cluster-network.md) 섹션을 참조하세요. 오류 처리를 위한 재시도 동작을 구성하려면 [Kafka 이벤트 소스의 오류 처리 제어 구성](kafka-retry-configurations.md) 항목을 참조하세요.

**Topics**
+ [Amazon MSK 클러스터를 이벤트 소스로 사용](#msk-esm-overview)
+ [Lambda에서 Amazon MSK 클러스터 인증 방법 구성](msk-cluster-auth.md)
+ [Amazon MSK 이벤트 소스에 대한 Lambda 이벤트 소스 매핑 생성](msk-esm-create.md)
+ [Lambda에서 교차 계정 이벤트 소스 매핑 생성](msk-cross-account.md)
+ [Lambda에서의 모든 Amazon MSK 이벤트 소스 구성 파라미터](msk-esm-parameters.md)

## Amazon MSK 클러스터를 이벤트 소스로 사용
<a name="msk-esm-overview"></a>

Apache Kafka 또는 Amazon MSK 클러스터를 Lambda 함수의 트리거로 추가하면 해당 클러스터가 [이벤트 소스](invocation-eventsourcemapping.md)로 사용됩니다.

Lambda는 사용자가 지정한 [시작 위치](kafka-starting-positions.md)를 기반으로 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 요청에서 `Topics`로 지정한 Kafka 주제에서 이벤트 데이터를 읽습니다. 성공적인 처리 후, Kafka 토픽은 Kafka 클러스터에 커밋됩니다.

Lambda는 각 Kafka 주제 파티션에 대해 순차적으로 메시지를 읽습니다. 단일 Lambda 페이로드에는 여러 파티션의 메시지가 포함될 수 있습니다. 사용 가능한 레코드가 더 있는 경우 Lambda는 함수가 주제를 따라잡을 때까지 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 요청에서 지정한 BatchSize 값을 기반으로 배치로 레코드를 계속 처리합니다.

Lambda는 각 배치를 처리한 후 해당 배치에 있는 메시지의 오프셋을 커밋합니다. 함수가 배치의 어떤 메시지에 대해 오류를 반환하면 Lambda는 처리가 성공하거나 메시지가 만료될 때까지 전체 메시지 배치를 다시 시도합니다. 모든 재시도에 실패한 레코드를 실패 시 대상으로 전송하여 나중에 처리하도록 할 수 있습니다.

**참고**  
Lambda 함수의 최대 제한 시간은 일반적으로 15분이지만 Amazon MSK, 자체 관리형 Apache Kafka, Amazon DocumentDB, ActiveMQ 및 RabbitMQ용 Amazon MQ에 대한 이벤트 소스 매핑은 최대 제한 시간이 14분인 함수만 지원합니다.

# Lambda에서 Amazon MSK 클러스터 인증 방법 구성
<a name="msk-cluster-auth"></a>

Lambda는 Amazon MSK 클러스터에 액세스하고 레코드를 검색하고 다른 태스크를 수행할 수 있는 권한이 필요합니다. Amazon MSK는 MSK 클러스터를 통해 인증하는 여러 가지 방법을 지원합니다.

**Topics**
+ [인증되지 않은 액세스](#msk-unauthenticated)
+ [SASL/SCRAM 인증](#msk-sasl-scram)
+ [상호 TLS 인증](#msk-mtls)
+ [IAM 인증](#msk-iam-auth)
+ [Lambda이 부트스트랩 브로커를 선택하는 방법](#msk-bootstrap-brokers)

## 인증되지 않은 액세스
<a name="msk-unauthenticated"></a>

인터넷을 통해 클러스터에 액세스하는 클라이언트가 없는 경우 인증되지 않은 액세스를 사용할 수 있습니다.

## SASL/SCRAM 인증
<a name="msk-sasl-scram"></a>

Lambda는 SHA-512 해시 함수와 Transport Layer Security(TLS) 암호화를 통해 [Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism(SASL/SCRAM)](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html) 인증을 지원합니다. Lambda가 클러스터에 연결하려면 인증 자격 증명(사용자 이름 및 암호)을 Secrets Manager 보안 암호에 저장하고 이벤트 소스 매핑을 구성할 때 이 보안 암호를 참조합니다.

Secrets Manager 사용에 대한 자세한 내용은 *Amazon Managed Streaming for Apache Kafka 개발자 안내서*의 [Sign-in credentials authentication with Secrets Manager](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html)를 참조하세요.

**참고**  
Amazon MSK는 SASL/PLAIN 인증을 지원하지 않습니다.

## 상호 TLS 인증
<a name="msk-mtls"></a>

상호 TLS(mTLS)는 클라이언트와 서버 간의 양방향 인증을 제공합니다. 클라이언트는 서버가 클라이언트를 확인할 수 있도록 인증서를 서버로 전송합니다. 서버는 또한 클라이언트가 서버를 확인할 수 있도록 클라이언트에 인증서를 전송합니다.

Amazon MSK와 Lambda 통합의 경우 MSK 클러스터는 서버 역할을 하고 Lambda는 클라이언트 역할을 합니다.
+ Lambda가 MSK 클러스터를 확인하려면 Secrets Manager에서 클라이언트 인증서를 보안 암호로 구성하고 이벤트 소스 매핑 구성에서 이 인증서를 참조합니다. 클라이언트 인증서는 서버의 신뢰 저장소에 있는 인증 기관(CA)에서 서명해야 합니다.
+ MSK 클러스터는 서버 인증서도 Lambda로 전송합니다. 서버 인증서는 AWS 트러스트 스토어에 있는 인증 기관(CA)에서 서명해야 합니다.

Amazon MSK는 자체 서명 서버 인증서를 지원하지 않습니다. Amazon MSK의 모든 브로커는 기본적으로 Lambda가 신뢰하는 [Amazon Trust Services CA](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html)에서 서명한 [퍼블릭 인증서](https://www.amazontrust.com/repository/)를 사용하기 때문입니다.

### mTLS 암호 구성
<a name="mtls-auth-secret"></a>

CLIENT\$1CERTIFICATE\$1TLS\$1AUTH 비밀 정보에 인증서 필드와 프라이빗 키 필드가 필요합니다. 암호화된 프라이빗 키의 경우 비밀 정보에 프라이빗 키 암호가 필요합니다. 인증서와 프라이빗 키는 모두 PEM 형식이어야 합니다.

**참고**  
Lambda는 [PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1)(PBES2가 아님) 프라이빗 키 암호화 알고리즘을 지원합니다.

인증서 필드에는 클라이언트 인증서부터 시작하여 중간 인증서가 이어지고 루트 인증서로 끝나는 인증서 목록이 포함되어야 합니다. 각 인증서는 다음 구조의 새 줄에서 시작해야 합니다.

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager는 최대 65,536바이트의 보안 정보를 지원하므로 긴 인증서 체인을 위한 충분한 공간입니다.

프라이빗 키는 다음 구조의 [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) 형식이어야 합니다.

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

암호화된 프라이빗 키의 경우 다음 구조를 사용합니다.

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

다음 예제에서는 암호화된 프라이빗 키를 사용한 mTLS 인증용 비밀 정보 콘텐츠를 표시합니다. 암호화된 개인 키의 경우 비밀 정보에 프라이빗 키 암호를 포함합니다.

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

mTLS for Amazon MSK에 대한 자세한 내용과 클라이언트 인증서 생성 방법에 대한 지침은 *Amazon Managed Streaming for Apache Kafka Developer Guide*의 [Mutual TLS client authentication for Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)를 참조하세요.

## IAM 인증
<a name="msk-iam-auth"></a>

AWS Identity and Access Management(IAM)를 사용하여 MSK 클러스터에 연결하는 클라이언트의 ID를 인증할 수 있습니다. IAM 인증을 사용하면 Lambda는 함수 [실행 역할](lambda-intro-execution-role.md)의 권한에 따라 클러스터에 연결하고, 레코드를 검색하고, 기타 필요한 작업을 수행합니다. 필요한 권한이 포함된 샘플 정책은 *Amazon Managed Streaming for Apache Kafka 개발자 안내서*의 [IAM 역할을 위한 권한 부여 정책 생성](https://docs.aws.amazon.com/msk/latest/developerguide/create-iam-access-control-policies.html)을 참조하세요.

IAM 인증이 MSK 클러스터에서 활성 상태이고 보안 암호를 제공하지 않으면 Lambda는 자동으로 IAM 인증을 사용하도록 기본 설정됩니다.

Amazon MSK의 IAM 인증에 대한 자세한 내용은 [IAM access control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)을 참조하세요.

## Lambda이 부트스트랩 브로커를 선택하는 방법
<a name="msk-bootstrap-brokers"></a>

Lambda는 클러스터에서 사용할 수 있는 인증 방법 및 인증을 위한 암호 제공 여부를 기반으로 [부트스트랩 브로커](https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html)를 선택합니다. mTLS 또는 SASL/SCRAM에 대한 암호를 제공하면 Lambda가 자동으로 해당 인증 방법을 선택합니다. 암호를 제공하지 않으면 Lambda가 클러스터에서 활성화된 가장 강력한 인증 방법을 선택합니다. 다음은 Lambda가 가장 강력한 인증부터 가장 약한 인증까지 브로커를 선택하는 우선 순위입니다.
+ mTLS(MTL에 제공되는 암호)
+ SASL/SCRAM(SASL/SCRAM에 대해 제공되는 암호)
+ SASL IAM(암호가 제공되지 않았으며 IAM 인증이 활성화됨)
+ 인증되지 않은 TLS(암호가 제공되지 않고 IAM 인증이 활성화되지 않음)
+ 일반 텍스트(암호가 제공되지 않고 IAM 인증 및 인증되지 않은 TLS가 모두 활성화되지 않음)

**참고**  
Lambda가 가장 안전한 브로커 유형에 연결할 수 없는 경우 Lambda는 다른 (약한) 브로커 유형에 연결을 시도하지 않습니다. Lambda가 더 약한 브로커 유형을 선택하게 하려면 클러스터에서 더 강력한 인증 방법을 모두 비활성화하십시오.

# Amazon MSK 이벤트 소스에 대한 Lambda 이벤트 소스 매핑 생성
<a name="msk-esm-create"></a>

이벤트 소스 매핑을 생성하려면 Lambda 콘솔, [AWS Command Line Interface(CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 또는 [AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/)를 사용할 수 있습니다.

**참고**  
이벤트 소스 매핑을 생성하면 Lambda는 MSK 클러스터가 포함된 프라이빗 서브넷에 [하이퍼플레인 ENI](configuration-vpc.md#configuration-vpc-enis)를 생성하여 Lambda가 보안 연결을 설정할 수 있도록 합니다. 이 하이퍼플레인 ENI는 Lambda 함수가 아닌 MSK 클러스터의 서브넷과 보안 그룹 구성을 사용합니다.

다음 콘솔 단계에서는 Amazon MSK 클러스터를 Lambda 함수의 트리거로 추가합니다. 그러면 내부적으로 이벤트 소스 매핑 리소스가 생성됩니다.

**Lambda 함수에 Amazon MSK 트리거를 추가하려면(콘솔)**

1. Lambda 콘솔의 [함수 페이지](https://console.aws.amazon.com/lambda/home#/functions)를 엽니다.

1. Amazon MSK 트리거를 추가할 Lambda 함수의 이름을 선택합니다.

1. **함수 개요(Function overview)**에서 **트리거 추가(Add trigger)**를 선택합니다.

1. **트리거 구성**에서 **MSK**를 선택합니다.

1. Kafka 클러스터 세부 정보를 지정하려면 다음을 수행합니다.

   1. **MSK 클러스터**에 해당 클러스터를 선택합니다.

   1. **주제 이름**에 메시지를 사용할 Kafka 주제의 이름을 입력합니다.

   1. **소비자 그룹 ID**에서 가입할 Kafka 소비자 그룹의 ID(해당되는 경우)를 입력합니다. 자세한 내용은 [Lambda에서 사용자 지정이 가능한 소비자 그룹 ID](kafka-consumer-group-id.md) 섹션을 참조하세요.

1. **클러스터 인증**에 대해 필요에 따라 구성합니다. 클러스터 인증에 대한 자세한 내용은 [Lambda에서 Amazon MSK 클러스터 인증 방법 구성](msk-cluster-auth.md) 섹션을 참조하세요.
   + 연결을 설정할 때 Lambda가 MSK 클러스터로 인증을 수행하도록 하려면 **인증 사용**을 켭니다. 인증이 권장됩니다.
   + 인증을 사용하는 경우 **인증 방법**에서 사용할 인증 방법을 선택합니다.
   + 인증을 사용하는 경우 **Secrets Manager 키**에서 클러스터에 액세스하는 데 필요한 인증 자격 증명이 포함된 Secrets Manager 키를 선택합니다.

1. **이벤트 폴러 구성**에서 필요에 따라 구성합니다.
   + 생성 직후 트리거를 활성화하려면 **트리거 활성화**를 선택합니다.
   + 이벤트 소스 매핑에 대해 **프로비저닝된 모드를 구성**할지 여부를 선택합니다. 자세한 내용은 [Lambda에서의 Apache Kafka 이벤트 폴러 스케일링 모드](kafka-scaling-modes.md) 섹션을 참조하세요.
     + 프로비저닝 모드를 구성하는 경우 **최소 이벤트 폴러**의 값, **최대 이벤트 폴러**의 값, PollerGroupName 값(선택 사항)을 입력하여 동일한 이벤트 소스 VPC 내에서 여러 ESM의 그룹화를 지정합니다.
   + **시작 위치**에서 Lambda가 스트림에서 읽기를 시작하는 방법을 선택합니다. 자세한 내용은 [Lambda에서의 Apache Kafka 폴링 및 스트림 시작 위치](kafka-starting-positions.md) 섹션을 참조하세요.

1. **배치 처리**에서 필요에 따라 구성합니다. 일괄 처리에 대한 자세한 내용은 [일괄 처리 동작](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching) 섹션을 참조하세요.

   1. **배치 크기(Batch size)**에 단일 배치에서 검색할 최대 메시지 수를 입력합니다.

   1. **배치 창**에서 Lambda가 함수를 간접적으로 호출하기 전에 레코드를 수집하는 데 걸리는 최대 시간(초)을 입력합니다.

1. **필터링**에서 필요에 따라 구성합니다. 필터링에 대한 자세한 내용은 [Amazon MSK 및 자체 관리형 Apache Kafka 이벤트 소스에서 이벤트 필터링](kafka-filtering.md) 섹션을 참조하세요.
   + **필터 기준**에서 필터 기준 정의를 추가하여 이벤트를 처리할지 여부를 결정합니다.

1. **장애 처리**에서 필요에 따라 구성합니다. 장애 처리에 대한 자세한 내용은 [Amazon MSK 및 자체 관리형 Apache Kafka 이벤트 소스에 대한 폐기된 배치 캡처](kafka-on-failure.md) 섹션을 참조하세요.
   + **장애 시 대상**에서 장애 시 대상의 ARN을 지정합니다.

1. **태그**에 이 이벤트 소스 매핑과 연결할 태그를 입력합니다.

1. 트리거를 생성하려면 **추가**를 선택합니다.

[create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 명령과 함께 AWS CLI를 사용하여 이벤트 소스 매핑을 생성할 수도 있습니다. 다음 예시에서는 `LATEST` 메시지부터 시작하여 Lambda 함수 `my-msk-function`을 `AWSKafkaTopic` 주제에 매핑하는 이벤트 소스 매핑을 생성합니다. 또한 이 명령은 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 객체를 사용하여 클러스터에 연결할 때 [SASL/SCRAM](msk-cluster-auth.md#msk-sasl-scram) 인증을 사용하도록 Lambda에 지시합니다.

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

클러스터가 [mTLS authentication](msk-cluster-auth.md#msk-mtls)을 사용하는 경우 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 객체를 포함하며 이는 `CLIENT_CERTIFICATE_TLS_AUTH` 및 Secrets Manager 키 ARN을 지정합니다. 다음 명령을 참조하세요.

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

클러스터가 [IAM 인증](msk-cluster-auth.md#msk-iam-auth)을 사용하는 경우 [ SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 객체가 필요하지 않습니다. 다음 명령을 참조하세요.

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

# Lambda에서 교차 계정 이벤트 소스 매핑 생성
<a name="msk-cross-account"></a>

[다중 VPC 프라이빗 연결](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html)을 사용하여 Lambda 함수를 다른 AWS 계정의 클러스터에 프로비저닝된 MSK 클러스터에 연결할 수 있습니다. 다중 VPC 연결은 AWS PrivateLink를 사용하며, 이는 모든 트래픽을 AWS 네트워크 내에 유지합니다.

**참고**  
서버리스 MSK 클러스터에는 계정 간 이벤트 소스 매핑을 생성할 수 없습니다.

계정 간 이벤트 소스 매핑을 생성하려면 먼저 [MSK 클러스터의 다중 VPC 연결을 구성](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on)해야 합니다. 이벤트 소스 매핑을 생성할 때는 다음 예에서 표시된 것처럼 클러스터 ARN 대신 관리형 VPC 연결 ARN을 사용합니다. [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 작업도 MSK 클러스터가 사용하는 인증 유형에 따라 달라집니다.

**Example — IAM 인증을 사용하는 클러스터의 계정 간 이벤트 소스 매핑 생성**  
클러스터가 [IAM 역할 기반 인증](msk-cluster-auth.md#msk-iam-auth)을 사용하는 경우 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 객체가 필요하지 않습니다. 예제:  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

**Example — SASL/SCRAM 인증을 사용하는 클러스터의 계정 간 이벤트 소스 매핑 생성**  
클러스터가 [SASL/SCRAM 인증](msk-cluster-auth.md#msk-sasl-scram)을 사용하는 경우 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 객체를 포함하며 이는 `SASL_SCRAM_512_AUTH` 및 Secrets Manager 비밀 ARN을 지정합니다.  
SASL/SCRAM 인증을 통한 계정 간 Amazon MSK 이벤트 소스 매핑에 암호를 사용하는 방법에는 두 가지가 있습니다.  
+ Lambda 함수 계정에서 시크릿을 생성하고 클러스터 암호와 동기화합니다. [로테이션을 생성](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html)하여 두 암호가 동기화된 상태를 유지합니다. 이 옵션을 사용하면 함수 계정에서 시크릿을 제어할 수 있습니다.
+ MSK 클러스터와 연결된 암호를 사용합니다. 이 암호는 Lambda 함수 계정에 대한 교차 계정 액세스를 허용해야 합니다. 자세한 내용은 [다른 계정에 있는 사용자의 AWS Secrets Manager 암호에 대한 권한](https://docs.aws.amazon.com/secretsmanager/latest/userguide/auth-and-access_examples_cross.html)을 참조하세요.

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

**Example — mTLS 인증을 사용하는 클러스터의 계정 간 이벤트 소스 매핑 생성**  
클러스터가 [mTLS](msk-cluster-auth.md#msk-mtls)을 사용하는 경우 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 객체를 포함하며 이는 `CLIENT_CERTIFICATE_TLS_AUTH` 및 Secrets Manager 비밀 ARN을 지정합니다. 암호는 클러스터 계정 또는 Lambda 함수 계정에 저장할 수 있습니다.  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

# Lambda에서의 모든 Amazon MSK 이벤트 소스 구성 파라미터
<a name="msk-esm-parameters"></a>

모든 Lambda 이벤트 소스 유형은 동일한 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 및 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API 작업을 공유합니다. 그러나 다음 표와 같이 일부 파라미터만 Amazon MSK에 적용됩니다.


| 파라미터 | 필수 | 기본값 | 참고 | 
| --- | --- | --- | --- | 
|  AmazonManagedKafkaEventSourceConfig  |  N  |  기본적으로 고유한 값으로 설정되는 소비자 그룹 ID 필드를 포함합니다.  |  생성 시에만 설정할 수 있음  | 
|  BatchSize  |  N  |  100  |  최대값: 10,000  | 
|  DestinationConfig  |  N  |  해당 사항 없음  |  [Amazon MSK 및 자체 관리형 Apache Kafka 이벤트 소스에 대한 폐기된 배치 캡처](kafka-on-failure.md)  | 
|  활성  |  N  |  True  |    | 
|  BisectBatchOnFunctionError  |  N  |  False  |  [Kafka 이벤트 소스의 오류 처리 제어 구성](kafka-retry-configurations.md)  | 
|  FunctionResponseTypes  |  N  |  해당 사항 없음  |  [Kafka 이벤트 소스의 오류 처리 제어 구성](kafka-retry-configurations.md)  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1(무제한)  |  [Kafka 이벤트 소스의 오류 처리 제어 구성](kafka-retry-configurations.md)  | 
|  MaximumRetryAttempts  |  N  |  -1(무제한)  |  [Kafka 이벤트 소스의 오류 처리 제어 구성](kafka-retry-configurations.md)  | 
|  EventSourceArn  |  Y  | 해당 사항 없음 |  생성 시에만 설정할 수 있음  | 
|  FilterCriteria  |  N  |  해당 사항 없음  |  [Lambda가 함수로 보내는 이벤트에 대한 제어](invocation-eventfiltering.md)  | 
|  FunctionName  |  Y  |  해당 사항 없음  |    | 
|  KMSKeyArn  |  N  |  해당 사항 없음  |  [필터 기준 암호화](invocation-eventfiltering.md#filter-criteria-encryption)  | 
|  MaximumBatchingWindowInSeconds  |  N  |  500ms  |  [일괄 처리 동작](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  ProvisionedPollersConfig  |  N  |  `MinimumPollers`: 지정하지 않은 경우 기본값(1) `MaximumPollers`: 지정하지 않은 경우 기본값(200) `PollerGroupName`: 해당 사항 없음  |  [프로비저닝된 모드](kafka-scaling-modes.md#kafka-provisioned-mode)  | 
|  SourceAccessConfigurations  |  N  |  자격 증명 없음  |  이벤트 소스에 대한 CLIENT\$1CERTIFICATE\$1TLS\$1AUTH(MutualTLS) 또는 SASL/SCRAM  | 
|  StartingPosition  |  Y  | 해당 사항 없음 |  AT\$1TIMESTAMP, TRIM\$1HORIZON, 또는 LATEST 생성 시에만 설정할 수 있음  | 
|  StartingPositionTimestamp  |  N  |  해당 사항 없음  |  StartingPosition이 AT\$1TIMESTAMP로 설정된 경우에만 필요합니다.  | 
|  Tags  |  N  |  해당 사항 없음  |  [이벤트 소스 매핑에 태그 사용](tags-esm.md)  | 
|  주제  |  Y  | 해당 사항 없음 |  Kafka 주제 이름 생성 시에만 설정할 수 있음  | 

**참고**  
`PollerGroupName`을 지정하면 동일한 Amazon VPC 내의 여러 ESM이 이벤트 폴러 단위(EPU) 용량을 공유할 수 있습니다. 이 옵션을 사용하면 ESM의 프로비저닝된 모드 비용을 최적화할 수 있습니다. ESM 그룹화 요구 사항:  
ESM이 동일한 Amazon VPC 내에 위치
폴러 그룹당 최대 100개의 ESM
그룹 내 모든 ESM의 최대 폴러 합계가 2,000을 초과할 수 없음
`PollerGroupName`을 업데이트하여 ESM을 다른 그룹으로 이동하거나 `PollerGroupName`을 빈 문자열("")로 설정하여 그룹에서 ESM을 제거할 수 있습니다.

# 자습서: Amazon MSK 이벤트 소스 매핑을 사용하여 간접적으로 Lambda 함수 간접 호출
<a name="services-msk-tutorial"></a>

이 자습서에서는 다음을 수행합니다.
+ 기존 Amazon MSK 클러스터와 동일한 AWS 계정에서 Lambda 함수를 생성합니다.
+ Lambda가 Amazon MSK와 통신할 수 있도록 네트워킹과 인증을 구성합니다.
+ 주제에 이벤트가 표시될 때 Lambda 함수를 실행하는 Lambda Amazon MSK 이벤트 소스 매핑을 설정합니다.

이 단계를 완료한 후 Amazon MSK로 이벤트가 전송되면 사용자 지정 Lambda 코드를 사용하여 해당 이벤트를 자동으로 처리하도록 Lambda 함수를 설정할 수 있습니다.

 **이 기능으로 무엇을 할 수 있나요?**

**예제 솔루션: MSK 이벤트 소스 매핑을 사용하여 고객에게 실시간 점수를 제공하세요.**

다음 시나리오를 생각해 보세요. 회사에서 고객이 스포츠 경기와 같은 실시간 이벤트에 대한 정보를 볼 수 있는 웹 애플리케이션을 호스팅하고 있습니다. 게임의 정보 업데이트는 Amazon MSK의 Kafka 주제를 통해 팀에 제공됩니다. 개발하는 애플리케이션 내에서 고객에게 라이브 이벤트의 업데이트된 보기를 제공하기 위해 MSK 주제의 업데이트를 사용하는 솔루션을 설계하려고 합니다. 다음과 같은 설계 접근 방식을 결정했습니다. 클라이언트 애플리케이션은 AWS에서 호스팅되는 서버리스 백엔드와 통신합니다. 클라이언트는 Amazon API Gateway WebSocket API를 사용하여 WebSocket 세션을 통해 연결됩니다.

이 솔루션에서는 MSK 이벤트를 읽고 일부 사용자 지정 로직을 수행하여 애플리케이션 계층에 맞게 해당 이벤트를 준비한 다음 해당 정보를 API Gateway API로 전달하는 구성 요소가 필요합니다. Lambda 함수에 사용자 지정 로직을 제공한 다음 AWS Lambda Amazon MSK 이벤트 소스 매핑을 통해 직접적으로 호출하여 AWS Lambda로 이 구성 요소를 구현할 수 있습니다.

Amazon API Gateway WebSocket API를 사용하여 솔루션을 구현하는 방법에 대한 자세한 내용은 API Gateway 설명서의 [WebSocket API 자습서](https://docs.aws.amazon.com/apigateway/latest/developerguide/websocket-api-chat-app.html)를 참조하세요.

## 사전 조건
<a name="w2aad101c23c15c35c19"></a>

다음과 같은 사전 구성된 리소스가 있는 AWS 계정:

**이러한 사전 요구 사항을 충족하려면 Amazon MSK 설명서의 [Getting started using Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/getting-started.html)를 따르는 것이 좋습니다.**
+ Amazon MSK 클러스터. *Getting started using Amazon MSK*의 [Create an Amazon MSK cluster](https://docs.aws.amazon.com//msk/latest/developerguide/create-cluster.html)를 참조하세요.
+ 다음 구성:
  + 클러스터 보안 설정에서 **IAM 역할 기반 인증**이 **활성화됨**인지 확인합니다. 이렇게 하면 필요한 Amazon MSK 리소스에만 액세스하도록 Lambda 함수를 제한하여 보안이 강화됩니다. 이는 새 Amazon MSK 클러스터에서 기본적으로 활성화됩니다.
  + 클러스터 네트워킹 설정에서 **퍼블릭 액세스**가 꺼져 있는지 확인합니다. Amazon MSK 클러스터의 인터넷 액세스를 제한하면 데이터를 처리하는 중개자 수가 제한되어 보안이 강화됩니다. 이는 새 Amazon MSK 클러스터에서 기본적으로 활성화됩니다.
+ 이 솔루션에 사용할 Amazon MSK 클러스터의 Kafka 주제. **Getting started using Amazon MSK의 [Create a topic](https://docs.aws.amazon.com//msk/latest/developerguide/create-topic.html)을 참조하세요.
+ Kafka 클러스터에서 정보를 검색하고 테스트를 위해 Kafka 이벤트를 주제에 전송하도록 설정된 Kafka 관리자 호스트(예: Kafka 관리 CLI와 Amazon MSK IAM 라이브러리가 설치된 Amazon EC2 인스턴스). **Getting started using Amazon MSK의 [Create a client machine](https://docs.aws.amazon.com//msk/latest/developerguide/create-client-machine.html)을 참조하세요.

이러한 리소스를 설정한 후에는 AWS 계정에서 다음 정보를 수집하여 계속할 준비가 되었는지 확인하세요.
+ Amazon MSK 클러스터의 이름. 이 정보는 Amazon MSK 콘솔에서 확인할 수 있습니다.
+ Amazon MSK 클러스터용 ARN의 일부인 클러스터 UUID(Amazon MSK 콘솔에서 찾을 수 있음). 이 정보를 찾으려면 Amazon MSK 설명서의 [Listing clusters](https://docs.aws.amazon.com/msk/latest/developerguide/msk-list-clusters.html)에 나와 있는 절차를 따르세요.
+ Amazon MSK 클러스터와 연결된 보안 그룹. 이 정보는 Amazon MSK 콘솔에서 확인할 수 있습니다. 다음 단계에서는 이를 *clusterSecurityGroups*라고 합니다.
+ Amazon MSK 클러스터를 포함하는 Amazon VPC의 ID. Amazon MSK 콘솔에서 Amazon MSK 클러스터와 연결된 서브넷을 식별한 다음 Amazon VPC 콘솔에서 해당 서브넷과 연결된 Amazon VPC를 식별하여 이 정보를 찾을 수 있습니다.
+ 솔루션에 사용되는 Kafka 주제의 이름. Kafka 관리 호스트에서 Kafka `topics` CLI로 Amazon MSK 클러스터를 직접적으로 호출하여 이 정보를 찾을 수 있습니다. 주제 CLI에 대한 자세한 내용은 Kafka 설명서의 [Adding and removing topics](https://kafka.apache.org/documentation/#basic_ops_add_topic)를 참조하세요.
+ Lambda 함수에서 사용하기에 적합한 Kafka 주제에 대한 소비자 그룹의 이름. 이 그룹은 Lambda에서 자동으로 생성할 수 있으므로 Kafka CLI를 사용하여 생성할 필요가 없습니다. 소비자 그룹을 관리해야 하는 경우 소비자 그룹 CLI에 대한 자세한 내용은 Kafka 설명서의 [Managing Consumer Groups](https://kafka.apache.org/documentation/#basic_ops_consumer_group)를 참조하세요.

AWS 계정의 다음 권한:
+ Lambda 함수를 생성하고 관리할 수 있는 권한
+ IAM 정책을 생성하고 Lambda 함수와 연결할 수 있는 권한
+ Amazon MSK 클러스터를 호스팅하는 Amazon VPC에서 Amazon VPC 엔드포인트를 생성하고 네트워킹 구성을 변경할 수 있는 권한

### AWS Command Line Interface 설치
<a name="install_aws_cli"></a>

아직 AWS Command Line Interface를 설치하지 않은 경우 [AWS CLI의 최신 버전 설치 또는 업데이트](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)에서 설명하는 단계에 따라 설치하세요.

이 자습서에서는 명령을 실행할 셸 또는 명령줄 터미널이 필요합니다. Linux 및 macOS에서는 선호하는 셸과 패키지 관리자를 사용합니다.

**참고**  
Windows에서는 Lambda와 함께 일반적으로 사용하는 일부 Bash CLI 명령(예:`zip`)은 운영 체제의 기본 제공 터미널에서 지원되지 않습니다. Ubuntu와 Bash의 Windows 통합 버전을 가져오려면 [Linux용 Windows Subsystem을 설치](https://docs.microsoft.com/en-us/windows/wsl/install-win10)합니다.

## Lambda가 Amazon MSK와 통신할 수 있도록 네트워크 연결 구성
<a name="w2aad101c23c15c35c21"></a>

 AWS PrivateLink를 사용하여 Lambda와 Amazon MSK를 연결합니다. 이를 위해 Amazon VPC 콘솔에서 인터페이스 Amazon VPC 엔드포인트를 생성할 수 있습니다. 네트워크 구성에 대한 자세한 내용은 [Lambda를 위한 Amazon MSK 클러스터 및 Amazon VPC 네트워크 구성](with-msk-cluster-network.md) 섹션을 참조하세요.

Amazon MSK 이벤트 소스 매핑이 Lambda 함수를 대신하여 실행되는 경우 Lambda 함수의 실행 역할을 수임합니다. 이 IAM 역할은 Amazon MSK 클러스터와 같이 IAM으로 보호되는 리소스에 액세스할 수 있도록 매핑 권한을 부여합니다. 구성 요소는 실행 역할을 공유하지만 다음 다이어그램과 같이 Amazon MSK 매핑과 Lambda 함수에는 해당 작업에 대한 별도의 연결 요구 사항이 있습니다.

![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/msk_tut_network.png)


이벤트 소스 매핑은 Amazon MSK 클러스터 보안 그룹에 속합니다. 이 네트워킹 단계에서는 Amazon MSK 클러스터 VPC에서 Amazon VPC 엔드포인트를 생성하여 Lambda 및 STS 서비스에 이벤트 소스 매핑을 연결합니다. 이러한 엔드포인트를 보호하여 Amazon MSK 클러스터 보안 그룹의 트래픽을 허용하세요. 그런 다음 Amazon MSK 클러스터 보안 그룹을 조정하여 이벤트 소스 매핑이 Amazon MSK 클러스터와 통신할 수 있도록 합니다.

 AWS Management Console을 사용하여 다음 단계를 구성할 수 있습니다.

**Lambda와 Amazon MSK를 연결하도록 인터페이스 Amazon VPC 엔드포인트를 구성하려면 다음을 수행하세요.**

1. 443에서 *clusterSecurityGroups*의 인바운드 TCP 트래픽을 허용하는 인터페이스 Amazon VPC 엔드포인트에 대한 보안 그룹인 *endpointSecurityGroup*을 생성합니다. Amazon EC2 설명서의 [보안 그룹 생성](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#creating-security-group)에 나와 있는 절차에 따라 보안 그룹을 생성합니다. 그런 다음 Amazon EC2 설명서의 [보안 그룹에 규칙 추가](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) 절차에 따라 적절한 규칙을 추가합니다.

   **다음 정보로 보안 그룹 생성:**

   인바운드 규칙을 추가할 때 *clusterSecurityGroups*에서 각 보안 그룹에 대한 규칙을 생성합니다. 각 규칙에 대해 다음을 수행합니다.
   + **타입**에 대해 **HTTPS**를 선택합니다.
   + **소스**로 *clusterSecurityGroups* 중 하나를 선택합니다.

1.  Amazon MSK 클러스터가 포함된 Amazon VPC에 Lambda 서비스를 연결하는 엔드포인트를 생성합니다. [인터페이스 엔드포인트 생성](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html) 절차를 따릅니다.

   **다음 정보로 인터페이스 엔드포인트 생성:**
   + **서비스 이름**으로 `com.amazonaws.regionName.lambda`를 선택합니다. 여기서 *regionName*은 Lambda 함수를 호스팅합니다.
   + **VPC**로 Amazon MSK 클러스터가 포함된 Amazon VPC를 선택합니다.
   + **보안 그룹**으로 앞에서 생성한 *endpointSecurityGroup*을 선택합니다.
   + **서브넷**으로 Amazon MSK 클러스터를 호스팅하는 서브넷을 선택합니다.
   + **정책**으로 Lambda 서비스 주체가 `lambda:InvokeFunction` 작업에 사용할 엔드포인트를 보호하는 다음 정책 문서를 제공합니다.

     ```
     {
         "Statement": [
             {
                 "Action": "lambda:InvokeFunction",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + **DNS 이름 활성화**가 설정된 상태로 유지되는지 확인합니다.

1.  Amazon MSK 클러스터가 포함된 Amazon VPC에 AWS STS 서비스를 연결하는 엔드포인트를 생성합니다. [인터페이스 엔드포인트 생성](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html) 절차를 따릅니다.

   **다음 정보로 인터페이스 엔드포인트 생성:**
   + **서비스 이름**에서 AWS STS를 선택합니다.
   + **VPC**로 Amazon MSK 클러스터가 포함된 Amazon VPC를 선택합니다.
   + **보안 그룹**으로 *endpointSecurityGroup*을 선택합니다.
   + **서브넷**으로 Amazon MSK 클러스터를 호스팅하는 서브넷을 선택합니다.
   + **정책**으로 Lambda 서비스 주체가 `sts:AssumeRole` 작업에 사용할 엔드포인트를 보호하는 다음 정책 문서를 제공합니다.

     ```
     {
         "Statement": [
             {
                 "Action": "sts:AssumeRole",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + **DNS 이름 활성화**가 설정된 상태로 유지되는지 확인합니다.

1. Amazon MSK 클러스터와 연결된 각 보안 그룹, 즉 *clusterSecurityGroups*에 대해 다음을 허용합니다.
   + 9098의 모든 인바운드 및 아웃바운드 TCP 트래픽을 자체 내부를 포함하여 모든 *clusterSecurityGroups*에 허용합니다.
   + 443에서 모든 아웃바운드 TCP 트래픽을 허용합니다.

   이 트래픽 중 일부는 기본 보안 그룹 규칙에 의해 허용되므로 클러스터가 단일 보안 그룹에 연결되어 있고 해당 그룹에 기본 규칙이 있는 경우 추가 규칙이 필요하지 않습니다. 보안 그룹 규칙을 조정하려면 Amazon EC2 설명서의 [보안 그룹에 규칙 추가](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) 절차를 따릅니다.

   **다음 정보로 보안 그룹에 규칙 추가**
   + 포트 9098에 대한 각 인바운드 규칙 또는 아웃바운드 규칙에 대해 다음을 제공합니다.
     + **유형**에서 **Custom TCP(사용자 지정 TCP)**를 선택합니다.
     + **포트 범위**에 9098을 입력합니다.
     + **소스**로 *clusterSecurityGroups* 중 하나를 제공합니다.
   + 포트 443에 대한 각 인바운드 규칙에 대해 **유형**으로 **HTTPS**를 선택합니다.

## Lambda가 Amazon MSK 주제에서 읽을 수 있도록 IAM 역할 생성
<a name="w2aad101c23c15c35c23"></a>

Lambda가 Amazon MSK 주제에서 읽기 위한 인증 요구 사항을 파악한 다음 정책에서 정의합니다. Lambda가 이러한 권한을 사용할 수 있도록 승인하는 역할인 *lambdaAuthRole*을 생성합니다. `kafka-cluster` IAM 작업을 사용하여 Amazon MSK 클러스터에서 작업 권한을 부여합니다. 그런 다음, Lambda가 Amazon MSK 클러스터를 검색하고 연결하는 데 필요한 Amazon MSK `kafka` 및 Amazon EC2 작업을 수행하도록 권한을 부여하고, Lambda가 수행한 작업을 기록할 수 있도록 CloudWatch 작업을 수행하도록 권한을 부여합니다.

**Lambda가 Amazon MSK에서 읽기 위한 인증 요구 사항을 설명하려면 다음을 수행하세요.**

1. Lambda가 Kafka 소비자 그룹을 사용하여 Amazon MSK 클러스터의 Kafka 주제에서 읽을 수 있도록 허용하는 IAM 정책 문서(JSON 문서)인 *clusterAuthPolicy*를 작성합니다. Lambda는 읽을 때 Kafka 소비자 그룹을 설정해야 합니다.

   사전 요구 사항에 맞게 다음 템플릿 변경

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "kafka-cluster:Connect",
                   "kafka-cluster:DescribeGroup",
                   "kafka-cluster:AlterGroup",
                   "kafka-cluster:DescribeTopic",
                   "kafka-cluster:ReadData",
                   "kafka-cluster:DescribeClusterDynamicConfiguration"
               ],
               "Resource": [
                   "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid",
                   "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName",
                   "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName"
               ]
           }
       ]
   }
   ```

------

   자세한 내용은 [Amazon MSK 이벤트 소스 매핑에 대한 Lambda 권한 구성](with-msk-permissions.md)을 참조하세요. 정책을 작성할 때
   + *us-east-1* 및 *111122223333*를 Amazon MSK 클러스터의 AWS 리전 및 AWS 계정으로 변경합니다.
   + *mskClusterName*으로 Amazon MSK 클러스터의 이름을 제공합니다.
   + *cluster-uuid*로 Amazon MSK 클러스터의 ARN에 UUID를 제공합니다.
   + *mskTopicName*으로 Kafka 주제의 이름을 제공합니다.
   + *mskGroupName*으로 Kafka 소비자 그룹의 이름을 제공합니다.

1. Lambda가 Amazon MSK 클러스터를 검색 및 연결하고 해당 이벤트를 기록하는 데 필요한 Amazon MSK, Amazon EC2 및 CloudWatch 권한을 식별합니다.

   `AWSLambdaMSKExecutionRole` 관리형 정책은 필요한 권한을 허용적으로 정의합니다. 다음 단계에서 사용합니다.

   프로덕션 환경에서는 `AWSLambdaMSKExecutionRole`을 평가하여 최소 권한 원칙에 따라 실행 역할 정책을 제한한 다음 이 관리형 정책을 대체하는 역할에 대한 정책을 작성합니다.

IAM 정책 언어에 대한 자세한 내용은 [IAM 설명서](https://docs.aws.amazon.com//iam/)를 참조하세요.

정책 문서를 작성했으니 이제 IAM 정책을 생성하여 역할에 연결할 수 있습니다. 다음 절차에 따라 콘솔을 사용하여 이 작업을 수행할 수 있습니다.

**정책 문서에서 IAM 정책을 생성하려면 다음을 수행하세요.**

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)에서 IAM 콘솔을 엽니다.

1. 왼쪽의 탐색 창에서 **정책**을 선택합니다.

1. **정책 생성**을 선택합니다.

1. **정책 편집기** 섹션에서 **JSON** 옵션을 선택합니다.

1. *clusterAuthPolicy*를 붙여넣습니다.

1. 정책에 권한 추가를 완료했으면 **다음**을 선택합니다.

1. **검토 및 생성** 페이지에서 생성하는 정책에 대한 **정책 이름**과 **설명**(선택 사항)을 입력합니다. **이 정책에 정의된 권한**을 검토하여 정책이 부여한 권한을 확인합니다.

1. **정책 생성**을 선택하고 새로운 정책을 저장합니다.

자세한 내용은 IAM 설명서의 [IAM 정책 생성](https://docs.aws.amazon.com//IAM/latest/UserGuide/access_policies_create.html)을 참조하세요.

이제 적절한 IAM 정책이 있으므로 역할을 생성하고 이에 IAM 정책을 연결합니다. 다음 절차에 따라 콘솔을 사용하여 이 작업을 수행할 수 있습니다.

**IAM 콘솔에서 실행 역할을 생성하려면**

1. IAM 콘솔에서 [역할(Roles)](https://console.aws.amazon.com/iam/home#/roles) 페이지를 엽니다.

1. **역할 생성**을 선택합니다.

1. **신뢰할 수 있는 엔터티 유형**에서 **AWS 서비스**를 선택합니다.

1. **사용 사례**에서 **Lambda**를 선택합니다.

1. **다음**을 선택합니다.

1. 다음 정책을 선택합니다.
   + *clusterAuthPolicy*
   + `AWSLambdaMSKExecutionRole`

1. **다음**을 선택합니다.

1. **역할 이름**에 *lambdaAuthRole*을 입력한 다음 **역할 생성**을 선택합니다.

자세한 내용은 [실행 역할로 Lambda 함수 권한 정의](lambda-intro-execution-role.md) 섹션을 참조하세요.

## Amazon MSK 주제에서 읽을 Lambda 함수 생성
<a name="w2aad101c23c15c35c25"></a>

IAM 역할을 사용하도록 구성된 Lambda 함수를 생성합니다. 콘솔을 사용하여 Lambda 함수를 생성할 수 있습니다.

**인증 구성을 사용하여 Lambda 함수를 생성하려면 다음을 수행하세요.**

1.  Lambda 콘솔을 열고 헤더에서 **함수 생성**을 선택합니다.

1. **새로 작성**을 선택합니다.

1. **함수 이름**으로 원하는 적절한 이름을 제공합니다.

1. **런타임**으로 이 자습서에 제공된 코드를 사용하려면 **지원되는 최신** 버전의 `Node.js`를 선택합니다.

1. **기본 실행 역할 변경**을 선택합니다.

1. **기존 역할 사용**을 선택합니다.

1. **기존 역할**로 *lambdaAuthRole*을 선택합니다.

프로덕션 환경에서는 일반적으로 Amazon MSK 이벤트를 의미 있게 처리하기 위해 Lambda 함수의 실행 역할에 추가 정책을 추가해야 합니다. 역할에 정책을 추가하는 방법에 대한 자세한 내용은 IAM 설명서의 [ID 권한 추가 또는 제거](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html#add-policies-console)를 참조하세요.

## Lambda 함수에 대한 이벤트 소스 매핑 생성
<a name="w2aad101c23c15c35c27"></a>

Amazon MSK 이벤트 소스 매핑은 적절한 Amazon MSK 이벤트가 발생할 때 Lambda를 간접적으로 간접 호출하는 데 필요한 정보를 Lambda 서비스에 제공합니다. 콘솔을 사용하여 Amazon MSK 매핑을 생성할 수 있습니다. Lambda 트리거를 생성하면 이벤트 소스 매핑이 자동으로 설정됩니다.

**Lambda 트리거 및 이벤트 소스 매핑을 생성하려면 다음을 수행하세요.**

1. Lambda 함수의 개요 페이지로 이동합니다.

1. 함수 개요 섹션의 왼쪽 하단에서 **트리거 추가**를 선택합니다.

1. **소스 선택** 드롭다운에서 **Amazon MSK**를 선택합니다.

1. **인증**을 설정하지 마세요.

1. **MSK 클러스터**로 클러스터의 이름을 선택합니다.

1. **배치 크기**로 1을 입력합니다. 이 단계를 수행하면 이 기능을 더 쉽게 테스트할 수 있지만 프로덕션에서는 이상적인 값이 아닙니다.

1. **주제 이름**으로 Kafka 주제의 이름을 입력합니다.

1. **소비자 그룹 ID**로 Kafka 소비자 그룹의 ID를 제공합니다.

## 스트리밍 데이터를 읽도록 Lambda 함수 업데이트
<a name="w2aad101c23c15c35c29"></a>

 Lambda는 이벤트 메서드 파라미터를 통해 Kafka 이벤트에 대한 정보를 제공합니다. Amazon MSK 이벤트의 예제 구조는 [예제 이벤트](with-msk.md#msk-sample-event) 섹션을 참조하세요. Lambda가 전달한 Amazon MSK 이벤트를 해석하는 방법을 이해한 후에는 제공된 정보를 사용하도록 Lambda 함수 코드를 변경할 수 있습니다.

 테스트 목적으로 Lambda Amazon MSK 이벤트의 내용을 기록하려면 Lambda 함수에 다음 코드를 제공하세요.

------
#### [ .NET ]

**SDK for .NET**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
.NET을 사용하여 Lambda로 Amazon MSK 이벤트 사용  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Go를 사용하여 Lambda로 Amazon MSK 이벤트 사용  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Java를 사용하여 Lambda로 Amazon MSK 이벤트를 사용합니다.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
JavaScript를 사용하여 Lambda로 Amazon MSK 이벤트를 사용합니다.  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
TypeScript를 사용하여 Lambda로 Amazon MSK 이벤트를 사용합니다.  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
PHP를 사용하여 Lambda로 Amazon MSK 이벤트 사용  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python(Boto3)**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Python을 사용하여 Lambda로 Amazon MSK 이벤트 사용  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Ruby를 사용하여 Lambda로 Amazon MSK 이벤트 사용  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Rust를 사용하여 Lambda로 Amazon MSK 이벤트 사용  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------

콘솔을 사용하여 Lambda에 함수 코드를 제공할 수 있습니다.

**콘솔 코드 편집기를 사용하여 함수 코드 업데이트**

1. Lambda 콘솔의 [함수 페이지](https://console.aws.amazon.com/lambda/home#/functions)를 열고 함수를 선택합니다.

1. **코드** 탭을 선택합니다.

1. **코드 소스** 창에서 소스 코드 파일을 선택하고 통합 코드 편집기에서 편집합니다.

1. **배포** 섹션에서 **배포**를 선택하여 함수의 코드를 업데이트하세요.  
![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## Lambda 함수를 테스트하여 Amazon MSK 주제에 연결되어 있는지 확인합니다.
<a name="w2aad101c23c15c35c31"></a>

이제 CloudWatch 이벤트 로그를 검사하여 Lambda가 이벤트 소스에 의해 간접적으로 간접 호출되고 있는지 여부를 확인할 수 있습니다.

**Lambda 함수가 간접적으로 간접 호출되고 있는지 확인하려면 다음을 수행하세요.**

1. Kafka 관리 호스트를 사용하여 `kafka-console-producer` CLI를 통해 Kafka 이벤트를 생성합니다. 자세한 내용은 Kafka 설명서의 [Write some events into the topic](https://kafka.apache.org/documentation/#quickstart_send)을 참조하세요. 이전 단계에서 정의된 이벤트 소스 매핑에 대해 배치 크기로 정의된 배치를 채우기에 충분한 이벤트를 전송합니다. 그렇지 않으면 Lambda가 추가 정보가 간접적으로 간접 호출될 때까지 기다립니다.

1. 함수가 실행되면 Lambda는 CloudWatch에 발생한 일을 기록합니다. 콘솔에서 Lambda 함수의 세부 정보 페이지로 이동합니다.

1. [**구성(Configuration)**] 탭을 선택합니다.

1. 사이드바에서 **모니터링 및 운영 도구**를 선택합니다.

1. **로깅 구성**에서 **CloudWatch 로그 그룹**을 식별합니다. 로그 그룹은 `/aws/lambda`로 시작해야 합니다. 로그 그룹의 링크를 선택합니다.

1. CloudWatch 콘솔의 **로그 이벤트**에서 Lambda가 로그 스트림으로 전송한 로그 이벤트가 있는지 검사합니다. 다음 이미지와 같이 Kafka 이벤트의 메시지가 포함된 로그 이벤트가 있는지 확인합니다. 로그 이벤트가 있으면 Lambda 이벤트 소스 매핑을 사용하여 Lambda 함수를 Amazon MSK에 성공적으로 연결한 것입니다.  
![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/msk_tut_log.png)