对自行管理的 Apache Kafka 事件源使用事件筛选 - AWS Lambda

对自行管理的 Apache Kafka 事件源使用事件筛选

您可以使用事件筛选,控制 Lambda 将流或队列中的哪些记录发送给函数。有关事件筛选工作原理的一般信息,请参阅 控制 Lambda 向您的函数发送的事件

本节重点介绍自行管理的 Apache Kafka 事件源的事件筛选。

自行管理的 Apache Kafka 事件筛选基础知识

假设创建者以有效的 JSON 格式或纯字符串的形式将消息写入自行管理的 Apache Kafka 集群中的主题。示例记录将如下所示,value 字段中的消息会转换为 Base64 编码字符串。

{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }

假设 Apache Kafka 创建器以如下 JSON 格式将消息写入主题。

{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }

您可以使用 value 键筛选记录。假设您只想筛选 device_ID 以字母 AB 开头的记录。FilterCriteria 对象将如下所示。

{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }

为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern 的值。

{ "value": { "device_ID": [ { "prefix": "AB" } ] } }

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

Console

要使用控制台添加此筛选条件,请按照 将筛选条件附加到事件源映射(控制台) 中的说明,为筛选条件输入以下字符串。

{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }
AWS CLI

要使用 AWS Command Line Interface(AWS CLI)创建包含这些筛选条件的新事件源映射,请运行以下命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'

要将这些筛选条件添加到现有事件源映射中,请运行以下命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'
AWS SAM

要使用 AWS SAM 添加此筛选条件,请将以下代码段添加到事件源的 YAML 模板中。

FilterCriteria: Filters: - Pattern: '{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }'

通过自行管理的 Apache Kafka,您还可以筛选消息为纯字符串的记录。假设您想忽略字符串为“错误”的消息。FilterCriteria 对象将如下所示。

{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }

为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern 的值。

{ "value": [ { "anything-but": [ "error" ] } ] }

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

Console

要使用控制台添加此筛选条件,请按照 将筛选条件附加到事件源映射(控制台) 中的说明,为筛选条件输入以下字符串。

{ "value" : [ { "anything-but": [ "error" ] } ] }
AWS CLI

要使用 AWS Command Line Interface(AWS CLI)创建包含这些筛选条件的新事件源映射,请运行以下命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'

要将这些筛选条件添加到现有事件源映射中,请运行以下命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
AWS SAM

要使用 AWS SAM 添加此筛选条件,请将以下代码段添加到事件源的 YAML 模板中。

FilterCriteria: Filters: - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'

自行管理的 Apache Kafka 消息必须是 UTF-8 编码的字符串,可以是纯字符串或 JSON 格式。这是因为 Lambda 在应用筛选条件之前将 Kafka 字节数组解码为 UTF-8。如果您的消息使用另一种编码,例如 UTF-16 或 ASCII,或者消息格式与 FilterCriteria 格式不匹配,则 Lambda 仅处理元数据筛选条件。下表汇总了具体行为:

传入消息格式 消息属性的筛选条件模式格式 导致的操作

纯字符串

纯字符串

Lambda 根据您的筛选条件进行筛选。

纯字符串

数据属性中没有筛选条件模式

Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。

纯字符串

有效 JSON

Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

纯字符串

Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

数据属性中没有筛选条件模式

Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

有效 JSON

Lambda 根据您的筛选条件进行筛选。

非 UTF-8 编码字符串

JSON、纯字符串或无模式

Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。