

如需與 Amazon Timestream for LiveAnalytics 類似的功能，請考慮使用 Amazon Timestream for InfluxDB。它提供簡化的資料擷取和單一位數毫秒查詢回應時間，以進行即時分析。[在這裡](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)進一步了解。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Amazon Kinesis
<a name="Kinesis"></a>

## 使用 Amazon Managed Service for Apache Flink
<a name="kinesis-via-flink"></a>

您可以使用 Managed Service for Apache Flink 的範例 Timestream 資料連接器，將資料從 Kinesis Data Streams 傳送至 Timestream for LiveAnalytics。如需詳細資訊，請參閱 [Amazon Managed Service for Apache Flink](ApacheFlink.md) for Apache Flink。

## 使用 EventBridge 管道將 Kinesis 資料傳送至 Timestream
<a name="Kinesis-via-pipes"></a>

您可以使用 EventBridge 管道將資料從 Kinesis 串流傳送至 Amazon Timestream for LiveAnalytics 資料表。

管道用於支援的來源和目標之間的點對點整合，並支援進階轉換和擴充。管道可減少開發事件驅動型架構時對專業知識和整合程式碼的需求。若要設定管道，您可以選擇來源、新增可選篩選、定義可選的擴充，以及選擇事件資料的目標。

![\[來源會將事件傳送至 EventBridge 管道，這會篩選相符的事件並將其路由至目標。\]](http://docs.aws.amazon.com/zh_tw/timestream/latest/developerguide/images/pipes-overview_shared_architecture.png)


此整合可讓您利用 Timestream時間序列資料分析功能，同時簡化資料擷取管道。

將 EventBridge 管道與 搭配使用 Timestream 可提供下列優點：
+ 即時資料擷取：將資料從 Kinesis 直接串流到 Timestream for LiveAnalytics，實現即時分析和監控。
+ 無縫整合：利用 EventBridge 管道來管理資料流程，而不需要複雜的自訂整合。
+ 增強型篩選和轉換：在 Kinesis 記錄存放於 之前對其進行篩選或轉換 Timestream ，以符合您的特定資料處理需求。
+ 可擴展性：使用內建平行處理和批次處理功能處理高輸送量資料串流，並確保有效率的資料處理。

### Configuration
<a name="Kinesis-via-pipes-config"></a>

若要設定 EventBridge 管道以將資料從 Kinesis 串流至 Timestream，請依照下列步驟執行：

1. 建立 Kinesis 串流

   確定您具有要從中擷取資料的作用中 Kinesis 資料串流。

1. 建立 Timestream 資料庫和資料表

   設定要存放資料的 Timestream 資料庫和資料表。

1. 設定 EventBridge 管道：
   + 來源：選取 Kinesis 串流做為來源。
   + 目標：選擇 Timestream 作為目標。
   + 批次處理設定：定義批次處理時段和批次大小，以最佳化資料處理並減少延遲。

**重要**  
設定管道時，建議您透過擷取一些記錄來測試所有組態的正確性。請注意，成功建立管道並不保證管道正確，而且資料會順利流動。當實際資料流經管道時，可能會發現執行時間錯誤，例如不正確的資料表、不正確的動態路徑參數，或套用映射後的無效 Timestream 記錄。

下列組態會決定擷取資料的速率：
+ BatchSize：將傳送至 Timestream for LiveAnalytics 的批次大小上限。範圍：0 - 100。建議將此值保留為 100，以取得最大輸送量。
+ MaximumBatchingWindowInSeconds：在批次傳送至 Timestream for LiveAnalytics 目標之前，等待填滿 batchSize 的時間上限。根據傳入事件的速率，此組態將決定擷取的延遲，建議將此值保留 < 10 秒，以近乎即時 Timestream 地將資料傳送至 。
+ ParallelizationFactor：每個碎片要同時處理的批次數。建議使用最大值 10，以取得最大輸送量和近乎即時的擷取。

  如果您的串流由多個目標讀取，請使用增強型廣發功能為您的管道提供專用消費者，以實現高輸送量。如需詳細資訊，請參閱*Kinesis Data Streams 《 使用者指南*》中的[使用 Kinesis Data Streams API 開發增強型廣發消費者](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html)。

**注意**  
可達到的最大輸送量取決於每個帳戶的[並行管道執行](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-quota.html#eb-pipes-limits)。

下列組態可確保防止資料遺失：
+ DeadLetterConfig：建議一律設定 DeadLetterConfig，以避免因使用者錯誤而無法將事件擷取至 Timestream for LiveAnalytics 的情況遺失任何資料。

使用下列組態設定最佳化管道的效能，這有助於防止記錄造成速度變慢或阻塞。
+ MaximumRecordAgeInSeconds：不會處理早於此值的記錄，且會直接移至 DLQ。我們建議將此值設定為不高於目標 Timestream 資料表的已設定記憶體存放區保留期間。
+ MaximumRetryAttempts：在記錄傳送至 DeadLetterQueue 之前，記錄的重試嘗試次數。建議將此設定為 10。這應該能夠協助解決任何暫時性問題，對於持久性問題，記錄將移至 DeadLetterQueue 並解除封鎖其餘的串流。
+ OnPartialBatchItemFailure：對於支援部分批次處理的來源，我們建議您啟用此功能，並將其設定為 AUTOMATIC\$1BISECT，以便在捨棄/傳送至 DLQ 之前額外重試失敗的記錄。

#### 組態範例
<a name="Kinesis-via-pipes-config-example"></a>

以下是如何設定 EventBridge 管道將資料從 Kinesis 串流串流到 Timestream 資料表的範例：

**Example IAM 的政策更新 Timestream**    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "timestream:WriteRecords"
            ],
            "Resource": [
                "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "timestream:DescribeEndpoints"
            ],
            "Resource": "*"
        }
    ]
}
```

**Example Kinesis 串流組態**  <a name="kinesis-stream-config.example"></a>

```
{
  "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream",
  "SourceParameters": {
    "KinesisStreamParameters": {
        "BatchSize": 100,
        "DeadLetterConfig": {
            "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue"
        },
       "MaximumBatchingWindowInSeconds": 5,
        "MaximumRecordAgeInSeconds": 1800,
        "MaximumRetryAttempts": 10,
        "StartingPosition": "LATEST",
       "OnPartialBatchItemFailure": "AUTOMATIC_BISECT"
    }
  }
}
```

**Example Timestream 目標組態**  <a name="kinesis-stream-config.example"></a>

```
{
    "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table",
    "TargetParameters": {
        "TimestreamParameters": {
            "DimensionMappings": [
                {
                    "DimensionName": "sensor_id",
                    "DimensionValue": "$.data.device_id",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "DimensionName": "sensor_type",
                    "DimensionValue": "$.data.sensor_type",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "DimensionName": "sensor_location",
                    "DimensionValue": "$.data.sensor_loc",
                    "DimensionValueType": "VARCHAR"
                }
            ],
            "MultiMeasureMappings": [
                {
                    "MultiMeasureName": "readings",
                    "MultiMeasureAttributeMappings": [
                        {
                            "MultiMeasureAttributeName": "temperature",
                            "MeasureValue": "$.data.temperature",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "MultiMeasureAttributeName": "humidity",
                            "MeasureValue": "$.data.humidity",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "MultiMeasureAttributeName": "pressure",
                            "MeasureValue": "$.data.pressure",
                            "MeasureValueType": "DOUBLE"
                        }
                    ]
                }
            ],
            "SingleMeasureMappings": [],
            "TimeFieldType": "TIMESTAMP_FORMAT",
            "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
            "TimeValue": "$.data.time",
            "VersionValue": "$.approximateArrivalTimestamp"
        }
    }
}
```



### 事件轉換
<a name="Kinesis-via-pipes-trans"></a>

EventBridge 管道可讓您在資料到達之前進行轉換 Timestream。您可以定義轉換規則來修改傳入 Kinesis 的記錄，例如變更欄位名稱。

假設您的 Kinesis 串流包含溫度和濕度資料。您可以使用 EventBridge 轉換來重新命名這些欄位，然後再將其插入 Timestream。

### 最佳實務
<a name="Kinesis-via-pipes-best"></a>

**批次處理和緩衝**
+ 設定批次處理時段和大小，以平衡寫入延遲和處理效率。
+ 使用批次處理時段在處理之前累積足夠的資料，減少頻繁的小型批次的額外負荷。

**平行處理**

利用 **ParallelizationFactor** 設定來增加並行，尤其是高輸送量串流。這可確保每個碎片的多個批次可以同時處理。

**資料轉換**

利用 EventBridge 管道的轉換功能，在將記錄存放在其中之前篩選和增強記錄 Timestream。這有助於使資料與您的分析需求保持一致。

**安全性**
+ 確保用於 EventBridge 管道的 IAM 角色具有讀取 Kinesis 和寫入的必要許可 Timestream。
+ 使用加密和存取控制措施來保護傳輸中和靜態資料。

### 偵錯失敗
<a name="Kinesis-via-pipes-debug"></a>
+ **自動停用管道**

  如果目標不存在或有許可問題，管道將在約 2 小時內自動停用
+ **限流**

  管道能夠自動關閉並重試，直到調節減少為止。
+ **啟用日誌**

  我們建議您在錯誤層級啟用日誌，並包含執行資料，以深入了解失敗。發生任何失敗時，這些日誌將包含傳送/接收的請求/回應 Timestream。這可協助您了解相關的錯誤，並視需要在修正後重新處理記錄。

### 監控
<a name="Kinesis-via-pipes-monitor"></a>

我們建議您設定下列警示，以偵測資料流程的任何問題：
+ 來源中記錄的最長存留期
  + `GetRecords.IteratorAgeMilliseconds`
+ 管道中的失敗指標
  + `ExecutionFailed`
  + `TargetStageFailed`
+ Timestream 寫入 API 錯誤
  + `UserErrors`

如需其他監控指標，請參閱*EventBridge 《 使用者指南*》中的[監控 EventBridge](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-monitoring.html#eb-metrics)。