

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics，可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间，以实现实时分析。点击[此处](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Timestream 中使用批量加载 LiveAnalytics
<a name="batch-load"></a>

使用 Amazon Timestream 的*批量加载* LiveAnalytics，您可以将存储在亚马逊 S3 中的 CSV 文件批量提取到 Timestream 中。借助这项新功能，您 LiveAnalytics 无需依赖其他工具或编写自定义代码，即可在 Timestream 中保存数据。您可以使用批量加载，以回填具有灵活等待时间的数据，例如无需立即用于查询或分析的数据。

您可以使用、和 AWS 管理控制台 AWS CLI，创建批量加载任务 AWS SDKs。有关更多信息，请参阅 [使用控制台进行批量加载](batch-load-using-console.md)、[将批量加载与 AWS CLI](batch-load-using-cli.md) 和 [将批量加载与 AWS SDKs](batch-load-using-sdk.md)。

除了批量加载外，您还可以使用 WriteRecords API 操作同时写入多条记录。有关使用哪一个的指南，请参阅[在 WriteRecords API 操作和批量加载之间进行选择](writes.writes-or-batch-load.md)。

**Topics**
+ [Timestream 中的批量加载概念](batch-load-concepts.md)
+ [批量加载先决条件](batch-load-prerequisites.md)
+ [批量加载最佳实践](batch-load-best-practices.md)
+ [准备批量加载数据文件](batch-load-preparing-data-file.md)
+ [批量加载的数据模型映射](batch-load-data-model-mappings.md)
+ [使用控制台进行批量加载](batch-load-using-console.md)
+ [将批量加载与 AWS CLI](batch-load-using-cli.md)
+ [将批量加载与 AWS SDKs](batch-load-using-sdk.md)
+ [使用批量加载错误报告](batch-load-using-error-reports.md)

# Timestream 中的批量加载概念
<a name="batch-load-concepts"></a>

查看以下概念，以深入了解批量加载功能。

**批量加载任务**：用于在 Amazon Timestream 中定义源数据和目标的任务。创建批量加载任务时，您可以指定其他配置，例如数据模型。您可以通过 AWS 管理控制台 AWS CLI、和创建批量加载任务 AWS SDKs。

**导入目标**：Timestream 中的目标数据库和表。有关创建数据库和表的信息，请参阅[创建数据库](console_timestream.md#console_timestream.db.using-console)和[创建表](console_timestream.md#console_timestream.table.using-console)。

**数据来源**：存储在 S3 存储桶中的源 CSV 文件。有关准备数据文件的更多信息，请参阅[准备批量加载数据文件](batch-load-preparing-data-file.md)。有关 S3 定价的信息，请参阅 [Amazon S3 定价](https://aws.amazon.com/s3/pricing/)。

**批量加载错误报告**：存储有关批量加载任务错误信息的报告。您可以定义批量加载错误报告的 S3 位置，这是批量加载任务的一部分。有关报告中信息的信息，请参阅[使用批量加载错误报告](batch-load-using-error-reports.md)。

**数据模型映射**-时间、维度和度量的批量加载映射，从 S3 位置的数据源到 LiveAnalytics 表的目标时间流。有关更多信息，请参阅 [批量加载的数据模型映射](batch-load-data-model-mappings.md)。

# 批量加载先决条件
<a name="batch-load-prerequisites"></a>

这是使用批量加载的先决条件列表。有关最佳实践，请参阅[批量加载最佳实践](batch-load-best-practices.md)。
+ 批量加载源数据以带有标题的 CSV 格式存储在 Amazon S3 中。
+ 对于每个 Amazon S3 源存储桶，您必须在附加策略中具备以下权限：

  ```
  "s3:GetObject",
  "s3:GetBucketAcl"
  "s3:ListBucket"
  ```

  同样，对于写入报告的每个 Amazon S3 输出存储桶，您必须在附加策略中具备以下权限：

  ```
  "s3:PutObject",
  "s3:GetBucketAcl"
  ```

  例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Action": [
                  "s3:GetObject",
                  "s3:GetBucketAcl",
                  "s3:ListBucket"
              ],
              "Resource": [
                  "arn:aws:s3:::amzn-s3-demo-source-bucket1\u201d",
                  "arn:aws:s3:::amzn-s3-demo-source-bucket2\u201d"
              ],
              "Effect": "Allow"
          },
          {
              "Action": [
                  "s3:PutObject",
                  "s3:GetBucketAcl"
              ],
              "Resource": [
                  "arn:aws:s3:::amzn-s3-demo-destination-bucket\u201d"
              ],
              "Effect": "Allow"
          }
      ]
  }
  ```

------
+ timestream for 通过将数据模型中提供的信息映射到 CSV 标头来 LiveAnalytics 解析 CSV。数据必须包含一个代表时间戳的列、至少一个维度列以及至少一个度量列。
+ 用于批量加载的 S3 存储桶必须与用于批量加载的 LiveAnalytics 表的 Timestream 位于同一区域，并且来自同一账户。
+ `timestamp` 列必须是长数据类型，表示自 Unix epoch 以来的时间。例如，时间戳 `2021-03-25T08:45:21Z` 将表示为 `1616661921`。Timestream 支持秒、毫秒、微秒和纳秒作为时间戳的精度单位。使用查询语言时，可通过 `to_unixtime` 等函数在不同格式之间进行转换。有关更多信息，请参阅[日期/时间函数](date-time-functions.md)。
+ Timestream 支持字符串数据类型用于维度值。Timestream 支持长整型、双精度、字符串和布尔值数据类型用于度量列。

有关批量加载限制和限额，请参阅[批量加载](ts-limits.md#limits.batch-load)。

# 批量加载最佳实践
<a name="batch-load-best-practices"></a>

当满足以下条件和建议时，批量加载效果最佳（高吞吐量）：

1. 提交用于摄取的 CSV 文件应保持较小大小，具体而言，文件大小应控制在 100MB 至 1GB 之间，以提高摄取过程的并行处理能力和速度。

1. 在批量加载过程中，避免将数据同时提取到同一个表中（例如使用 WriteRecords API 操作或计划查询）。这可能会导致节流，且导致批量加载任务失败。

1. 在批量加载任务运行期间，请勿向用于批量加载的 S3 存储桶添加、修改或删除文件。

1. 请勿删除或撤销表或源的权限，也请勿报告已计划或正在进行批量加载任务的 S3 存储桶。

1. 摄取具有高基数维度值集的数据时，请按照 [对多度量记录进行分区的建议](data-modeling.md#data-modeling-multi-measure-partitioning) 中的指南操作。

1. 确保通过提交小文件测试数据的正确性。无论数据是否正确，提交到批量加载的任何数据都将产生费用。有关定价的更多信息，请参阅 [Amazon Timestream 定价](https://aws.amazon.com/timestream/pricing/)。

1. 除非 `ActiveMagneticStorePartitions` 低于 250，否则不要恢复批量加载任务。作业可能会受到限制并导致失败。对于同一数据库，如果同时提交多个作业，应减少作业数量。

以下是控制台最佳实践：

1. 仅使用[生成器](batch-load-using-console.md#batch-load-using-visual-builder)进行更简单的数据建模，即多度量记录仅使用一个度量名称。

1. 对于更复杂的数据建模，请使用 JSON。例如，在使用多度量记录时，如果涉及多个度量名称，请使用 JSON。

有关 LiveAnalytics 最佳实践的更多信息 Timestream，请参阅[最佳实践](best-practices.md)。

# 准备批量加载数据文件
<a name="batch-load-preparing-data-file"></a>

源数据文件采用分隔符分隔值的形式。通常使用逗号分隔值（CSV）这一更具体的术语。有效的列分隔符包括逗号和竖线。记录之间以换行符分隔。文件必须存储在 Amazon S3 中。当创建新的批量加载任务时，源数据的位置由文件的 ARN 指定。文件包含标题。其中一列代表时间戳。至少还有一列代表度量。

用于批量加载的 S3 存储桶必须与用于批量加载的 LiveAnalytics 表的时间流位于同一个区域中。批量加载任务提交后，请勿向用于批量加载的 S3 存储桶添加或移除文件。有关使用 S3 存储桶的更多信息，请参阅 [Amazon S3 入门](https://docs.aws.amazon.com/AmazonS3/latest/userguide/GetStartedWithS3.html)。

**注意**  
某些应用程序（例如 Excel）生成的 CSV 文件可能包含与预期编码冲突的字节顺序标记（BOM）。引用带有 BOM 的 CSV 文件的 LiveAnalytics 批量加载任务的时间流在以编程方式处理时会引发错误。为避免此情况，您可以删除 BOM，这是一种不可见字符。  
例如，您可以使用 Notepad\$1\$1 等应用程序保存文件，这些应用程序允许您指定新的编码。您还可以使用编程方式：读取文件首行，移除该行中的字符，然后将新值写入文件的首行。  
从 Excel 保存时，有多种 CSV 选项可供选择。使用不同的 CSV 选项进行保存可能有助于避免出现上述问题。但您应该检查结果，因为编码的更改可能会影响某些字符。

## CSV 格式参数
<a name="batch-load-data-file-options"></a>

当需要表示被格式参数保留的值时，应使用转义字符。例如，如果引号字符是双引号，要在数据中表示双引号，请在双引号前添加转义字符。

有关在创建批量加载任务时何时指定这些参数的信息，请参阅[创建批量加载任务](batch-load-using-console.md#console_timestream.create-batch-load.using-console)。


| 参数 | 选项 | 
| --- | --- | 
| 列分隔符 | （逗号（“,”）\$1 竖线（“\$1”）\$1 分号（“;”）\$1 制表符（“/t”）\$1 空格（“ ”）） | 
| 转义字符 | none | 
| 引证字符 | 控制台：（双引号（"）\$1 单引号（'）） | 
| Null 值 | 空格（“ ”） | 
| 删除空格 | 控制台：（否 \$1 是） | 

# 批量加载的数据模型映射
<a name="batch-load-data-model-mappings"></a>

以下内容讨论数据模型映射的架构，并给出具体示例。

## 数据模型映射架构
<a name="batch-load-data-model-mappings-schema"></a>

调用 `DescribeBatchLoadTask` 返回的 `CreateBatchLoadTask` 请求语法和 `BatchLoadTaskDescription` 对象包含 `DataModelConfiguration` 对象，该对象包含用于批量加载的 `DataModel`。`DataModel`定义了从以 CSV 格式存储在 S3 位置的源数据到 LiveAnalytics 数据库和表的目标时间流的映射。

该`TimeColumn`字段表示要映射到 Timestream 中目标表`time`列的值的 LiveAnalytics源数据的位置。`TimeUnit` 指定 `TimeColumn` 的单位，可以是 `MILLISECONDS`、`SECONDS`、`MICROSECONDS` 或 `NANOSECONDS` 其中之一。还存在用于维度和度量的映射关系。维度映射由源列和目标字段组成。

有关更多信息，请参阅 [DimensionMapping](https://docs.aws.amazon.com/timestream/latest/developerguide/API_DimensionMapping)。度量的映射包含两个选项：`MixedMeasureMappings` 和 `MultiMeasureMappings`。

总而言之，a `DataModel` 包含从 S3 位置的数据源到以下 LiveAnalytics 表的目标时间流的映射。
+ 时间
+ Dimensions
+ 度量

如果可能，我们建议您将测量数据映射到 Timestream 中的多度量记录中。 LiveAnalytics有关多度量记录优势的信息，请参阅[多度量记录](writes.md#writes.writing-data-multi-measure)。

如果源数据中的多个度量存储在一行中，则可以将这些多个度量映射到 Timestream 中的多度量记录以供使用 LiveAnalytics 。`MultiMeasureMappings`如果存在必须映射到单度量记录的值，可使用 `MixedMeasureMappings`。

`MixedMeasureMappings` 和 `MultiMeasureMappings` 都包括 `MultiMeasureAttributeMappings`。无论是否需要单度量记录，均支持多度量记录。

如果 Timestream 中只需要多度量目标记录 LiveAnalytics，则可以在以下结构中定义度量映射。

```
CreateBatchLoadTask
    MeasureNameColumn
    MultiMeasureMappings
        TargetMultiMeasureName
        MultiMeasureAttributeMappings array
