

# 调用 AWS Lambda 函数
<a name="batch-ops-invoke-lambda"></a>

可以使用 Amazon S3 批量操作对 Amazon S3 对象执行大规模批量操作。批量操作**调用 AWS Lambda 函数**操作将启动 AWS Lambda 函数，来对清单中列出的对象执行自定义操作。本部分介绍如何创建 Lambda 函数来与 S3 批量操作结合使用，以及如何创建任务来调用该函数。S3 批量操作任务使用 `LambdaInvoke` 操作对清单中列出的每个对象运行 Lambda 函数。

可以使用 Amazon S3 控制台、AWS Command Line Interface（AWS CLI）、AWS SDK 或 Amazon S3 REST API 处理 S3 批量操作。有关使用 Lambda 的更多信息，请参阅《*AWS Lambda 开发人员指南*》中的[入门AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)。

以下部分说明如何开始将 S3 批量操作与 Lambda 结合使用。

**Topics**
+ [将 Lambda 与批量操作结合使用](#batch-ops-invoke-lambda-using)
+ [创建与 S3 批量操作一起使用的 Lambda 函数](#batch-ops-invoke-lambda-custom-functions)
+ [创建调用 Lambda 函数的 S3 批量操作任务](#batch-ops-invoke-lambda-create-job)
+ [在 Lambda 清单中提供任务级信息](#storing-task-level-information-in-lambda)
+ [S3 批量操作教程](#batch-ops-tutorials-lambda)

## 将 Lambda 与批量操作结合使用
<a name="batch-ops-invoke-lambda-using"></a>

将 S3 批量操作与 AWS Lambda 结合使用时，您必须创建专门用于 S3 批量操作的新 Lambda 函数。您无法在 S3 批量操作中重复使用基于事件的现有 Amazon S3 函数。事件函数只能接收消息；不返回消息。与 S3 批量操作结合使用的 Lambda 函数必须接受并返回消息。有关将 Lambda 与 Amazon S3 事件结合使用的更多信息，请参阅《*AWS Lambda 开发人员指南*》中的[将 AWS Lambda 与 Amazon S3](https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html) 结合使用。

创建调用 Lambda 函数的 S3 批量操作任务。此任务在清单中列出的所有对象上运行相同的 Lambda 函数。在处理清单中的对象时，您可以控制要使用的 Lambda 函数版本。S3 批量操作支持非限定的 Amazon Resource Name（ARN）、别名和特定版本。有关更多信息，请参阅《*AWS Lambda 开发人员指南*》中的[AWS Lambda 版本控制简介](https://docs.aws.amazon.com/lambda/latest/dg/versioning-intro.html)。

如果您提供的 S3 批量操作任务包含使用别名或 `$LATEST` 限定符的函数 ARN，并更新它们各自所指向的版本，S3 批量操作开始调用 Lambda 函数的新版本。如果您希望通过大型任务更新此方法的功能部分，这会非常有用。如果您不希望 S3 批量操作更改所使用的版本，请在创建任务时在 `FunctionARN` 参数中提供特定版本。

使用 S3 批量操作的单个 AWS Lambda 作业可以支持包含多达 200 亿个对象的清单。

### 将 Lambda 和批量操作用于目录存储桶
<a name="batch-ops-invoke-lambda-directory-buckets"></a>

目录存储桶是一种 Amazon S3 存储桶类型，专为需要一致的个位数毫秒延迟的工作负载或性能至关重要的应用程序而设计。有关更多信息，请参阅[目录存储桶](https://docs.aws.amazon.com//AmazonS3/latest/userguide/directory-buckets-overview.html)。

使用批量操作来调用处理目录存储桶的 Lambda 函数有特殊要求。例如，必须使用更新后的 JSON 架构来构建 Lambda 请求，并在创建任务时指定 [https://docs.aws.amazon.com//AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion](https://docs.aws.amazon.com//AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion) 2.0（而不是 1.0）。这一更新后的架构可让您为 [https://docs.aws.amazon.com//AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-UserArguments](https://docs.aws.amazon.com//AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-UserArguments) 指定可选的键值对，您可以使用这些键值对修改现有 Lambda 函数的某些参数。有关更多信息，请参阅 AWS 存储博客中的 [Automate object processing in Amazon S3 directory buckets with S3 Batch Operations and AWS Lambda](https://aws.amazon.com/blogs/storage/automate-object-processing-in-amazon-s3-directory-buckets-with-s3-batch-operations-and-aws-lambda/)。

### 响应和结果代码
<a name="batch-ops-invoke-lambda-response-codes"></a>

S3 批量操作使用一个或多个键调用 Lambda 函数，每个键都有一个与之关联的 `TaskID`。S3 批量操作需要 Lambda 函数提供每个键的结果代码。对于在请求中发送的任何任务 ID，如果没有为它们返回每个键的结果代码，则将从 `treatMissingKeysAs` 字段中提供结果代码。`treatMissingKeysAs` 是可选的请求字段，默认为 `TemporaryFailure`。下表包含 `treatMissingKeysAs` 字段的其它可能的结果代码和值。


| 响应代码 | 描述 | 
| --- | --- | 
| Succeeded | 任务正常完成。如果您请求了任务完成报告，报告中将包含任务的结果字符串。 | 
| TemporaryFailure | 任务暂时失败，将在任务完成前重新启动。忽略结果字符串。如果是最后一次重新启动，最终报告将包含错误消息。 | 
| PermanentFailure | 任务永久失败。如果您请求了任务完成报告，任务将被标记为 Failed 并包含错误消息字符串。忽略失败任务的结果字符串。 | 

## 创建与 S3 批量操作一起使用的 Lambda 函数
<a name="batch-ops-invoke-lambda-custom-functions"></a>

此部分提供使用 Lambda 函数必须具有的 AWS Identity and Access Management (IAM) 权限示例。它还包含一个与 S3 批量操作一起使用的示例 Lambda 函数。如果您之前从未创建过 Lambda 函数，请参阅《*AWS Lambda 开发人员指南*》中的[教程：将 AWS Lambda 与 Amazon S3 结合使用](https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html)。

您必须创建专门与 S3 批量操作一起使用的 Lambda 函数。您无法重用基于 Amazon S3 事件的现有 Lambda 函数，因为用于 S3 批量操作的 Lambda 函数必须接受并返回特殊数据字段。

**重要**  
用 Java 编写的 AWS Lambda 函数接受 [https://github.com/aws/aws-lambda-java-libs/blob/master/aws-lambda-java-core/src/main/java/com/amazonaws/services/lambda/runtime/RequestHandler.java](https://github.com/aws/aws-lambda-java-libs/blob/master/aws-lambda-java-core/src/main/java/com/amazonaws/services/lambda/runtime/RequestHandler.java) 或 [https://github.com/aws/aws-lambda-java-libs/blob/master/aws-lambda-java-core/src/main/java/com/amazonaws/services/lambda/runtime/RequestStreamHandler.java](https://github.com/aws/aws-lambda-java-libs/blob/master/aws-lambda-java-core/src/main/java/com/amazonaws/services/lambda/runtime/RequestStreamHandler.java) 处理程序接口。但是，为了支持 S3 批量操作请求和响应格式，AWS Lambda 需要 `RequestStreamHandler` 接口来对请求和响应进行自定义序列化和反序列化。此接口允许 Lambda 将 InputStream 和 OutputStream 传递给 Java `handleRequest` 方法。  
将 Lambda 函数与 S3 批量操作结合使用时，请务必使用 `RequestStreamHandler` 接口。如果您使用 `RequestHandler` 接口，则批处理任务会失败，并在完成报告中显示“Lambda 负载中返回无效的 JSON”。  
有关更多信息，请参阅《*AWS Lambda 用户指南*》中的[处理程序接口](https://docs.aws.amazon.com//lambda/latest/dg/java-handler.html#java-handler-interfaces)。

### IAM 权限示例
<a name="batch-ops-invoke-lambda-custom-functions-iam"></a>

以下是将 Lambda 函数与 S3 批量操作结合使用所需的 IAM 权限示例。

**Example – S3 批量操作信任策略**  
以下是您可用于批量操作 IAM 角色的信任策略示例。当您创建任务并授予批量操作代入 IAM 角色的权限时，会指定此 IAM 角色。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "batchoperations.s3.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

**Example – Lambda IAM 策略**  
以下是提供 S3 批量操作权限以调用 Lambda 函数和读取输入清单的 IAM 策略示例。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "BatchOperationsLambdaPolicy",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion",
                "s3:PutObject",
                "lambda:InvokeFunction"
            ],
            "Resource": "*"
        }
    ]
}
```

### 示例请求和响应
<a name="batch-ops-invoke-lambda-custom-functions-request"></a>

本节内容提供 Lambda 函数的请求和响应示例。

**Example 请求**  
以下是 Lambda 函数的请求的 JSON 示例。  

```
{
    "invocationSchemaVersion": "1.0",
    "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
    "job": {
        "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce"
    },
    "tasks": [
        {
            "taskId": "dGFza2lkZ29lc2hlcmUK",
            "s3Key": "customerImage1.jpg",
            "s3VersionId": "1",
            "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:amzn-s3-demo-bucket1"
        }
    ]
}
```

**Example 响应**  
以下是 Lambda 函数的响应的 JSON 示例。  

```
{
  "invocationSchemaVersion": "1.0",
  "treatMissingKeysAs" : "PermanentFailure",
  "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
  "results": [
    {
      "taskId": "dGFza2lkZ29lc2hlcmUK",
      "resultCode": "Succeeded",
      "resultString": "[\"Mary Major", \"John Stiles\"]"
    }
  ]
}
```

### S3 批量操作的 Lambda 函数示例
<a name="batch-ops-invoke-lambda-custom-functions-example"></a>

以下示例中，Python Lambda 从受版本控制的对象中移除删除标记。

如示例所示，S3 批量操作中的键采用 URL 编码。要将 Amazon S3 与其他 AWS 服务结合使用，请务必对从 S3 批量操作传递的键进行 URL 解码。

```
import logging
from urllib import parse
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
logger.setLevel("INFO")

s3 = boto3.client("s3")


def lambda_handler(event, context):
    """
    Removes a delete marker from the specified versioned object.

    :param event: The S3 batch event that contains the ID of the delete marker
                  to remove.
    :param context: Context about the event.
    :return: A result structure that Amazon S3 uses to interpret the result of the
             operation. When the result code is TemporaryFailure, S3 retries the
             operation.
    """
    # Parse job parameters from Amazon S3 batch operations
    invocation_id = event["invocationId"]
    invocation_schema_version = event["invocationSchemaVersion"]

    results = []
    result_code = None
    result_string = None

    task = event["tasks"][0]
    task_id = task["taskId"]

    try:
        obj_key = parse.unquote_plus(task["s3Key"], encoding="utf-8")
        obj_version_id = task["s3VersionId"]
        bucket_name = task["s3BucketArn"].split(":")[-1]

        logger.info(
            "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key
        )

        try:
            # If this call does not raise an error, the object version is not a delete
            # marker and should not be deleted.
            response = s3.head_object(
                Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id
            )
            result_code = "PermanentFailure"
            result_string = (
                f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker."
            )

            logger.debug(response)
            logger.warning(result_string)
        except ClientError as error:
            delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get(
                "x-amz-delete-marker", "false"
            )
            if delete_marker == "true":
                logger.info(
                    "Object %s, version %s is a delete marker.", obj_key, obj_version_id
                )
                try:
                    s3.delete_object(
                        Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id
                    )
                    result_code = "Succeeded"
                    result_string = (
                        f"Successfully removed delete marker "
                        f"{obj_version_id} from object {obj_key}."
                    )
                    logger.info(result_string)
                except ClientError as error:
                    # Mark request timeout as a temporary failure so it will be retried.
                    if error.response["Error"]["Code"] == "RequestTimeout":
                        result_code = "TemporaryFailure"
                        result_string = (
                            f"Attempt to remove delete marker from  "
                            f"object {obj_key} timed out."
                        )
                        logger.info(result_string)
                    else:
                        raise
            else:
                raise ValueError(
                    f"The x-amz-delete-marker header is either not "
                    f"present or is not 'true'."
                )
    except Exception as error:
        # Mark all other exceptions as permanent failures.
        result_code = "PermanentFailure"
        result_string = str(error)
        logger.exception(error)
    finally:
        results.append(
            {
                "taskId": task_id,
                "resultCode": result_code,
                "resultString": result_string,
            }
        )
    return {
        "invocationSchemaVersion": invocation_schema_version,
        "treatMissingKeysAs": "PermanentFailure",
        "invocationId": invocation_id,
        "results": results,
    }
```

## 创建调用 Lambda 函数的 S3 批量操作任务
<a name="batch-ops-invoke-lambda-create-job"></a>

创建 S3 批量操作任务以调用 Lambda 函数时，必须提供以下信息：
+ Lambda 函数的 ARN（可能包含函数别名或特定版本号）
+ 具有调用此函数的权限的 IAM 角色
+ 操作参数 `LambdaInvokeFunction`

有关创建 S3 批量操作任务的更多信息，请参阅 [创建 S3 批量操作任务](batch-ops-create-job.md) 和 [S3 分批操作支持的操作](batch-ops-operations.md)。

以下示例使用 AWS CLI 创建用于调用 Lambda 函数的 S3 批量操作任务。要使用此示例，请将 *`user input placeholders`* 替换为您自己的信息。

```
aws s3control create-job
    --account-id account-id
    --operation  '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:region:account-id:function:LambdaFunctionName" } }'
    --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::amzn-s3-demo-manifest-bucket","ETag":"ManifestETag"}}'
    --report '{"Bucket":"arn:aws:s3:::amzn-s3-demo-bucket","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}'
    --priority 2
    --role-arn arn:aws:iam::account-id:role/BatchOperationsRole
    --region region
    --description "Lambda Function"
```

## 在 Lambda 清单中提供任务级信息
<a name="storing-task-level-information-in-lambda"></a>

当您将 AWS Lambda 函数与 S3 批量操作结合使用时，对于所运行的各个任务或键，可能会需要附带额外的数据。例如，您可能希望同时提供源对象键和新的对象键。然后，您的 Lambda 函数可以将源键以新名称复制到新 S3 存储桶。默认情况下，通过批量操作，可以仅在任务的输入清单中指定目标存储桶以及源键的列表。下面的示例介绍了如何在清单中包含额外的数据，以便运行更复杂的 Lambda 函数。

要在 S3 批量操作清单中指定各个键的参数以在 Lambda 函数代码中使用，请使用以下 URL 编码的 JSON 格式。`key` 字段采用类似于 Amazon S3 对象键的方式传递到您的 Lambda 函数。但是，Lambda 函数可以解释此字段来包含其它值或多个键，如下示例所示。

**注意**  
清单中 `key` 字段的最大字符数为 1024。

**Example – 清单将“Amazon S3 密钥”替换为 JSON 字符串**  
必须向 S3 批量操作提供 URL 编码的版本。  

```
amzn-s3-demo-bucket,{"origKey": "object1key", "newKey": "newObject1Key"}
amzn-s3-demo-bucket,{"origKey": "object2key", "newKey": "newObject2Key"}
amzn-s3-demo-bucket,{"origKey": "object3key", "newKey": "newObject3Key"}
```

**Example – URL 编码的清单**  
必须向 S3 批量操作提供此 URL 编码的版本。非 URL 编码的版本不起作用。  

```
amzn-s3-demo-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D
amzn-s3-demo-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D
amzn-s3-demo-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D
```

**Example – 具有清单格式的 Lambda 函数将结果写入任务报告**  
这个 URL 编码的清单示例包含以竖线分隔的对象键，供以下 Lambda 函数进行解析。  

```
amzn-s3-demo-bucket,object1key%7Clower
amzn-s3-demo-bucket,object2key%7Cupper
amzn-s3-demo-bucket,object3key%7Creverse
amzn-s3-demo-bucket,object4key%7Cdelete
```
此 Lambda 函数显示了如何解析编码到 S3 批量操作清单中的以竖线分隔的任务。该任务指示应用于指定对象的修订操作。  

```
import logging
from urllib import parse
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
logger.setLevel("INFO")

s3 = boto3.resource("s3")


def lambda_handler(event, context):
    """
    Applies the specified revision to the specified object.

    :param event: The Amazon S3 batch event that contains the ID of the object to
                  revise and the revision type to apply.
    :param context: Context about the event.
    :return: A result structure that Amazon S3 uses to interpret the result of the
             operation.
    """
    # Parse job parameters from Amazon S3 batch operations
    invocation_id = event["invocationId"]
    invocation_schema_version = event["invocationSchemaVersion"]

    results = []
    result_code = None
    result_string = None

    task = event["tasks"][0]
    task_id = task["taskId"]
    # The revision type is packed with the object key as a pipe-delimited string.
    obj_key, revision = parse.unquote_plus(task["s3Key"], encoding="utf-8").split("|")
    bucket_name = task["s3BucketArn"].split(":")[-1]

    logger.info("Got task: apply revision %s to %s.", revision, obj_key)

    try:
        stanza_obj = s3.Bucket(bucket_name).Object(obj_key)
        stanza = stanza_obj.get()["Body"].read().decode("utf-8")
        if revision == "lower":
            stanza = stanza.lower()
        elif revision == "upper":
            stanza = stanza.upper()
        elif revision == "reverse":
            stanza = stanza[::-1]
        elif revision == "delete":
            pass
        else:
            raise TypeError(f"Can't handle revision type '{revision}'.")

        if revision == "delete":
            stanza_obj.delete()
            result_string = f"Deleted stanza {stanza_obj.key}."
        else:
            stanza_obj.put(Body=bytes(stanza, "utf-8"))
            result_string = (
                f"Applied revision type '{revision}' to " f"stanza {stanza_obj.key}."
            )

        logger.info(result_string)
        result_code = "Succeeded"
    except ClientError as error:
        if error.response["Error"]["Code"] == "NoSuchKey":
            result_code = "Succeeded"
            result_string = (
                f"Stanza {obj_key} not found, assuming it was deleted "
                f"in an earlier revision."
            )
            logger.info(result_string)
        else:
            result_code = "PermanentFailure"
            result_string = (
                f"Got exception when applying revision type '{revision}' "
                f"to {obj_key}: {error}."
            )
            logger.exception(result_string)
    finally:
        results.append(
            {
                "taskId": task_id,
                "resultCode": result_code,
                "resultString": result_string,
            }
        )
    return {
        "invocationSchemaVersion": invocation_schema_version,
        "treatMissingKeysAs": "PermanentFailure",
        "invocationId": invocation_id,
        "results": results,
    }
```

## S3 批量操作教程
<a name="batch-ops-tutorials-lambda"></a>

以下教程提供了使用 Lambda 的一些批量操作任务的完整端到端过程。在本教程中，您将了解如何将批量操作设置为调用 Lambda 函数，来对存储在 S3 源存储桶中的视频进行批量转码。Lambda 函数调用 AWS Elemental MediaConvert 来对视频进行转码。
+ [教程：使用 S3 批量操作对视频进行批量转码](tutorial-s3-batchops-lambda-mediaconvert-video.md)