Hive SQL

Hive是基于Hadoop的数据库,底层数据存储采用HDFS,提供HQL(Hive SQL)查询功能。

Hive将SQL语句转换为MapReduce任务运行。

Hive用于做海量离线数据统计分析:

  • 不支持记录级别的增删改操作,可以将查询结果导出到文件
  • 查询延时很严重,因为 MapReduce任务的启动过程消耗很长时间,所以不能用在交互查询系统中。
  • 不支持事务,因为没有增删改操作,所以主要用来做 OLAP(联机分析处理),而不是 OLTP(联机事务处理)。

配置

Hive命令行回显结果不显示表名:

hive> set hive.resultset.use.unique.column.names=false;

定义数据库

CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name
  [COMMENT 'database comment']
  [WITH DBPROPERTIES (property_name=property_value, ...)];
DROP DATABASE|SCHEMA [IF EXISTS] database_name [RESTRICT|CASCADE];

CASCADE删除数据库前先删除表格,默认情况RESTRICT如果数据库非空则删除操作失败。

定义数据表

CREATE TABLE tablename (
   col_name DATA_TYPE [constraints] [COMMENT col_comment], ...)
   [COMMENT 'table comment']                   -- 表格说明
   [PARTITIONED BY (col_name data_type,...)]   -- 分片: 必须声明为(col type)格式
   ROW FORMAT DELIMITED      -- 分隔符文件
   FIELDS TERMINATED BY '\t' -- 分隔符
   -- LINES TERMINATED BY '\n' 
   STORED AS TEXTFILE        -- 存储格式:默认TEXTFILE,其他PARQUET、AVRO、ORC、JSONFILE等
   TBLPROPERTIES("skip.header.line.count" = "1",...);  
DROP TABLE tablename [purge];

==分片的列单独声明==,不应在列定义中重复声明;STORED AS TEXTFILE应该放在其他格式声明之后。

数据表属性声明:

  • skip.header.line.count=1:使用Hive查询时,跳过数据文件中的首行(包含表头的情况,列名不要包含特殊字符,如#,空格等)。

对于具有表头的原始数据,可以去除表头后再导入HDFS(从而与Spark读取处理方式兼容)。

tail -n +2 data.csv > data_nohead.csv
hdfs dfs -put data_nohead.csv /datasets/data.csv

对于较大数据量,使用tail处理时间较长(需要复制数据)。

查看表的定义
DESC table_name;  -- DESCRIBE
SHOW CREATE TABLE table_name;
# STORED AS TEXTFILE <=> STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
# OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
# LOCATION 'hdfs://CLUSTER_ADDR/hive/warehouse/DBNAME.db/TABLE_FILE'

从定义中可看到表的底层数据在HDFS上的存储位置。

自动获取数源的定义

使用Pandas可加载数据文件并自动推导数据类型定义,然后生成数据表的定义。

table_schema = pd.io.sql.get_schema(df.drop(columns=parition_cols), 'table_name')\
                        .replace('"', '`')\
                        .replace('TEXT', 'STRING')\
                        .replace('INTEGER', 'INT')
# print(table_schema)
# CREATE TABLE `table_name` (
# `field1` STRING,
# `field2` INT
# )

转换后的SQL语句语法不完全与HQL兼容,例如需要将标识符范围限定符"(PosgreSQL)替换为ˋ(MySQL);某些类型关键字转换(如TEXT转换为STRING)。此外,为了支持设置分片列声明,可提前将分片列从源数据中分离出来并单独生成类型声明。

part_schema = pd.io.sql.get_schema(df[parition_cols], 'part_name')\
                       .replace('"', '`')\
                       .replace('TEXT', 'STRING')
part_schema = ''.join(part_schema.split('\n')[1:-1])     

基于上述结果,可继续添加Hive表格属性限定声明(如分片、分隔符、存储格式等),从而生成Hive表格定义。

table_schema = '\n'.join(table_schema, f"PARTITIONED BY ({part_schema})", 
                         "ROW FORMAT DELIMITED", "STORED AS TEXTFILE", ...)

如果使用Spark加载数据文件为表格,类似地,能基于自动推导生成表格定义(需要自行生成SQL语句)。

def spark_get_schema(df, tablename):
  ts = []
  ts.append(f'CREATE TABLE `{tablename}` (')
  tts = []
  for x in df.schema: 
    tts.append('`{0}` {1}'.format(x.name, x.dataType.typeName()))
  ts.append(',\n'.join(tts))
  ts.append(')')
  table_schema = '\n'.join(ts)
  return table_schema
修改表定义
ALTER TABLE table_name RENAME TO new_table_name;  # 修改表名
ALTER TABLE invites REPLACE COLUMNS (foo INT, bar STRING, ...);

修改表的Schema,不在参数声明中的列将被丢弃。

修改表属性
ALTER TABLE pokemon SET TBLPROPERTIES ("skip.header.line.count"="1");

数据类型

基本类型
分类数据类型
逻辑类型BOOLEAN
整数TINYINT,SMALLINT,INT,BIGINT
浮点数FLOAT,DOUBLE
高精度浮点数DECIMAL,DECIMAL(precision, scale)
字符文本STRING,VARCHAR,CHAR
时间日期TIMESTAMP,DATE
原始数据BINARY

导入数据

将本地文件导入Hive数据仓库:

LOAD DATA [LOCAL] INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
LOAD DATA [LOCAL] INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');

LOCAL:从本地文件系统上传(省略则从HDFS中查找文件并移动到hive目录下);OVERWRITE覆盖当前已有数据(省略则==追加==到当前数据);如果是已定义分区的数据表,则必须指定PARTITION指定要写入数据的分区(分区字段将被写入设置的值,而非源数据中的值)。

不会验证数据的schema

读取数据

使用SQLSELECT语句进行查询,结果将输出至标准输出。

查询的列名可使用正则表达式模糊匹配。

保存数据

使用HiveINSERT命令将查询结果写入Hive表、HDFS或本地存储。

INSERT OVERWRITE [LOCAL] DIRECTORY 'directory1'
  [ROW FORMAT row_format] [STORED AS file_format]
  SELECT ... FROM ...;
-- row_format
:DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] 
		[COLLECTION ITEMS TERMINATED BY char]
        [MAP KEYS TERMINATED BY char] 
        [LINES TERMINATED BY char]
        [NULL DEFINED AS char]
INSERT OVERWRITE DIRECTORY '/tmp/hdfs' SQL_STATEMENT;   -- to HDFS
INSERT OVERWRITE TABLE events;                          -- to hive table
INSERT OVERWRITE LOCAL DIRECTORY '/tmp' SQL_STATEMENT;  -- to local storage

Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.

基于分区的查询

计算

在查询语句使用TRANSFORMUSING,可以调用外部程序(例如使用Python脚本)处理数据。

import sys
import datetime
for line in sys.stdin:   # 从标准输入读取一行数据
  line = line.strip()
  userid, movieid, rating, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print '\t'.join([userid, movieid, rating, str(weekday)])  # 输出结果到标准输出
add FILE weekday_mapper.py;
INSERT OVERWRITE TABLE u_data_new
   SELECT
     TRANSFORM (userid, movieid, rating, unixtime)
     USING 'python weekday_mapper.py'  -- 运行Python程序处理每一行记录
     AS (userid, movieid, rating, weekday)
   FROM u_data;

GettingStarted - Apache Hive - Apache Software Foundation

Tutorial - Apache Hive - Apache Software Foundation

LanguageManual DDL - Apache Hive - Apache Software Foundation

LanguageManual DML - Apache Hive - Apache Software Foundation