透過自我管理的 Apache Kafka 事件來源使用事件篩選 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

透過自我管理的 Apache Kafka 事件來源使用事件篩選

您可以使用事件篩選來控制 Lambda 將哪些記錄從串流或佇列中傳送至函數。如需事件篩選如何運作的一般資訊,請參閱控制 Lambda 傳送給函數的事件

本節著重於自我管理的 Apache Kafka 事件來源的事件篩選。

自我管理的 Apache 卡夫卡事件篩選基礎知識

假設生產者正在將消息寫入自我管理的 Apache Kafka 集群中的某個主題,無論是以有效JSON格式還是純字符串。範例記錄如下所示,訊息在 value 欄位中會轉換為 Base64 編碼字串。

{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }

假設您的 Apache 卡夫卡製作人正在以下JSON格式將消息寫入您的主題。

{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }

您可以使用 value 索引鍵來篩選記錄。假設您只想篩選 device_ID 以字母 AB 開頭的記錄。FilterCriteria 物件如下所示。

{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }

為了增加清晰度,以下是以普通方式Pattern擴展過濾器的值JSON。

{ "value": { "device_ID": [ { "prefix": "AB" } ] } }

您可以使用控制台 AWS CLI 或 AWS SAM 模板添加過濾器。

Console

若要使用主控台新增此篩選條件,請遵循 將篩選條件標準連接至事件來源映射 (主控台) 中的指示,並針對篩選條件標準輸入下列字串。

{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }
AWS CLI

若要使用 AWS Command Line Interface (AWS CLI) 建立具有這些篩選條件的新事件來源對應,請執行下列命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'

若要將這些篩選條件標準新增到現有事件來源映射,請執行下列命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'
AWS SAM

若要使用新增此篩選器 AWS SAM,請將下列程式碼片段新增至事件來源的YAML範本。

FilterCriteria: Filters: - Pattern: '{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }'

使用自我管理的 Apache Kafka,您還可以過濾消息為純字符串的記錄。假設您想忽略字串為「錯誤」的訊息。FilterCriteria 物件如下所示。

{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }

為了增加清晰度,以下是以普通方式Pattern擴展過濾器的值JSON。

{ "value": [ { "anything-but": [ "error" ] } ] }

您可以使用控制台 AWS CLI 或 AWS SAM 模板添加過濾器。

Console

若要使用主控台新增此篩選條件,請遵循 將篩選條件標準連接至事件來源映射 (主控台) 中的指示,並針對篩選條件標準輸入下列字串。

{ "value" : [ { "anything-but": [ "error" ] } ] }
AWS CLI

若要使用 AWS Command Line Interface (AWS CLI) 建立具有這些篩選條件的新事件來源對應,請執行下列命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'

若要將這些篩選條件標準新增到現有事件來源映射,請執行下列命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
AWS SAM

若要使用新增此篩選器 AWS SAM,請將下列程式碼片段新增至事件來源的YAML範本。

FilterCriteria: Filters: - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'

自我管理的 Apache 卡夫卡消息必須是 UTF -8 個編碼的字符串,無論是純字符串還是格式。JSON這是因為 Lambda 在應用過濾條件之前將卡夫卡字節數組解碼為 UTF -8。如果您的訊息使用其他編碼 (例如 UTF -16 或)ASCII,或訊息格式與格式不相符,Lambda 只會處理中繼資料篩選器。FilterCriteria下表摘要說明特定行為:

傳入訊息 格式 訊息屬性的篩選條件模式格式 產生的動作

純文字的字串

純文字的字串

根據您的篩選條件標準之 Lambda 篩選條件。

純文字的字串

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

純文字的字串

有效 JSON

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效 JSON

純文字的字串

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效 JSON

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效 JSON

有效 JSON

根據您的篩選條件標準之 Lambda 篩選條件。

非 UTF -8 編碼字符串

JSON,普通字符串,或沒有模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。