AWS Lambda 関数の呼び出し - Amazon Simple Storage Service

AWS Lambda 関数の呼び出し

Amazon S3 バッチオペレーションを使用すると、Amazon S3 のオブジェクトに対して大規模なバッチオペレーションを実行できます。AWS Lambda の呼び出し関数バッチオペレーションは、AWS Lambda 関数を開始し、マニフェストにリストされているオブジェクトに対してカスタムアクションを実行します。このセクションでは、S3 バッチ操作で使用する Lambda 関数とその関数を呼び出すジョブを作成する方法について説明します。S3 バッチ操作ジョブでは、LambdaInvoke オペレーションを使用して、マニフェストにリストされているすべてのオブジェクトに対して Lambda 関数を実行します。

Amazon S3 コンソール、AWS Command Line Interface (AWS CLI)、AWS SDK、または Amazon S3 REST API を使用して S3 バッチオペレーションを扱うことができます。Lambda の使用について詳しくは、AWS Lambda 開発者ガイドAWS Lambda の使用を開始するを参照してください。

以下のセクションでは、S3 バッチ操作で Lambda を使用する方法について説明します。

バッチオペレーションで Lambda を使用する

S3 バッチ操作で AWS Lambda を使用するときは、S3 バッチ操作で使用するための Lambda 関数を新しく作成する必要があります。既存の Amazon S3 のイベントベースの関数を S3 バッチ操作で再利用することはできません。イベント関数はメッセージの受信のみ可能です。メッセージを返すことはできません。S3 バッチ操作で使用する Lambda 関数では、メッセージを受け取って返す必要があります。Amazon S3 のイベントで Lambda を使用する方法について詳しくは、AWS Lambda 開発者ガイドAWS Lambda を Amazon S3 に使用するを参照してください。

Lambda 関数を呼び出す S3 バッチ操作のジョブを作成します。このジョブは、マニフェストにリストされているすべてのオブジェクトに対して同じ Lambda 関数を実行します。マニフェストのオブジェクトの処理時に使用する Lambda 関数のバージョンは指定できます。S3 バッチ操作では、非修飾 Amazon リソースネーム (ARN)、エイリアス、特定のバージョンがサポートされています。詳しくは、AWS Lambda 開発者ガイドAWS Lambda 関数のバージョンを参照してください。

S3 バッチ操作のジョブにエイリアスまたは $LATEST 修飾子を使用する関数の ARN を指定し、それらのいずれかが指すバージョンを更新すると、S3 バッチ操作は新しいバージョンの Lambda 関数を呼び出します。これは、大規模なジョブで機能の一部を更新する場合に役立ちます。S3 バッチオペレーションで使用するバージョンを変更したくない場合は、ジョブの作成時に FunctionARN パラメータで特定のバージョンを指定します。

ディレクトリバケットで Lambda とバッチオペレーションを使用する

ディレクトリバケットは Amazon S3 バケットタイプの1 つであり、一貫して 1 桁ミリ秒のレイテンシーに維持する必要があるワークロードまたはパフォーマンス重視のアプリケーション向けに設計されています。詳細については、「ディレクトリバケット」を参照してください。

バッチオペレーションを使用してディレクトリバケットで動作する Lambda 関数を呼び出すには、独自の要件があります。例えば、更新された JSON スキーマを使用して Lambda リクエストを構築し、ジョブを作成する際に InvocationSchemaVersion 2.0 (1.0 ではない) を指定する必要があります。この更新されたスキーマでは、UserArguments にオプションのキーバリューペアを指定できます。これを使用して、既存の Lambda 関数の特定のパラメータを変更できます。詳細については、「AWS ストレージブログ」の「Automate object processing in Amazon S3 directory buckets with S3 Batch Operations and AWS Lambda」を参照してください。

レスポンスと結果のコード

S3 バッチオペレーションは、それぞれに TaskID が関連付けられている 1 つ以上のキーを使用して Lambda 関数を呼び出します。S3 バッチオペレーションには、Lambda 関数からのキーごとの結果コードが必要です。キーごとの結果コードで返されないタスク ID は、リクエストコードで返され、treatMissingKeysAs フィールドの結果コードが付与されます。treatMissingKeysAs はオプションのリクエストフィールドで、デフォルトは TemporaryFailure です。次の表に、 treatMissingKeysAs フィールドの他の有効な結果コードと値を示します。

