并行处理输入流以增加吞吐量 - SQL适用于应用程序的 Amazon Kinesis Data Analytics 开发者指南

经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:

1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。

2. 我们将从 2026 年 1 月 27 日起删除您的申请。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

并行处理输入流以增加吞吐量

注意

2023 年 9 月 12 日之后,如果您尚未使用适用于 SQL 的 Kinesis Data Analytics,则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。有关更多信息,请参阅限制

Amazon Kinesis Data Analytics 应用程序可以支持多个应用程序内部输入流,以将应用程序扩展到超出单个应用程序内部输入流的吞吐量。有关应用程序内部输入流的更多信息,请参阅 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics:工作原理

几乎在所有情况下,Amazon Kinesis Data Analytics 都会扩展您的应用程序,以处理馈送到您的应用程序的 Kinesis 流或 Firehose 源流的容量。但是,如果您的源流的吞吐量超出单个应用程序内部输入流的吞吐量,您可显式增加您的应用程序使用的应用程序内部输入流的数量。您需使用 InputParallelism 参数执行此操作。

如果 InputParallelism 参数大于 1,则 Amazon Kinesis Data Analytics 在应用程序内部流之间平均拆分源流的分区。例如,如果您的源流具有 50 个分片,并且您已将 InputParallelism 设置为 2,则每个应用程序内部输入流都将收到来自 25 个源流分片的输入。

当您增加应用程序内部流的数量后,您的应用程序必须显式访问每个流中的数据。有关通过代码访问多个应用程序内部流的信息,请参阅在 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内部流

尽管 Kinesis Data Streams 和 Firehose 流分片都以相同的方式划分到应用程序内部流,但它们在应用程序中的显示方式有所不同:

  • Kinesis 数据流中的记录包含一个 shard_id 字段,可用于指定记录的源分片。

  • 来自 Firehose 传输流的记录不包含用于标识记录源分片或分区的字段。这是因为 Firehose 会将这些信息从您的应用程序中抽象出来。

评估是否增加您的应用程序内部输入流的数量

在大多数情况下,单个应用程序内部输入流可处理单个源流的吞吐量,具体取决于输入流的复杂度和数据大小。要确定是否需要增加应用程序内输入流的数量,您可以在 Amazon CloudWatch 中监控InputBytesMillisBehindLatest指标。

如果 InputBytes 指标大于 100 MB/秒(或您预期该指标将大于此速率),这可能会使 MillisBehindLatest 增大并导致应用程序问题的影响扩大。为解决此问题,我们建议您为应用程序选择以下语言:

如果 MillisBehindLatest 指标具有以下任一特征,则应调高您的应用程序的 InputParallelism 设置:

  • MillisBehindLatest 指标逐渐增大,指示您的应用程序落后于流中的最新数据。

  • MillisBehindLatest 指标始终高于 1000 (1 秒)。

如果满足以下条件,您无需调高您的应用程序的 InputParallelism 设置:

  • MillisBehindLatest 指标逐渐减小,指示您的应用程序跟上了流中的最新数据。

  • MillisBehindLatest 指标小于 1000 (1 秒)。

有关使用的更多信息 CloudWatch,请参阅《CloudWatch 用户指南》

实施多个应用程序内部输入流

如果应用程序是使用 CreateApplication 创建的,您可以设置应用程序内部输入流的数量。您应在使用UpdateApplication创建应用程序之后设置此数量。

注意

你只能使用 Amazon Kinesis Data Analytics API 或 AWS CLI设置 InputParallelism。您不能使用来设置此设置 AWS Management Console。有关设置的信息 AWS CLI,请参阅第 2 步:设置 AWS Command Line Interface (AWS CLI)

设置新应用程序的输入流计数

以下示例说明了如何使用 CreateApplication API 操作将新应用程序的输入流计数设置为 2。

有关 CreateApplication 的更多信息,请参阅CreateApplication

{ "ApplicationCode": "<The SQL code the new application will run on the input stream>", "ApplicationDescription": "<A friendly description for the new application>", "ApplicationName": "<The name for the new application>", "Inputs": [ { "InputId": "ID for the new input stream", "InputParallelism": { "Count": 2 }], "Outputs": [ ... ], }] }

设置现有应用程序的输入流计数

以下示例说明了如何使用 UpdateApplication API 操作将现有应用程序的输入流计数设置为 2。

有关 Update_Application 的更多信息,请参阅UpdateApplication

{ "InputUpdates": [ { "InputId": "yourInputId", "InputParallelismUpdate": { "CountUpdate": 2 } } ], }

在 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内部流

要使用您的应用程序中的多个应用程序内部输入流,您必须从不同的流中显式选择。以下代码示例说明了如何在创建于“入门”教程中的应用程序中查询多个输入流。

在以下示例中,每个源流在组合到名为 的单个应用程序内部流之前先通过 COUNTin_application_stream001 进行了聚合。预先聚合源流有助于确保组合的应用程序内部流可处理来自多个流的流量而不会过载。

注意

要运行此示例并获得来自应用程序内部输入流的结果,请更新源流中的分片数和应用程序中的 InputParallelism 参数。

CREATE OR REPLACE STREAM in_application_stream_001 ( ticker VARCHAR(64), ticker_count INTEGER ); CREATE OR REPLACE PUMP pump001 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND), ticker_symbol; CREATE OR REPLACE PUMP pump002 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_002 GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND), ticker_symbol;

前面的代码示例将在 in_application_stream001 中生成类似于下面的输出:

Table showing ROWTIME, TICKER, and TICKER_COUNT columns with sample data entries.

其他注意事项

在使用多个输入流时,请注意以下事项:

  • 应用程序内部输入流的最大数量为 64。

  • 应用程序内部输入流在应用程序的输入流的分片之间均匀分配。

  • 通过添加应用程序内部流获得的性能改进无法线性扩展。也就是说,使应用程序内部流的数量加倍不会使吞吐量加倍。对于典型行大小,每个应用程序内部流可实现每秒大约 5,000 到 15,000 行的吞吐量。通过将应用程序内部流计数增加到 10,您可以实现每秒 20,000 到 30,000 行的吞吐量。吞吐量流速取决于输入流中的字段的计数、数据类型和数据大小。

  • 某些聚合函数(如 AVG)在应用于分区到不同分片中的输入流时可能生成意外结果。由于您需要在将各个分片组合到聚合流中之前对它们运行聚合操作,因此结果可能向包含更多记录的流加权。

  • 如果您增加了输入流的数量后,应用程序性能仍然很差 (反映在较高的 MillisBehindLatest 指标上),您可能已达到 Kinesis 处理单元 (KPU) 的限制。有关更多信息,请参阅 自动扩展应用程序以提高吞吐量