Lambda での SQS イベントソースのエラーの処理 - AWS Lambda

Lambda での SQS イベントソースのエラーの処理

SQS イベントソースに関連するエラーを処理する際に、Lambda はバックオフ戦略を備えた再試行戦略を自動的に使用します。また、部分的なバッチレスポンスを返すように SQS イベントソースマッピングを設定することで、エラー処理の動作をカスタマイズすることもできます。

失敗した呼び出しに対するバックオフ戦略

呼び出しが失敗すると、Lambda はバックオフ戦略の実装中に呼び出しの再試行を試みます。バックオフ戦略は、Lambda で発生した障害が関数コード内のエラーによるものか、スロットリングによるものかに応じて若干異なります。

  • 関数コードが原因でエラーが発生した場合、Lambda は呼び出しの処理と再試行を停止します。その間、Lambda は Amazon SQS イベントソースマッピングに割り当てられた同時実行数を減らすことで、再試行を徐々にバックオフします。キューの可視性タイムアウトがタイムアウトすると、メッセージが再びキューに表示されます。

  • スロットリングが原因で呼び出しが失敗する場合、Lambda は Amazon SQS イベントソースマッピングに割り当てられた同時実行数を減らすことで、再試行を徐々にバックオフします。Lambda は、メッセージのタイムスタンプがキューの可視性タイムアウトを超過するまでメッセージを再試行し続けますが、タイムアウトした時点でメッセージをドロップします。

部分的なバッチレスポンスの実装

Lambda 関数がバッチを処理しているときにエラーが発生すると、デフォルトでそのバッチ内のすべてのメッセージが再度キューに表示され、これには Lambda が正常に処理したメッセージも含まれます。その結果、関数が同じメッセージを複数回処理することになる場合があります。

失敗したバッチ内の正常に処理されたメッセージを再処理しないようにするために、失敗したメッセージのみを再び表示するようにイベントソースマッピングを設定できます。これを部分的なバッチレスポンスと呼びます。部分的なバッチレスポンスをオンにするには、イベントソースマッピングを設定するときに FunctionResponseTypes アクション用に ReportBatchItemFailures を指定します。そうすると、関数が部分的な成功を返すようになるため、レコードでの不必要な再試行回数を減らすことができます。

ReportBatchItemFailures がアクティブ化されている場合、Lambda は、関数の呼び出しが失敗したときにメッセージポーリングをスケールダウンしません。一部のメッセージが失敗することが想定され、それらの失敗によってメッセージの処理レートに影響が及ばないようにする場合は、ReportBatchItemFailures を使用します。

注記

部分的なバッチレスポンスを使用する場合は、次の点に注意してください。

  • 関数が例外をスローする場合、バッチ全体が完全な失敗とみなされます。

  • この機能を FIFO キューで使用している場合、関数は最初の失敗後にメッセージの処理を停止し、batchItemFailures で失敗したメッセージと未処理のメッセージのすべてを返します。これは、キュー内のメッセージの順序を維持するのに役立ちます。

部分的なバッチレポートをアクティブ化するには
  1. 部分的なバッチレスポンスを実装するためのベストプラクティスを確認します。

  2. 次のコマンドを実行して、関数用に ReportBatchItemFailures をアクティブ化します。イベントソースマッピングの UUID を取得するには、list-event-source-mappings AWS CLI コマンドを実行します。

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 関数コードを更新して、すべての例外をキャッチし、失敗したメッセージを batchItemFailures JSON レスポンスで返します。batchItemFailures レスポンスには、メッセージ ID のリストが itemIdentifier JSON 値として含まれている必要があります。

    例えば、メッセージ ID が id1id2id3id4、および id5 である 5 つのメッセージのバッチがあるとします。関数は、id1id3id5 を正常に処理します。メッセージ id2 および id4 がキューで再び表示されるようにするには、関数が次のレスポンスを返す必要があります。

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    バッチで失敗したメッセージ ID のリストを返す関数コードの例を次に示します。

    .NET
    AWS SDK for .NET
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    .NET を使用した Lambda での SQS バッチアイテム失敗のレポート。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK for Go V2
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Go を使用した Lambda での SQS バッチアイテム失敗のレポート。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK for Java 2.x
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Java を使用した Lambda での SQS バッチアイテム失敗のレポート。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    SDK for JavaScript (v3)
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    JavaScript を使用した Lambda での SQS バッチアイテム失敗のレポート。

    // Node.js 20.x Lambda runtime, AWS SDK for Javascript V3 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    TypeScript を使用して Lambda で SQS バッチ項目の失敗を報告します。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }
    PHP
    SDK for PHP
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    PHP を使用した Lambda での SQS バッチアイテム失敗のレポート。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Sqs\SqsHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends SqsHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleSqs(SqsEvent $event, Context $context): void { $this->logger->info("Processing SQS records"); $records = $event->getRecords(); foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $this->markAsFailed($record); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); } } $logger = new StderrLogger(); return new Handler($logger);
    Python
    SDK for Python (Boto3)
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Python を使用した Lambda での SQS バッチアイテム失敗のレポート。

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    SDK for Ruby
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Ruby を使用した Lambda での SQS バッチアイテム失敗のレポート。

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    SDK for Rust
    注記

    GitHub には、その他のリソースもあります。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

    Rust を使用した Lambda での SQS バッチアイテム失敗のレポート。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

失敗したイベントがキューに戻らない場合は、AWS ナレッジセンターの「Lambda 関数 SQS ReportBatchItemFailures をトラブルシューティングするにはどうすればよいですか?」を参照してください。

成功条件と失敗の条件

関数が以下のいずれかを返す場合、Lambda はバッチを完全な成功として扱います。

  • 空の batchItemFailures リスト

  • null の batchItemFailures リスト

  • 空の EventResponse

  • null の EventResponse

関数が以下のいずれかを返す場合、Lambda はバッチを完全な失敗として扱います。

  • 無効な JSON レスポンス

  • 空の文字列 itemIdentifier

  • ヌル itemIdentifier

  • 不正なキー名を持つ itemIdentifier

  • 存在しないメッセージ ID を持つ itemIdentifier

CloudWatch メトリクス

関数がバッチ項目の失敗を正しく報告しているかどうかを判断するために、Amazon SQS メトリクスの NumberOfMessagesDeleted および ApproximateAgeOfOldestMessage を Amazon CloudWatch でモニタリングできます。

  • NumberOfMessagesDeleted は、キューから削除されたメッセージの数を追跡します。これが 0 になるということは、関数レスポンスが失敗したメッセージを正しく返していないことを示唆しています。

  • ApproximateAgeOfOldestMessage は、最も古いメッセージがキューに残っている期間を追跡します。このメトリクスの急激な増加は、関数が失敗したメッセージを正しく返していないことを示唆している可能性があります。