

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 Lambda 中實作有狀態的 DynamoDB 串流處理
<a name="services-ddb-windows"></a>

Lambda 函數可執行持續串流處理應用程式。串流表示持續在應用程式中流動的無限制資料。若要分析此持續更新輸入中的資訊，您可以使用定義的時段來限制包含的記錄。

輪轉時段是定期開啟和關閉的不同時段。依預設，Lambda 調用是無狀態的，您無法在沒有外部資料庫的情況下，將其用於處理多個持續調用的資料。然而，使用輪轉時段，您可以在不同的調用間維護狀態。此狀態包含之前為目前時段處理之訊息的彙總結果。狀態可以是每個分區最多 1 MB。如果超過該大小，則 Lambda 會提前終止時段。

串流中的每個記錄都屬於一個特定時段。Lambda 至少會處理一次每筆記錄，但不保證每筆記錄只會處理一次。在極少數情況下，例如錯誤處理，某些記錄可能會處理多次。第一次時一律會依序處理記錄。如果多次處理記錄，則可能不會按順序處理。

## 彙總與處理
<a name="streams-tumbling-processing"></a>

調用您的使用者管理函數進行彙總，以及處理該彙總的最終結果。Lambda 會彙總時段中接收的所有記錄。您可以在多個批次中接收這些記錄，各自作為單獨的調用。每次調用會收到一個狀態。因此，當使用輪轉時段時，您的 Lambda 函數回應必須包含 `state` 屬性。如果回應不包含 `state` 屬性，Lambda 會將此視為失敗的調用。為了滿足此條件，您的函數可以返回一個 `TimeWindowEventResponse` 物件，它具有下列 JSON 形狀：

**Example `TimeWindowEventResponse` 值**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**注意**  
對於 Java 函數，我們建議使用 `Map<String, String>` 來表示狀態。

在時段結束時，標記 `isFinalInvokeForWindow` 會設定為 `true` 以指示這是最終狀態，並且可隨時進行處理。處理完成後，時段結束並完成最終調用，然後丟棄該狀態。

在時段結束時，Lambda 會針對彙總結果上的動作使用最終處理。您的最終處理將同步調用。成功調用後，您的函數檢查點序號和串流處理將會繼續。如果調用失敗，則您的 Lambda 函數會暫停進一步處理，直至成功調用。

**Example DynamodbTimeWindowEvent**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Configuration
<a name="streams-tumbling-config"></a>

您可以在建立或更新事件來源對映時設定輪轉時段。若要設定輪轉時段，請以秒為單位指定時段 ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds))。下列 example AWS Command Line Interface (AWS CLI) 命令會建立輪轉時段為 120 秒的串流事件來源映射。針對彙總與處理定義的 Lambda 函數命名為 `tumbling-window-example-function`。

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda 根據記錄插入串流的時間，確定輪轉時段邊界。所有記錄都有 Lambda 在邊界確定中使用的近似時間戳記。

輪轉時段彙總不支援重新分區。分區結束後，Lambda 會考慮關閉時段，並且子分區會以新的狀態開始自己的時段。

輪轉時段完全支援現有的重試政策 `maxRetryAttempts` 和 `maxRecordAge`。

**Example Handler.py - 彙總與處理**  
下列 Python 函數示範了如何彙總，然後處理您的最終狀態：  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```