

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# CREATE PUMP
<a name="sql-reference-create-pump"></a>

数据泵是 Amazon Kinesis Data Analytics 存储库对象（SQL 标准的扩展），提供连续运行的 INSERT INTO 流 SELECT ... FROM 查询功能，从而使查询的结果能够连续输入到命名流中。

您需要同时为查询和命名流指定列列表（这表示一组源-目标对）。列列表在数据类型方面需要匹配，否则 SQL 验证器将拒绝它们。（它们不必列出目标流中的所有列；您可以为某个列设置数据泵。）

有关更多信息，请参阅 [SELECT 语句](sql-reference-select.md)。

以下代码首先创建并设置架构，然后在此架构中创建两个流：
+ “OrderDataWithCreateTime” 将作为泵的源流。
+ “OrderData” 将用作泵的目标流。

```
CREATE OR REPLACE STREAM "OrderDataWithCreateTime" (
"key_order" VARCHAR(20),
"key_user" VARCHAR(20),
"key_billing_country" VARCHAR(20),
"key_product" VARCHAR(20),
"quantity" VARCHAR(20),
"eur" VARCHAR(20),
"usd" VARCHAR(20))
DESCRIPTION 'Creates origin stream for pump';

CREATE OR REPLACE STREAM "OrderData" (
"key_order" VARCHAR(20),
"key_user" VARCHAR(20),
"country" VARCHAR(20),
"key_product" VARCHAR(20),
"quantity" VARCHAR(20),
"eur" INTEGER,
"usd" INTEGER)
DESCRIPTION 'Creates destination stream for pump';
```

以下代码使用这两个流来创建数据泵。从 “OrderDataWithCreateTime” 中选择数据并插入到 “OrderData” 中。

```
CREATE OR REPLACE PUMP "200-ConditionedOrdersPump" AS
INSERT INTO "OrderData" (
"key_order", "key_user", "country",
"key_product", "quantity", "eur", "usd")
//note that this list matches that of the query
SELECT STREAM
"key_order", "key_user", "key_billing_country",
"key_product", "quantity", "eur", "usd"
//note that this list matches that of the insert statement
FROM "OrderDataWithCreateTime";
```

有关更多详细信息，请参阅 *Amazon Managed Service for Apache Flink Developer Guide* 中的 [In-Application Streams and Pumps](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/streams-pumps.html) 主题。

## 语法
<a name="sqlrf_create_pump_syntax"></a>

```
 CREATE [ OR REPLACE ] PUMP <qualified-pump-name> 
                       [ DESCRIPTION '<string-literal>' ] AS <streaming-insert>
```

其中 streaming-insert 是一个插入语句，例如：

```
INSERT INTO ''stream-name'' SELECT "columns" FROM <source stream>
```