

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics，可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间，以实现实时分析。点击[此处](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Amazon Kinesis
<a name="Kinesis"></a>

## 使用 适用于 Apache Flink 的 Amazon 托管服务
<a name="kinesis-via-flink"></a>

您可以将数据从 Kinesis Data Streams 发送到 Timestream LiveAnalytics ，以便使用 Apache Flink 托管服务的 Timestream 示例数据连接器。有关更多信息，请参阅 Apache Flink 的 [适用于 Apache Flink 的亚马逊托管服务](ApacheFlink.md)。

## 使用 EventBridge 管道将 Kinesis 数据发送到 Timestream
<a name="Kinesis-via-pipes"></a>

你可以使用 Pip EventBridge es 将数据从 Kinesis 流发送到 Amazon Time LiveAnalytics stream 作为表格。

Pipes 旨在在支持的源和目标之间 point-to-point进行集成，并支持高级转换和扩展。在开发事件驱动型架构时，管道可减少对专业知识和集成代码的需求。要设置管道，请选择源、添加可选筛选、定义可选富集，然后为事件数据选择目标。

![\[源向管道发送事件， EventBridge 管道过滤匹配的事件并将其路由到目标。\]](http://docs.aws.amazon.com/zh_cn/timestream/latest/developerguide/images/pipes-overview_shared_architecture.png)


这种集成使您能够利用时间序列数据分析功能 Timestream的强大功能，同时简化您的数据摄取管道。

将 Pip EventBridge es 与配合使用 Timestream 具有以下好处：
+ 实时数据摄取：将数据从 Kinesis 直接流式传输到 Timestream LiveAnalytics，从而实现实时分析和监控。
+ 无缝集成：利用 EventBridge 管道管理数据流，无需复杂的自定义集成。
+ 增强的筛选和转换：在 Kinesis 记录存储之前对其进行筛选或转换 Timestream ，以满足您的特定数据处理要求。
+ 可扩展性：利用内置的并行和批处理功能，处理高吞吐量数据流，并确保高效的数据处理。

### 配置
<a name="Kinesis-via-pipes-config"></a>

要设置 Pi EventBridge pe 以将数据从 Kinesis 流式传输到 Timestream，请按照以下步骤操作：

1. 创建 Kinesis 流

   确保您有活跃的 Kinesis 数据流，可用于摄取数据。

1. 创建 Timestream 数据库和表

   设置存储数据的 Timestream 数据库和表。

1. 配置 EventBridge 管道：
   + 来源：选择 Kinesis 流作为来源。
   + 目标：选择 Timestream 作为目标。
   + 批处理设置：定义批处理窗口及批处理大小，以优化数据处理并减少延迟。

**重要**  
设置管道时，我们建议通过摄取少量记录以测试所有配置是否正确。请注意，成功创建管道并不能保证管道配置正确，也不能保证数据流畅无误。可能存在运行时错误，例如表不正确、动态路径参数不正确或应用映射后的 Timestream 记录无效，这些错误将在实际数据流过管道时被发现。

以下配置决定数据摄取的速率：
+ BatchSize：将发送到 Timestream 的批次的最大大小。 LiveAnalytics范围：0-100。建议将此值保持为 100，以获得最大吞吐量。
+ MaximumBatchingWindowInSeconds：在将批次发送到 Timestream 作为目标之前，等待填充 batchSize 的最长时间。 LiveAnalytics 根据传入事件的速率，此配置将决定摄取的延迟，建议将此值保持 Timestream 在 < 10 秒，以保持近乎实时地向其发送数据。
+ ParallelizationFactor：每个分片中要同时处理的批次数。建议使用最大值 10，以实现最大吞吐量和近乎实时的摄取。

  如果您的数据流被多个目标读取，请使用增强型扇出功能为管道提供专属消费者，从而实现高吞吐量。有关更多信息，请参阅*Kinesis Data Streams 用户*指南中的[使用 Kinesis Data Streams API 开发增强型扇出消费者](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html)。

**注意**  
每个账户可实现的最大吞吐量受限于[并发管道执行数](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-quota.html#eb-pipes-limits)。

以下配置可防止数据丢失：
+ DeadLetterConfig：建议始终进行配置， DeadLetterConfig 以避免 LiveAnalytics 由于用户错误而无法将事件提取到 Timestream 时丢失任何数据。

使用以下配置设置优化管道性能，这有助于防止记录导致性能下降或堵塞。
+ MaximumRecordAgeInSeconds: 超过此日期的记录将不予处理，并将直接移至 DLQ。我们建议将此值设置为不高于目标 Timestream 表配置的内存存储保留期。
+ MaximumRetryAttempts：记录发送到 DeadLetterQueue之前重试记录的次数。建议将此项配置为 10。这应该能够帮助解决任何暂时性问题，对于持续存在的问题，记录将被移至直播的其余部分 DeadLetterQueue 并解除封锁。
+ OnPartialBatchItemFailure：对于支持部分批处理的来源，我们建议您启用此功能并将其配置为 AUTOMATIC\$1BISECT，以便在 DLQ 之前对失败的记录进行额外重试。 dropping/sending 

#### 配置示例
<a name="Kinesis-via-pipes-config-example"></a>

以下是如何配置 Pi EventBridge pe 以将数据从 Kinesis 流传输到表的 Timestream 示例：

**Example IAM 的政策更新 Timestream**    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "timestream:WriteRecords"
            ],
            "Resource": [
                "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "timestream:DescribeEndpoints"
            ],
            "Resource": "*"
        }
    ]
}
```

**Example Kinesis 流配置**  <a name="kinesis-stream-config.example"></a>

```
{
  "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream",
  "SourceParameters": {
    "KinesisStreamParameters": {
        "BatchSize": 100,
        "DeadLetterConfig": {
            "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue"
        },
       "MaximumBatchingWindowInSeconds": 5,
        "MaximumRecordAgeInSeconds": 1800,
        "MaximumRetryAttempts": 10,
        "StartingPosition": "LATEST",
       "OnPartialBatchItemFailure": "AUTOMATIC_BISECT"
    }
  }
}
```

**Example Timestream 目标配置**  <a name="kinesis-stream-config.example"></a>

```
{
    "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table",
    "TargetParameters": {
        "TimestreamParameters": {
            "DimensionMappings": [
                {
                    "DimensionName": "sensor_id",
                    "DimensionValue": "$.data.device_id",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "DimensionName": "sensor_type",
                    "DimensionValue": "$.data.sensor_type",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "DimensionName": "sensor_location",
                    "DimensionValue": "$.data.sensor_loc",
                    "DimensionValueType": "VARCHAR"
                }
            ],
            "MultiMeasureMappings": [
                {
                    "MultiMeasureName": "readings",
                    "MultiMeasureAttributeMappings": [
                        {
                            "MultiMeasureAttributeName": "temperature",
                            "MeasureValue": "$.data.temperature",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "MultiMeasureAttributeName": "humidity",
                            "MeasureValue": "$.data.humidity",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "MultiMeasureAttributeName": "pressure",
                            "MeasureValue": "$.data.pressure",
                            "MeasureValueType": "DOUBLE"
                        }
                    ]
                }
            ],
            "SingleMeasureMappings": [],
            "TimeFieldType": "TIMESTAMP_FORMAT",
            "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
            "TimeValue": "$.data.time",
            "VersionValue": "$.approximateArrivalTimestamp"
        }
    }
}
```



### 事件转换
<a name="Kinesis-via-pipes-trans"></a>

EventBridge 管道允许您在数据到达之前对其进行转换 Timestream。您可以定义转换规则来修改传入的 Kinesis 记录，例如更改字段名称。

假设您的 Kinesis 直播包含温度和湿度数据。在将这些字段插入之前，您可以使用 EventBridge 转换来重命名这些字段 Timestream。

### 最佳实践
<a name="Kinesis-via-pipes-best"></a>

**批处理和缓冲**
+ 配置批处理窗口和大小，以在写入延迟和处理效率之间取得平衡。
+ 使用批处理窗口在处理前积累足够的数据，从而减少频繁进行小规模批处理的开销。

**并行处理**

利用该**ParallelizationFactor**设置来提高并发度，特别是对于高吞吐量流。这确保每个分片的多批次数据能够同时进行处理。

**数据转换**

利用 Pip EventBridge es 的转换功能在记录存储之前对其进行筛选和增强 Timestream。这有助于确保数据与分析要求保持一致。

**安全性**
+ 确保用于 Pip EventBridge es 的 IAM 角色具有读取 Kinesis 和写入所需的权限 Timestream。
+ 使用加密和访问控制措施，以保护传输中数据和静态数据。

### 调试失败
<a name="Kinesis-via-pipes-debug"></a>
+ **自动禁用管道**

  如果目标不存在或存在权限问题，管道将在约 2 小时后自动禁用
+ **限制**

  管道具备自动后退并重试的能力，直至节流有所减弱。
+ **启用日志**

  建议您启用错误级别的日志，并包含执行数据，以便更深入地分析失败原因。如果出现任何故障，这些日志将包含 request/response 发送/接收自。 Timestream这有助于您了解相关的错误，并在修复错误后重新处理记录（如有需要）。

### 监控
<a name="Kinesis-via-pipes-monitor"></a>

建议您设置以下方面的警报，以检测数据流的任何问题：
+ 源中记录的最长存留时间
  + `GetRecords.IteratorAgeMilliseconds`
+ 管道中的故障指标
  + `ExecutionFailed`
  + `TargetStageFailed`
+ Timestream 写入 API 错误
  + `UserErrors`

有关其他监控指标，请参阅《EventBridge 用户指南》**中的[监控 EventBridge](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-monitoring.html#eb-metrics)。