

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Lambda 處理來自 Amazon Kinesis Data Streams 的記錄
<a name="with-kinesis"></a>

您可以使用 Lambda 函數來處理 [Amazon Kinesis 資料串流](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)中的記錄。您可以將 Lambda 函數對應至 Kinesis Data Streams 共用輸送量取用程式 (標準迭代程式)，或對應至具有[增強散發](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html)功能的專用輸送量取用程式。對於標準迭代程式，Lambda 會使用 HTTP 協定輪詢 Kinesis 串流中的每個碎片以尋找記錄。事件來源映射會與碎片的其他取用者共用讀取傳輸量。

 如需 Kinesis 資料串流的詳細資訊，請參閱[從 Amazon Kinesis Data Streams 讀取資料](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html)。

**注意**  
Kinesis 會收取每個碎片的費用，以及從串流讀取資料的增強型散發。如需定價的詳細資訊，請參閱 [Amazon Kinesis 定價](https://aws.amazon.com/kinesis/data-streams/pricing)。

## 輪詢和批次處理串流
<a name="kinesis-polling-and-batching"></a>

Lambda 會從資料串流讀取記錄並透過包含串流記錄的事件[同步](invocation-sync.md)調用函數。Lambda 會讀取批次中的記錄並調用函數，以處理來自該批次的記錄。每個批次包含來自單一碎片/資料串流的記錄。

您的 Lambda 函數是適用於資料串流的取用者應用程式。其會從每個碎片中一次處理一個批次的記錄。您可以將 Lambda 函數對應至共用輸送量取用程式 (標準迭代程式)，或對應至具有增強散發功能的專用輸送量取用程式。
+ **標準迭代器：**Lambda 會輪詢 Kinesis 串流中的每個碎片，其記錄的基本速率為每秒一次。有更多記錄可用時，Lambda 會持續處理批次，直到函數掌握串流資訊。事件來源映射會與碎片的其他取用者共用讀取傳輸量。
+ **增強散發功能：**若要將延遲降至最低，並將讀取傳輸量增至最大，請建立具有[增強散發功能](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)的資料串流取用者。增強散發取用者會取得每個碎片的專用連線，其不會影響其他從串流讀取的應用程式。串流取用者會使用 HTTP/2 來減少延遲，方法為透過長期連線將記錄推送至 Lambda，以及透過壓縮請求標頭。您可以使用 Kinesis [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) API，建立串流取用者。

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

您應該會看到下列輸出：

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

若要增加函數處理記錄的速度，[請將碎片新增至您的資料串流](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards)。Lambda 會依序在每個碎片中處理記錄。如果您的函數傳回錯誤，則會停止處理碎片中的其他記錄。碎片越多，要一次處理的批次越多，這會降低錯誤對並行的影響。

如果您的函數無法擴展至可處理並行批次的總數，您可以[請求增加配額](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html)，或為您的函數[保留並行](configuration-concurrency.md)。

Lambda 預設會在記錄可用時立即調用函數。如果 Lambda 從事件來源中讀取的批次只有一筆記錄，Lambda 只會傳送一筆記錄至函數。為避免調用具有少量記錄的函數，您可設定*批次間隔*，請求事件來源緩衝記錄最長達五分鐘。調用函數之前，Lambda 會繼續從事件來源中讀取記錄，直到收集到完整批次、批次間隔到期或者批次達到 6 MB 的承載限制。如需更多詳細資訊，請參閱 [批次處理行為](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

**警告**  
Lambda 事件來源映射至少會處理每個事件一次，而且可能會重複處理記錄。為避免與重複事件相關的潛在問題，強烈建議您讓函數程式碼具有等冪性。如需詳細資訊，請參閱 AWS 知識中心中的[如何讓 Lambda 函數具有冪等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

Lambda 在傳送下一批進行處理前不會等待任何已設定的[擴充](lambda-extensions.md)完成。換句話說，當 Lambda 處理下一批記錄時，您的擴充功能可能會繼續執行。如果您違反任何帳戶的 [並行](lambda-concurrency.md) 設定或限制，便可能會產生限流的問題。若要偵測此是否為潛在問題，請監控您的函數，並確認您看到的 [並行指標](monitoring-concurrency.md#general-concurrency-metrics) 是否高於事件來源映射的預期值。由於兩次調用之間的時間很短，Lambda 可能會短暫報告比碎片數目更高的並行用量。即使對於沒有延伸項目的 Lambda 函數也可能如此。

設定 [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 設定，同時使用多個 Lambda 調用來處理 Kinesis 資料串流的一個碎片。您可以透過從 1 (預設) 到 10 的並行化因子指定 Lambda 從碎片輪詢的並行批次數。例如，當 `ParallelizationFactor` 設定為 2 時，您最多可以有 200 個並行 Lambda 調用，來處理 100 個 Kinesis 資料碎片 (不過在實務中，`ConcurrentExecutions` 指標可能有不同值)。當資料量急劇波動並且 `IteratorAge` 高時，這有助於縱向擴展處理輸送量。如果增加每個碎片的並行批次數量，則 Lambda 仍會確保在分割區索引鍵層級進行順序處理。

您也可以搭配 Kinesis 彙總使用 `ParallelizationFactor`。事件來源映射的行為取決於您是否使用[增強型散發](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)：
+ **沒有使用增強型散發**：彙總事件內的所有事件都必須具有相同的分割區索引鍵。分割區索引鍵也必須與彙總事件的分割區索引鍵相符。如果彙總事件內的事件具有不同的分割區索引鍵，則 Lambda 無法保證依分割區索引鍵依序處理事件。
+ **使用增強型散發**：首先，Lambda 會將彙總的事件解碼為其個別事件。彙總的事件可以具有與其包含的事件不同的分割區索引鍵。不過，未對應至分割區索引鍵的事件會[遭到捨棄和遺失](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md)。Lambda 不會處理這些事件，也不會將其傳送至設定的失敗目的地。

## 範例事件
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```