Amazon 簡單隊列服務作為 EventBridge 管道中的源 - Amazon EventBridge

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon 簡單隊列服務作為 EventBridge 管道中的源

您可以使用 EventBridge 管道從 Amazon SQS 隊列接收記錄。然後,您可以選擇性地篩選或增強這些記錄,然後再將其傳送至可用的目的地進行處理。

您可以使用管道來處理 Amazon 簡單佇列服務 (AmazonSQS) 佇列中的訊息。 EventBridge 管道支援標準佇列先進先出 (FIFO) 佇列。使用 AmazonSQS,您可以將任務傳送到佇列並非同步處理,從應用程式的一個元件卸載任務。

EventBridge 輪詢佇列,並與包含佇列訊息的事件同步呼叫管道。 EventBridge 批量讀取消息,並為每個批次調用管道一次。當管道成功處理批次時, EventBridge 會從佇列中刪除其訊息。

依預設,會同時 EventBridge 輪詢佇列中最多 10 個訊息,並將該批次傳送至管道。為避免調用具有少量記錄的管道,您可設定批次間隔,要求事件來源緩衝記錄最長達五分鐘。在叫用管道之前,請 EventBridge 繼續輪詢來自 Amazon SQS 標準佇列的訊息,直到發生下列其中一種情況為止:

  • 批次化視窗即會到期。

  • 已達到調用有效負載大小配額。

  • 已達到批次大小可設定的上限。

注意

如果您使用的是批次處理視窗,且 Amazon SQS 佇列的流量很低,最多 EventBridge 可能需要等待 20 秒鐘才能叫用管道。即使您將批次時間範圍設定為低於 20 秒也是如此。對於FIFO佇列,記錄包含與重複資料刪除和排序相關的其他屬性。

EventBridge 讀取批次時,訊息會保留在佇列中,但會在佇列的可見性逾時長度下隱藏。如果您的管道成功處理批次,請從佇列中 EventBridge 刪除訊息。根據預設,如果管道在處理批次時遇到錯誤,則該批次中的所有訊息會再次顯示在佇列中。因此,您的管道程式碼必須能夠多次處理相同的訊息,而不會產生副作用。您可以在管道回應中包含批次項目失敗的資訊,以便修改此重新處理行為。下列範例顯示兩則訊息之批次的事件。

範例事件

下列範例事件顯示管道接收的資訊。您可以使用此事件來建立和篩選事件模式,或定義輸入轉換。並非所有欄位都可以篩選。如需有關所能篩選欄位的詳細資訊,請參閱 Amazon EventBridge 管道中的事件過濾

標準佇列

[ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ]

FIFO隊列

[ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ]

擴展和處理

對於標準佇列, EventBridge 使用長輪詢來輪詢佇列,直到佇列變為作用中為止。當訊息可用時,最多可 EventBridge 讀取五個批次,並將它們傳送至管道。如果仍然可以使用訊息,則每分鐘最多可 EventBridge 增加 300 個讀取批次的處理序數目。一個管道可同時處理的批次數量上限為 1,000。

對於FIFO佇列,請按照接收訊息的順序 EventBridge 將訊息傳送至管道。當您將訊息傳送至FIFO佇列時,您可以指定訊息群組識別碼。Amazon SQS 有助於將同一組中的消息交付給 EventBridge,按順序。 EventBridge 將接收的郵件排序為群組,一次只傳送一個群組的批次。如果您的管道傳回錯誤,管道會先嘗試所有受影響訊息的重試,然後再 EventBridge 接收來自同一群組的其他訊息。

配置要與 EventBridge 管道搭配使用的佇列

創建一個 Amazon SQS 隊列作為管道的來源。然後設定佇列,讓管道有時間處理每批事件,並在擴展時重試節流錯誤。 EventBridge

為讓您的管道有時間處理每個記錄批次,請將來源佇列的可見性逾時設定為管道擴充程序和目標元件合併執行期的至少六倍。如果您的管道在處理上一個批次時被節流,則額外的時間允許重試。 EventBridge

如果您的管道無法多次處理訊息,Amazon SQS 可以將其傳送到無效字母佇列。當您的管道返回錯誤時,請將其 EventBridge 保留在隊列中。發生可見性逾時後,再次 EventBridge接收訊息。若要在多次接收後將訊息傳送至第二個佇列,請在來源佇列上設定無效字母佇列。

注意

請確定在來源佇列上設定無效字母佇列,而不是在管道上。您在管道上設定的無效字母佇列用於佇列的非同步調用佇列,而不是事件來源佇列。

如果您的管道因為以達到並行上限而傳回錯誤或無法調用,可能可以透過進行額外的嘗試來使處理成功。若要讓訊息在傳送到無效字母佇列前更有機會受到處理,請將來源佇列再驅動政策上的 maxReceiveCount 設置值至少為 5

報告批次項目失敗

當使 EventBridge 用和處理來源的串流資料時,依預設,它會檢查點為批次的最高序號,但只有在批次完全成功時才會檢查點。若要避免重新處理失敗批次中成功處理過的訊息,您可以設定擴充或目標,傳回哪些訊息成功,哪些失敗的物件。我們將其稱為部分批次回應。

如需詳細資訊,請參閱 部分批次失敗

成功與失敗條件

如果您傳回下列任一項目,請 EventBridge 將批次視為完全成功:

  • 空白 batchItemFailure 清單

  • Null batchItemFailure 清單

  • 空白 EventResponse

  • Null EventResponse

如果您傳回下列任一項目,則 EventBridge 會將批次視為完全失敗:

  • 空白字串 itemIdentifier

  • Null itemIdentifier

  • 具有錯誤金鑰名稱的 itemIdentifier

EventBridge 根據您的重試策略重試失敗。