对自行管理的 Apache Kafka 事件源使用事件筛选
您可以使用事件筛选,控制 Lambda 将流或队列中的哪些记录发送给函数。有关事件筛选工作原理的一般信息,请参阅 控制 Lambda 向您的函数发送的事件。
本节重点介绍自行管理的 Apache Kafka 事件源的事件筛选。
自行管理的 Apache Kafka 事件筛选基础知识
假设创建者以有效的 JSON 格式或纯字符串的形式将消息写入自行管理的 Apache Kafka 集群中的主题。示例记录将如下所示,value
字段中的消息会转换为 Base64 编码字符串。
{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }
假设 Apache Kafka 创建器以如下 JSON 格式将消息写入主题。
{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }
您可以使用 value
键筛选记录。假设您只想筛选 device_ID
以字母 AB 开头的记录。FilterCriteria
对象将如下所示。
{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }
为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern
的值。
{ "value": { "device_ID": [ { "prefix": "AB" } ] } }
您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。
通过自行管理的 Apache Kafka,您还可以筛选消息为纯字符串的记录。假设您想忽略字符串为“错误”的消息。FilterCriteria
对象将如下所示。
{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }
为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern
的值。
{ "value": [ { "anything-but": [ "error" ] } ] }
您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。
自行管理的 Apache Kafka 消息必须是 UTF-8 编码的字符串,可以是纯字符串或 JSON 格式。这是因为 Lambda 在应用筛选条件之前将 Kafka 字节数组解码为 UTF-8。如果您的消息使用另一种编码,例如 UTF-16 或 ASCII,或者消息格式与 FilterCriteria
格式不匹配,则 Lambda 仅处理元数据筛选条件。下表汇总了具体行为:
传入消息格式 | 消息属性的筛选条件模式格式 | 导致的操作 |
---|---|---|
纯字符串 |
纯字符串 |
Lambda 根据您的筛选条件进行筛选。 |
纯字符串 |
数据属性中没有筛选条件模式 |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
纯字符串 |
有效 JSON |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
纯字符串 |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
数据属性中没有筛选条件模式 |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
有效 JSON |
Lambda 根据您的筛选条件进行筛选。 |
非 UTF-8 编码字符串 |
JSON、纯字符串或无模式 |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |