

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 擷取 Amazon MSK 和自我管理的 Apache Kafka 事件來源的捨棄批次
<a name="kafka-on-failure"></a>

若要保留失敗的事件來源映射調用記錄，請將目標地新增到函數的事件來源映射中。傳送至該目的地的每筆記錄都是包含失敗調用之中繼資料的 JSON 文件。對於 Amazon S3 目的地，Lambda 還會將整個調用記錄與中繼資料一起傳送。您可以設定任何 Amazon SNS 主題、Amazon SQS 佇列、Amazon S3 儲存貯體或 Kafka 做為目的地。

透過 Amazon S3 目的地，您可以使用 [Amazon S3 事件通知](https://docs.aws.amazon.com/)功能，在物件上傳至您的目的地 S3 儲存貯體時接收通知。您也可以設定 S3 事件通知來調用另一個 Lambda 函數，以對失敗的批次執行自動處理。

執行角色必須具有目的地的許可：
+ **對於 SQS 目的地：**[sqs：SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **對於 SNS 目的地：**[sns：Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **對於 S3 目的地：**[s3：PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) 和 [s3：ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **對於 Kafka 目的地：**[kafka-cluster：WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

您可以將 Kafka 主題設定為 Kafka 事件來源映射的失敗時目的地。當 Lambda 在耗盡重試嘗試後或記錄超過最長存留期後無法處理記錄時，Lambda 會將失敗的記錄傳送至指定的 Kafka 主題，以供稍後處理。請參閱[使用 Kafka 主題做為失敗時的目的地](kafka-on-failure-destination.md)。

必須在 Kafka 叢集 VPC 內為失敗時目的地服務部署 VPC 端點。

此外，如果您在目的地上設定 KMS 金鑰，則視目的地類型而定，Lambda 需要下列許可：
+ 如果您已針對 S3 目的地使用自己的 KMS 金鑰啟用加密，則需要 [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html)。如果 KMS 金鑰和 S3 儲存貯體目的地與 Lambda 函數和執行角色位於不同的帳戶中，請將 KMS 金鑰設定為信任執行角色以允許 kms:GenerateDataKey。
+ 如果您已針對 SQS 目的地使用自己的 KMS 金鑰啟用加密，則需要 [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) 和 [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html)。如果 KMS 金鑰和 SQS 佇列目的地與 Lambda 函數和執行角色位於不同的帳戶中，請將 KMS 金鑰設定為信任執行角色，以允許 kms:Decrypt、kms:GenerateDataKey、[kms:DescribeKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html) 以及 [kms:ReEncrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_ReEncrypt.html)。
+ 如果您已針對 SNS 目的地使用自己的 KMS 金鑰啟用加密，則需要 [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) 和 [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html)。如果 KMS 金鑰和 SNS 主題目的地與 Lambda 函數和執行角色位於不同的帳戶中，請將 KMS 金鑰設定為信任執行角色，以允許 kms:Decrypt、kms:GenerateDataKey、[kms:DescribeKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html) 以及 [kms:ReEncrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_ReEncrypt.html)。

## 設定 Kafka 事件來源映射的失敗時目的地
<a name="kafka-onfailure-destination"></a>

若要使用主控台設定失敗時的目的地，請依照下列步驟執行：

1. 開啟 Lambda 主控台中的 [函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇一個函數。

1. 在**函數概觀下**，選擇**新增目的地**。

1. 針對**來源**，請選擇**事件來源映射調用**。

1. 對於**事件來源映射**，請選擇針對此函數設定的事件來源。

1. 對於**條件**，選取**失敗時**。對於事件來源映射調用，這是唯一可接受的條件。

1. 對於**目標類型**，請選擇 Lambda 將調用記錄傳送至的目標類型。

1. 對於**目的地**，請選擇一個資源。

1. 選擇**儲存**。

您也可以使用 AWS CLI設定失敗時的目的地。例如，下列 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 命令會將具有 SQS 失敗時的目的地的事件來源映射新增至 `MyFunction`：

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

下列 [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) 命令會將 S3 失敗時的目的地新增至與輸入 `uuid` 相關聯的事件來源：

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
```

若要移除目的地，請提供空白字串作為 `destination-config` 參數的引數：

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Amazon S3 目的地的安全最佳實務
<a name="kafka-s3-destination-security"></a>

刪除已設定為目的地的 S3 儲存貯體而不從函數的組態中移除目的地，可能會產生安全風險。如果其他使用者知道目的地儲存貯體的名稱，他們可以在其 AWS 帳戶中重新建立儲存貯體。失敗調用的記錄會被傳送到其儲存貯體，可能公開來自您函數的資料。

**警告**  
為了確保無法將來自函數的調用記錄傳送到另一個 中的 S3 儲存貯體 AWS 帳戶，請將條件新增至函數的執行角色，以限制您帳戶中儲存貯體的`s3:PutObject`許可。

下列範例顯示的 IAM 政策，將函數的 `s3:PutObject` 許可限制為帳戶中的儲存貯體。此政策也為 Lambda 提供了使用 S3 儲存貯體做為目的地所需的 `s3:ListBucket` 許可。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": {{"111122223333"}}
                }
            }
        }
    ]
}
```

若要使用 AWS 管理主控台 或 將許可政策新增至函數的執行角色 AWS CLI，請參閱下列程序中的指示：

------
#### [ Console ]

**若要將許可政策新增至函數的執行角色 (主控台)**

1. 開啟 Lambda 主控台中的[函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選取您要修改其執行角色的 Lambda 函數。

1. 在**組態**索引標籤中，選擇**許可**。

1. 在**執行角色**索引標籤中，選取函數的**角色名稱**，以開啟角色的 IAM 主控台頁面。

1. 透過下列步驟將許可政策新增至角色：

   1. 在**許可政策**窗格中，選擇**新增許可** ，然後選取**建立內嵌政策**。

   1. 在**政策編輯器**中，選取 **JSON**。

   1. 將您要新增的政策貼入編輯器 (取代現有的 JSON)，然後選擇**下一步**。

   1. 在**政策詳細資訊**下，輸入**政策名稱**。

   1. 選擇**建立政策**。

------
#### [ AWS CLI ]

**若要將許可政策新增至函數的執行角色 (CLI)**

1. 建立具有所需許可的 JSON 政策文件，並將其儲存在本機目錄中。

1. 使用 IAM `put-role-policy` CLI 命令，將許可新增至函數的執行角色。從您儲存 JSON 政策文件的目錄執行下列命令，並將角色名稱、政策名稱和政策文件取代為您自己的值。

   ```
   aws iam put-role-policy \
   --role-name {{my_lambda_role}} \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://{{my_policy.json}}
   ```

------

### SNS 和 SQS 調用記錄範例
<a name="kafka-sns-sqs-destinations"></a>

下列範例顯示 Lambda 針對失敗的 Kafka 事件來源調用而傳送至 SNS 主題或 SQS 佇列目的地。`recordsInfo` 之下的每個索引鍵都包含 Kafka 主題和分割區，以連字號分隔。例如，對於金鑰 `"Topic-0"`，`Topic` 是 Kafka 主題，並且 `0` 是分區。對於每個主題和分區，您可以使用偏移和時間戳記資料來查找原始調用記錄。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
        "approximateInvokeCount": 1
    },
    "responseContext": { // null if record is MaximumPayloadSizeExceeded
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KafkaBatchInfo": {
        "batchSize": 500,
        "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
        "bootstrapServers": "...",
        "payloadSize": 2039086, // In bytes
        "recordsInfo": {
            "Topic-0": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            },
            "Topic-1": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            }
        }
    }
}
```

### S3 目的地調用記錄範例
<a name="kafka-s3-destinations"></a>

對於 S3 的目的地，Lambda 會將整個調用記錄與中繼資料一起傳送至目的地。下列範例顯示 Lambda 針對失敗的 Kafka 事件來源調用而傳送至 S3 儲存貯體目的地。除了上一個 SQS 和 SNS 目的地範例中的所有欄位之外，此 `payload` 欄位還包含原始調用記錄做為逸出 JSON 字串。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
        "approximateInvokeCount": 1
    },
    "responseContext": { // null if record is MaximumPayloadSizeExceeded
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KafkaBatchInfo": {
        "batchSize": 500,
        "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
        "bootstrapServers": "...",
        "payloadSize": 2039086, // In bytes
        "recordsInfo": {
            "Topic-0": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            },
            "Topic-1": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            }
        }
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

**提示**  
建議您在目的地儲存貯體上啟用 S3 版本控制。