자체 관리형 Apache Kafka 이벤트 소스에 대한 폐기 배치 캡처
실패한 이벤트 소스 매핑 간접 호출 기록을 보관하려면 함수의 이벤트 소스 매핑에 대상을 추가합니다. 대상으로 전송된 각 레코드는 실패한 간접 호출에 대한 메타데이터를 포함하는 JSON 문서입니다. Amazon S3 대상의 경우 Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 전송합니다. 모든 Amazon SNS 주제, Amazon SQS 대기열 또는 S3 버킷을 대상으로 구성할 수 있습니다.
Amazon S3 대상을 사용하면 Amazon S3 이벤트 알림 기능을 통해 객체를 대상 S3 버킷에 업로드할 때 알림을 받을 수 있습니다. 실패한 배치에서 자동 처리를 수행하기 위해 다른 Lambda 함수를 간접 호출하도록 S3 이벤트 알림을 구성할 수도 있습니다.
실행 역할에 대상에 대한 권한이 있어야 합니다.
-
SQS 대상의 경우: sqs:SendMessage
-
SNS 대상의 경우: sns:Publish
-
S3 버킷 대상의 경우: s3:PutObject 및 s3:ListBucket
장애 시 대상 서비스를 위한 VPC 엔드포인트를 Apache Kafka 클러스터 VPC 내에 배포해야 합니다.
또한 대상에 KMS 키를 구성한 경우 대상 유형에 따라 Lambda에는 다음과 같은 권한이 필요합니다.
-
S3 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:GenerateDataKey가 필요합니다. KMS 키와 S3 버킷 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:GenerateDataKey를 허용합니다.
-
SQS 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:Decrypt 및 kms:GenerateDataKey가 필요합니다. KMS 키와 SQS 대기열 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:DescribeKey 및 kms:ReEncrypt를 허용합니다.
-
SNS 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 kms:Decrypt 및 kms:GenerateDataKey가 필요합니다. KMS 키와 SNS 주제 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:DescribeKey 및 kms:ReEncrypt를 허용합니다.
자체 관리형 Apache Kafka 이벤트 소스 매핑에 대한 장애 발생 시 대상 구성
이 콘솔을 사용하여 장애 시 대상을 구성하려면 다음 단계를 따르세요.
Lambda 콘솔의 함수 페이지
를 엽니다. -
함수를 선택합니다.
-
함수 개요(Function overview)에서 대상 추가(Add destination)를 선택합니다.
-
소스의 경우 이벤트 소스 매핑 간접 호출을 선택합니다.
-
이벤트 소스 매핑의 경우 이 함수에 대해 구성된 이벤트 소스를 선택합니다.
-
조건의 경우 실패 시를 선택합니다. 이벤트 소스 매핑 간접 호출의 경우 이 조건만 수락됩니다.
-
대상 유형의 경우 Lambda가 간접 호출 레코드를 전송할 대상 유형을 선택합니다.
-
Destination(대상)에서 리소스를 선택합니다.
-
저장을 선택합니다.
AWS CLI를 사용하여 장애 시 대상을 구성할 수도 있습니다. 예를 들어, 다음 create-event-source-mappingMyFunction
에 추가합니다.
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
다음 update-event-source-mappinguuid
입력과 연결된 이벤트 소스에 S3 장애 시 대상을 추가합니다.
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
대상을 제거하려면 destination-config
파라미터의 인수로 빈 문자열을 제공합니다.
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Amazon S3에 대한 보안 모범 사례
함수 구성에서 대상을 제거하지 않고 대상으로 구성된 S3 버킷을 삭제하면 보안 위험이 초래될 수 있습니다. 사용자의 대상 버킷 이름을 다른 사용자가 알고 있는 경우 AWS 계정에서 버킷을 다시 생성할 수 있습니다. 실패한 간접 호출에 대한 레코드가 해당 버킷으로 전송되어 함수의 데이터가 공개될 수 있습니다.
주의
함수의 간접 호출 레코드를 다른 AWS 계정의 S3 버킷으로 전송할 수 없도록 하려면 계정의 버킷에 대한 s3:PutObject
권한을 제한하는 조건을 함수의 실행 역할에 추가합니다.
다음 예제에서는 함수의 s3:PutObject
권한을 계정의 버킷으로 제한하는 IAM 정책을 보여줍니다. 또한 이 정책에서는 Lambda가 S3 버킷을 대상으로 사용하는 데 필요한 s3:ListBucket
권한도 부여합니다.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount":
"111122223333"
} } } ] }
AWS Management Console 또는 AWS CLI를 사용하여 함수의 실행 역할에 권한 정책을 추가하려면 다음 절차의 지침을 참조하세요.
SNS 및 SQS 예제 간접 호출 레코드
다음 예는 실패한 Kafka 이벤트 소스 간접 호출에 대해 Lambda가 SNS 주제 또는 SQS 대기열 대상으로 전송하는 내용을 보여줍니다. recordsInfo
아래의 각 키에는 하이픈으로 구분된 Kafka 주제와 파티션이 모두 포함되어 있습니다. 예를 들어, "Topic-0"
키의 경우 Topic
은 Kafka 주제이고 0
은 파티션입니다. 각 주제 및 파티션에 대해 오프셋과 타임스탬프 데이터를 사용하여 원래의 간접 호출 레코드를 찾을 수 있습니다.
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }
S3 대상 예제 간접 호출 레코드
S3 대상의 경우, Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 대상으로 전송합니다. 다음 예는 실패한 Kafka 이벤트 소스 간접 호출에 대해 Lambda가 SNS 주제 또는 S3 버킷 대상으로 전송하는 것을 보여줍니다. SQS 및 SNS 대상에 대한 이전 예제의 모든 필드 외에도 payload
필드에는 원래 간접 호출 레코드가 이스케이프된 JSON 문자열로 포함되어 있습니다.
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
작은 정보
대상 버킷에서 S3 버전 관리를 활성화하는 것이 좋습니다.