

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 教程：使用 cqlsh 将数据加载到 Amazon Keyspaces
<a name="bulk-upload"></a>

本教程将指导您完成使用 `cqlsh COPY FROM` 命令将数据从 Apache Cassandra 迁移到 Amazon Keyspaces 的过程。`cqlsh COPY FROM` 命令对于出于学术或测试目的快速轻松地将小型数据集上传到 Amazon Keyspaces 而言非常有用。有关如何迁移生产工作负载的更多信息，请参阅[离线迁移过程：Apache Cassandra 到 Amazon Keyspaces](migrating-offline.md)。在本教程中，您将完成以下步骤：

先决条件-使用证书设置 AWS 账户，为证书创建 JKS 信任存储文件，然后配置为连接`cqlsh`到 Amazon Keyspaces。

1. **创建源 CSV 和目标表** - 准备一个 CSV 文件作为源数据，然后在 Amazon Keyspaces 中创建目标键空间和表。

1. **准备数据** - 随机采样 CSV 文件中的数据，并对其进行分析，以确定平均行大小和最大行大小。

1. **设置吞吐容量**-根据数据大小和所需的加载时间计算所需的写入容量单位 (WCUs)，并配置表的预配置容量。

1. **配置 cqlsh 参数** - 确定诸如 `INGESTRATE`、`NUMPROCESSES`、`MAXBATCHSIZE` 和 `CHUNKSIZE` 之类的 `cqlsh COPY FROM` 参数的最佳值，以均匀分配工作负载。

1. **运行 `cqlsh COPY FROM` 命令** - 运行 `cqlsh COPY FROM` 命令，将 CSV 文件中的数据上传到 Amazon Keyspaces 表，并监控进度。

疑难解答 - 解决数据上传过程中出现的常见问题，例如无效请求、解析器错误、容量错误和 cqlsh 错误。

**Topics**
+ [先决条件：在使用 `cqlsh COPY FROM` 上传数据之前需要完成的步骤](bulk-upload-prequs.md)
+ [第 1 步：为数据上传创建源 CSV 文件和目标表](bulk-upload-source.md)
+ [第 2 步：为成功上传数据做好源数据准备](bulk-upload-prepare-data.md)
+ [步骤 3：为表设置吞吐容量](bulk-upload-capacity.md)
+ [步骤 4：配置 `cqlsh COPY FROM` 设置](bulk-upload-config.md)
+ [第 5 步：运行 `cqlsh COPY FROM` 命令将 CSV 文件中的数据上传到目标表](bulk-upload-run.md)
+ [问题排查](bulk-upload-troubleshooting.md)

# 先决条件：在使用 `cqlsh COPY FROM` 上传数据之前需要完成的步骤
<a name="bulk-upload-prequs"></a>

在开始本教程之前，您必须完成以下任务：

