자체 관리형 Apache Kafka 이벤트 소스에 대한 폐기 배치 캡처 - AWS Lambda

자체 관리형 Apache Kafka 이벤트 소스에 대한 폐기 배치 캡처

실패한 이벤트 소스 매핑 간접 호출 기록을 보관하려면 함수의 이벤트 소스 매핑에 대상을 추가합니다. 대상으로 전송된 각 레코드는 실패한 간접 호출에 대한 메타데이터를 포함하는 JSON 문서입니다. Amazon S3 대상의 경우 Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 전송합니다. 모든 Amazon SNS 주제, Amazon SQS 대기열 또는 S3 버킷을 대상으로 구성할 수 있습니다.

Amazon S3 대상을 사용하면 Amazon S3 이벤트 알림 기능을 통해 객체를 대상 S3 버킷에 업로드할 때 알림을 받을 수 있습니다. 실패한 배치에서 자동 처리를 수행하기 위해 다른 Lambda 함수를 간접 호출하도록 S3 이벤트 알림을 구성할 수도 있습니다.

실행 역할에 대상에 대한 권한이 있어야 합니다.

장애 시 대상 서비스를 위한 VPC 엔드포인트를 Apache Kafka 클러스터 VPC 내에 배포해야 합니다.

또한 대상에 KMS 키를 구성한 경우 대상 유형에 따라 Lambda에는 다음과 같은 권한이 필요합니다.

  • S3 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:GenerateDataKey가 필요합니다. KMS 키와 S3 버킷 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:GenerateDataKey를 허용합니다.

  • SQS 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:Decryptkms:GenerateDataKey가 필요합니다. KMS 키와 SQS 대기열 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:DescribeKeykms:ReEncrypt를 허용합니다.

  • SNS 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:Decryptkms:GenerateDataKey가 필요합니다. KMS 키와 SNS 주제 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:DescribeKeykms:ReEncrypt를 허용합니다.

자체 관리형 Apache Kafka 이벤트 소스 매핑에 대한 장애 발생 시 대상 구성

이 콘솔을 사용하여 장애 시 대상을 구성하려면 다음 단계를 따르세요.

  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수를 선택합니다.

  3. 함수 개요(Function overview)에서 대상 추가(Add destination)를 선택합니다.

  4. 소스의 경우 이벤트 소스 매핑 간접 호출을 선택합니다.

  5. 이벤트 소스 매핑의 경우 이 함수에 대해 구성된 이벤트 소스를 선택합니다.

  6. 조건의 경우 실패 시를 선택합니다. 이벤트 소스 매핑 간접 호출의 경우 이 조건만 수락됩니다.

  7. 대상 유형의 경우 Lambda가 간접 호출 레코드를 전송할 대상 유형을 선택합니다.

  8. Destination(대상)에서 리소스를 선택합니다.

  9. 저장을 선택합니다.

AWS CLI를 사용하여 장애 시 대상을 구성할 수도 있습니다. 예를 들어, 다음 create-event-source-mapping 명령은 SQS 장애 시 대상이 있는 이벤트 소스 매핑을 MyFunction에 추가합니다.

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

다음 update-event-source-mapping 명령은 uuid 입력과 연결된 이벤트 소스에 S3 장애 시 대상을 추가합니다.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

대상을 제거하려면 destination-config 파라미터의 인수로 빈 문자열을 제공합니다.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Amazon S3에 대한 보안 모범 사례

함수 구성에서 대상을 제거하지 않고 대상으로 구성된 S3 버킷을 삭제하면 보안 위험이 초래될 수 있습니다. 사용자의 대상 버킷 이름을 다른 사용자가 알고 있는 경우 AWS 계정에서 버킷을 다시 생성할 수 있습니다. 실패한 간접 호출에 대한 레코드가 해당 버킷으로 전송되어 함수의 데이터가 공개될 수 있습니다.

주의

함수의 간접 호출 레코드를 다른 AWS 계정의 S3 버킷으로 전송할 수 없도록 하려면 계정의 버킷에 대한 s3:PutObject 권한을 제한하는 조건을 함수의 실행 역할에 추가합니다.

다음 예제에서는 함수의 s3:PutObject 권한을 계정의 버킷으로 제한하는 IAM 정책을 보여줍니다. 또한 이 정책에서는 Lambda가 S3 버킷을 대상으로 사용하는 데 필요한 s3:ListBucket 권한도 부여합니다.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

AWS Management Console 또는 AWS CLI를 사용하여 함수의 실행 역할에 권한 정책을 추가하려면 다음 절차의 지침을 참조하세요.

Console
함수의 실행 역할에 권한 정책을 추가하는 방법(콘솔)
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 실행 역할을 수정하려는 Lambda 함수를 선택하세요.

  3. 구성 탭을 선택한 다음 사용 권한을 선택합니다.

  4. 실행 역할 탭에서 함수의 역할 이름을 선택하여 역할의 IAM 콘솔 페이지를 여세요.

  5. 다음을 수행하여 역할에 기본 권한 정책을 추가하세요.

    1. 권한 창에서 권한 추가를 선택하고 인라인 정책 생성을 선택하세요.

    2. 정책 편집기에서 JSON을 선택하세요.

    3. 추가하려는 정책을 편집기에 붙여넣고(이때 기존 JSON 대체) 다음을 선택하세요.

    4. 정책 세부 정보 아래에 정책 이름을 입력하세요.

    5. 정책 생성을 선택합니다.

AWS CLI
함수의 실행 역할에 권한 정책을 추가하는 방법(CLI)
  1. 필요한 권한이 있는 JSON 정책 문서를 생성하고 로컬 디렉터리에 저장하세요.

  2. IAM put-role-policy CLI 명령을 사용하여 함수의 실행 역할에 권한을 추가하세요. JSON 정책 문서를 저장한 디렉터리에서 다음 명령을 실행하고 역할 이름, 정책 이름 및 정책 문서를 고유한 값으로 바꾸세요.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

SNS 및 SQS 예제 간접 호출 레코드

다음 예는 실패한 Kafka 이벤트 소스 간접 호출에 대해 Lambda가 SNS 주제 또는 SQS 대기열 대상으로 전송하는 내용을 보여줍니다. recordsInfo 아래의 각 키에는 하이픈으로 구분된 Kafka 주제와 파티션이 모두 포함되어 있습니다. 예를 들어, "Topic-0" 키의 경우 Topic은 Kafka 주제이고 0은 파티션입니다. 각 주제 및 파티션에 대해 오프셋과 타임스탬프 데이터를 사용하여 원래의 간접 호출 레코드를 찾을 수 있습니다.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 대상 예제 간접 호출 레코드

S3 대상의 경우, Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 대상으로 전송합니다. 다음 예는 실패한 Kafka 이벤트 소스 간접 호출에 대해 Lambda가 SNS 주제 또는 S3 버킷 대상으로 전송하는 것을 보여줍니다. SQS 및 SNS 대상에 대한 이전 예제의 모든 필드 외에도 payload 필드에는 원래 간접 호출 레코드가 이스케이프된 JSON 문자열로 포함되어 있습니다.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
작은 정보

대상 버킷에서 S3 버전 관리를 활성화하는 것이 좋습니다.