Hive SQL
Hive是基于Hadoop的数据库,底层数据存储采用HDFS,提供HQL(Hive SQL)查询功能。
Hive将SQL语句转换为MapReduce任务运行。
Hive用于做海量离线数据统计分析:
- 不支持记录级别的增删改操作,可以将查询结果导出到文件
- 查询延时很严重,因为 MapReduce任务的启动过程消耗很长时间,所以不能用在交互查询系统中。
- 不支持事务,因为没有增删改操作,所以主要用来做 OLAP(联机分析处理),而不是 OLTP(联机事务处理)。
配置
Hive命令行回显结果不显示表名:
hive> set hive.resultset.use.unique.column.names=false;
定义数据库
CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name
[COMMENT 'database comment']
[WITH DBPROPERTIES (property_name=property_value, ...)];
DROP DATABASE|SCHEMA [IF EXISTS] database_name [RESTRICT|CASCADE];
CASCADE
删除数据库前先删除表格,默认情况RESTRICT
如果数据库非空则删除操作失败。
定义数据表
CREATE TABLE tablename (
col_name DATA_TYPE [constraints] [COMMENT col_comment], ...)
[COMMENT 'table comment'] -- 表格说明
[PARTITIONED BY (col_name data_type,...)] -- 分片: 必须声明为(col type)格式
ROW FORMAT DELIMITED -- 分隔符文件
FIELDS TERMINATED BY '\t' -- 分隔符
-- LINES TERMINATED BY '\n'
STORED AS TEXTFILE -- 存储格式:默认TEXTFILE,其他PARQUET、AVRO、ORC、JSONFILE等
TBLPROPERTIES("skip.header.line.count" = "1",...);
DROP TABLE tablename [purge];
==分片的列单独声明==,不应在列定义中重复声明;STORED AS TEXTFILE
应该放在其他格式声明之后。
数据表属性声明:
skip.header.line.count=1
:使用Hive查询时,跳过数据文件中的首行(包含表头的情况,列名不要包含特殊字符,如#
,空格等)。
对于具有表头的原始数据,可以去除表头后再导入HDFS(从而与Spark读取处理方式兼容)。
tail -n +2 data.csv > data_nohead.csv
hdfs dfs -put data_nohead.csv /datasets/data.csv
对于较大数据量,使用
tail
处理时间较长(需要复制数据)。
查看表的定义
DESC table_name; -- DESCRIBE
SHOW CREATE TABLE table_name;
# STORED AS TEXTFILE <=> STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
# OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
# LOCATION 'hdfs://CLUSTER_ADDR/hive/warehouse/DBNAME.db/TABLE_FILE'
从定义中可看到表的底层数据在HDFS上的存储位置。
自动获取数源的定义
使用Pandas可加载数据文件并自动推导数据类型定义,然后生成数据表的定义。
table_schema = pd.io.sql.get_schema(df.drop(columns=parition_cols), 'table_name')\
.replace('"', '`')\
.replace('TEXT', 'STRING')\
.replace('INTEGER', 'INT')
# print(table_schema)
# CREATE TABLE `table_name` (
# `field1` STRING,
# `field2` INT
# )
转换后的SQL语句语法不完全与HQL兼容,例如需要将标识符范围限定符"
(PosgreSQL)替换为ˋ
(MySQL);某些类型关键字转换(如TEXT
转换为STRING
)。此外,为了支持设置分片列声明,可提前将分片列从源数据中分离出来并单独生成类型声明。
part_schema = pd.io.sql.get_schema(df[parition_cols], 'part_name')\
.replace('"', '`')\
.replace('TEXT', 'STRING')
part_schema = ''.join(part_schema.split('\n')[1:-1])
基于上述结果,可继续添加Hive表格属性限定声明(如分片、分隔符、存储格式等),从而生成Hive表格定义。
table_schema = '\n'.join(table_schema, f"PARTITIONED BY ({part_schema})",
"ROW FORMAT DELIMITED", "STORED AS TEXTFILE", ...)
如果使用Spark加载数据文件为表格,类似地,能基于自动推导生成表格定义(需要自行生成SQL语句)。
def spark_get_schema(df, tablename):
ts = []
ts.append(f'CREATE TABLE `{tablename}` (')
tts = []
for x in df.schema:
tts.append('`{0}` {1}'.format(x.name, x.dataType.typeName()))
ts.append(',\n'.join(tts))
ts.append(')')
table_schema = '\n'.join(ts)
return table_schema
修改表定义
ALTER TABLE table_name RENAME TO new_table_name; # 修改表名
ALTER TABLE invites REPLACE COLUMNS (foo INT, bar STRING, ...);
修改表的Schema
,不在参数声明中的列将被丢弃。
修改表属性
ALTER TABLE pokemon SET TBLPROPERTIES ("skip.header.line.count"="1");
数据类型
基本类型
分类 | 数据类型 |
---|---|
逻辑类型 | BOOLEAN |
整数 | TINYINT,SMALLINT,INT,BIGINT |
浮点数 | FLOAT,DOUBLE |
高精度浮点数 | DECIMAL,DECIMAL(precision, scale) |
字符文本 | STRING,VARCHAR,CHAR |
时间日期 | TIMESTAMP,DATE |
原始数据 | BINARY |
导入数据
将本地文件导入Hive数据仓库:
LOAD DATA [LOCAL] INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
LOAD DATA [LOCAL] INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
LOCAL
:从本地文件系统上传(省略则从HDFS中查找文件并移动到hive目录下);OVERWRITE
覆盖当前已有数据(省略则==追加==到当前数据);如果是已定义分区的数据表,则必须指定PARTITION
指定要写入数据的分区(分区字段将被写入设置的值,而非源数据中的值)。
不会验证数据的
schema
;
读取数据
使用SQLSELECT
语句进行查询,结果将输出至标准输出。
查询的列名可使用正则表达式模糊匹配。
保存数据
使用HiveINSERT
命令将查询结果写入Hive表、HDFS或本地存储。
INSERT OVERWRITE [LOCAL] DIRECTORY 'directory1'
[ROW FORMAT row_format] [STORED AS file_format]
SELECT ... FROM ...;
-- row_format
:DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
[COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char]
[LINES TERMINATED BY char]
[NULL DEFINED AS char]
INSERT OVERWRITE DIRECTORY '/tmp/hdfs' SQL_STATEMENT; -- to HDFS
INSERT OVERWRITE TABLE events; -- to hive table
INSERT OVERWRITE LOCAL DIRECTORY '/tmp' SQL_STATEMENT; -- to local storage
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
基于分区的查询
计算
在查询语句使用TRANSFORM
和USING
,可以调用外部程序(例如使用Python脚本)处理数据。
import sys
import datetime
for line in sys.stdin: # 从标准输入读取一行数据
line = line.strip()
userid, movieid, rating, unixtime = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([userid, movieid, rating, str(weekday)]) # 输出结果到标准输出
add FILE weekday_mapper.py;
INSERT OVERWRITE TABLE u_data_new
SELECT
TRANSFORM (userid, movieid, rating, unixtime)
USING 'python weekday_mapper.py' -- 运行Python程序处理每一行记录
AS (userid, movieid, rating, weekday)
FROM u_data;
GettingStarted - Apache Hive - Apache Software Foundation
Tutorial - Apache Hive - Apache Software Foundation
LanguageManual DDL - Apache Hive - Apache Software Foundation
LanguageManual DML - Apache Hive - Apache Software Foundation