

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics，可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间，以实现实时分析。点击[此处](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 内置时间序列功能
<a name="timeseries-specific-constructs"></a>

Timestream for LiveAnalytics 提供了内置的时间序列功能，可将时间序列数据视为一流的概念。

内置的时间序列功能可分为两类：视图和函数。

可在下方阅读有关每个构造的说明。

**Topics**
+ [时间序列视图](timeseries-specific-constructs.views.md)
+ [时间序列函数](timeseries-specific-constructs.functions.md)

# 时间序列视图
<a name="timeseries-specific-constructs.views"></a>

Timestream for LiveAnalytics 支持以下用于将数据转换为`timeseries`数据类型的函数：

**Topics**
+ [CREATE\$1TIME\$1SERIES](#timeseries-specific-constructs.views.CREATE_TIME_SERIES)
+ [UNNEST](#timeseries-specific-constructs.views.UNNEST)

## CREATE\$1TIME\$1SERIES
<a name="timeseries-specific-constructs.views.CREATE_TIME_SERIES"></a>

 **CREATE\$1TIME\$1SERIES** 是一个聚合函数，接收时间序列的所有原始测量值（时间和度量值），并返回时间序列数据类型。此函数的语法如下：

```
CREATE_TIME_SERIES(time, measure_value::<data_type>)
```

 其中，`<data_type>` 是度量值的数据类型，可以是 bigint、boolean、double 或 varchar 其中之一。第二个参数不能为 null。

考虑存储在名为 **metrics** 的表中 EC2 实例的 CPU 利用率，如下所示：


| 时间 | region | az | vpc | instance\$1id | measure\$1name | measure\$1value::double | 
| --- | --- | --- | --- | --- | --- | --- | 
|  2019-12-04 19:00:00.000000000  |  us-east-1  |  us-east-1d  |  vpc-1a2b3c4d  |  i-1234567890abcdef0  |  cpu\$1utilization  |  35.0  | 
|  2019-12-04 19:00:01.000000000  |  us-east-1  |  us-east-1d  |  vpc-1a2b3c4d  |  i-1234567890abcdef0  |  cpu\$1utilization  |  38.2  | 
|  2019-12-04 19:00:02.000000000  |  us-east-1  |  us-east-1d  |  vpc-1a2b3c4d  |  i-1234567890abcdef0  |  cpu\$1utilization  |  45.3  | 
|  2019-12-04 19:00:00.000000000  |  us-east-1  |  us-east-1d  |  vpc-1a2b3c4d  |  i-1234567890abcdef1  |  cpu\$1utilization  |  54.1  | 
|  2019-12-04 19:00:01.000000000  |  us-east-1  |  us-east-1d  |  vpc-1a2b3c4d  |  i-1234567890abcdef1  |  cpu\$1utilization  |  42.5  | 
|  2019-12-04 19:00:02.000000000  |  us-east-1  |  us-east-1d  |  vpc-1a2b3c4d  |  i-1234567890abcdef1  |  cpu\$1utilization  |  33.7  | 

运行查询：

```
SELECT region, az, vpc, instance_id, CREATE_TIME_SERIES(time, measure_value::double) as cpu_utilization FROM metrics
    WHERE measure_name=’cpu_utilization’
    GROUP BY region, az, vpc, instance_id
```

将返回所有以 `cpu_utilization` 作为度量值的序列。本例中包含两个序列：


| region | az | vpc | instance\$1id | cpu\$1utilization | 
| --- | --- | --- | --- | --- | 
|  us-east-1  |  us-east-1d  |  vpc-1a2b3c4d  |  i-1234567890abcdef0  |  [\$1time: 2019-12-04 19:00:00.000000000, measure\$1value::double: 35.0\$1, \$1time: 2019-12-04 19:00:01.000000000, measure\$1value::double: 38.2\$1, \$1time: 2019-12-04 19:00:02.000000000, measure\$1value::double: 45.3\$1]  | 
|  us-east-1  |  us-east-1d  |  vpc-1a2b3c4d  |  i-1234567890abcdef1  |  [\$1time: 2019-12-04 19:00:00.000000000, measure\$1value::double: 35.1\$1, \$1time: 2019-12-04 19:00:01.000000000, measure\$1value::double: 38.5\$1, \$1time: 2019-12-04 19:00:02.000000000, measure\$1value::double: 45.7\$1]  | 

## UNNEST
<a name="timeseries-specific-constructs.views.UNNEST"></a>

 `UNNEST` 是表函数，可让您将 `timeseries` 数据转换为平面模型。语法如下：

 `UNNEST` 将 `timeseries` 转换为两列，即 `time` 和 `value`。也可以在 UNNEST 中使用别名，如下所示：

```
UNNEST(timeseries) AS <alias_name> (time_alias, value_alias)
```

其中 `<alias_name>` 是平面表的别名，`time_alias` 是 `time` 列的别名，`value_alias` 是 `value` 列的别名。

例如，假设实例集中的某些 EC2 实例配置为每 5 秒发送一次指标，其余实例则每 15 秒发送一次指标。此时您需要获取过去 6 小时内所有实例的平均指标数，且要求以 10 秒为粒度进行统计。要获取此数据，请使用 **CREATE\$1TIME\$1SERIES** 将指标转换为时间序列模型。然后，可使用 **INTERPOLATE\$1LINEAR**，并以 10 秒为粒度获取缺失值。下一步，使用 **UNNEST** 将数据转换回平面模型，然后使用 **AVG** 获取所有实例的平均指标数。

```
WITH interpolated_timeseries AS (
    SELECT region, az, vpc, instance_id,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(time, measure_value::double),
                SEQUENCE(ago(6h), now(), 10s)) AS interpolated_cpu_utilization
    FROM timestreamdb.metrics 
    WHERE measure_name= ‘cpu_utilization’ AND time >= ago(6h)
    GROUP BY region, az, vpc, instance_id
)
SELECT region, az, vpc, instance_id, avg(t.cpu_util)
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_cpu_utilization) AS t (time, cpu_util)
GROUP BY region, az, vpc, instance_id
```

 上述查询演示如何使用带别名的 **UNNEST**。以下是相同查询的示例，但未使用 **UNNEST** 的别名：

```
WITH interpolated_timeseries AS (
    SELECT region, az, vpc, instance_id,
        INTERPOLATE_LINEAR(
            CREATE_TIME_SERIES(time, measure_value::double),
                SEQUENCE(ago(6h), now(), 10s)) AS interpolated_cpu_utilization
    FROM timestreamdb.metrics 
    WHERE measure_name= ‘cpu_utilization’ AND time >= ago(6h)
    GROUP BY region, az, vpc, instance_id
)
SELECT region, az, vpc, instance_id, avg(value)
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_cpu_utilization)
GROUP BY region, az, vpc, instance_id
```

# 时间序列函数
<a name="timeseries-specific-constructs.functions"></a>

Amazon Timestream for LiveAnalytics 支持时间序列函数，例如导数、积分和相关性以及其他函数，以便从您的时间序列数据中获得更深入的见解。本部分提供每个函数的用法说明及示例查询。选择以下主题以了解更多信息。

**Topics**
+ [插值函数](timeseries-specific-constructs.functions.interpolation.md)
+ [导数函数](timeseries-specific-constructs.functions.derivatives.md)
+ [积分函数](timeseries-specific-constructs.functions.integrals.md)
+ [相关性函数](timeseries-specific-constructs.functions.correlation.md)
+ [筛选和归约函数](timeseries-specific-constructs.functions.filter-reduce.md)

# 插值函数
<a name="timeseries-specific-constructs.functions.interpolation"></a>

如果时间序列数据在某些时间点存在事件缺失值，可通过插值法估计这些缺失事件的值。Amazon Timestream 支持四种插值变体：线性插值、三次样条插值、末次观测值结转（LOCF）插值以及常数插值。本节提供 LiveAnalytics插值函数的 Timestream 的用法信息以及示例查询。



## 使用情况信息
<a name="w2aab7c59c13c13c11b7"></a>


| 函数 | 输出数据类型 | 说明 | 
| --- | --- | --- | 
|  `interpolate_linear(timeseries, array[timestamp])`  |  时间序列  |  使用[线性插值](https://wikipedia.org/wiki/Linear_interpolation)填充缺失数据。  | 
|  `interpolate_linear(timeseries, timestamp)`  |  double  |  使用[线性插值](https://wikipedia.org/wiki/Linear_interpolation)填充缺失数据。  | 
|  `interpolate_spline_cubic(timeseries, array[timestamp])`  |  时间序列  |  使用[三次样条插值](https://wikiversity.org/wiki/Cubic_Spline_Interpolation#:~:text=Cubic%20spline%20interpolation%20is%20a,Lagrange%20polynomial%20and%20Newton%20polynomial.)填充缺失数据。  | 
|  `interpolate_spline_cubic(timeseries, timestamp)`  |  double  |  使用[三次样条插值](https://wikiversity.org/wiki/Cubic_Spline_Interpolation#:~:text=Cubic%20spline%20interpolation%20is%20a,Lagrange%20polynomial%20and%20Newton%20polynomial.)填充缺失数据。  | 
|  `interpolate_locf(timeseries, array[timestamp])`  |  时间序列  |  使用上次采样值填充缺失数据。  | 
|  `interpolate_locf(timeseries, timestamp)`  |  double  |  使用上次采样值填充缺失数据。  | 
|  `interpolate_fill(timeseries, array[timestamp], double)`  |  时间序列  |  使用常量值填充缺失数据。  | 
|  `interpolate_fill(timeseries, timestamp, double)`  |  double  |  使用常量值填充缺失数据。  | 

## 查询示例
<a name="w2aab7c59c13c13c11b9"></a>

**Example**  
计算过去 2 小时内特定 EC2 主机的 CPU 平均利用率，按 30 秒间隔进行分箱，并使用线性插值填补缺失值：  

```
WITH binned_timeseries AS (
SELECT hostname, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::double), 2) AS avg_cpu_utilization
FROM "sampleDB".DevOps
WHERE measure_name = 'cpu_utilization'
    AND hostname = 'host-Hovjv'
    AND time > ago(2h)
GROUP BY hostname, BIN(time, 30s)
), interpolated_timeseries AS (
SELECT hostname,
    INTERPOLATE_LINEAR(
        CREATE_TIME_SERIES(binned_timestamp, avg_cpu_utilization),
            SEQUENCE(min(binned_timestamp), max(binned_timestamp), 15s)) AS interpolated_avg_cpu_utilization
FROM binned_timeseries
GROUP BY hostname
)
SELECT time, ROUND(value, 2) AS interpolated_cpu
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_cpu_utilization)
```

**Example**  
计算过去 2 小时内特定 EC2 主机的 CPU 平均利用率，按 30 秒间隔进行分箱，并使用基于末次观测值结转的插值填补缺失值：  

```
WITH binned_timeseries AS (
SELECT hostname, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::double), 2) AS avg_cpu_utilization
FROM "sampleDB".DevOps
WHERE measure_name = 'cpu_utilization'
    AND hostname = 'host-Hovjv'
    AND time > ago(2h)
GROUP BY hostname, BIN(time, 30s)
), interpolated_timeseries AS (
SELECT hostname,
    INTERPOLATE_LOCF(
        CREATE_TIME_SERIES(binned_timestamp, avg_cpu_utilization),
            SEQUENCE(min(binned_timestamp), max(binned_timestamp), 15s)) AS interpolated_avg_cpu_utilization
FROM binned_timeseries
GROUP BY hostname
)
SELECT time, ROUND(value, 2) AS interpolated_cpu
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_cpu_utilization)
```

# 导数函数
<a name="timeseries-specific-constructs.functions.derivatives"></a>

导数用于计算特定指标的变化率，并可用于主动响应事件。例如，假设您计算过去 5 分钟内 EC2 实例 CPU 利用率的导数，并注意到出现明显的正导数。这可能表明对工作负载的需求有所增加，因此您可能需要启动更多的 EC2 实例，以更好地处理您的工作负载。

Amazon Timestream 支持两种导数函数的变体。本节提供 LiveAnalytics 导数函数的时间流的用法信息以及示例查询。



## 使用情况信息
<a name="w2aab7c59c13c13c13b9"></a>


| 函数 | 输出数据类型 | 说明 | 
| --- | --- | --- | 
|  `derivative_linear(timeseries, interval)`  |  时间序列  |  计算 `timeseries` 中每个点对指定 `interval` 的[导数](https://wikipedia.org/wiki/Derivative)。  | 
|  `non_negative_derivative_linear(timeseries, interval)`  |  时间序列  |  与 `derivative_linear(timeseries, interval)` 相同，但仅返回正值。  | 

## 查询示例
<a name="w2aab7c59c13c13c13c11"></a>

**Example**  
计算过去 1 小时内 CPU 利用率每 5 分钟的变化率：  

```
SELECT DERIVATIVE_LINEAR(CREATE_TIME_SERIES(time, measure_value::double), 5m) AS result 
FROM “sampleDB”.DevOps 
WHERE measure_name = 'cpu_utilization' 
AND hostname = 'host-Hovjv' and time > ago(1h) 
GROUP BY hostname, measure_name
```

**Example**  
计算由一个或多个微服务产生的错误增长率：  

```
WITH binned_view as (
    SELECT bin(time, 5m) as binned_timestamp, ROUND(AVG(measure_value::double), 2) as value            
    FROM “sampleDB”.DevOps  
    WHERE micro_service = 'jwt'  
    AND time > ago(1h) 
    AND measure_name = 'service_error'
    GROUP BY bin(time, 5m)
)
SELECT non_negative_derivative_linear(CREATE_TIME_SERIES(binned_timestamp, value), 1m) as rateOfErrorIncrease
FROM binned_view
```

# 积分函数
<a name="timeseries-specific-constructs.functions.integrals"></a>

您可以使用积分计算时间序列事件中单位时间内曲线下的面积。例如，假设您正在跟踪应用程序每单位时间接收到的请求量。在此场景中，您可以使用积分函数计算特定时间段内每指定间隔处理的请求总数。

Amazon Timestream 支持一种积分函数的变体。本节提供 LiveAnalytics 积分函数的 Timestream 的用法信息以及示例查询。



## 使用情况信息
<a name="w2aab7c59c13c13c15b9"></a>


| 函数 | 输出数据类型 | 说明 | 
| --- | --- | --- | 
|  `integral_trapezoidal(timeseries(double))` `integral_trapezoidal(timeseries(double), interval day to second)` `integral_trapezoidal(timeseries(bigint))` `integral_trapezoidal(timeseries(bigint), interval day to second)` `integral_trapezoidal(timeseries(integer), interval day to second)` `integral_trapezoidal(timeseries(integer))`  |  double  |  使用[梯形法则](https://wikipedia.org/wiki/Trapezoidal_rule),根据 `timeseries` 提供的指定 `interval day to second`，近似计算[积分](https://wikipedia.org/wiki/Integral)。天到秒间隔参数是可选的，默认值为 `1s`。有关间隔的更多信息，请参阅[间隔和持续时间](date-time-functions.md#date-time-functions-interval-duration)。  | 

## 查询示例
<a name="w2aab7c59c13c13c15c11"></a>

**Example**  
计算过去一小时内特定主机每五分钟处理的请求总量：  

```
SELECT INTEGRAL_TRAPEZOIDAL(CREATE_TIME_SERIES(time, measure_value::double), 5m) AS result FROM sample.DevOps 
WHERE measure_name = 'request' 
AND hostname = 'host-Hovjv' 
AND time > ago (1h) 
GROUP BY hostname, measure_name
```

# 相关性函数
<a name="timeseries-specific-constructs.functions.correlation"></a>

对于两个长度相近的时间序列，相关性函数可提供相关系数，该系数揭示两个时间序列随时间推移的趋势关系。相关系数的范围为 `-1.0` 到 `1.0`。`-1.0` 表示两个时间序列以相同速率呈相反方向变化。而 `1.0` 则表示两个时间序列以相同速率呈相同方向变化。值为 `0` 两个时间序列之间不存在相关性。例如，如果油价上涨，某石油公司的股价随之上涨，则油价上涨趋势与该石油公司股价上涨趋势之间将呈现正相关系数。高正相关系数表明两种价格以相似的速率变化。同样，债券价格与债券收益率之间的相关系数为负值，表明这两个值随时间推移呈现相反的趋势。

Amazon Timestream 支持两种相关性函数的变体。本节提供 LiveAnalytics 关联函数的时间流的用法信息以及示例查询。



## 使用情况信息
<a name="w2aab7c59c13c13c19c11"></a>


| 函数 | 输出数据类型 | 说明 | 
| --- | --- | --- | 
|  `correlate_pearson(timeseries, timeseries)`  |  double  |  计算两个 `timeseries` 的[皮尔逊相关系数](https://wikipedia.org/wiki/Pearson_correlation_coefficient)。时间序列必须具有相同的时间戳。  | 
|  `correlate_spearman(timeseries, timeseries)`  |  double  |  计算两个 `timeseries` 的[斯皮尔曼相关系数](https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient)。时间序列必须具有相同的时间戳。  | 

## 查询示例
<a name="w2aab7c59c13c13c19c13"></a>

**Example**  

```
WITH cte_1 AS (
    SELECT INTERPOLATE_LINEAR(
        CREATE_TIME_SERIES(time, measure_value::double), 
        SEQUENCE(min(time), max(time), 10m)) AS result 
    FROM sample.DevOps 
    WHERE measure_name = 'cpu_utilization' 
    AND hostname = 'host-Hovjv' AND time > ago(1h) 
    GROUP BY hostname, measure_name
), 
cte_2 AS (
    SELECT INTERPOLATE_LINEAR(
        CREATE_TIME_SERIES(time, measure_value::double), 
        SEQUENCE(min(time), max(time), 10m)) AS result 
    FROM sample.DevOps 
    WHERE measure_name = 'cpu_utilization' 
    AND hostname = 'host-Hovjv' AND time > ago(1h) 
    GROUP BY hostname, measure_name
) 
SELECT correlate_pearson(cte_1.result, cte_2.result) AS result 
FROM cte_1, cte_2
```

# 筛选和归约函数
<a name="timeseries-specific-constructs.functions.filter-reduce"></a>

Amazon Timestream 支持对时间序列数据执行筛选和归约操作的函数。本节提供 LiveAnalytics 筛选和缩减函数的时间流的用法信息，以及示例查询。



## 使用情况信息
<a name="w2aab7c59c13c13c23b7"></a>


| 函数 | 输出数据类型 | 说明 | 
| --- | --- | --- | 
|  `filter(timeseries(T), function(T, Boolean))`  |  timeseries(T)  |  根据输入的时间序列构造时间序列，其中使用的值是传递的 `function` 返回的 `true`。  | 
|  `reduce(timeseries(T), initialState S, inputFunction(S, T, S), outputFunction(S, R))`  |  R  |  返回从时间序列中减去的单个值。`inputFunction` 将按顺序对时间序列中的每个元素进行调用。除获取当前元素以外，inputFunction 还会获取当前状态（初始为 `initialState`）并返回新状态。将调用 `outputFunction`，以将最终状态转换为结果值。`outputFunction` 可以是恒等函数。  | 

## 查询示例
<a name="w2aab7c59c13c13c23b9"></a>

**Example**  
构造主机的 CPU 利用率时间序列，并筛选测量值大于 70 的数据点：  

```
WITH time_series_view AS (
    SELECT INTERPOLATE_LINEAR(
        CREATE_TIME_SERIES(time, ROUND(measure_value::double,2)), 
            SEQUENCE(ago(15m), ago(1m), 10s)) AS cpu_user
    FROM sample.DevOps
    WHERE hostname = 'host-Hovjv' and measure_name = 'cpu_utilization'
        AND time > ago(30m)
    GROUP BY hostname
)
SELECT FILTER(cpu_user, x -> x.value > 70.0) AS cpu_above_threshold
from time_series_view
```

**Example**  
构造主机的 CPU 利用率时间序列，并计算测量值的平方和：  

```
WITH time_series_view AS (
    SELECT INTERPOLATE_LINEAR(
        CREATE_TIME_SERIES(time, ROUND(measure_value::double,2)), 
            SEQUENCE(ago(15m), ago(1m), 10s)) AS cpu_user
    FROM sample.DevOps
    WHERE hostname = 'host-Hovjv' and measure_name = 'cpu_utilization'
        AND time > ago(30m)
    GROUP BY hostname
)
SELECT REDUCE(cpu_user,
    DOUBLE '0.0',
    (s, x) -> x.value * x.value + s,
    s -> s)
from time_series_view
```

**Example**  
构造主机的 CPU 利用率时间序列，并确定超过 CPU 阈值的样本所占比例：  

```
WITH time_series_view AS (
    SELECT INTERPOLATE_LINEAR(
        CREATE_TIME_SERIES(time, ROUND(measure_value::double,2)), 
            SEQUENCE(ago(15m), ago(1m), 10s)) AS cpu_user
    FROM sample.DevOps
    WHERE hostname = 'host-Hovjv' and measure_name = 'cpu_utilization'
        AND time > ago(30m)
    GROUP BY hostname
)
SELECT ROUND(
    REDUCE(cpu_user, 
      -- initial state 
      CAST(ROW(0, 0) AS ROW(count_high BIGINT, count_total BIGINT)),
      -- function to count the total points and points above a certain threshold
      (s, x) -> CAST(ROW(s.count_high + IF(x.value > 70.0, 1, 0), s.count_total + 1) AS ROW(count_high BIGINT, count_total BIGINT)),
      -- output function converting the counts to fraction above threshold
      s -> IF(s.count_total = 0, NULL, CAST(s.count_high AS DOUBLE) / s.count_total)), 
    4) AS fraction_cpu_above_threshold
from time_series_view
```