Amazon 简单队列服务作为 Pip EventBridge es 中的来源 - Amazon EventBridge

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon 简单队列服务作为 Pip EventBridge es 中的来源

您可以使用 Pi EventBridge pes 接收来自亚马逊SQS队列的记录。然后,您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标进行处理。

您可以使用管道来处理亚马逊简单队列服务 (AmazonSQS) 队列中的消息。 EventBridge 管道支持标准队列先进先出 (FIFO) 队列。借助 AmazonSQS,您可以将任务发送到队列并异步处理,从而将任务从应用程序的一个组件中卸载。

EventBridge 轮询队列并使用包含队列消息的事件同步调用您的管道。 EventBridge 批量读取消息,每批次调用管道一次。当您的管道成功处理批处理时, EventBridge 会将其消息从队列中删除。

默认情况下,可以同时 EventBridge 轮询队列中最多 10 条消息,然后将该批次发送到您的管道。为避免在记录数量较少的情况下调用管道,您可以配置批处理时间窗,让事件源缓冲最多 5 分钟的记录。在调用管道之前, EventBridge 继续轮询来自 Amazon SQS 标准队列的消息,直到出现以下情况之一:

  • 批处理时间窗过期。

  • 已达到调用负载大小配额。

  • 已达到配置的批次大小上限。

注意

如果您使用的是批处理窗口,并且您的 Amazon SQS 队列流量较低,则 EventBridge 可能需要等待 20 秒钟才能调用管道。即使您将批处理时间窗设置为少于 20 秒,情况依然如此。对于FIFO队列,记录包含与重复数据删除和排序相关的其他属性。

EventBridge 读取批次时,消息会保留在队列中,但在队列的可见性超时时间内会被隐藏。如果您的管道成功处理了批处理,则会从队列中 EventBridge 删除消息。默认情况下,如果您的管道在处理某个批次时遇到错误,则该批次中的所有消息都会在队列中重新可见。因此,管道代码必须能够多次处理同一条消息,而不会产生意外的副作用。您可以在管道响应中包括批处理项目失败次数,来修改此再处理行为。以下示例显示了包含两条消息的批次事件。

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 Amazon P EventBridge ipes 中的事件筛选

标准队列

[ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ]

FIFO队列

[ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ]

扩展和处理

对于标准队列, EventBridge 使用长轮询来轮询队列,直到队列变为活动状态。当有消息可用时,最多 EventBridge 读取五个批次并将其发送到您的管道。如果消息仍然可用,则将正在读取批处理的进程的数量每分钟最多 EventBridge 增加 300 个实例。管道可以同时处理的最大批次数量为 1,000。

对于FIFO队列,按接收消息的顺序向您的管道 EventBridge 发送消息。向FIFO队列发送消息时,需要指定消息组 ID。Amazon SQS 便于按顺序将同一个群组中的消息发送给。 EventBridge EventBridge 将收到的消息分组,一次只能为一个群组发送一批。如果您的管道返回错误,则在 EventBridge 收到来自同一组的其他消息之前,管道会尝试对受影响的消息进行所有重试。

配置队列以与 Pip EventBridge es 配合使用

创建一个 Amazon SQS 队列作为管道的来源。然后配置队列,让管道有时间处理每批事件,并留出 EventBridge 时间在扩展时重试以响应限制错误。

为使您的管道有时间处理每批记录,请将源队列的可见性超时至少设置为管道富集和目标组件合并运行时的六倍。如果您的管道在处理前一批次时受到限制,则额外的时间允许重试。 EventBridge

如果您的管道多次未能处理一条消息,Amazon SQS 可以将其发送到死信队列。当您的管道返回错误时,请将其 EventBridge 保留在队列中。可见性超时发生后,再次 EventBridge收到消息。要在多次接收之后将消息发送到第二个队列,请在源队列上配置死信队列。

注意

确保在源队列上配置死信队列,而不是在管道上配置。您在管道上配置的死信队列用于管道的异步调用队列,而不是用于源队列。

如果您的管道返回错误,或者由于达到并发上限而无法调用,则处理可能会成功,但需要额外尝试。要在将消息发送到死信队列之前给予更多处理机会,请将源队列重新驱动策略的 maxReceiveCount 至少设置为 5

报告批处理项目失败

在 EventBridge 使用和处理来自源的流数据时,默认情况下,它会检查批次的最高序号,但前提是批处理完全成功时。为避免重新处理失败批次中已成功处理的消息,您可以配置富集或目标,返回对象来指示哪些消息处理成功、哪些失败。这称为部分批处理响应。

有关更多信息,请参阅 部分批处理故障

成功和失败的条件

如果您返回以下任意一项,则将批次 EventBridge 视为完全成功:

  • 空的 batchItemFailure 列表

  • Null batchItemFailure 列表

  • 空的 EventResponse

  • Null EventResponse

如果您返回以下任何内容,则会将批次 EventBridge 视为完全失败:

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

EventBridge 根据您的重试策略重试失败。