```

**注意**  
建议尽量使用 `MultiMeasureMappings`。

如果 Timestream 中需要单度量目标记录 LiveAnalytics，则可以在以下结构中定义度量映射。

```
CreateBatchLoadTask
    MeasureNameColumn
    MixedMeasureMappings array
        MixedMeasureMapping
            MeasureName
            MeasureValueType
            SourceColumn
            TargetMeasureName
            MultiMeasureAttributeMappings array
```

使用 `MultiMeasureMappings` 时，`MultiMeasureAttributeMappings` 数组始终为必填项。使用 `MixedMeasureMappings` 数组时，如果 `MeasureValueType` 是给定 `MixedMeasureMapping` 的 `MULTI`，则对于 `MixedMeasureMapping`，`MultiMeasureAttributeMappings` 是必填项。否则，`MeasureValueType` 表示单度量记录的度量类型。

无论哪种方式，都有 `MultiMeasureAttributeMapping` 的数组可用。您可以按如下方式定义每个 `MultiMeasureAttributeMapping` 中多度量记录的映射：

`SourceColumn`  
位于 Amazon S3 中的源数据列。

`TargetMultiMeasureAttributeName`  
目标表中目标多度量名称的名称。未提供 `MeasureNameColumn` 时需要此输入。如果已提供 `MeasureNameColumn`，则该列中的值会用作多度量名称。

`MeasureValueType`  
`DOUBLE`、`BIGINT`、`BOOLEAN`、`VARCHAR` 或 `TIMESTAMP` 其中之一。

## 使用 `MultiMeasureMappings` 示例的数据模型映射
<a name="batch-load-data-model-mappings-example-multi"></a>

此示例演示映射到多度量记录的首选方法，将每个度量值存储在专用的列中。您可以通过[示例 CSV](samples/batch-load-sample-file.csv.zip) 下载示例 CSV。该示例具有以下标题，可映射到表的 Timestream 中的目标列。 LiveAnalytics 
+ `time`
+ `measure_name`
+ `region`
+ `location`
+ `hostname`
+ `memory_utilization`
+ `cpu_utilization`

确定 CSV 文件中的 `time` 和 `measure_name` 列。在这种情况下，它们直接映射到同名 LiveAnalytics 表列的时间流。
+ `time` 映射到 `time`
+ `measure_name` 映射到 `measure_name`（或您选择的值）

使用 API 时，您可以在 `TimeColumn` 字段中指定 `time` 以及支持的时间单位值，例如 `TimeUnit` 字段中的 `MILLISECONDS`。这些对应于控制台中**源列名称**和**时间戳时间输入**。您可以使用 `MeasureNameColumn` 密钥定义的 `measure_name`，对记录进行分组或分区。

在示例中，`region`、`location` 和 `hostname` 都是维度。维度映射到 `DimensionMapping` 对象的数组中。

对于度量，该值`TargetMultiMeasureAttributeName`将变为 “时间流” LiveAnalytics 表中的一列。您可以保留相同的名称，如本例中所示。或者，您可以指定新名称。`MeasureValueType` 是 `DOUBLE`、`BIGINT`、`BOOLEAN`、`VARCHAR` 或 `TIMESTAMP` 其中之一。

```
{
  "TimeColumn": "time",
  "TimeUnit": "MILLISECONDS",
  "DimensionMappings": [
    {
      "SourceColumn": "region",
      "DestinationColumn": "region"
    },
    {
      "SourceColumn": "location",
      "DestinationColumn": "location"
    },
    {
      "SourceColumn": "hostname",
      "DestinationColumn": "hostname"
    }
  ],
  "MeasureNameColumn": "measure_name",
  "MultiMeasureMappings": {
    "MultiMeasureAttributeMappings": [
      {
        "SourceColumn": "memory_utilization",
        "TargetMultiMeasureAttributeName": "memory_utilization",
        "MeasureValueType": "DOUBLE"
      },
      {
        "SourceColumn": "cpu_utilization",
        "TargetMultiMeasureAttributeName": "cpu_utilization",
        "MeasureValueType": "DOUBLE"
      }
    ]
  }
}
```

![\[Visual builder interface showing column mappings for timestream data attributes and types.\]](http://docs.aws.amazon.com/zh_cn/timestream/latest/developerguide/images/column-mapping.jpg)


## 使用 `MixedMeasureMappings` 示例的数据模型映射
<a name="batch-load-data-model-mappings-example-mixed"></a>

我们建议您仅在需要映射到 Timestream 中的单度记录时才使用此方法。 LiveAnalytics

# 使用控制台进行批量加载
<a name="batch-load-using-console"></a>

以下是使用 AWS 管理控制台进行批量加载的步骤。您可以通过[示例 CSV](samples/batch-load-sample-file.csv.zip) 下载示例 CSV。

**Topics**
+ [访问批量加载](#console_timestream.access-batch-load.using-console)
+ [创建批量加载任务](#console_timestream.create-batch-load.using-console)
+ [恢复批量加载任务](#console_timestream.resume-batch-load.using-console)
+ [使用可视化生成器](#batch-load-using-visual-builder)

## 访问批量加载
<a name="console_timestream.access-batch-load.using-console"></a>

按照以下步骤，使用 AWS 管理控制台访问批量加载。

1. 打开 [Amazon Timestream 控制台](https://console.aws.amazon.com/timestream)。

1. 在导航窗格中，选择**管理工具**，然后选择**批量加载任务**。

1. 在此处，您可以查看批量加载任务列表，并深入研究特定任务以获取更多详细信息。您还可以创建和恢复任务。

## 创建批量加载任务
<a name="console_timestream.create-batch-load.using-console"></a>

按照以下步骤，使用 AWS 管理控制台创建批量加载任务。

1. 打开 [Amazon Timestream 控制台](https://console.aws.amazon.com/timestream)。

1. 在导航窗格中，选择**管理工具**，然后选择**批量加载任务**。

1. 选择**创建批量加载任务**。

1. 在**导入目标**中，选择以下选项。
   + **目标数据库**：选择在 [创建数据库](console_timestream.md#console_timestream.db.using-console) 中创建的数据库名称。
   + **目标表**：选择在 [创建表](console_timestream.md#console_timestream.table.using-console) 中创建的表名称。

   如有必要，可使用**创建新表**按钮，从此面板添加表。

1. 从**数据来源**中的**数据来源 S3 位置**，选择存储源数据的 S3 存储桶。使用 “**浏览 S3**” 按钮查看活动 AWS 账户有权访问的 S3 资源，或者输入 S3 位置 URL。数据来源必须位于同一区域中。

1. 在**文件格式设置**（可展开部分）中，可使用默认设置解析输入数据。您也可以选择**高级设置**。从此处，您可以选择 **CSV 格式参数**，然后选择参数以解析输入数据。有关这些参数的信息，请参阅[CSV 格式参数](batch-load-preparing-data-file.md#batch-load-data-file-options)。

1. 在**配置数据模型映射**中，配置数据模型。有关其他数据模型指南，请参阅[批量加载的数据模型映射](batch-load-data-model-mappings.md)
   + 在**数据模型映射**中，选择**映射配置输入**，然后选择以下选项之一。
     + **可视化生成器**-要直观地映射数据，请选择**TargetMultiMeasureName**或**MeasureNameColumn**。然后从**可视化生成器**中，映射列。

       当选择单个 CSV 文件作为数据来源时，可视化生成器会自动检测并加载数据来源文件中的源列标题。选择要创建映射的属性及数据类型。

       有关使用可视化生成器的信息，请参阅[使用可视化生成器](#batch-load-using-visual-builder)。
     + **JSON 编辑器**：用于配置数据模型的自由格式 JSON 编辑器。如果您熟悉 Timestream， LiveAnalytics 并且想要构建高级数据模型映射，请选择此选项。
     + **来自 S3 的 JSON 文件**：选择存储在 S3 中的 JSON 模型文件。如果您已配置数据模型并希望将其重复用于其他批量加载，请选择此选项。

1. 在**错误日志报告**的**错误日志 S3 位置**中，选择用于报告错误的 S3 位置。有关如何使用此报告的信息，请参阅[使用批量加载错误报告](batch-load-using-error-reports.md)。

1. 对于**加密密钥类型**，选择下列选项之一。
   + **Amazon S3 托管密钥（SSE-S3）**：Amazon S3 创建、管理和使用的加密密钥。
   + **AWS KMS key (SSE-KMS)**-受 AWS Key Management Service (AWS KMS) 保护的加密密钥。

1. 选择**下一步**。

1. 在**审核和创建页面**上，根据需要审核并编辑设置。
**注意**  
批量加载任务创建后，无法更改批量加载任务设置。任务完成时间将根据导入的数据量而有所不同。

1. 选择**创建批量加载任务**。

## 恢复批量加载任务
<a name="console_timestream.resume-batch-load.using-console"></a>

当选择状态为“进度已停止”且仍可恢复的批量加载任务时，系统会提示您恢复该任务。查看这些任务的详细信息时，还会显示一个带有**恢复任务**按钮的横幅。可恢复任务设有“恢复截止日期”。该日期到期后，任务将无法恢复。

## 使用可视化生成器
<a name="batch-load-using-visual-builder"></a>

您可以使用可视化生成器将存储在 S3 存储桶中的一个或多个 CSV 文件源数据列映射到 LiveAnalytics 表的 Timestream 中的目标列。

**注意**  
您的角色需要文件的 `SelectObjectContent` 权限。否则，您将需要手动添加或删除列。

### 自动加载源列模式
<a name="batch-load-using-visual-builder-auto-load"></a>

如果您只指定一个存储桶，Timestream for LiveAnalytics 可以自动扫描源 CSV 文件中的列名。如果没有现有的映射，则可以选择**导入源列**。

1. 从**映射配置输入设置**中选择**可视化生成器**选项，设置时间戳时间输入。`Milliseconds` 为默认设置。

1. 点击**加载源列**按钮，以导入源数据文件中的列标题。该表将使用数据来源文件中的源列标题名称进行填充。

1. 为每个源列选择**目标表列名**、**Timestream 属性类型**和**数据类型**。

   有关这些列和可能值的详细信息，请参阅[映射字段](#batch-load-using-visual-builder-mapping-fields)。

1. 使用该 drag-to-fill功能一次性设置多列的值。

### 手动添加源列
<a name="batch-load-using-visual-builder-manually-add"></a>

如果您使用的是存储桶或 CSV 前缀，而非单个 CSV，则可以使用**添加列映射**和**删除列映射**按钮，通过可视化编辑器添加和删除列映射。还有一个用于重置映射的按钮。

### 映射字段
<a name="batch-load-using-visual-builder-mapping-fields"></a>
+ **源列名称**：源文件中表示要导入的度量的列名称。当您使用**导**入源列时，Timestream LiveAnalytics 可以自动填充此值。
+ **目标表列名**：可选输入，用于指示目标表中度量的列名。
+ **Timestream 属性类型**：指定源列中数据的属性类型，例如 `DIMENSION`。
  + **TIMESTAMP**：指定何时收集度量。
  + **MULTI**：表示多个度量。
  + **DIMENSION**：时间序列元数据。
  + **MEASURE\$1NAME**：对于单度量记录，这是度量名称。
+ **数据类型**：Timestream 列的类型，例如 `BOOLEAN`。
  + **BIGINT**：64 位整数。
  + **BOOLEAN**：逻辑的两个真值：true 和 false。
  + **DOUBLE**：64 位可变精度数值。
  + **TIMESTAMP**：使用 UTC 纳秒级精度的时点实例，用于跟踪自 Unix epoch 以来的时间。

# 将批量加载与 AWS CLI
<a name="batch-load-using-cli"></a>

**设置**

要开始使用批量加载，请执行以下步骤。

1. 按照中的说明 AWS CLI 进行安装[访问 Amazon Timestream 以使用 LiveAnalytics AWS CLI](Tools.CLI.md)。

1. 运行以下命令以验证 Timestream CLI 命令是否已更新。验证 create-batch-load-task是否在列表中。

   `aws timestream-write help`

1. 按照 [准备批量加载数据文件](batch-load-preparing-data-file.md) 的说明准备数据来源。

1. 按照 [访问 Amazon Timestream 以使用 LiveAnalytics AWS CLI](Tools.CLI.md) 的说明创建数据库和表。

1. 为报告输出创建 S3 存储桶。存储桶必须位于同一区域。有关存储桶的更多信息，请参阅[创建、配置和使用 Amazon S3 存储桶](https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-buckets-s3.html)。

1. 创建批量加载任务。要查看步骤，请参阅[创建批量加载任务](#batch-load-using-cli-create-task)。

1. 确认任务的状态。要查看步骤，请参阅[描述批量加载任务](#batch-load-using-cli-describe-task)。

## 创建批量加载任务
<a name="batch-load-using-cli-create-task"></a>

您可以使用 `create-batch-load-task` 命令创建批量加载任务。使用 CLI 创建批量加载任务时，可使用 JSON 参数 `cli-input-json`，该参数允许您将参数聚合到单个 JSON 片段中。您还可以使用其他多个参数（包括 `data-model-configuration`、`data-source-configuration`、`report-configuration`、`target-database-name` 和 `target-table-name`）将这些细节拆分开。

有关示例，请参阅[创建批量加载任务示例](#batch-load-using-cli-example)。

## 描述批量加载任务
<a name="batch-load-using-cli-describe-task"></a>

您可以按以下方式检索批量加载任务描述。

```
aws timestream-write describe-batch-load-task --task-id <value>
```

以下是示例响应。

```
{
    "BatchLoadTaskDescription": {
        "TaskId": "<TaskId>",
        "DataSourceConfiguration": {
            "DataSourceS3Configuration": {
                "BucketName": "test-batch-load-west-2",
                "ObjectKeyPrefix": "sample.csv"
            },
            "CsvConfiguration": {},
            "DataFormat": "CSV"
        },
        "ProgressReport": {
            "RecordsProcessed": 2,
            "RecordsIngested": 0,
            "FileParseFailures": 0,
            "RecordIngestionFailures": 2,
            "FileFailures": 0,
            "BytesIngested": 119
        },
        "ReportConfiguration": {
            "ReportS3Configuration": {
                "BucketName": "test-batch-load-west-2",
                "ObjectKeyPrefix": "<ObjectKeyPrefix>",
                "EncryptionOption": "SSE_S3"
            }
        },
        "DataModelConfiguration": {
            "DataModel": {
                "TimeColumn": "timestamp",
                "TimeUnit": "SECONDS",
                "DimensionMappings": [
                    {
                        "SourceColumn": "vehicle",
                        "DestinationColumn": "vehicle"
                    },
                    {
                        "SourceColumn": "registration",
                        "DestinationColumn": "license"
                    }
                ],
                "MultiMeasureMappings": {
                    "TargetMultiMeasureName": "test",
                    "MultiMeasureAttributeMappings": [
                        {
                            "SourceColumn": "wgt",
                            "TargetMultiMeasureAttributeName": "weight",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "SourceColumn": "spd",
                            "TargetMultiMeasureAttributeName": "speed",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "SourceColumn": "fuel",
                            "TargetMultiMeasureAttributeName": "fuel",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "SourceColumn": "miles",
                            "TargetMultiMeasureAttributeName": "miles",
                            "MeasureValueType": "DOUBLE"
                        }
                    ]
                }
            }
        },
        "TargetDatabaseName": "BatchLoadExampleDatabase",
        "TargetTableName": "BatchLoadExampleTable",
        "TaskStatus": "FAILED",
        "RecordVersion": 1,
        "CreationTime": 1677167593.266,
        "LastUpdatedTime": 1677167602.38
    }
}
```

## 列出批量加载任务
<a name="batch-load-using-cli-list-tasks"></a>

您可以按以下方式列出批量加载任务。

```
aws timestream-write list-batch-load-tasks
```

输出如下所示。

```
{
    "BatchLoadTasks": [
        {
            "TaskId": "<TaskId>",
            "TaskStatus": "FAILED",
            "DatabaseName": "BatchLoadExampleDatabase",
            "TableName": "BatchLoadExampleTable",
            "CreationTime": 1677167593.266,
            "LastUpdatedTime": 1677167602.38
        }
    ]
}
```

## 恢复批量加载任务
<a name="batch-load-using-cli-resume-task"></a>

您可以按以下方式恢复批量加载任务。

```
aws timestream-write resume-batch-load-task --task-id <value>
```

响应可能表示成功，也可能包含错误信息。

## 创建批量加载任务示例
<a name="batch-load-using-cli-example"></a>

**Example**  

1. 为名为的 LiveAnalytics 数据库`BatchLoad`和名为的表创建时间流。`BatchLoadTest`验证并根据需要调整 `MemoryStoreRetentionPeriodInHours` 和 `MagneticStoreRetentionPeriodInDays` 的值。

   ```
   aws timestream-write create-database --database-name BatchLoad \
   
   aws timestream-write create-table --database-name BatchLoad \
   --table-name BatchLoadTest \
   --retention-properties "{\"MemoryStoreRetentionPeriodInHours\": 12, \"MagneticStoreRetentionPeriodInDays\": 100}"
   ```

1. 使用控制台创建 S3 存储桶，并将 `sample.csv` 文件复制到该位置。您可以通过[示例 CSV](samples/batch-load-sample-file.csv.zip) 下载示例 CSV。

1. 使用控制台为 Timestream 创建 S3 存储桶 LiveAnalytics ，以便在批量加载任务完成时出现错误时编写报告。

1. 创建批量加载任务。请务必*\$1REPORT\$1BUCKET*使用您在前面步骤中创建的存储桶替换*\$1INPUT\$1BUCKET*和。

   ```
   aws timestream-write create-batch-load-task \
   --data-model-configuration "{\
               \"DataModel\": {\
                 \"TimeColumn\": \"timestamp\",\
                 \"TimeUnit\": \"SECONDS\",\
                 \"DimensionMappings\": [\
                   {\
                     \"SourceColumn\": \"vehicle\"\
                   },\
                   {\
                     \"SourceColumn\": \"registration\",\
                     \"DestinationColumn\": \"license\"\
                   }\
                 ],
                 \"MultiMeasureMappings\": {\
                   \"TargetMultiMeasureName\": \"mva_measure_name\",\
                   \"MultiMeasureAttributeMappings\": [\
                     {\
                       \"SourceColumn\": \"wgt\",\
                       \"TargetMultiMeasureAttributeName\": \"weight\",\
                       \"MeasureValueType\": \"DOUBLE\"\
                     },\
                     {\
                       \"SourceColumn\": \"spd\",\
                       \"TargetMultiMeasureAttributeName\": \"speed\",\
                       \"MeasureValueType\": \"DOUBLE\"\
                     },\
                     {\
                       \"SourceColumn\": \"fuel_consumption\",\
                       \"TargetMultiMeasureAttributeName\": \"fuel\",\
                       \"MeasureValueType\": \"DOUBLE\"\
                     },\
                     {\
                       \"SourceColumn\": \"miles\",\
                       \"MeasureValueType\": \"BIGINT\"\
                     }\
                   ]\
                 }\
               }\
             }" \
   --data-source-configuration "{
               \"DataSourceS3Configuration\": {\
                 \"BucketName\": \"$INPUT_BUCKET\",\
                 \"ObjectKeyPrefix\": \"$INPUT_OBJECT_KEY_PREFIX\"
               },\
               \"DataFormat\": \"CSV\"\
             }" \
   --report-configuration "{\
               \"ReportS3Configuration\": {\
                 \"BucketName\": \"$REPORT_BUCKET\",\
                 \"EncryptionOption\": \"SSE_S3\"\
               }\
             }" \
   --target-database-name BatchLoad \
   --target-table-name BatchLoadTest
   ```

   上述命令返回以下输出。

   ```
   {
       "TaskId": "TaskId "
   }
   ```

1. 检查任务的进度。请务必*\$1TASK\$1ID*使用在上一步中返回的任务 ID 进行替换。

   ```
   aws timestream-write describe-batch-load-task --task-id $TASK_ID 
   ```
**输出示例**  

```
{
    "BatchLoadTaskDescription": {
        "ProgressReport": {
            "BytesIngested": 1024,
            "RecordsIngested": 2,
            "FileFailures": 0,
            "RecordIngestionFailures": 0,
            "RecordsProcessed": 2,
            "FileParseFailures": 0
        },
        "DataModelConfiguration": {
            "DataModel": {
                "DimensionMappings": [
                    {
                        "SourceColumn": "vehicle",
                        "DestinationColumn": "vehicle"
                    },
                    {
                        "SourceColumn": "registration",
                        "DestinationColumn": "license"
                    }
                ],
                "TimeUnit": "SECONDS",
                "TimeColumn": "timestamp",
                "MultiMeasureMappings": {
                    "MultiMeasureAttributeMappings": [
                        {
                            "TargetMultiMeasureAttributeName": "weight",
                            "SourceColumn": "wgt",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "TargetMultiMeasureAttributeName": "speed",
                            "SourceColumn": "spd",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "TargetMultiMeasureAttributeName": "fuel",
                            "SourceColumn": "fuel_consumption",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "TargetMultiMeasureAttributeName": "miles",
                            "SourceColumn": "miles",
                            "MeasureValueType": "DOUBLE"
                        }
                    ],
                    "TargetMultiMeasureName": "mva_measure_name"
                }
            }
        },
        "TargetDatabaseName": "BatchLoad",
        "CreationTime": 1672960381.735,
        "TaskStatus": "SUCCEEDED",
        "RecordVersion": 1,
        "TaskId": "TaskId ",
        "TargetTableName": "BatchLoadTest",
        "ReportConfiguration": {
            "ReportS3Configuration": {
                "EncryptionOption": "SSE_S3",
                "ObjectKeyPrefix": "ObjectKeyPrefix ",
                "BucketName": "amzn-s3-demo-bucket"
            }
        },
        "DataSourceConfiguration": {
            "DataSourceS3Configuration": {
                "ObjectKeyPrefix": "sample.csv",
                "BucketName": "amzn-s3-demo-source-bucket"
            },
            "DataFormat": "CSV",
            "CsvConfiguration": {}
        },
        "LastUpdatedTime": 1672960387.334
    }
}
```

# 将批量加载与 AWS SDKs
<a name="batch-load-using-sdk"></a>

有关如何使用创建、描述和列出批量加载任务的示例 AWS SDKs，请参阅[创建批量加载任务](code-samples.create-batch-load.md)[描述批量加载任务](code-samples.describe-batch-load.md)、[列出批量加载任务](code-samples.list-batch-load-tasks.md)、和[恢复批量加载任务](code-samples.resume-batch-load-task.md)。

# 使用批量加载错误报告
<a name="batch-load-using-error-reports"></a>

批量加载任务具有以下状态值之一：
+ `CREATED`（**已创建**）：任务已创建。
+ `IN_PROGRESS`（**进行中**）：任务正在进行中。
+ `FAILED`（**失败**）：任务已完成。但检测到一个或多个错误。
+ `SUCCEEDED`（**已完成**）：任务已完成，未出现任何错误。
+ `PROGRESS_STOPPED`（**进度已停止**）：任务已停止但尚未完成。您可以尝试恢复任务。
+ `PENDING_RESUME`（**待恢复**）：任务处于待恢复状态。

当出现错误时，会在为此定义的 S3 存储桶中创建错误日志报告。不同数组中的错误可分类为 taskErrors 或 fileErrors。以下是错误报告示例。

```
{
    "taskId": "9367BE28418C5EF902676482220B631C",
    "taskErrors": [],
    "fileErrors": [
        {
            "fileName": "example.csv",
            "errors": [
                {
                    "reason": "The record timestamp is outside the time range of the data ingestion window.",
                    "lineRanges": [
                        [
                            2,
                            3
                        ]
                    ]
                }
            ]
        }
    ]
}
```