用于在 DynamoDB 中导出、导入和查询数据的 Hive 命令示例
以下示例使用 Hive 命令执行将数据导出到 Amazon S3 或 HDFS、将数据导入到 DynamoDB、连接表、查询表等操作。
对 Hive 表执行的操作将引用 DynamoDB 中存储的数据。Hive 命令受到 DynamoDB 表预置的吞吐量设置约束,并且检索的数据包括 DynamoDB 处理 Hive 操作请求时写入到 DynamoDB 表的数据。如果数据检索过程需要很长一段时间,则自 Hive 命令开始执行以来,Hive 命令返回的某些数据可能已在 DynamoDB 中更新。
Hive 命令 DROP TABLE
和 CREATE TABLE
仅对 Hive 中的本地表进行操作,而不会在 DynamoDB 中创建或删除表。如果 Hive 查询引用 DynamoDB 中的表,则在您运行查询之前,该表必须已存在。有关在 DynamoDB 中创建和删除表的更多信息,请参阅 Amazon DynamoDB 开发人员指南中的在 DynamoDB 中处理表。
注意
当您将 Hive 表映射到 Amazon S3 中的某个位置时,请勿将其映射到存储桶的根路径 s3://amzn-s3-demo-bucket,因为这可能会导致 Hive 将数据写入 Amazon S3 时出错。而是将表映射到存储桶的子路径 s3://amzn-s3-demo-bucket/mypath。
从 DynamoDB 中导出数据
可以使用 Hive 从 DynamoDB 中导出数据。
将 DynamoDB 表导出到 Amazon S3 存储桶
-
创建一个引用 DynamoDB 中存储的数据的 Hive 表。然后,您可以调用 INSERT OVERWRITE 命令将数据写入到外部目录。在以下示例中,
s3://amzn-s3-demo-bucket/path/subpath
是 Amazon S3 中的有效路径。调整 CREATE 命令中的列和数据类型来匹配 DynamoDB 中的值。可以使用此命令在 Amazon S3 中创建 DynamoDB 数据的存档。CREATE EXTERNAL TABLE
hiveTableName
(col1 string, col2 bigint, col3 array<string>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays
"); INSERT OVERWRITE DIRECTORY 's3://amzn-s3-demo-bucket/path/subpath/
' SELECT * FROMhiveTableName
;
使用格式设置将 DynamoDB 表导出到 Amazon S3 存储桶
-
创建引用 Amazon S3 中的位置的外部表。此表在下面显示为 s3_export。在调用 CREATE 期间,为此表指定行格式设置。然后,当您使用 INSERT OVERWRITE 将数据从 DynamoDB 导出到 s3_export 时,数据将以指定的格式写出。在以下示例中,数据以逗号分隔值 (CSV) 的格式写出。
CREATE EXTERNAL TABLE
hiveTableName
(col1 string, col2 bigint, col3 array<string>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays
"); CREATE EXTERNAL TABLEs3_export
(a_col string, b_col bigint, c_col array<string>
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://amzn-s3-demo-bucket/path/subpath/
'; INSERT OVERWRITE TABLEs3_export
SELECT * FROMhiveTableName
;
在不指定列映射的情况下将 DynamoDB 表导出到 Amazon S3 存储桶
-
创建一个引用 DynamoDB 中存储的数据的 Hive 表。此例与前面的示例类似,只是不指定列映射。该表必须正好具有类型为
map<string, string>
的一个列。如果您随后在 Amazon S3 中创建EXTERNAL
表,可以调用INSERT OVERWRITE
命令将数据从 DynamoDB 写入到 Amazon S3。可以使用此命令在 Amazon S3 中创建 DynamoDB 数据的存档。由于没有列映射,因此您无法查询以此方式导出的表。在 Hive 0.8.1.5 或更高版本(在 Amazon EMR AMI 2.2.x 及其更高版本上受支持)中导出数据而不指定列映射。CREATE EXTERNAL TABLE
hiveTableName
(item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
"); CREATE EXTERNAL TABLE s3TableName (item map<string, string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/path/subpath/
'; INSERT OVERWRITE TABLEs3TableName
SELECT * FROMhiveTableName
;
使用数据压缩将 DynamoDB 表导出到 Amazon S3 存储桶
-
Hive 提供多个可以在 Hive 会话期间设置的压缩编解码器。这样做会导致导出的数据以指定的格式进行压缩。以下示例使用 Lempel-Ziv-Oberhumer (LZO) 算法压缩导出的文件。
SET hive.exec.compress.output=true; SET io.seqfile.compression.type=BLOCK; SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec; CREATE EXTERNAL TABLE
hiveTableName
(col1 string, col2 bigint, col3 array<string>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays
"); CREATE EXTERNAL TABLElzo_compression_table
(line STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/path/subpath/
'; INSERT OVERWRITE TABLElzo_compression_table
SELECT * FROMhiveTableName
;可用的压缩编解码器包括:
-
org.apache.hadoop.io.compress.GzipCodec
-
org.apache.hadoop.io.compress.DefaultCodec
-
com.hadoop.compression.lzo.LzoCodec
-
com.hadoop.compression.lzo.LzopCodec
-
org.apache.hadoop.io.compress.BZip2Codec
-
org.apache.hadoop.io.compress.SnappyCodec
-
将 DynamoDB 表导出到 HDFS
-
使用以下 Hive 命令,其中
hdfs:///directoryName
是有效的 HDFS 路径,而hiveTableName
为 Hive 中引用 DynamoDB 的表。此导出操作比将 DynamoDB 表导出到 Amazon S3 速度快,因为将数据导出到 Amazon S3 时,Hive 0.7.1.1 将 HDFS 用作中间步骤。以下示例还显示了如何将dynamodb.throughput.read.percent
设置为 1.0 以提高读取请求速率。CREATE EXTERNAL TABLE
hiveTableName
(col1 string, col2 bigint, col3 array<string>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays
"); SET dynamodb.throughput.read.percent=1.0; INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName
' SELECT * FROMhiveTableName
;您还可以按上面所示的用于导出到 Amazon S3 的方法,使用格式设置和压缩将数据导出到 HDFS。为此,只需将上面示例中的 Amazon S3 目录替换为 HDFS 目录。
在 Hive 中读取不可打印的 UTF-8 字符数据
-
创建表时,您可以使用
STORED AS SEQUENCEFILE
子句在 Hive 中读取和写入不可打印的 UTF-8 字符数据。SequenceFile 是 Hadoop 二进制文件格式;您需要使用 Hadoop 来读取此文件。以下示例显示了如何将数据从 DynamoDB 导出到 Amazon S3 中。可以使用此功能处理不可打印的 UTF-8 编码字符。CREATE EXTERNAL TABLE
hiveTableName
(col1 string, col2 bigint, col3 array<string>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays
"); CREATE EXTERNAL TABLEs3_export
(a_col string, b_col bigint, c_col array<string>
) STORED AS SEQUENCEFILE LOCATION 's3://amzn-s3-demo-bucket/path/subpath/
'; INSERT OVERWRITE TABLEs3_export
SELECT * FROMhiveTableName
;
将数据导入到 DynamoDB
使用 Hive 将数据写入到 DynamoDB 中时,应确保写入容量单位数大于集群中的映射器数。例如,在 m1.xlarge EC2 实例上运行的集群在每个实例上生成 8 个映射器。对于具有 10 个实例的集群,这意味着生成 80 个映射器。如果写入容量单位数不大于集群中的映射器数,则 Hive 写入操作可能会占用所有写入吞吐量,或者尝试占用超过预配置值的吞吐量。有关每种 EC2 实例类型生成的映射器数的更多信息,请参阅 配置 Hadoop。
Hadoop 中的映射器数由输入的拆分数控制。如果拆分数过小,写入命令可能无法占用所有可用的写入吞吐量。
如果具有相同键的项目在目标 DynamoDB 表中存在,则将覆盖该项目。如果目标 DynamoDB 表中不存在具有该键的项目,则将插入该项目。
要将数据从 Amazon S3 导入 DynamoDB
-
您可以使用 Amazon EMR(Amazon EMR)和 Hive 将数据从 Amazon S3 写入到 DynamoDB。
CREATE EXTERNAL TABLE
s3_import
(a_col string, b_col bigint, c_col array<string>
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://amzn-s3-demo-bucket/path/subpath/
'; CREATE EXTERNAL TABLEhiveTableName
(col1 string, col2 bigint, col3 array<string>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays
"); INSERT OVERWRITE TABLEhiveTableName
SELECT * FROMs3_import
;
在不指定列映射时将表从 Amazon S3 存储桶导入到 DynamoDB 中
-
创建一个引用 Amazon S3 中存储数据的
EXTERNAL
表,该数据是以前从 DynamoDB 中导出的。在导入之前,请确保该表存在于 DynamoDB 中,并且该表与以前导出的 DynamoDB 表具有相同的键架构。此外,该表还必须正好具有类型为map<string, string>
的一个列。如果您随后创建一个链接到 DynamoDB 的 Hive 表,则可以调用INSERT OVERWRITE
命令将数据从 Amazon S3 写入到 DynamoDB 中。由于没有列映射,因此您无法查询以此方式导入的表。在 Hive 0.8.1.5 或更高版本(在 Amazon EMR AMI 2.2.3 及其更高版本上受支持)中可以在不指定列映射时导入数据。CREATE EXTERNAL TABLE s3TableName (item map<string, string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION '
s3://amzn-s3-demo-bucket/path/subpath/
'; CREATE EXTERNAL TABLEhiveTableName
(item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
"); INSERT OVERWRITE TABLEhiveTableName
SELECT * FROMs3TableName
;
将表从 HDFS 导入到 DynamoDB 中
-
可以使用 Amazon EMR 和 Hive 将数据从 HDFS 写入到 DynamoDB 中。
CREATE EXTERNAL TABLE
hdfs_import
(a_col string, b_col bigint, c_col array<string>
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'hdfs:///directoryName
'; CREATE EXTERNAL TABLEhiveTableName
(col1 string, col2 bigint, col3 array<string>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1
", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays
"); INSERT OVERWRITE TABLEhiveTableName
SELECT * FROMhdfs_import
;
查询 DynamoDB 中的数据
以下示例显示了您可以使用 Amazon EMR 查询 DynamoDB 中存储数据的各种方式。
查找映射列的最大值 (max
)
-
使用如下 Hive 命令。在第一个命令中,CREATE 语句创建了一个引用 DynamoDB 中存储数据的 Hive 表。然后,SELECT 语句使用该表查询 DynamoDB 中存储的数据。以下示例查找给定客户提交的最大订单。
CREATE EXTERNAL TABLE
hive_purchases
(customerId bigint, total_cost double, items_purchased array<String>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases
", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items
"); SELECT max(total_cost) from hive_purchases where customerId = 717;
使用 GROUP BY
子句聚合数据
-
可以使用
GROUP BY
子句收集多条记录的数据。此子句通常与聚合函数 (如 sum、count、min 或 max) 一起使用。以下示例返回提交了三个以上订单的客户的最大订单列表。CREATE EXTERNAL TABLE
hive_purchases
(customerId bigint, total_cost double, items_purchased array<String>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases
", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items
"); SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
连接两个 DynamoDB 表
-
以下示例将两个 Hive 表映射到 DynamoDB 中存储的数据。然后,它对这两个表调用联接。连接在集群上计算并返回。连接不在 DynamoDB 中进行。此示例返回提交了两个以上订单的客户及其购买物的列表。
CREATE EXTERNAL TABLE
hive_purchases
(customerId bigint, total_cost double, items_purchased array<String>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases
", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items
"); CREATE EXTERNAL TABLEhive_customers
(customerId bigint, customerName string, customerAddress array<String>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Customers
", "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address
"); Select c.customerId, c.customerName, count(*) as count from hive_customers c JOIN hive_purchases p ON c.customerId=p.customerId GROUP BY c.customerId, c.customerName HAVING count > 2;
联接来自不同源的两个表
-
在以下示例中,Customer_S3 是加载了 Amazon S3 中存储的 CSV 文件的 Hive 表,而 hive_purchases 是引用了 DynamoDB 中的数据的表。以下示例将 Amazon S3 中以 CSV 文件格式存储的客户数据与 DynamoDB 中存储的订单数据连接在一起,以返回一组数据,这些数据表示名称中包含“Miller”的客户提交的订单。
CREATE EXTERNAL TABLE
hive_purchases
(customerId bigint, total_cost double, items_purchased array<String>
) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases
", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items
"); CREATE EXTERNAL TABLECustomer_S3
(customerId bigint, customerName string, customerAddress array<String>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://amzn-s3-demo-bucket/path/subpath/
'; Select c.customerId, c.customerName, c.customerAddress from Customer_S3 c JOIN hive_purchases p ON c.customerid=p.customerid where c.customerName like '%Miller%';
注意
在上述示例中,为了提高清晰性和完整性,在每个示例中均包括了 CREATE TABLE 语句。针对给定 Hive 表运行多个查询或执行导出操作时,只需在 Hive 会话的开始创建表一次即可。