

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 示例：转换 DateTime 值
<a name="app-string-datetime-manipulation"></a>

Amazon Kinesis Data Analytics 支持将列转换为时间戳。例如，除了 `ROWTIME` 列以外，建议您将自己的时间戳用作 `GROUP BY` 子句中另一个基于时间的窗口。Kinesis Data Analytics 提供用于处理日期和时间字段的操作和 SQL 函数。
+ **日期和时间运算符** - 您可以对日期、时间和间隔数据类型执行算术运算。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink* SQL 参考中的[日期、时间戳和间隔运算符](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-timestamp-interval.html)。

   
+ **SQL 函数** - 其中包括以下各项。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[日期和时间函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html)。
  + `EXTRACT()` - 从日期、时间、时间戳或间隔表达式中提取一个字段。
  + `CURRENT_TIME` - 返回执行查询的时间 (UTC)。
  + `CURRENT_DATE` - 返回执行查询的日期 (UTC)。
  + `CURRENT_TIMESTAMP` - 返回执行查询的时间戳 (UTC)。
  + `LOCALTIME` - 返回 Kinesis Data Analytics 运行环境所定义执行查询的当前时间 (UTC)。
  + `LOCALTIMESTAMP` - 返回 Kinesis Data Analytics 运行环境定义的当前时间戳 (UTC)。

     
+ **SQL 扩展** - 其中包括以下各项。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[日期和时间函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html)以及[日期时间转换函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-datetime-conversion-functions.html)。
  + `CURRENT_ROW_TIMESTAMP` - 为流中的每一行返回一个新的时间戳。
  + `TSDIFF` - 返回两个时间戳的差异 (以毫秒为单位)。
  + `CHAR_TO_DATE` - 将字符串转换为日期。
  + `CHAR_TO_TIME` - 将字符串转换为时间。
  + `CHAR_TO_TIMESTAMP` - 将字符串转换为时间戳。
  + `DATE_TO_CHAR` - 将日期转换为字符串。
  + `TIME_TO_CHAR` - 将时间转换为字符串。
  + `TIMESTAMP_TO_CHAR` - 将时间戳转换为字符串。

上面大多数 SQL 函数都采用一种格式来转换列。格式是灵活多变的。例如，您可以指定采用格式 `yyyy-MM-dd hh:mm:ss` 将输入字符串 `2009-09-16 03:15:24` 转换为时间戳。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[字符到时间戳 (Sys)](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-char-to-timestamp.html)。

## 示例：转换日期
<a name="examples-transforming-dates"></a>

在本示例中，您将以下记录写入到 Amazon Kinesis 数据流中。

```
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"}
{"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"}
{"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"}
...
```



然后，您在控制台上创建一个 Kinesis Data Analytics 应用程序，并将 Kinesis 流作为流式传输源。发现过程读取流式传输源中的示例记录，并推断出具有两个列 (`EVENT_TIME` 和 `TICKER`) 的如下应用程序内部架构。

![\[控制台屏幕截图，显示具有事件时间和股票代码列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_datetime_convert_0.png)


然后，将该应用程序代码与 SQL 函数结合使用，以多种方式转换 `EVENT_TIME` 时间戳字段。随后将结果数据插入另一个应用程序内部流，如下面的屏幕截图所示：



![\[控制台屏幕截图，显示应用程序内部流中的结果数据。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_datetime_convert_1.png)




### 步骤 1：创建 Kinesis 数据流
<a name="examples-transforming-dates-1"></a>

创建一个 Amazon Kinesis Data Stream 并填充事件时间和股票代码记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建带有一个分片的流。

1. 运行以下 Python 代码以便用示例数据填充流。此简单代码会不断将具有随机股票代号和当前时间戳的记录写入流中。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

### 步骤 2：创建 Amazon Kinesis Data Analytics 应用程序
<a name="examples-transforming-dates-2"></a>

按如下方式创建应用程序：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，键入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**，以连接到源。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在上一部分中创建的流。

   1. 选择以创建 IAM 角色。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。

   1. 选择 **编辑架构**。将 **EVENT\$1TIME** 列的 **列类型** 更改为 `TIMESTAMP`。

   1. 选择 **保存架构并更新流示例**。在控制台保存架构后，选择 **退出**。

   1. 选择 **保存并继续**。

   

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果，如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          TICKER VARCHAR(4), 
          event_time TIMESTAMP, 
          five_minutes_before TIMESTAMP, 
          event_unix_timestamp BIGINT,
          event_timestamp_as_char VARCHAR(50),
          event_second INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      
      SELECT STREAM 
          TICKER, 
          EVENT_TIME,
          EVENT_TIME - INTERVAL '5' MINUTE,
          UNIX_TIMESTAMP(EVENT_TIME),
          TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
          EXTRACT(SECOND FROM EVENT_TIME) 
      FROM "SOURCE_SQL_STREAM_001"
      ```

   1. 选择 **保存并运行 SQL**。在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。