

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Lambda 函数预处理数据
<a name="lambda-preprocessing"></a>

**注意**  
2023 年 9 月 12 日之后，如果您尚未使用 Kinesis Data Analytics for SQL，则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。有关更多信息，请参阅[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

如果您的流中的数据需要格式转换、转换、丰富或过滤，则可以使用 AWS Lambda 函数对数据进行预处理。您可以在执行应用程序 SQL 代码或应用程序通过数据流创建架构之前执行此操作。

在以下情况下，Lambda 函数有助于预处理记录：
+ 将其他格式 (例如 KPL 或 GZIP) 的记录转换为 Kinesis Data Analytics 可以分析的格式。Kinesis Data Analytics 目前支持 JSON 或 CSV 数据格式。
+ 将数据扩展为聚合或异常检测等操作更易访问的格式。例如，如果多个数据值存储在同一字符串中，您可以将数据展开为多个分开的列。
+ 利用其他 Amazon 服务进行数据扩充，例如外推或错误更正。
+ 将复杂的字符串转换应用于记录字段。
+ 用于整理数据的数据筛选。

## 使用 Lambda 函数预处理记录
<a name="lambda-preprocessing-use"></a>

在创建 Kinesis Data Analytics 应用程序时，您可在 **连接到源 **页面上启用 Lambda 预处理。

**使用 Lambda 函数在 Kinesis Data Analytics 应用程序中预处理记录**

1. [登录 AWS 管理控制台 并打开适用于 Apache Flink 的托管服务控制台，网址为 /kinesisanalyt https://console.aws.amazon.com ics。](https://console.aws.amazon.com/kinesisanalytics)

1. 在应用程序的 **连接到源 ** 页面上，在 **使用 预处理记录 AWS Lambda**部分选择 **已启用。**

1. 如需使用已创建的 Lambda 函数，请在 **Lambda 函数 **下拉列表中选择该函数。

1. 如需通过某个 Lambda 预处理模板创建新的 Lambda 函数，请从下拉列表中选择该模板。然后，选择 **View <template name> in Lambda (在 Lambda 中查看 <模板名称>)** 以编辑该函数。

1. 如需新建 Lambda 函数，请选择 **新建**。*有关创建 Lambda 函数的信息，请参阅开发人员指南中的[创建 HelloWorld Lambda 函数和浏览控制台](https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html)。AWS Lambda *

1. 选择要使用的 Lambda 函数的版本。要使用最新版本，请选择 **\$1LATEST**。

在选择或创建 Lambda 函数以预处理记录时，将在执行应用程序 SQL 代码或应用程序通过记录生成架构之前预处理记录。

## Lambda 预处理权限
<a name="lambda-preprocessing-policy"></a>

使用 Lambda 进行预处理，应用程序的 IAM 角色需要具有以下权限策略：

```
     {
       "Sid": "UseLambdaFunction",
       "Effect": "Allow",
       "Action": [
           "lambda:InvokeFunction",
           "lambda:GetFunctionConfiguration"
       ],
       "Resource": "<FunctionARN>"
   }
```

## Lambda 预处理指标
<a name="lambda-preprocessing-metrics"></a>

您可以使用 Amazon CloudWatch 来监控 Lambda 调用的次数、处理的字节数、成功和失败次数等。[有关 Kinesis Data Analytics Lambda 预处理发出的 CloudWatch 指标的信息，请参阅亚马逊 Kinesis Analytics 指标。](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)

## AWS Lambda 与 Kinesis 制作人库配合使用
<a name="lambda-preprocessing-deaggregation"></a>

[Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)（KPL）将较小的用户格式化记录聚合为较大的记录（最大为 1 MB），以更好地利用 Amazon Kinesis Data Streams 吞吐量。用于 Java 的 Kinesis 客户端库 (KCL) 支持取消聚合这些记录。但是，当你用作直播的使用者时，必须使用 AWS Lambda 特殊模块来解聚记录。

要获取必要的项目代码和说明，请参阅 [Kinesis Producer 库解聚模块](https://github.com/awslabs/kinesis-deaggregation)以获取相关信息。 AWS Lambda GitHub您可以使用此项目中的组件在 Java、Node.js 和 Python AWS Lambda 中处理 KPL 序列化数据。您也可以将这些组件用在[多语言 KCL 应用程序](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java)中。

## 数据预处理事件输入数据 Model/Record 响应模型
<a name="lambda-preprocessing-data-model"></a>

要预处理记录，您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。

### 事件输入数据模型
<a name="lambda-preprocessing-request-model"></a>

Kinesis Data Analytics 持续从您的 Kinesis 数据流或 Firehose 传输流读取数据。对于检索的每一批记录，此服务管理如何将每个批次传送到您的 Lambda 函数。您的函数将接收到的记录列表作为输入。在您的函数中，您对列表进行迭代，并应用业务逻辑来完成您的预处理要求 (如数据格式转换或扩充)。

您的预处理函数的输入模型会稍有不同，具体取决于是从 Kinesis 数据流还是 Firehose 传输流接收数据。

如果源是 Firehose 传输流，则事件输入数据模型如下所示：

**Kinesis Data Firehose 请求数据模型**


| 字段 | 说明 | 
| --- | --- | 
| 字段 | 说明 | 
| --- | --- | 
| 字段 | 说明 | 
| --- | --- | 
| invocationId | Lambda 调用 ID (随机 GUID)。 | 
| applicationArn | Kinesis Data Analytics 应用程序 Amazon 资源名称 (ARN) | 
| streamArn | 传输流 ARN | 
| 记录 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 记录 ID (随机 GUID) | 
| kinesisFirehoseRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Base64 编码的源记录负载 | 
| approximateArrivalTimestamp | 传输流记录大致到达时间 | 

以下示例显示来自 Firehose 传输流的输入：

```
{
   "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
   "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
   "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test",
   "records":[
      {
         "recordId":"49572672223665514422805246926656954630972486059535892482",
         "data":"aGVsbG8gd29ybGQ=",
         "kinesisFirehoseRecordMetadata":{
            "approximateArrivalTimestamp":1520280173
         }
      }
   ]
}
```

如果源是 Kinesis 数据流，事件输入数据模型如下所示：

**Kinesis 流请求数据模型**


| 字段 | 说明 | 
| --- | --- | 
| 字段 | 说明 | 
| --- | --- | 
| 字段 | 说明 | 
| --- | --- | 
| invocationId | Lambda 调用 ID (随机 GUID)。 | 
| applicationArn | Kinesis Data Analytics 应用程序 ARN | 
| streamArn | 传输流 ARN | 
| 记录 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 基于 Kinesis 记录序列号的记录 ID | 
| kinesisStreamRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| 数据 | Base64 编码的源记录负载 | 
| sequenceNumber | 从 Kinesis 流记录中得到的序列号 | 
| partitionKey | 从 Kinesis 流记录中得到的分区键 | 
| shardId | 从 Kinesis 流记录中得到的 ShardId | 
| approximateArrivalTimestamp | 传输流记录大致到达时间 | 

以下示例显示来自 Kinesis 数据流的输入：

```
{
  "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
  "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
  "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test",
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "data": "aGVsbG8gd29ybGQ=",
      "kinesisStreamRecordMetadata":{
            "shardId" :"shardId-000000000003",
            "partitionKey":"7400791606",
            "sequenceNumber":"49572672223665514422805246926656954630972486059535892482",
            "approximateArrivalTimestamp":1520280173
         }
    }
  ]
}
```

### 记录响应模型
<a name="lambda-preprocessing-response-model"></a>

必须返回从 Lambda 预处理函数（含记录 IDs）返回并发送到 Lambda 函数的所有记录。这些记录必须包含以下参数，否则，Kinesis Data Analytics 将拒绝这些记录，并将其视为失败的数据预处理。可对记录的数据负载部分进行转换，以满足预处理要求。

**响应数据模型**


| 字段 | 说明 | 
| --- | --- | 
| 记录 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 在调用期间，记录 ID 从 Kinesis Data Analytics 传送到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配，将被视为数据预处理失败。 | 
| result | 记录的数据转换的状态。可能的值包括：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | 转换后的数据负载（使用 base64 编码之后）。如果应用程序提取数据格式为 JSON，则每个数据负载可能包含多个 JSON 文档。或者，如果应用程序提取数据格式为 CSV，则每个数据负载可能包含多个 CSV 行 (在每一行中指定行分隔符)。Kinesis Data Analytics 服务将成功分析并处理同一数据负载中的多个 JSON 文档或 CSV 行数据。 | 

以下示例显示来自 Lambda 函数的输出：

```
{
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "result": "Ok",
      "data": "SEVMTE8gV09STEQ="
    }
  ]
}
```

## 常见的数据预处理失败情况
<a name="lambda-preprocessing-failures"></a>

以下是预处理失败的常见原因。
+ 并非批次中发送到 Lambda 函数的所有记录（有记录 IDs）都会返回到 Kinesis Data Analytics 服务。
+ 响应中缺少记录 ID、状态或数据负载字段。对于 `Dropped` 或 `ProcessingFailed` 记录，数据负载字段是可选的。
+ Lambda 函数的超时时间不足以预处理数据。
+ Lambda 函数的响应时间超出了 AWS Lambda 服务施加的响应限制。

如果数据预处理失败，Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用，直到成功为止。您可以监控以下 CloudWatch 指标以深入了解故障。
+ Kinesis Data Analytics 应用程序 `MillisBehindLatest`：指示应用程序从流式传输源中读取时的滞后时间。
+ Kinesis Data Analytics `InputPreprocessing` CloudWatch 应用程序指标：表示成功和失败的数量以及其他统计数据。有关更多信息，请参阅 [Amazon Kinesis Analytics 指标](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)。
+ AWS Lambda 函数 CloudWatch 指标和日志。