Amazon Kinesis - Amazon Timestream

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Kinesis

您可以将数据从 Kinesis Data Streams 发送到 Timestream LiveAnalytics ,以便使用 Apache Flink 托管服务的 Timestream 示例数据连接器。有关更多信息,适用于 Apache Flink 的亚马逊托管服务请参阅 Apache Flink。

使用 EventBridge 管道将 Kinesis 数据发送到 Timestream

你可以使用 Pip EventBridge es 将数据从 Kinesis 流发送到 Amazon Time LiveAnalytics stream 作为表格。

Pipes 旨在在支持的源和目标之间 point-to-point进行集成,并支持高级转换和扩展。Pipes 减少了开发事件驱动架构时对专业知识和集成代码的需求。要设置管道,请选择源、添加可选筛选、定义可选富集,然后为事件数据选择目标。

源向管道发送事件, EventBridge 管道过滤匹配的事件并将其路由到目标。

这种集成使您能够利用时间序列数据分析功能 Timestream的强大功能,同时简化您的数据摄取管道。

将 Pip EventBridge es 与配合使用 Timestream 具有以下好处:

  • 实时数据摄取:将数据从 Kinesis 直接流式传输到 Timestream LiveAnalytics,从而实现实时分析和监控。

  • 无缝集成:利用 EventBridge 管道管理数据流,无需复杂的自定义集成。

  • 增强的筛选和转换:在 Kinesis 记录存储之前对其进行筛选或转换 Timestream ,以满足您的特定数据处理要求。

  • 可扩展性:利用内置的并行和批处理功能,处理高吞吐量数据流,确保高效的数据处理。

配置

要设置 Pi EventBridge pe 以将数据从 Kinesis 流式传输到 Timestream,请按照以下步骤操作:

  1. 创建 Kinesis 流

    确保您有一个活跃的 Kinesis 数据流,您要从中提取数据。

  2. 创建 Timestream 数据库和表

    设置存储数据的 Timestream 数据库和表。

  3. 配置 EventBridge 管道:

    • 来源:选择你的 Kinesis 直播作为来源。

    • 目标:选择 Timestream 作为目标。

    • 批处理设置:定义批处理窗口和批处理大小,以优化数据处理并减少延迟。

重要

设置管道时,我们建议通过摄取一些记录来测试所有配置的正确性。请注意,成功创建管道并不能保证管道的正确性,也不能保证数据流畅无误。可能存在运行时错误,例如表不正确、动态路径参数不正确或应用映射后的 Timestream 记录无效,这些错误将在实际数据流过管道时被发现。

以下配置决定了数据摄取的速率:

  • BatchSize:将发送到 Timestream 的批次的最大大小。 LiveAnalytics射程:0-100。建议将此值保持为 100 以获得最大吞吐量。

  • MaximumBatchingWindowInSeconds:在将批次发送到 Timestream 作为 LiveAnalytics 目标 batchSize 之前,等待填充的最长时间。根据传入事件的速率,此配置将决定摄取的延迟,建议将此值保持 Timestream 在 < 10 秒,以保持近乎实时地向其发送数据。

  • ParallelizationFactor:每个分片中要同时处理的批次数。建议使用最大值 10 来获得最大吞吐量和近乎实时的摄取。

    如果您的直播被多个目标读取,请使用增强的扇出功能为您的管道提供专门的使用者,以实现高吞吐量。有关更多信息,请参阅使用Kinesis Data Streams 用户指南 Kinesis Data Streams API中的开发增强型扇出消费者

注意

可以实现的最大吞吐量受每个账户并发管道执行的限制。

以下配置可确保防止数据丢失:

  • DeadLetterConfig:建议始终进行配置, DeadLetterConfig 以避免 LiveAnalytics 由于用户错误而无法将事件提取到 Timestream 时丢失任何数据。