Response Code (レスポンスコード) 説明
Succeeded タスクは正常に完了しました。ジョブ完了レポートをリクエストした場合は、タスクの結果の文字列がレポートに含まれます。
TemporaryFailure タスクは一時的に失敗し、ジョブが完了する前に再始動されます。結果の文字列は無視されます。最終的な再処理である場合は、エラーメッセージが最終レポートに含まれます。
PermanentFailure タスクに固定障害が発生しました。ジョブ完了レポートをリクエストした場合、タスクは Failed としてマークされ、エラーメッセージの文字列が含まれます。失敗したタスクの結果の文字列は無視されます。

S3 バッチ操作で使用する Lambda 関数の作成

このセクションでは、Lambda 関数で使用する必要のある AWS Identity and Access Management (IAM) アクセス許可の例について説明します。また、S3 バッチ操作で使用する Lambda 関数の例も示します。Lambda 関数を作成したことがない場合は、AWS Lambda 開発者ガイドチュートリアル: Amazon S3 トリガーを使用して AWS Lambda 関数を呼び出すを参照してください。

S3 バッチ操作で使用するための Lambda 関数を作成する必要があります。既存の Amazon S3 イベントベースの Lambda 関数を再利用することはできません。これは、S3 バッチオペレーションで使用する Lambda 関数では、特別なデータフィールドを受け取って返す必要があるためです。

重要

Java で記述された AWS Lambda 関数は、RequestHandler または RequestStreamHandler ハンドラーインターフェイスのいずれかを受け入れます。ただし、S3 バッチ操作のリクエストとレスポンスの形式をサポートするため、リクエストとレスポンスのカスタムのシリアル化と逆シリアル化のために AWS Lambda は RequestStreamHandler インターフェイスを必要とします。このインターフェイスにより、Lambda は Java の handleRequest メソッドに InputStream と OutputStream ストリームを渡すことができます。

S3 バッチ操作で Lambda 関数を使用する場合は、必ず RequestStreamHandler インターフェイスを使用してください。RequestHandler インターフェイスを使用すると、バッチジョブが失敗し、完了レポートに「Invalid JSON returned in Lambda payload」と表示されます。

詳しくは、AWS Lambda ユーザーガイドハンドラーのインターフェイスを参照してください。

IAM アクセス許可の例

S3 バッチ操作で Lambda 関数を使用するために必要な IAM アクセス許可の例を以下に示します。

例 - S3 バッチ操作の信頼ポリシー

以下は、バッチ操作の IAM ロールに使用できる信頼ポリシーの例です。この IAM ロールは、ジョブを作成して、バッチ操作に IAM ロールを引き受けるアクセス許可を付与する場合に指定します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
例 - Lambda の IAM ポリシー

以下は、S3 バッチ操作に Lambda 関数を呼び出して入力のマニフェストを読み取るアクセス許可を与える IAM ポリシーの例です。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }

リクエストとレスポンスの例

このセクションでは、Lambda 関数のリクエストとレスポンスの例を示します。

例 リクエスト

以下は、Lambda 関数のリクエストの JSON の例です。

{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:amzn-s3-demo-bucket1" } ] }
例 レスポンス

以下は、Lambda 関数のレスポンスの JSON の例です。

{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }

S3 バッチ操作の Lambda 関数の例

次の Python Lambda の例では、バージョン管理されたオブジェクトから削除マーカーを削除します。

例に示すように、S3 バッチ操作のキーは URL エンコードされます。Amazon S3 を他の AWS のサービスとともに使用するには、S3 バッチ操作から渡されたキーを URL デコードすることが重要です。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }

Lambda 関数を呼び出す S3 バッチ操作のジョブの作成

Lambda 関数を呼び出す S3 バッチ操作のジョブを作成する場合は、以下を指定する必要があります。

  • Lambda 関数の ARN (関数のエイリアスまたは特定のバージョン番号を含む)

  • 関数を呼び出すアクセス許可を持つ IAM ロール

  • アクションパラメータ LambdaInvokeFunction

S3 バッチ操作のジョブの作成の詳細については、「S3 バッチオペレーションジョブの作成」および「S3 バッチ操作でサポートされるオペレーション」を参照してください。

次の例では、AWS CLI を使用して Lambda 関数を呼び出す S3 バッチオペレーションのジョブを作成します。この例を実行するには、user input placeholders をユーザー自身の情報に置き換えます。

