Lambda에서 Kinesis 데이터 스트림 이벤트 소스에 대한 폐기된 배치 레코드 유지 - AWS Lambda

Lambda에서 Kinesis 데이터 스트림 이벤트 소스에 대한 폐기된 배치 레코드 유지

Kinesis 이벤트 소스 매핑에 대한 오류 처리는 오류가 함수 간접 호출 전에 발생하는지 아니면 함수 간접 호출 중에 발생하는지 여부에 따라 달라집니다.

  • 간접 호출 전: Lambda 이벤트 소스 매핑이 제한 또는 기타 문제로 인해 함수를 간접적으로 간접 호출할 수 없는 경우 레코드가 만료되거나 이벤트 소스 매핑에 구성된 최대 기간(MaximumRecordAgeInSeconds)을 초과할 때까지 재시도합니다.

  • 간접 호출 중: 함수가 간접적으로 간접 호출되었지만 오류가 반환되는 경우 Lambda는 레코드가 만료되거나 최대 기간(MaximumRecordAgeInSeconds)을 초과하거나 구성된 재시도 할당량(MaximumRetryAttempts)에 도달할 때까지 재시도합니다. 함수 오류의 경우 실패한 배치를 두 개의 작은 배치로 분할하여 잘못된 레코드를 격리하고 시간 초과를 방지하는 BisectBatchOnFunctionError를 구성할 수도 있습니다. 배치를 분할할 때는 재시도 할당량이 소모되지 않습니다.

오류 처리에서 실패를 측정하는 경우 Lambda는 레코드를 폐기하고 스트림에서 배치 처리를 계속합니다. 기본 설정을 사용하는 경우 이는 잘못된 레코드가 영향을 받은 샤드에 대한 처리를 최대 1주 동안 차단할 수 있음을 의미합니다. 이를 방지하려면 함수의 이벤트 소스 매핑을 사용자의 사례에 적합한 최대 레코드 사용 기간 및 합당한 재시도 횟수로 구성합니다.

실패한 간접 호출에 대한 대상 구성

실패한 이벤트 소스 매핑 간접 호출 기록을 보관하려면 함수의 이벤트 소스 매핑에 대상을 추가합니다. 대상으로 전송된 각 레코드는 실패한 간접 호출에 대한 메타데이터를 포함하는 JSON 문서입니다. Amazon S3 대상의 경우 Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 전송합니다. 모든 Amazon SNS 주제, Amazon SQS 대기열 또는 S3 버킷을 대상으로 구성할 수 있습니다.

Amazon S3 대상을 사용하면 Amazon S3 이벤트 알림 기능을 통해 객체를 대상 S3 버킷에 업로드할 때 알림을 받을 수 있습니다. 실패한 배치에서 자동 처리를 수행하기 위해 다른 Lambda 함수를 간접 호출하도록 S3 이벤트 알림을 구성할 수도 있습니다.

실행 역할에 대상에 대한 권한이 있어야 합니다.

S3 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 함수의 실행 역할에 kms:GenerateDataKey를 직접 호출할 수 있는 권한도 있어야 합니다. KMS 키와 S3 버킷 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:GenerateDataKey를 허용합니다.

이 콘솔을 사용하여 장애 시 대상을 구성하려면 다음 단계를 따르세요.

  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수를 선택합니다.

  3. 함수 개요(Function overview)에서 대상 추가(Add destination)를 선택합니다.

  4. 소스의 경우 이벤트 소스 매핑 간접 호출을 선택합니다.

  5. 이벤트 소스 매핑의 경우 이 함수에 대해 구성된 이벤트 소스를 선택합니다.

  6. 조건의 경우 실패 시를 선택합니다. 이벤트 소스 매핑 간접 호출의 경우 이 조건만 수락됩니다.

  7. 대상 유형의 경우 Lambda가 간접 호출 레코드를 전송할 대상 유형을 선택합니다.

  8. Destination(대상)에서 리소스를 선택합니다.

  9. 저장을 선택합니다.

AWS Command Line Interface(AWS CLI)를 사용하여 장애 시 대상을 구성할 수도 있습니다. 예를 들어, 다음 create-event-source-mapping 명령은 SQS 장애 시 대상이 있는 이벤트 소스 매핑을 MyFunction에 추가합니다.

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

다음 update-event-source-mapping 명령은 두 번의 재시도 시도 후 또는 레코드가 1시간 이상 지난 경우 SNS 대상으로 실패한 간접 호출 레코드를 전송하도록 이벤트 소스 매핑을 업데이트합니다.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

업데이트된 설정은 비동기식으로 적용되며 프로세스가 완료된 후에야 출력에 반영됩니다. get-event-source-mapping 명령을 사용하여 현재 상태를 봅니다.

대상을 제거하려면 destination-config 파라미터의 인수로 빈 문자열을 제공합니다.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Amazon S3에 대한 보안 모범 사례

함수 구성에서 대상을 제거하지 않고 대상으로 구성된 S3 버킷을 삭제하면 보안 위험이 초래될 수 있습니다. 사용자의 대상 버킷 이름을 다른 사용자가 알고 있는 경우 AWS 계정에서 버킷을 다시 생성할 수 있습니다. 실패한 간접 호출에 대한 레코드가 해당 버킷으로 전송되어 함수의 데이터가 공개될 수 있습니다.

주의

함수의 간접 호출 레코드를 다른 AWS 계정의 S3 버킷으로 전송할 수 없도록 하려면 계정의 버킷에 대한 s3:PutObject 권한을 제한하는 조건을 함수의 실행 역할에 추가합니다.

다음 예제에서는 함수의 s3:PutObject 권한을 계정의 버킷으로 제한하는 IAM 정책을 보여줍니다. 또한 이 정책에서는 Lambda가 S3 버킷을 대상으로 사용하는 데 필요한 s3:ListBucket 권한도 부여합니다.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

AWS Management Console 또는 AWS CLI를 사용하여 함수의 실행 역할에 권한 정책을 추가하려면 다음 절차의 지침을 참조하세요.

Console
함수의 실행 역할에 권한 정책을 추가하는 방법(콘솔)
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 실행 역할을 수정하려는 Lambda 함수를 선택하세요.

  3. 구성 탭을 선택한 다음 사용 권한을 선택합니다.

  4. 실행 역할 탭에서 함수의 역할 이름을 선택하여 역할의 IAM 콘솔 페이지를 여세요.

  5. 다음을 수행하여 역할에 기본 권한 정책을 추가하세요.

    1. 권한 창에서 권한 추가를 선택하고 인라인 정책 생성을 선택하세요.

    2. 정책 편집기에서 JSON을 선택하세요.

    3. 추가하려는 정책을 편집기에 붙여넣고(이때 기존 JSON 대체) 다음을 선택하세요.

    4. 정책 세부 정보 아래에 정책 이름을 입력하세요.

    5. 정책 생성을 선택합니다.

AWS CLI
함수의 실행 역할에 권한 정책을 추가하는 방법(CLI)
  1. 필요한 권한이 있는 JSON 정책 문서를 생성하고 로컬 디렉터리에 저장하세요.

  2. IAM put-role-policy CLI 명령을 사용하여 함수의 실행 역할에 권한을 추가하세요. JSON 정책 문서를 저장한 디렉터리에서 다음 명령을 실행하고 역할 이름, 정책 이름 및 정책 문서를 고유한 값으로 바꾸세요.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Amazon SNS 및 Amazon SQS 간접 호출 레코드 예제

다음 예는 실패한 Kinesis 이벤트 소스 간접 호출에 대해 Lambda가 SQS 대기열 또는 SNS 주제로 전송하는 내용을 보여줍니다. Lambda는 이러한 대상 유형에 대한 메타데이터만 전송하기 때문에 streamArn, shardId, startSequenceNumberendSequenceNumber 필드를 사용하여 전체 원본 레코드를 확보합니다. KinesisBatchInfo 속성에 표시되는 모든 필드는 항상 존재합니다.

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

이 정보를 사용하여 문제 해결을 위해 스트림에서 영향을 받은 레코드를 검색할 수 있습니다. 실제 레코드는 포함되지 않으므로 이 레코드를 처리하고 실제 레코드가 만료되고 없어지기 전에 스트림에서 해당 레코드를 검색해야 합니다.

Amazon S3 간접 호출 레코드 예제

다음 예제에서는 실패한 Kinesis 이벤트 소스 간접 호출에 대해 Lambda가 Amazon S3 버킷으로 전송하는 내용을 보여줍니다. SQS 및 SNS 대상에 대한 이전 예제의 모든 필드 외에도 payload 필드에는 원래 간접 호출 레코드가 이스케이프된 JSON 문자열로 포함되어 있습니다.

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" }, "payload": "<Whole Event>" // Only available in S3 }

간접 호출 레코드를 포함하는 S3 객체는 다음 이름 지정 규칙을 사용합니다.

aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>