亚马逊 Kinesis 直播作为 Pipes 的来源 EventBridge - Amazon EventBridge

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

亚马逊 Kinesis 直播作为 Pipes 的来源 EventBridge

你可以使用 Pi EventBridge pes 接收 Kinesis 数据流中的记录。然后,您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标之一进行处理。在设置管道时,可以选择特定于 Kinesis 的设置。 EventBridge 当将数据发送到目标时,管道会保持数据流中记录的顺序。

Kinesis 数据流是一组分区。每个分片包含一系列数据记录。使用者 是一种处理 Kinesis 数据流中的数据的应用程序。您可以将 Pi EventBridge pe 映射到共享吞吐量使用者(标准迭代器),或者映射到具有增强扇出功能的专用吞吐量使用器。

对于标准迭代器, EventBridge 使用HTTP协议轮询您的 Kinesis 流中的每个分片以获取记录。管道与分片的其他使用者共享读取吞吐量。

为了最大限度地减少延迟并最大限度地提高读取吞吐量,您可以创建具有增强扇出功能的数据流使用者。流使用者将获得与每个分片的专用连接,这不会影响从流中读取信息的其他应用程序。如果您有许多应用程序读取相同的数据,或者您正在重新处理具有大记录的流,则专用吞吐量可以提供帮助。Kinesis 将记录推高到 /2 以上。 EventBridge HTTP有关 Kinesis 数据流的信息,请参阅读取 Amazon Kinesis Data Streams 中的数据

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 Amazon P EventBridge ipes 中的事件筛选

[ { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ]

轮询和批处理流

EventBridge 以每秒四次的基本速率对你的 Kinesis 直播中的碎片进行民意调查,寻找记录。当有记录可用时, EventBridge 处理事件并等待结果。如果处理成功,则 EventBridge 恢复轮询直到收到更多记录。

默认情况下,只要有记录,就会 EventBridge 调用你的管道。如果从源 EventBridge 读取的批次中只有一条记录,则只处理一个事件。为避免处理数量较少的记录,您可以配置批处理时段,让管道缓冲最多五分钟的记录。在处理事件之前,会 EventBridge 继续从源读取记录,直到它收集了完整批次、批处理窗口到期或批处理达到 6 MB 的有效载荷限制。

您还可以通过将来自各个分区的多个批次并行处理来增加并发性。 EventBridge 每个分片中最多可以同时处理 10 个批次。如果增加每个分片的并发批次数, EventBridge 仍可确保分区键级别的按顺序处理。

配置 ParallelizationFactor 设置,同时处理 Kinesis 或 DynamoDB 数据流一个分片中的多个管道执行。您可以通过从 1(默认)到 10 的并行化系数指定从分片 EventBridge 轮询的并发批次数。例如,当您设置为 ParallelizationFactor 2 时,最多可以有 200 个并发 EventBridge Pipe 执行来处理 100 个 Kinesis 数据分片。这有助于在数据量不稳定并且 IteratorAge 较高时纵向扩展处理吞吐量。请注意,如果使用 Kinesis 聚合,并行化因子将不起作用。

轮询和流的起始位置

请注意,管道创建和更新期间的流源轮询最终将是一致的。

  • 在管道创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在管道更新源轮询配置期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

这意味着,如果您指定 LATEST 作为流的起始位置,在创建或更新管道期间,管道可能会错过发送的事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZONAT_TIMESTAMP

报告批处理项目失败

在 EventBridge 使用和处理来自源的流数据时,默认情况下,它会检查批次的最高序号,但前提是批处理完全成功时。为避免重新处理失败批次中已成功处理的消息,您可以配置富集或目标,返回对象来指示哪些消息处理成功、哪些失败。这称为部分批处理响应。

有关更多信息,请参阅 部分批处理故障

成功和失败的条件

如果您返回以下任意一项,则将批次 EventBridge 视为完全成功:

  • 空的 batchItemFailure 列表

  • Null batchItemFailure 列表

  • 空的 EventResponse

  • Null EventResponse

如果您返回以下任意一项,则会将批次 EventBridge 视为完全失败:

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

EventBridge 根据您的重试策略重试失败。