本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
叫用 AWS Lambda 函數
您可以使用 Amazon S3 Batch Operations 對 Amazon S3 物件執行大規模批次操作。叫用 AWS Lambda 函數批次操作操作會 AWS Lambda 啟動函數,以對資訊清單中列出的物件執行自訂動作。本節說明如何建立 Lambda 函數以搭配 S3 批次作業使用,以及如何建立任務來叫用函數。S3 批次操作任務會使用 LambdaInvoke
操作,對資訊清單中列出的每個物件執行 Lambda 函數。
您可以使用 Amazon S3 主控台、 AWS Command Line Interface (AWS CLI)、 AWS SDKs 或 Amazon S3 Word 來使用 S3 批次操作API。 REST如需有關使用 Lambda 的詳細資訊,請參閱《AWS Lambda 開發人員指南》中的 AWS Lambda入門。
下列各節說明如何開始搭配 Lambda 使用 S3 批次作業。
主題
將 Lambda 與批次操作搭配使用
搭配 S3 Batch Operations 使用時 AWS Lambda,您必須建立專門用於 S3 Batch Operations 的新 Lambda 函數。您無法搭配 S3 批次作業重複使用現有基於 Amazon S3 事件的函數。事件函數只能接收訊息;不能傳回訊息。搭配 S3 批次作業使用的 Lambda 函數必須接受並傳回訊息。如需將 Lambda 與 Amazon S3 事件搭配使用的詳細資訊,請參閱 AWS Lambda 開發人員指南中的搭配使用 AWS Lambda 與 Amazon S3。
您建立 S3 批次作業任務來叫用 Lambda 函數。此任務對資訊清單中列出的所有物件執行相同的 Lambda 函數。您可以控制在處理清單中的物件時,使用 Lambda 函數的哪些版本。S3 Batch Operations 支援不合格的 Amazon Resource Names (ARNs)、別名和特定版本。如需詳細資訊,請參閱 AWS Lambda 開發人員指南中的AWS Lambda 版本控制簡介。
如果您為 S3 Batch Operations 任務提供使用別名或$LATEST
限定詞的函數 ARN,並且更新其中任一點的版本,S3 Batch Operations 會開始呼叫 Lambda 函數的新版本。當您想要隨著大型工作更新部分功能,此功能會很有用。如果您不希望 S3 批次操作變更使用的版本,請在建立任務時在 FunctionARN
參數中提供特定版本。
將 Lambda 和批次操作與目錄儲存貯體搭配使用
目錄儲存貯體是一種 Amazon S3 儲存貯體,專為需要一致單位數毫秒延遲的工作負載或效能關鍵型應用程式而設計。如需詳細資訊,請參閱目錄儲存貯體。
使用 Batch Operations 調用對目錄儲存貯體執行動作的 Lambda 函數時,須遵循特殊需求。例如,您必須使用更新的 JSON 結構描述來建構 Lambda 請求,並指定 InvocationSchemaVersion 2.0 (非 1.0)。此更新的結構描述可讓您指定 的選用鍵值對 UserArguments,您可以使用 來修改現有 Lambda 函數的特定參數。如需詳細資訊,請參閱使用 S3 批次操作在 Amazon S3 目錄儲存貯體中自動處理物件,以及 S3 AWS Lambda
回應代碼和結果代碼
S3 Batch Operations 會使用一或多個金鑰叫用 Lambda 函數,每個金鑰都有與其TaskID
相關聯的 。S3 Batch Operations 預期 Lambda 函數會有一個每個金鑰的結果代碼。在請求中傳送且未傳回每個金鑰結果代碼的任何任務 IDs,都會得到 欄位的結果代碼treatMissingKeysAs
。 treatMissingKeysAs
是選用的請求欄位,預設為 TemporaryFailure
。下表包含 treatMissingKeysAs
欄位的其他可能結果代碼和值。
回應代碼 | 描述 |
---|---|
Succeeded |
任務正常完成。如果您請求工作完成報告,即會在報告中包含任務的結果字串。 |
TemporaryFailure |
任務遇到暫時性的失敗,並且將在工作完成之前重新推動。會忽略結果字串。如果這是最後一次的重新推動,即會在最終報告中包含錯誤訊息。 |
PermanentFailure |
任務遇到永久的失敗。如果您請求工作完成報告,即會將任務標示為 Failed ,並包含錯誤訊息字串。會忽略來自失敗任務的結果字串。 |
建立 Lambda 函數以搭配 S3 批次作業使用
本節提供您必須搭配 Lambda 函數使用的範例 AWS Identity and Access Management (IAM) 許可。還包含一個搭配 S3 批次作業使用的 Lambda 函數範例。如果您先前從未建立 Lambda 函數,請參閱 AWS Lambda 開發人員指南中的教學課程: AWS Lambda 搭配 Amazon S3 使用 。
您必須建立專門搭配 S3 批次作業使用的 Lambda 函數。您無法重複使用現有的 Amazon S3 事件型 Lambda 函數,因為用於 S3 批次操作的 Lambda 函數必須接受並傳回特殊資料欄位。
重要
AWS Lambda 以 Java 編寫的函數接受 RequestHandlerRequestStreamHandler
界面,以進行請求和回應的自訂序列化和還原序列化。此介面允許 Lambda 將 InputStream 和 OutputStream 傳遞至 Java handleRequest
方法。
搭配 S3 批次作業使用 Lambda 函數時,請務必使用 RequestStreamHandler
界面。如果您使用RequestHandler
介面,批次任務會在完成報告中失敗,其中「Lambda 承載中傳回無效 JSON」。
如需詳細資訊,請參閱《AWS Lambda 使用者指南》中的處理常式界面。
範例 IAM 許可
以下是將 Lambda 函數與 S3 批次操作搭配使用所需的 IAM 許可範例。
範例 — S3 批次作業信任政策
以下是您可以用於批次操作 IAM 角色的信任政策範例。當您建立任務時,會指定此 IAM 角色,並授予批次操作擔任 IAM 角色的許可。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
範例 — Lambda IAM 政策
以下是 IAM 政策的範例,該政策授予 S3 Batch Operations 調用 Lambda 函數和讀取輸入資訊清單的許可。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }
範例請求和回應
本節提供 Lambda 函數的請求和回應範例。
範例 請求
以下是 Lambda 函數請求的 JSON 範例。
{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:amzn-s3-demo-bucket1" } ] }
範例 回應
以下是 Lambda 函數回應的 JSON 範例。
{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }
S3 批次作業的 Lambda 函數範例
以下範例 Python Lambda 從版本控制的物件中移除了刪除標記。
如範例所示,來自 S3 批次操作的金鑰是 URL 編碼。若要將 Amazon S3 與其他 AWS 服務搭配使用,請務必對從 S3 批次操作傳遞的金鑰進行 URL 解碼。
import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
建立 S3 批次作業任務以叫用 Lambda 函數
建立 S3 批次作業任務來叫用 Lambda 函數時,您必須提供下列項目:
-
Lambda 函數的 ARN (可能包含函數別名或特定版本編號)
-
具有調用函數許可的 IAM 角色
-
動作參數
LambdaInvokeFunction
如需有關建立 S3 批次作業任務的詳細資訊,請參閱建立 S3 批次操作任務和S3 批次操作支援的操作。
下列範例會建立 S3 批次操作任務,該任務會使用 叫用 Lambda 函數 AWS CLI。若要使用此範例,請取代
使用您自己的資訊。使用者輸入預留位置
aws s3control create-job --account-id
account-id
--operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:region
:account-id
:function:LambdaFunctionName
" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::amzn-s3-demo-manifest-bucket
","ETag":"ManifestETag
"}}' --report '{"Bucket":"arn:aws:s3:::amzn-s3-demo-bucket
","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix
","ReportScope":"AllTasks"}' --priority2
--role-arn arn:aws:iam::account-id
:role/BatchOperationsRole
--regionregion
--description "Lambda Function
"
在 Lambda 資訊清單中提供工作層級的資訊
當您將 AWS Lambda 函數與 S3 批次操作搭配使用時,您可能會希望每個操作的任務或金鑰隨附額外的資料。例如,您可能想要同時提供來源物件金鑰和新的物件金鑰。這樣 Lambda 函數就能以新名稱,將來源金鑰複製到新的 S3 儲存貯體。根據預設,批次操作可讓您將輸入資訊清單中的目的地儲存貯體和來源索引鍵清單指定為任務。下列範例說明如何在資訊清單中包含其他資料,以便您可以執行更複雜的 Lambda 函數。
若要在 S3 批次操作資訊清單中指定要在 Lambda 函數程式碼中使用的每個金鑰參數,請使用下列 URL 編碼的 JSON 格式。key
欄位視同 Amazon S3 物件金鑰傳遞給 Lambda 函數。但是 Lambda 函數可以解譯為包含其他值或多個金鑰,如下列範例所示。
注意
資訊清單中 key
欄位的字元數上限是 1,024。
範例 — 以 JSON 字串取代「Amazon S3 金鑰」的清單
URL 編碼版本必須提供給 S3 批次操作。
amzn-s3-demo-bucket
,{"origKey": "object1key
", "newKey": "newObject1Key
"}amzn-s3-demo-bucket
,{"origKey": "object2key
", "newKey": "newObject2Key
"}amzn-s3-demo-bucket
,{"origKey": "object3key
", "newKey": "newObject3Key
"}
範例 — 清單單URL編碼
此 URL 編碼版本必須提供給 S3 批次操作。 non-URL-encoded 版本無法運作。
amzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object1key
%22%2C%20%22newKey%22%3A%20%22newObject1Key
%22%7Damzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object2key
%22%2C%20%22newKey%22%3A%20%22newObject2Key
%22%7Damzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object3key
%22%2C%20%22newKey%22%3A%20%22newObject3Key
%22%7D
範例 — 以資訊清單格式將結果寫入任務報告的 Lambda 函數
此 URL 編碼資訊清單範例包含管道分隔物件索引鍵,供下列 Lambda 函數剖析。
amzn-s3-demo-bucket
,object1key
%7Cloweramzn-s3-demo-bucket
,object2key
%7Cupperamzn-s3-demo-bucket
,object3key
%7Creverseamzn-s3-demo-bucket
,object4key
%7Cdelete
此 Lambda 函數說明如何剖析編碼為 S3 批次操作資訊清單的管道分隔任務。任務會指出要套用至指定物件的修訂版作業。
import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.resource("s3") def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = parse.unquote(task["s3Key"], encoding="utf-8").split("|") bucket_name = task["s3BucketArn"].split(":")[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()["Body"].read().decode("utf-8") if revision == "lower": stanza = stanza.lower() elif revision == "upper": stanza = stanza.upper() elif revision == "reverse": stanza = stanza[::-1] elif revision == "delete": pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == "delete": stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, "utf-8")) result_string = ( f"Applied revision type '{revision}' to " f"stanza {stanza_obj.key}." ) logger.info(result_string) result_code = "Succeeded" except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": result_code = "Succeeded" result_string = ( f"Stanza {obj_key} not found, assuming it was deleted " f"in an earlier revision." ) logger.info(result_string) else: result_code = "PermanentFailure" result_string = ( f"Got exception when applying revision type '{revision}' " f"to {obj_key}: {error}." ) logger.exception(result_string) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
S3 批次操作教學課程
下列教學課程會針對某些使用 Lambda 的批次操作任務顯示完整的 end-to-end 程序。在本教學課程中,您將了解如何設定批次操作來調用 Lambda 函數,以批次轉碼存放在 S3 來源儲存貯體中的影片。Lambda 函數會呼叫 AWS Elemental MediaConvert 來轉碼影片。