使用以下配置设置优化管道的性能,这有助于防止记录导致速度减慢或阻塞。

  • MaximumRecordAgeInSeconds: 超过此日期的记录将不会被处理,而是会直接移至DLQ。我们建议将此值设置为不高于目标 Timestream 表配置的内存存储保留期。

  • MaximumRetryAttempts:记录发送到 DeadLetterQueue之前重试记录的次数。建议将其配置为 10。这应该能够帮助解决任何暂时性问题,对于持续存在的问题,记录将被移至直播的其余部分 DeadLetterQueue 并解除封锁。

  • OnPartialBatchItemFailure:对于支持部分批处理的来源,我们建议您启用此功能并将其配置为 AUTOMATIC _,以便在删除/发送到之前BISECT对失败的记录进行额外重试。DLQ

配置示例

以下是如何配置 Pi EventBridge pe 以将数据从 Kinesis 流传输到表的 Timestream 示例:

例 IAM 的政策更新 Timestream
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "timestream:WriteRecords" ], "Resource": [ "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table" ] }, { "Effect": "Allow", "Action": [ "timestream:DescribeEndpoints" ], "Resource": "*" } ] }
例 Kinesis 直播配置
{ "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream", "SourceParameters": { "KinesisStreamParameters": { "BatchSize": 100, "DeadLetterConfig": { "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue" }, "MaximumBatchingWindowInSeconds": 5, "MaximumRecordAgeInSeconds": 1800, "MaximumRetryAttempts": 10, "StartingPosition": "LATEST", "OnPartialBatchItemFailure": "AUTOMATIC_BISECT" } } }
例 Timestream 目标配置
{ "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table", "TargetParameters": { "TimestreamParameters": { "DimensionMappings": [ { "DimensionName": "sensor_id", "DimensionValue": "$.data.device_id", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_type", "DimensionValue": "$.data.sensor_type", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_location", "DimensionValue": "$.data.sensor_loc", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": [ { "MultiMeasureName": "readings", "MultiMeasureAttributeMappings": [ { "MultiMeasureAttributeName": "temperature", "MeasureValue": "$.data.temperature", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "humidity", "MeasureValue": "$.data.humidity", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "pressure", "MeasureValue": "$.data.pressure", "MeasureValueType": "DOUBLE" } ] } ], "SingleMeasureMappings": [], "TimeFieldType": "TIMESTAMP_FORMAT", "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS", "TimeValue": "$.data.time", "VersionValue": "$.approximateArrivalTimestamp" } } }

事件转换

EventBridge 管道允许您在数据到达之前对其进行转换 Timestream。您可以定义转换规则来修改传入的 Kinesis 记录,例如更改字段名称。

假设您的 Kinesis 直播中包含温度和湿度数据。在将这些字段插入之前,您可以使用 EventBridge 转换来重命名这些字段 Timestream。

最佳实践

批处理和缓冲

  • 配置批处理窗口和大小,以便在写入延迟和处理效率之间取得平衡。

  • 在处理之前,使用批处理窗口积累足够的数据,从而减少频繁的小批量处理的开销。

并行处理

利用该ParallelizationFactor设置来提高并发度,特别是对于高吞吐量流。这样可以确保可以同时处理来自每个分片的多个批次。

数据转换

利用 Pip EventBridge es 的转换功能对记录进行筛选和增强,然后再将其存储到其中 Timestream。这有助于使数据与您的分析要求保持一致。

安全性

  • 确保用于 Pip EventBridge es 的IAM角色具有读取 Kinesis 和写入所需的权限 Timestream。

  • 使用加密和访问控制措施来保护传输中的数据和静态数据。

调试失败

  • 自动禁用管道

    如果目标不存在或存在权限问题,则管道将在大约 2 小时后自动禁用

  • 限制

    管道能够自动退缩并重试,直到油门减小。

  • 启用日志

    我们建议您启用ERROR级别的日志并包含执行数据,以便更深入地了解失败。如果出现任何故障,这些日志将包含request/response sent/received来自 Timestream。这可以帮助您了解相关的错误,并在修复错误后根据需要重新处理记录。

监控

我们建议您设置以下警报,以检测数据流的任何问题:

  • 来源中记录的最大保存期限

    • GetRecords.IteratorAgeMilliseconds

  • 管道中的故障指标

    • ExecutionFailed

    • TargetStageFailed

  • Timestream 写入API错误

    • UserErrors

有关其他监控指标,请参阅《EventBridge 用户指南》 EventBridge中的监控