

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# AWS源版本 2 的 Security Lake 查询 (OCSF 1.1.0)
<a name="subscriber-query-examples2"></a>

以下部分提供了有关从 Security Lake 中查询数据的指导，并包括源版本 2 原生支持的AWSAWS源代码的一些查询示例。这些查询旨在检索特定数据AWS 区域。示例使用的是 us-east-1，即美国东部（弗吉尼亚州北部）。此外，示例查询使用 `LIMIT 25` 参数，最多返回 25 条记录。您可以省略该参数或根据自己的偏好进行调整。有关更多示例，请参阅 [Amazon Security Lake OCSF 查询 GitHub 目录](https://github.com/awslabs/aws-security-analytics-bootstrap/tree/main/AWSSecurityAnalyticsBootstrap/amazon_security_lake_queries)。

您可以查询 Security Lake 存储在AWS Lake Formation数据库和表中的数据。您还可以在 Security Lake 控制台、API 或AWS CLI中创建第三方订阅用户。第三方订阅用户还可以从您指定的来源查询 Lake Formation 数据。

Lake Formation 数据湖管理员必须向查询数据的 IAM 身份授予相关数据库和表的 `SELECT` 权限。订阅用户也必须是在 Security Lake 中创建的，然后才能查询数据。有关如何创建具有查询权限的订阅用户的更多信息，请参阅[管理 Security Lake 订阅用户的查询访问权限](subscriber-query-access.md)。

以下查询包括基于时间的过滤器，`eventDay`用于确保您的查询在配置的保留设置范围内。有关更多信息，请参阅 [Querying data with retention settings](subscriber-query-examples.md#security-lake-retention-setting-query-data)。

例如，如果超过 60 天的数据已过期，则您的查询应包含时间限制，以防止访问过期的数据。对于 60 天的保留期，请在查询中加入以下子句：

```
...
WHERE time_dt > DATE_ADD('day', -59, CURRENT_TIMESTAMP)
...
```

该条款使用 59 天（而不是 60 天）来避免 Amazon S3 和 Apache Iceberg 之间出现任何数据或时间重叠。

## 日志源表
<a name="log-source-table"></a>

查询 Security Lake 数据时，您必须将数据所在的 Lake Formation 表的名称包含在内。

```
SELECT *
FROM "amazon_security_lake_glue_db_DB_Region"."amazon_security_lake_table_DB_Region_SECURITY_LAKE_TABLE"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
LIMIT 25
```

日志源表的常见值包括以下内容：
+ `cloud_trail_mgmt_2_0`—AWS CloudTrail管理活动
+ `lambda_execution_2_0`— Lambda CloudTrail 的数据事件
+ `s3_data_2_0`— S3 CloudTrail 的数据事件
+ `route53_2_0` – Amazon Route 53 Resolver 查询日志
+ `sh_findings_2_0`—AWS Security Hub CSPM调查结果
+ `vpc_flow_2_0` – Amazon Virtual Private Cloud (Amazon VPC) 流日志
+ `eks_audit_2_0`— 亚马逊 Elastic Kubernetes Service（亚马逊 EKS）审计日志
+ `waf_2_0`—AWS WAF v2 日志

**示例：表中所有来自 us-east `sh_findings_2_0` -1 区域的 Security Hub CSPM 调查结果**

```
SELECT *
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
    WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
LIMIT 25
```

## 数据库区域
<a name="database-region"></a>

查询 Security Lake 数据时，您必须将要从中查询数据的数据库区域名称包含在内。有关当前提供 Security Lake 的数据库区域的完整列表，请参阅 [Amazon Security Lake 端点](https://docs.aws.amazon.com/general/latest/gr/securitylake.html)。

**示例：列出来自来源 IP 的亚马逊 Virtual Private Cloud 活动**

以下示例列出了在（2023 年 3 月 1 日）之后{{20230301}}（2023 年 3 月 1 日）记录的{{vpc\_flow\_2\_0}}来自源 IP {{192.0.2.1}} 的所有 Amazon VPC 活动{{us-west-2}}`DB_Region`。

```
SELECT * 
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
    WHERE time_dt > TIMESTAMP '2023-03-01' 
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time_dt desc
LIMIT 25
```

## 分区日期
<a name="partition-date"></a>

通过对数据进行分区，您可以限制每次查询所扫描的数据量，从而提高性能并降低成本。与 Security Lake 1.0 相比，Security Lake 2.0 中的分区工作 Security Lake 现在通过`time_dt``region`、和`accountid`实现分区。而 Security Lake 1.0 通过`eventDay``region`、和`accountid`参数实现了分区。

查询`time_dt`将自动生成来自 S3 的日期分区，并且可以像 Athena 中任何基于时间的字段一样进行查询。

以下是使用` time_dt`分区查询 2023 年 3 月 1 日之后的日志的查询示例：

```
SELECT *
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt > TIMESTAMP '2023-03-01'
AND src_endpoint.ip = '192.0.2.1'
ORDER BY time desc
LIMIT 25
```

`time_dt` 的常见值包括以下内容：

**过去 1 年内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '1' YEAR`

**过去 1 个月内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '1' MONTH`

**过去 30 天内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '30' DAY`

**过去 12 个小时内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '12' HOUR`

**过去 5 分钟内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '5' MINUTE`

**7-14 天前发生的事件**  
`WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '14' DAY AND CURRENT_TIMESTAMP - INTERVAL '7' DAY`

**在特定日期当天或之后发生的事件**  
`WHERE time_dt >= TIMESTAMP '2023-03-01'`

**示例：表中列出了 2023 年 3 月 1 日当天或之后来自源 IP `192.0.2.1` 的所有 CloudTrail 活动 `cloud_trail_mgmt_1_0`**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay >= '{{20230301}}'
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT {{25}}
```

**示例：表中列出了过去 30 天内来自源 IP `192.0.2.1` 的所有 CloudTrail 活动 `cloud_trail_mgmt_1_0`**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay > cast(date_format(current_timestamp - INTERVAL '{{30}}' day, '%Y%m%d%H') as varchar) 
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT {{25 }}
```

## 查询安全湖观测数据
<a name="querying-observables-examples"></a>

Observables 是 Security Lake 2.0 现已推出的一项新功能。可观察对象是一个枢轴元素，其中包含在事件中许多地方发现的相关信息。通过查询可观察数据，用户可以从其数据集中获得高级安全见解。

通过查询可观察对象中的特定元素，您可以将数据集限制为诸如特定用户名、资源 UIDs IPs、哈希值和其他 IOC 类型信息之类的内容

这是一个使用 observables 数组查询包含 IP 值 “172.01.02.03” 的 VPC Flow 和 Route53 表中的日志的示例查询

```
WITH a AS 
    (SELECT 
    time_dt,
    observable.name,
    observable.value
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0",
    UNNEST(observables) AS t(observable)
    WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
    AND observable.value='172.01.02.03'
    AND observable.name='src_endpoint.ip'),
b as 
    (SELECT 
    time_dt,
    observable.name,
    observable.value
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_route53_2_0",
    UNNEST(observables) AS t(observable)
    WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
    AND observable.value='172.01.02.03'
    AND observable.name='src_endpoint.ip')
SELECT * FROM a
LEFT JOIN b ON a.value=b.value and a.name=b.name
LIMIT 25
```

## Amazon EKS 审核日志的安全湖查询示例
<a name="example-queries-eks-sourceversion2"></a>

Amazon EKS 日志跟踪控制平面活动直接从 Amazon EKS 控制平面向您的账户提供审计和诊断 CloudWatch 日志。这些日志可让您轻松地保护和运行您的集群。订阅者可以查询 EKS 日志以了解以下类型的信息。

以下是AWS源版本 2 的 Amazon EKS 审核日志的一些查询示例：

**过去 7 天内对特定 URL 的请求**

```
SELECT 
    time_dt,
    actor.user.name,
    http_request.url.path,
    activity_name
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0" 
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND activity_name = 'get'
and http_request.url.path = '/apis/coordination.k8s.io/v1/'
LIMIT 25
```

**更新过去 7 天来自 “10.0.97.167” 的请求**

```
SELECT 
    activity_name,
    time_dt,
    api.request,
    http_request.url.path,
    src_endpoint.ip,
    resources
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0" 
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND src_endpoint.ip = '10.0.97.167'
AND activity_name = 'Update'
LIMIT 25
```

**过去 7 天内与资源 “kube-controller-manager” 关联的请求和响应**

```
SELECT 
    activity_name,
    time_dt,
    api.request,
    api.response,
    resource.name
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0",
UNNEST(resources) AS t(resource)
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND resource.name = 'kube-controller-manager'
LIMIT 25
```