aws s3control create-job --account-id account-id --operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:region:account-id:function:LambdaFunctionName" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::amzn-s3-demo-manifest-bucket","ETag":"ManifestETag"}}' --report '{"Bucket":"arn:aws:s3:::amzn-s3-demo-bucket","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}' --priority 2 --role-arn arn:aws:iam::account-id:role/BatchOperationsRole --region region --description "Lambda Function"

Lambda のマニフェストでのタスクレベルの情報の指定

S3 バッチオペレーションで AWS Lambda 関数を使用する際、操作対象の各タスク/キーに付随する追加データが必要になる場合があります。例えば、ソースオブジェクトキーと新しいオブジェクトキーの両方を提供したい場合があります。Lambda 関数は、元のキーを新しい名前で新しい S3 バケットにコピーできます。デフォルトでは、バッチオペレーションでは、ジョブへの入力マニフェストで移行先バケットとソースキーのリストのみを指定できます。以下の例では、マニフェストにデータを追加して、より複雑な Lambda 関数を実行する方法について説明します。

S3 バッチ操作のマニフェストでキーごとのパラメータを指定して Lambda 関数のコードで使用するには、以下に示すように URL エンコードされた形式の JSON を使用します。key フィールドは、Amazon S3 のオブジェクトのキーのように Lambda 関数に渡されます。ただし、以下の例に示すように、Lambda 関数で他の値や複数のキーを含めるように解釈することができます。

注記

マニフェストの key フィールドの最大文字数は 1,024 文字です。

例 - 「Amazon S3 のキー」を JSON の文字列に置き換える前のマニフェスト

URL エンコードされたバージョンを、S3 バッチ操作に渡す必要があります。

amzn-s3-demo-bucket,{"origKey": "object1key", "newKey": "newObject1Key"} amzn-s3-demo-bucket,{"origKey": "object2key", "newKey": "newObject2Key"} amzn-s3-demo-bucket,{"origKey": "object3key", "newKey": "newObject3Key"}
例 - URL エンコードされたマニフェスト

この URL エンコードされたバージョンを、S3 バッチ操作に渡す必要があります。URL エンコードされていないバージョンは機能しません。

amzn-s3-demo-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D amzn-s3-demo-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D amzn-s3-demo-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D
例 - Lambda 関数とジョブのレポートに結果が書き込まれるマニフェストの形式

この URL エンコードされたマニフェストの例には、解析する次の Lambda 関数のパイプ区切りオブジェクトキーが含まれています。

amzn-s3-demo-bucket,object1key%7Clower amzn-s3-demo-bucket,object2key%7Cupper amzn-s3-demo-bucket,object3key%7Creverse amzn-s3-demo-bucket,object4key%7Cdelete

この Lambda 関数は、パイプ区切りのタスクをエンコードされた S3 バッチオペレーションのマニフェストに解析する方法を示しています。タスクは、指定されたオブジェクトに適用されているリビジョンオペレーションを示します。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.resource("s3") def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = parse.unquote(task["s3Key"], encoding="utf-8").split("|") bucket_name = task["s3BucketArn"].split(":")[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()["Body"].read().decode("utf-8") if revision == "lower": stanza = stanza.lower() elif revision == "upper": stanza = stanza.upper() elif revision == "reverse": stanza = stanza[::-1] elif revision == "delete": pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == "delete": stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, "utf-8")) result_string = ( f"Applied revision type '{revision}' to " f"stanza {stanza_obj.key}." ) logger.info(result_string) result_code = "Succeeded" except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": result_code = "Succeeded" result_string = ( f"Stanza {obj_key} not found, assuming it was deleted " f"in an earlier revision." ) logger.info(result_string) else: result_code = "PermanentFailure" result_string = ( f"Got exception when applying revision type '{revision}' " f"to {obj_key}: {error}." ) logger.exception(result_string) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }

S3 バッチ操作のチュートリアル

次のチュートリアルでは、Lambda を使用したいくつかのバッチ操作タスクにおけるエンドツーエンドの一連の手順について説明します。このチュートリアルでは、S3 ソースバケットに保存された動画のバッチトランスコーディング用に Lambda 関数を呼び出すようにバッチオペレーションを設定する方法を学習します。Lambda 関数は AWS Elemental MediaConvert を呼び出して、動画をトランスコードします。