对 Kinesis 事件源使用事件筛选 - AWS Lambda

对 Kinesis 事件源使用事件筛选

您可以使用事件筛选,控制 Lambda 将流或队列中的哪些记录发送给函数。有关事件筛选工作原理的一般信息,请参阅 控制 Lambda 向您的函数发送的事件

本节重点介绍 Kinesis 事件源的事件筛选。

Kinesis 事件筛选基础知识

假设创建器将 JSON 格式的数据放入 Kinesis 数据流。示例记录如下所示,data 字段中的 JSON 数据会转换为 Base64 编码字符串。

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

只要创建器放入流中的数据是有效的 JSON,您就可以使用 data 键,通过事件筛选来筛选记录。假设创建器将如下 JSON 格式的记录放入 Kinesis 流。

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

要仅筛选订单类型为“购买”的记录,FilterCriteria 对象将如下所示。

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern 的值。

{ "data": { "order": { "type": [ "buy" ] } } }

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

Console

要使用控制台添加此筛选条件,请按照 将筛选条件附加到事件源映射(控制台) 中的说明,为筛选条件输入以下字符串。

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

要使用 AWS Command Line Interface(AWS CLI)创建包含这些筛选条件的新事件源映射,请运行以下命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

要将这些筛选条件添加到现有事件源映射中,请运行以下命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

要使用 AWS SAM 添加此筛选条件,请将以下代码段添加到事件源的 YAML 模板中。

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

要正确筛选 Kinesis 源中的事件,数据字段及其筛选条件都必须为有效的 JSON 格式。如果任一字段不为有效的 JSON 格式,Lambda 将会丢弃消息或引发异常。下表汇总了具体行为:

传入数据格式 数据属性中的筛选条件模式格式 导致的操作

有效 JSON

有效 JSON

Lambda 根据您的筛选条件进行筛选。

有效 JSON

数据属性中没有筛选条件模式

Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

非 JSON

Lambda 在事件源映射创建或更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。

非 JSON

有效 JSON

Lambda 将丢弃记录。

非 JSON

数据属性中没有筛选条件模式

Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。

非 JSON

非 JSON

Lambda 在事件源映射创建或更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。

筛选 Kinesis 聚合记录

使用 Kinesis,您可以将多条记录聚合到单条 Kinesis Data Streams 记录中,以提高数据吞吐量。使用 Kinesis 增强扇出功能时,Lambda 只能将筛选条件应用于聚合记录。不支持使用标准 Kinesis 筛选聚合记录。使用增强扇出功能时,您可以配置 Kinesis 专用吞吐量使用者作为 Lambda 函数的触发器。然后,Lambda 会筛选聚合记录,并仅传递符合筛选条件的记录。

要了解有关 Kinesis 记录聚合的更多信息,请参阅“Kinesis Producer Library(KPL)关键概念”页面上的聚合部分。要了解有关将 Lambda 与 Kinesis 增强扇出功能结合使用的更多信息,请参阅 AWS 计算博客上的使用 Amazon Kinesis Data Streams 增强扇出功能和 AWS Lambda 提高实时流处理性能