Kinesis 이벤트 소스를 통해 이벤트 필터링 사용 - AWS Lambda

Kinesis 이벤트 소스를 통해 이벤트 필터링 사용

이벤트 필터링을 사용하여 Lambda가 함수로 전송하는 스트림 또는 대기열의 레코드를 제어할 수 있습니다. 이벤트 필터링의 작동 방식에 대한 일반적인 내용은 Lambda가 함수로 보내는 이벤트에 대한 제어을 참조하세요.

이 섹션에서는 Kinesis 이벤트 소스에 대한 이벤트 필터링에 중점을 둡니다.

Kinesis 이벤트 필터링 기본 사항

생산자가 JSON 형식의 데이터를 Kinesis 데이터 스트림에 넣고 있다고 가정해 보겠습니다. 예제 레코드는 다음과 같으며, JSON 데이터는 data 필드에서 Base64로 인코딩된 문자열로 변환됩니다.

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

생산자가 스트림에 넣는 데이터가 유효한 JSON이면 이벤트 필터링을 사용하여 data 키로 레코드를 필터링할 수 있습니다. 생산자가 다음 JSON 형식으로 레코드를 Kinesis 스트림에 넣고 있다고 가정해 보겠습니다.

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

주문 유형이 'buy'인 레코드만 필터링하려면 FilterCriteria 객체는 다음과 같습니다.

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

명확성을 더하기 위해 일반 JSON으로 확장된 필터의 Pattern 값은 다음과 같습니다.

{ "data": { "order": { "type": [ "buy" ] } } }

콘솔, AWS CLI 또는 AWS SAM 템플릿을 사용하여 필터를 추가할 수 있습니다.

Console

콘솔을 사용하여 이 필터를 추가하려면 이벤트 소스 매핑에 필터 기준 연결(콘솔)의 지침을 따르고 필터 기준에 대해 다음 문자열을 입력합니다.

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

AWS Command Line Interface(AWS CLI)를 사용하여 이러한 필터 기준으로 새 이벤트 소스 매핑을 생성하려면 다음 명령을 실행합니다.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

이러한 필터 기준을 기존 이벤트 소스 매핑에 추가하려면 다음 명령을 실행합니다.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

AWS SAM을 사용하여 이 필터를 추가하려면 이벤트 소스의 YAML 템플릿에 다음 코드 조각을 추가합니다.

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

Kinesis 소스에서 이벤트를 올바르게 필터링하려면 데이터 필드와 데이터 필드의 필터 기준이 모두 유효한 JSON 형식이어야 합니다. 두 필드 중 하나가 유효한 JSON 형식이 아닌 경우 Lambda는 해당 메시지를 삭제하거나 예외를 발생시킵니다. 다음 표에는 특정 동작이 요약되어 있습니다.

수신 데이터 형식 데이터 속성에 대한 필터 패턴 형식 결과적 작업

유효한 JSON

유효한 JSON

Lambda는 필터 기준에 따라 필터링합니다.

유효한 JSON

데이터 속성에 대한 필터 패턴 없음

Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.

유효한 JSON

JSON 외

이벤트 소스 매핑 생성 또는 업데이트 시 Lambda에서 예외가 발생합니다. 데이터 속성에 대한 필터 패턴은 유효한 JSON 형식이어야 합니다.

JSON 외

유효한 JSON

Lambda는 해당 레코드를 삭제합니다.

JSON 외

데이터 속성에 대한 필터 패턴 없음

Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.

JSON 외

JSON 외

이벤트 소스 매핑 생성 또는 업데이트 시 Lambda에서 예외가 발생합니다. 데이터 속성에 대한 필터 패턴은 유효한 JSON 형식이어야 합니다.

Kinesis 집계 레코드 필터링

Kinesis를 사용하면 여러 레코드를 단일 Kinesis Data Streams 레코드로 집계하여 데이터 처리량을 늘릴 수 있습니다. Lambda는 Kinesis 향상된 팬아웃을 사용할 때 집계 레코드에만 필터 기준을 적용할 수 있습니다. 표준 Kinesis를 사용하여 집계 레코드를 필터링하는 것은 지원되지 않습니다. 향상된 팬아웃을 사용하는 경우 Lambda 함수의 트리거 역할을 하도록 Kinesis 전용 처리량 소비자를 구성합니다. 그런 다음 Lambda는 집계 레코드를 필터링하고 필터 기준을 충족하는 레코드만 전달합니다.

Kinesis 레코드 집계에 대해 자세히 알아보려면 Kinesis Producer Library(KPL) 주요 개념 페이지의 집계 섹션을 참조하세요. Kinesis 향상된 팬아웃과 함께 Lambda를 사용하는 방법에 대해 자세히 알아보려면 AWS 컴퓨팅 블로그의 Amazon Kinesis Data Streams 향상된 팬아웃 및 AWS Lambda로 실시간 스트림 처리 성능 향상을 참조하세요.