Amazon EventBridge 管道中的事件筛选 - Amazon EventBridge

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon EventBridge 管道中的事件筛选

使用 EventBridge Pipes,您可以筛选给定源的事件,仅处理其中的一部分事件。这种筛选的工作原理与使用事件模式对 EventBridge 事件总线或 Lambda 事件源映射进行筛选的原理相同。有关事件模式的更多信息,请参阅 Amazon EventBridge 事件模式

筛选标准 FilterCriteria 对象是由筛选条件 Filters 列表组成的结构。每个筛选条件都是用于定义筛选模式 (Pattern) 的结构。Pattern 是 JSON 筛选条件规则的字符串表示。FilterCriteria 对象与以下示例类似:

{ "Filters": [ {"Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}" } ] }

为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern 的值。

{ "Metadata1": [ pattern1 ], "data": {"Data1": [ pattern2 ]} }

FilterCriteria 对象的主要部分是元数据属性和数据属性。

  • 元数据属性是指事件对象的字段。在示例中,FilterCriteria.Metadata1 表示元数据属性。

  • 数据属性是指事件主体的字段。在示例中,FilterCriteria.Data1 表示数据属性。

例如,假设您的 Kinesis 流包含这样的事件:

{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": {"City": "Seattle", "State": "WA", "Temperature": "46", "Month": "December" }, "approximateArrivalTimestamp": 1545084650.987 }

当事件流经您的管道时,采用 base64 编码 data 字段将如下所示:

{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

Kinesis 事件的元数据属性data 对象之外的任何字段,例如 partitionKeysequenceNumber

Kinesis 事件的数据属性data 对象中的任何字段,例如 CityTemperature

当您进行筛选以匹配此事件时,可以针对解码后的字段使用筛选条件。例如,要筛选 partitionKeyCity,需要使用以下筛选条件:

{ "partitionKey": [ "1" ], "data": { "City": [ "Seattle" ] } }

创建事件筛选器后,EventBridge Pipes 可以访问事件内容。这些内容要么是 JSON 转义的,例如 Amazon SQS body 字段,要么是 base64 编码的,例如 Kinesis data 字段。如果您的数据是有效的 JSON,则您的输入模板或目标参数的 JSON 路径可以直接引用内容。例如,如果 Kinesis 事件源是有效的 JSON,您可以使用 <$.data.someKey> 引用变量。

创建事件模式时,您可以根据源 API 发送的字段进行筛选,而不是根据轮询操作添加的字段进行筛选。以下字段不能用于事件模式:

  • awsRegion

  • eventSource

  • eventSourceARN

  • eventVersion

  • eventID

  • eventName

  • invokeIdentityArn

  • eventSourceKey

消息和数据字段

每个 EventBridge Pipe 源都包含一个字段,其中包含核心消息或数据。我们将它们称为消息 字段或数据 字段。这些字段之所以特别,是因为它们可能是 JSON 转义或 base64 编码的,但当它们是有效 JSON 时,可以使用 JSON 模式对其进行筛选,就像正文没有被转义一样。这些字段的内容也可以无缝地用于输入转换器

正确筛选 Amazon SQS 消息

如果 Amazon SQS 消息不符合您的筛选条件,EventBridge 会自动从队列中删除该消息。您无需在 Amazon SQS 中手动删除这些消息。

对于 Amazon SQS,消息 body 可以是任何字符串。但如果您的 FilterCriteria 期望 body 为有效的 JSON 格式,则可能会导致问题。反之亦然,如果传入的消息 body 为有效的 JSON 格式,但您的筛选标准期望 body 为纯字符串,会导致出现意外行为。

要避免此问题,请确保 FilterCriteriabody 的格式与您从队列中收到的消息中的 body 的期望格式一致。在筛选消息之前,EventBridge 会自动评估传入消息 body 的格式以及 body 的筛选模式的格式。如果不一致,EventBridge 将会丢弃此消息。下表汇总了此评估:

传入消息 body 格式 筛选条件模式 body 格式 导致的操作

纯字符串

纯字符串

EventBridge 根据您的筛选标准进行筛选。

纯字符串

数据属性中没有筛选条件模式

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

纯字符串

有效 JSON

EventBridge 会丢弃此消息。

有效 JSON

纯字符串

EventBridge 会丢弃此消息。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

有效 JSON

有效 JSON

EventBridge 根据您的筛选标准进行筛选。

如果您的 FilterCriteria 不包括 body,EventBridge 将会跳过此检查。

正确筛选 Kinesis 和 DynamoDB 消息

在筛选标准处理了 Kinesis 或 DynamoDB 记录后,流迭代器会跳过此记录。如果记录不符合筛选标准,您无需从事件源手动删除记录。保留期结束后,Kinesis 和 DynamoDB 会自动删除这些旧记录。如果您希望提前删除记录,请参阅更改数据保留期

要正确筛选流事件源中的事件,数据字段和数据字段的筛选条件都必须为有效的 JSON 格式。(对于 Kinesis,数据字段为 data。对于 DynamoDB,数据字段为 dynamodb。) 如果任一字段不是有效的 JSON 格式,EventBridge 将会丢弃消息或引发异常。下表汇总了具体行为:

传入数据格式(datadynamodb 数据属性中的筛选条件模式格式 导致的操作

有效 JSON

有效 JSON

EventBridge 根据您的筛选标准进行筛选。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

有效 JSON

非 JSON

EventBridge 在管道更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。

非 JSON

有效 JSON

EventBridge 会丢弃此记录。

非 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

非 JSON

非 JSON

EventBridge 在创建和更新管道时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。

正确筛选 Amazon Managed Streaming for Apache Kafka、自托管 Apache Kafka 和 Amazon MQ 消息。

对于 Amazon MQ 来源,消息字段为 data。对于 Apache Kafka 源(Amazon MSK自托管 Apache Kafka),有两个消息字段:keyvalue

EventBridge 会丢弃无法匹配筛选条件中包含的所有字段的消息。对于 Apache Kafka,EventBridge 在成功调用函数后,会为匹配和不匹配的消息提交偏移。对于 Amazon MQ,EventBridge 在成功调用函数后确认匹配的消息,并在筛选不匹配的消息时确认此类消息。

Apache Kafka 和 Amazon MQ 消息必须是 UTF-8 编码的字符串,可以是纯字符串或 JSON 格式。这是因为 EventBridge 在应用筛选标准之前将 Apache Kafka 和 Amazon MQ 字节数组解码为 UTF-8。如果您的消息使用另一种编码,例如 UTF-16 或 ASCII,或者消息格式与 FilterCriteria 格式不匹配,则 EventBridge 仅处理元数据筛选条件。下表汇总了具体行为:

传入消息格式(datakeyvalue 消息属性的筛选条件模式格式 导致的操作

纯字符串

纯字符串

EventBridge 根据您的筛选标准进行筛选。

纯字符串

数据属性中没有筛选条件模式

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

纯字符串

有效 JSON

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

有效 JSON

纯字符串

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

有效 JSON

有效 JSON

EventBridge 根据您的筛选标准进行筛选。

非 UTF-8 编码字符串

JSON、纯字符串或无模式

EventBridge 根据您的筛选标准进行筛选(仅限其他元数据属性)。

Lambda ESM 和 EventBridge Pipes 之间的区别

筛选事件时,Lambda ESM 和 EventBridge Pipes 的工作方式通常相同。主要区别在于 ESM 有效负载中不存在 eventSourceKey 字段。

在管道筛选器中使用比较运算符

比较运算符使您能够构造与事件中的字段值匹配的事件模式。

有关支持在管道筛选器中使用的比较运算符的完整列表,请参阅比较运算符