1. 如果您还没有这样做，请 AWS 账户 按照中的步骤进行注册[设置 AWS Identity and Access Management](accessing.md#SettingUp.IAM)。

1. 按照[创建用于通过编程方式访问 Amazon Keyspaces 的服务特定凭证。](programmatic.credentials.ssc.md)中的步骤创建特定于服务的凭证。

1. 设置 Cassandra 查询语言 Shell (cqlsh) 连接，并按照[使用 `cqlsh` 连接 Amazon Keyspaces](programmatic.cqlsh.md) 中的步骤确认您可以连接到 Amazon Keyspaces。

# 第 1 步：为数据上传创建源 CSV 文件和目标表
<a name="bulk-upload-source"></a>

在本教程中，我们使用名为 `keyspaces_sample_table.csv` 的逗号分隔值 (CSV) 文件作为用于数据迁移的源文件。提供的示例文件包含名为 `book_awards` 的表中的几行数据。

1. 创建源文件。您可以选择以下选项之一：
   + 下载以下存档文件 [samplemigration.zip](samples/samplemigration.zip) 中包含的示例 CSV 文件 (`keyspaces_sample_table.csv`)。解压缩存档文件并记下指向 `keyspaces_sample_table.csv` 的路径。
   + 要使用您自己存储在 Apache Cassandra 数据库中的数据来填充 CSV 文件，您可以使用 `cqlsh` `COPY TO` 语句来填充源 CSV 文件，如以下示例所示。

     ```
     cqlsh localhost 9042 -u "username" -p "password" --execute "COPY mykeyspace.mytable TO 'keyspaces_sample_table.csv' WITH HEADER=true";
     ```

     确保您创建的 CSV 文件符合以下要求：
     + 第一行包含列名称。
     + 源 CSV 文件中的列名称与目标表中的列名称相匹配。
     + 数据用逗号分隔。
     + 所有数据值均为有效的 Amazon Keyspaces 数据类型。请参阅[数据类型](cql.elements.md#cql.data-types)。

1. 在 Amazon Keyspaces 中创建目标键空间和表。

   1. 使用 `cqlsh` 连接到 Amazon Keyspaces，将以下示例中的服务端点、用户名和密码替换为您自己的值。

      ```
      cqlsh cassandra.us-east-1.amazonaws.com 9142 -u "111122223333" -p "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" --ssl
      ```

   1. 使用名称 `catalog` 创建新的键空间，如以下示例所示。

      ```
      CREATE KEYSPACE catalog WITH REPLICATION = {'class': 'SingleRegionStrategy'};
      ```

   1. 当新的键空间可用后，使用以下代码创建目标表 `book_awards`。

      ```
      CREATE TABLE "catalog.book_awards" (
         year int,
         award text,
         rank int, 
         category text,
         book_title text,
         author text, 
         publisher text,
         PRIMARY KEY ((year, award), category, rank)
         );
      ```

   如果 Apache Cassandra 是您的原始数据来源，那么创建带有匹配标题的 Amazon Keyspaces 目标表的一种简单方法是从源表生成 `CREATE TABLE` 语句，如以下语句所示。

   ```
   cqlsh localhost 9042  -u "username" -p "password" --execute "DESCRIBE TABLE mykeyspace.mytable;"
   ```

   然后，在 Amazon Keyspaces 中创建目标表，其列名称和数据类型与 Cassandra 源表中的描述相匹配。

# 第 2 步：为成功上传数据做好源数据准备
<a name="bulk-upload-prepare-data"></a>

为高效传输准备源数据的过程包含两个步骤。第一步，随机化数据。第二步，分析数据以确定相应的 `cqlsh` 参数值和所需的表设置，确保数据上传成功。

**随机化数据**  
`cqlsh COPY FROM` 命令按数据在 CSV 文件中显示的顺序读取和写入数据。如果使用 `cqlsh COPY TO` 命令创建源文件，将在 CSV 中按键排序顺序写入数据。Amazon Keyspaces 在内部使用分区键对数据进行分区。尽管 Amazon Keyspaces 具有内置逻辑来帮助对针对同一分区键的请求进行负载均衡，但如果您随机排列顺序，则可以更快、更高效地加载数据。这是因为您可以利用 Amazon Keyspaces 在写入不同分区时会出现的内置负载均衡功能。

要将写入操作均匀地分布在分区中，您必须随机化源文件中的数据。您可以编写一个应用程序来执行此操作，也可以使用开源工具来执行此操作，比如 [Shuf](https://en.wikipedia.org/wiki/Shuf)。Shuf 在 Linux 发行版、macOS（通过在 [Homebrew](https://brew.sh) 中安装 coreutils）和 Windows [通过使用 Windows Subsystem for Linux (WSL)] 上免费提供。您还需要执行一个额外步骤来防止包含列名称的标题行在此步骤中被随机排序。

要在保留标题的同时随机化源文件，请输入以下代码。

```
tail -n +2 keyspaces_sample_table.csv | shuf -o keyspace.table.csv && (head -1 keyspaces_sample_table.csv && cat keyspace.table.csv ) > keyspace.table.csv1 && mv keyspace.table.csv1 keyspace.table.csv
```

Shuf 将数据重写到名为 `keyspace.table.csv` 的新 CSV 文件中。现在，您可以删除 `keyspaces_sample_table.csv` 文件，您不再需要此文件了。

**分析数据**  
通过分析数据来确定平均行大小和最大行大小。

执行此操作出于以下原因：
+ 平均行大小有助于估算要传输的数据总量。
+ 您需要平均行大小来预置上传数据所需的写入容量。
+ 您可以确保每行的大小小于 1MB，这是 Amazon Keyspaces 中的最大行大小。

**注意**  
此限额指的是行大小，而不是分区大小。与 Apache Cassandra 分区不同，Amazon Keyspaces 分区实际上可以不受大小限制。分区键和聚类列需要额外的元数据存储空间，您必须将其加到行的原始大小中。有关更多信息，请参阅 [估算 Amazon Keyspaces 中的行大小](calculating-row-size.md)。

以下代码使用 [AWK](https://en.wikipedia.org/wiki/AWK) 分析 CSV 文件并打印平均行大小和最大行大小。

```
awk -F, 'BEGIN {samp=10000;max=-1;}{if(NR>1){len=length($0);t+=len;avg=t/NR;max=(len>max ? len : max)}}NR==samp{exit}END{printf("{lines: %d, average: %d bytes, max: %d bytes}\n",NR,avg,max);}' keyspace.table.csv
```

运行此代码将生成以下输出。

```
using 10,000 samples:
{lines: 10000, avg: 123 bytes, max: 225 bytes}
```

在本教程的下一步中，您将使用平均行大小来预置表的写入容量。

# 步骤 3：为表设置吞吐容量
<a name="bulk-upload-capacity"></a>

本教程向您展示了如何调整 cqlsh 以在设定的时间范围内加载数据。由于您提前知道自己要执行多少读取和写入操作，因此可以使用预置容量模式。完成数据传输后，应该将表的容量模式设置为与应用程序的流量模式相匹配。要了解有关容量管理的更多信息，请参阅 [在 Amazon Keyspaces（Apache Cassandra 兼容）中管理无服务器资源](serverless_resource_management.md)。

使用预置容量模式，您可以提前指定要为表预置多少读取和写入容量。写入容量按小时计费，并以写入容量单位 () WCUs 计量。每个 WCU 的写入容量足以支持每秒写入 1KB 数据。加载数据时，写入速率必须低于目标表上设置的最大值 WCUs（参数:`write_capacity_units`）。

默认情况下，您最多可以为一个表预配置 40,000， WCUs 为账户中的 WCUs 所有表配置最多 80,000。如果您需要更多容量，可以在[服务限额](https://console.aws.amazon.com/servicequotas/home#!/services/cassandra/quotas)控制台中请求提高限额。有关限额的更多信息，请参阅[Amazon Keyspaces（Apache Cassandra 兼容）限额](quotas.md)。

**计算刀片 WCUs 所需的平均数量**  
每秒插入 1KB 数据需要 1 个 WCU。如果您的 CSV 文件有 360000 行，并且您想在 1 小时内加载所有数据，则必须每秒写入 100 行（360000 行/60 分/60 秒 = 每秒 100 行）。如果每行包含最多 1 KB 的数据，则要每秒插入 100 行，则必须为表预配置 100 WCUs 行。如果每行有 1.5 KB 的数据，则需要两行 WCUs 才能每秒插入一行。因此，要每秒插入 100 行，必须预置 200 行 WCUs。

要确定每秒需要插入多少 WCUs 行，请将平均行大小（以字节为单位）除以 1024，然后向上舍入到最接近的整数。

例如，如果平均行大小为 3000 字节，则需要三个字节 WCUs 才能每秒插入一行。

```
ROUNDUP(3000 / 1024) = ROUNDUP(2.93) = 3 WCUs
```

**计算数据加载时间和容量**  
既然您已经知道了 CSV 文件中的平均大小和行数，就可以计算出在给定时间内需要加载多少 WCUs 数据，以及使用不同的 WCU 设置在 CSV 文件中加载所有数据所花费的大致时间。

例如，如果文件中的每行为 1 KB，而 CSV 文件中有 1,000,000 行，则要在 1 小时内加载数据，则需要在该小时内为表预置至少 278 WCUs 行。

```
1,000,000 rows * 1 KBs = 1,000,000 KBs
1,000,000 KBs / 3600 seconds =277.8 KBs / second = 278 WCUs
```

**配置预置容量设置**  
您可以在创建表时或使用 `ALTER TABLE` CQL 命令来设置表的写入容量设置。以下是使用 `ALTER TABLE` CQL 语句来更改表的预置容量设置的语法。

```
ALTER TABLE mykeyspace.mytable WITH custom_properties={'capacity_mode':{'throughput_mode': 'PROVISIONED', 'read_capacity_units': 100, 'write_capacity_units': 278}} ; 
```

有关完整的语言参考，请参阅 [ALTER TABLE](cql.ddl.table.md#cql.ddl.table.alter)。

# 步骤 4：配置 `cqlsh COPY FROM` 设置
<a name="bulk-upload-config"></a>

本部分概述如何确定 `cqlsh COPY FROM` 的参数值。`cqlsh COPY FROM` 命令读取您之前准备的 CSV 文件，并使用 CQL 将数据插入到 Amazon Keyspaces 中。该命令将行分开，并将 `INSERT` 操作分配给一组 Worker。每个 Worker 与 Amazon Keyspaces 建立连接并通过该通道发送 `INSERT` 请求。

`cqlsh COPY` 命令没有在 Worker 之间均匀分配工作的内部逻辑。但是，您可以手动对其进行配置，以确保均匀分配工作。首先查看以下关键的 cqlsh 参数：
+ **DELIMITER**：如果您使用逗号以外的分隔符，则可以设置此参数，此参数默认为逗号。
+ **INGESTRATE**：`cqlsh COPY FROM` 每秒尝试处理的目标行数。如果未设置，则默认为 100000。
+ **NUMPROCESSES**：cqlsh 为 `COPY FROM` 任务创建的子 Worker 进程的数量。此设置的最大值为 16，默认值为 `num_cores - 1`，其中 `num_cores` 是运行 cqlsh 的主机上的处理内核数。
+ **MAXBATCHSIZE**：批次大小决定了在单个批次中插入到目标表中的最大行数。如果未设置，cqlsh 将使用插入 20 行的批次。
+ **CHUNKSIZE**：传递给子 Worker 的工作单元的大小。默认情况下，它设置为 5000。
+ **MAXATTEMPTS**：重试失败 Worker 块的最大次数。达到最大尝试次数后，失败记录将写入一个新的 CSV 文件中，您可以在调查失败后再次运行该文件。

`INGESTRATE`根据您配置到目标目标表 WCUs 的数量进行设置。`cqlsh COPY FROM` 命令的 `INGESTRATE` 不是限制，而是目标平均值。这意味着它可以（并且经常）突破您设定的数字。要允许暴增并确保有足够的容量来处理数据加载请求，请将 `INGESTRATE` 设置为表写入容量的 90%。

```
INGESTRATE = WCUs * .90
```

接下来，将 `NUMPROCESSES` 参数设置为比系统上的内核数少一个。要弄清楚系统的内核数，您可以运行以下代码。

```
python -c "import multiprocessing; print(multiprocessing.cpu_count())"
```

在本教程中，我们使用以下值。

```
NUMPROCESSES = 4
```

每个进程都会创建一个 Worker，并且每个 Worker 都会与 Amazon Keyspaces 建立连接。Amazon Keyspaces 在每个连接上每秒可支持最多 3000 个 CQL 请求。这意味着您必须确保每个 Worker 每秒处理的请求少于 3000 个。

与 `INGESTRATE` 一样，Worker 经常会突破您设置的数字，并且不受时钟秒数的限制。因此，考虑到暴增，请将 cqlsh 参数设置为每个 Worker 每秒处理 2500 个请求。要计算分配给 Worker 的工作量，请使用以下准则。
+ `INGESTRATE` 除以 `NUMPROCESSES`。
+ 如果 `INGESTRATE`/`NUMPROCESSES` > 2500，请降低 `INGESTRATE` 以使此公式成立。

```
INGESTRATE / NUMPROCESSES <= 2,500
```

在配置设置以优化示例数据的上传之前，让我们回顾一下 `cqlsh` 默认设置，看看使用它们会如何影响数据上传过程。由于 `cqlsh COPY FROM` 使用 `CHUNKSIZE` 创建工作块（`INSERT` 语句）以分配给 Worker，因此工作不会自动均匀分配。根据 `INGESTRATE` 设置，有些 Worker 可能会处于闲置状态。

要在 Worker 之间均匀分配工作并使每个 Worker 保持每秒 2500 个请求的最佳速率，必须通过更改输入参数来设置 `CHUNKSIZE`、`MAXBATCHSIZE`、和 `INGESTRATE`。要优化数据加载期间的网络流量利用率，请为 `MAXBATCHSIZE` 选择一个接近最大值 30 的值。通过将 `CHUNKSIZE` 更改为 100，将 `MAXBATCHSIZE` 更改为 25，10000 行将均匀分配给四个 Worker（10000/2500 = 4）。

以下代码示例说明了如何执行此操作。

```
INGESTRATE = 10,000
NUMPROCESSES = 4
CHUNKSIZE = 100
MAXBATCHSIZE. = 25
Work Distribution:
Connection 1 / Worker 1 : 2,500 Requests per second
Connection 2 / Worker 2 : 2,500 Requests per second
Connection 3 / Worker 3 : 2,500 Requests per second
Connection 4 / Worker 4 : 2,500 Requests per second
```

总而言之，在设置 `cqlsh COPY FROM` 参数时使用以下公式：
+ **INGESTRATE** = 写入容量单位 \$1 0.90
+ **NUMPROCESSES** = 内核数 - 1（默认设置）
+ **INGESTRATE/NUMPROCESSES** = 2500（这必须是一个真语句。）
+ **MAXBATCHSIZE** = 30（默认为 20。Amazon Keyspaces 最多可接受 30 个批次。）
+ **CHUNKSIZE** = (INGESTRATE/NUMPROCESSES)/MAXBATCHSIZE

现在您已经计算了 `NUMPROCESSES`、`INGESTRATE` 和 `CHUNKSIZE`，接下来可以加载数据。

# 第 5 步：运行 `cqlsh COPY FROM` 命令将 CSV 文件中的数据上传到目标表
<a name="bulk-upload-run"></a>

要运行 `cqlsh COPY FROM` 命令，请完成以下步骤。

1. 使用 cqlsh 连接到 Amazon Keyspaces。

1. 使用以下代码选择键空间。

   ```
   USE catalog;
   ```

1. 将写入一致性设置为 `LOCAL_QUORUM`。为了确保数据的持久性，Amazon Keyspaces 不允许使用其他写入一致性设置。查看以下代码。

   ```
   CONSISTENCY LOCAL_QUORUM;
   ```

1. 使用以下代码示例准备 `cqlsh COPY FROM` 语法。

   ```
   COPY book_awards FROM './keyspace.table.csv' WITH HEADER=true 
   AND INGESTRATE=calculated ingestrate 
   AND NUMPROCESSES=calculated numprocess
   AND MAXBATCHSIZE=20 
   AND CHUNKSIZE=calculated chunksize;
   ```

1. 运行上一步中准备的语句。cqlsh 会回显您配置的所有设置。

   1. 确保设置与您的输入相匹配。请参阅以下示例。

      ```
      Reading options from the command line: {'chunksize': '120', 'header': 'true', 'ingestrate': '36000', 'numprocesses': '15', 'maxbatchsize': '20'}
      Using 15 child processes
      ```

   1. 查看传输的行数和当前的平均速率，如以下示例所示。

      ```
      Processed: 57834 rows; Rate: 6561 rows/s; Avg. rate: 31751 rows/s
      ```

   1. 当 cqlsh 完成数据上传后，查看数据加载统计信息（读取的文件数、运行时和跳过的行数）的摘要，如以下示例所示。

      ```
      15556824 rows imported from 1 files in 8 minutes and 8.321 seconds (0 skipped).
      ```

在本教程的最后一步中，您已将数据上传到 Amazon Keyspaces。

**重要**  
现在您已经传输了数据，接下来调整目标表的容量模式设置，使其与应用程序的常规流量模式相匹配。在更改之前，您的预置容量按小时费率收费。

# 问题排查
<a name="bulk-upload-troubleshooting"></a>

数据上传完成后，检查是否跳过了行。为此，请导航到源 CSV 文件的源目录并搜索具有以下名称的文件。

```
import_yourcsvfilename.err.timestamp.csv
```

cqlsh 将所有跳过的数据行写入具有该名称的文件中。如果文件存在于源目录中且其中包含数据，则说明这些行未上传到 Amazon Keyspaces。要重试这些行，请先检查上传过程中是否遇到任何错误，然后相应地调整数据。要重试这些行，您可以重新运行进程。



**常见错误**  
行未加载的最常见原因是容量错误和解析错误。

**将数据上传到 Amazon Keyspaces 时出现无效请求错误**

在以下示例中，源表包含一个计数器列，该列会生成来自 cqlsh `COPY` 命令的记录的批处理调用。Amazon Keyspaces 不支持记录的批处理调用。

```
Failed to import 10 rows: InvalidRequest - Error from server: code=2200 [Invalid query] message=“Only UNLOGGED Batches are supported at this time.“,  will retry later, attempt 22 of 25
```

要解决此错误， DSBulk 请使用迁移数据。有关更多信息，请参阅 [教程：使用将数据加载到 Amazon Keyspaces DSBulk](dsbulk-upload.md)。

**将数据上传到 Amazon Keyspaces 时出现解析器错误**

以下示例显示了由于 `ParseError` 而跳过的行。

```
Failed to import 1 rows: ParseError - Invalid ... – 
```

要解决此错误，您需要确保要导入的数据与 Amazon Keyspaces 中的表模式相匹配。查看导入文件中是否存在解析错误。您可以尝试通过 `INSERT` 语句来使用单行数据，从而隔离错误。

**将数据上传到 Amazon Keyspaces 时出现容量错误**

```
Failed to import 1 rows: WriteTimeout - Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses]
 message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 2, 'write_type': 'SIMPLE', 'consistency': 
 'LOCAL_QUORUM'}, will retry later, attempt 1 of 100
```

Amazon Keyspaces 使用 `ReadTimeout` 和 `WriteTimeout` 异常指示写入请求何时因吞吐容量不足而失败。为了帮助诊断容量不足的异常，Amazon Keyspaces 在亚马逊上发布了`WriteThrottleEvents``ReadThrottledEvents`指标。 CloudWatch有关更多信息，请参阅 [使用亚马逊监控亚马逊密钥空间 CloudWatch](monitoring-cloudwatch.md)。

**将数据上传到 Amazon Keyspaces 时出现 cqlsh 错误**

要帮助对 cqlsh 错误进行问题排查，请重新运行带有 `--debug` 标志的失败命令。

使用不兼容的 cqlsh 版本时，您会看到以下错误。

```
AttributeError: 'NoneType' object has no attribute 'is_up'
Failed to import 3 rows: AttributeError - 'NoneType' object has no attribute 'is_up',  given up after 1 attempts
```

通过运行以下命令确认已安装正确版本的 cqlsh。

```
cqlsh --version
```

输出应该类似于以下内容。

```
cqlsh 5.0.1
```

如果您使用的是 Windows，请将 `cqlsh` 的所有实例替换为 `cqlsh.bat`。例如，要检查 Windows 中的 cqlsh 版本，请运行以下命令。

```
cqlsh.bat --version
```

cqlsh 客户端从服务器连续收到三个任意类型的错误后，与 Amazon Keyspaces 的连接失败。cqlsh 客户端失败时显示以下消息。

```
Failed to import 1 rows: NoHostAvailable - , will retry later, attempt 3 of 100
```

要解决此错误，您需要确保要导入的数据与 Amazon Keyspaces 中的表模式相匹配。查看导入文件中是否存在解析错误。您可以尝试通过 INSERT 语句来使用单行数据，从而隔离错误。

客户端会自动尝试重新建立连接。