搭配 Kinesis 事件來源使用事件篩選 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 Kinesis 事件來源使用事件篩選

您可以使用事件篩選來控制 Lambda 將哪些記錄從串流或佇列中傳送至函數。如需事件篩選如何運作的一般資訊,請參閱控制 Lambda 傳送給函數的事件

本節著重於 Kinesis 事件來源的事件篩選。

Kinesis 動事件篩選基礎知識

假設生產者正在將JSON格式化的資料放入 Kinesis 資料串流中。範例記錄如下所示,JSON資料會轉換為data欄位中的 Base64 編碼字串。

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

只要生產者放入串流中的資料有效JSON,就可以使用事件篩選來使用data金鑰篩選記錄。假設生產者以下列JSON格式將記錄放入 Kinesis 串流中。

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

若僅篩選訂單類型為「購買」的記錄,FilterCriteria 物件如下所示。

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

為了增加清晰度,以下是以普通方式Pattern擴展過濾器的值JSON。

{ "data": { "order": { "type": [ "buy" ] } } }

您可以使用控制台 AWS CLI 或 AWS SAM 模板添加過濾器。

Console

若要使用主控台新增此篩選條件,請遵循 將篩選條件標準連接至事件來源映射 (主控台) 中的指示,並針對篩選條件標準輸入下列字串。

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

若要使用 AWS Command Line Interface (AWS CLI) 建立具有這些篩選條件的新事件來源對應,請執行下列命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

若要將這些篩選條件標準新增到現有事件來源映射,請執行下列命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

若要使用新增此篩選器 AWS SAM,請將下列程式碼片段新增至事件來源的YAML範本。

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

若要正確篩選 Kinesis 來源中的事件,資料欄位和資料欄位的篩選條件都必須是有效的JSON格式。如果任一欄位不是有效的JSON格式,Lambda 會捨棄訊息或擲回例外狀況。下表摘要說明特定行為:

傳入資料格式 資料屬性的篩選條件模式格式 產生的動作

有效 JSON

有效 JSON

根據您的篩選條件標準之 Lambda 篩選條件。

有效 JSON

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效 JSON

非 JSON

Lambda 會在事件來源映射建立或更新時擲回例外狀況。資料屬性的篩選器模式必須是有效的JSON格式。

非 JSON

有效 JSON

Lambda 捨棄記錄。

非 JSON

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

非 JSON

非 JSON

Lambda 會在事件來源映射建立或更新時擲回例外狀況。資料屬性的篩選器模式必須是有效的JSON格式。

篩選 Kinesis 彙總記錄

使用 Kinesis,可以將多個記錄彙總到單一 Kinesis 資料串流記錄中,以提高資料輸送量。Lambda 只會在您使用 Kinesis 增強型展開傳送時,將篩選條件標準套用至彙總記錄。不支援使用標準 Kinesis 篩選彙總記錄。使用增強型展開傳送時,您可以設定 Kinesis 專用輸送量取用程式,以充當 Lambda 函數的觸發條件。Lambda 接著會篩選彙總記錄,並僅傳遞符合篩選條件標準的記錄。

若要進一步了解 Kinesis 記錄彙總,請參閱 Kinesis 製作者程式庫 (KPL) 主要概念頁面上的總部分。若要進一步了解如何使用 Lambda 搭配 Kinesis 增強型散發功能,請參閱運算部落格上的 Amazon Kinesis Data Streams 增強型散發和 AWS Lambda 來提高即時串流處理效能。 AWS