Hive
资料
数据仓库介绍
数据仓库的基本概念
数据仓库,英文名称为Data Warehouse,可简写为DW或DWH。数据仓库顾名思义,是一个很大的数据存储集合,出于企业的分析性报告和决策支持目的而创建,对多样的业务数据进行筛选与整合。它为企业提供一定的BI(商业智能)能力,指导业务流程改进、监视时间、成本、质量以及控制。 数据仓库的输入方是各种各样的数据源,最终的输出用于企业的数据分析、数据挖掘、数据报表等方向。
数据仓库的主要特征
数据仓库是面向主题的(Subject-Oriented )、集成的(Integrated)、稳定的(Non-Volatile)和时变的(Time-Variant )数据集合,用以支持管理决策。
- 主题性
- 不同于传统数据库对应于某一个或多个项目,数据仓库根据使用者实际需求,将不同数据源的数据在一个较高的抽象层次上做整合,所有数据都围绕某一主题来组织。
- 这里的主题怎么来理解呢?比如对于城市,“天气湿度分析”就是一个主题,对于淘宝,“用户点击行为分析”就是一个主题。
- 集成性
- 数据仓库中存储的数据是来源于多个数据源的集成,原始数据来自不同的数据源,存储方式各不相同。要整合成为最终的数据集合,需要从数据源经过一系列抽取、清洗、转换的过程。
- 稳定性
- 数据仓库中保存的数据是一系列历史快照,不允许被修改。用户只能通过分析工具进行查询和分析。这里说明一点,数据仓库基本上是不许允许用户进行修改,删除操作的。大多数的场景是用来查询分析数据。
- 时变性
- 数据仓库会定期接收新的集成数据,反应出最新的数据变化。这和稳定特点并不矛盾。
数据仓库和数据库的区别
数据库(面向事务)
数据库是面向交易的处理系统,它是针对具体业务在数据库联机的日常操作,通常对记录进行查询、修改。用户较为关心操作的响应时间、数据的安全性、完整性和并发支持的用户数等问题。传统的数据库系统作为数据管理的主要手段,主要用于操作型处理,也被称为联机事务处理 OLTP(On-Line Transaction Processing)。
数据仓库
数据仓库一般针对某些主题的历史数据进行分析,支持管理决策,又被称为联机分析处理 OLAP(On-Line Analytical Processing)。 首先要明白,数据仓库的出现,并不是要取代数据库。
两者区别
- 数据库是面向事务的设计,数据仓库是面向主题设计的。
- 数据库一般存储业务数据,数据仓库存储的一般是历史数据。
- 数据库设计是尽量避免冗余,一般针对某一业务应用进行设计,比如一张简单的User表,记录用户名、密码等简单数据即可,符合业务应用,但是不符合分析。数据仓库在设计是有意引入冗余,依照分析需求,分析维度、分析指标进行设计。
- 数据库是为捕获数据而设计,数据仓库是为分析数据而设计。
- 以银行业务为例。数据库是事务系统的数据平台,客户在银行做的每笔交易都会写入数据库,被记录下来,这里,可以简单地理解为用数据库记账。
- 数据仓库是分析系统的数据平台,它从事务系统获取数据,并做汇总、加工,为决策者提供决策的依据。比如,某银行某分行一个月发生多少交易,该分行当前存款余额是多少。如果存款又多,消费交易又多,那么该地区就有必要设立ATM了。
- 显然,银行的交易量是巨大的,通常以百万甚至千万次来计算。事务系统是实时的,这就要求时效性,客户存一笔钱需要几十秒是无法忍受的,这就要求数据库只能存储很短一段时间的数据。而分析系统是事后的,它要提供关注时间段内所有的有效数据。这些数据是海量的,汇总计算起来也要慢一些,但是,只要能够提供有效的分析数据就达到目的了。
数据仓库,是在数据库已经大量存在的情况下,为了进一步挖掘数据资源、为了决策需要而产生的,它决不是所谓的“大型数据库”。
数据仓库分层架构
按照数据流入流出的过程,数据仓库架构可分为三层——源数据、数据仓库、数据应用。
数据仓库的数据来源于不同的源数据,并提供多样的数据应用,数据自下而上流入数据仓库后向上层开放应用,而数据仓库只是中间集成化数据管理的一个平台。
- 源数据层(ODS)
- 操作性数据(Operational Data Store) ,是作为数据库到数据仓库的一种过渡,ODS的数据结构一般与数据来源保持一致,而且ODS的数据周期一般比较短。ODS的数据为后一步的数据处理做准备。
- 数据仓库层(DW)
- 数据仓库(Data Warehouse),是数据的归宿,这里保持这所有的从ODS到来的数据,并长期保存,而且这些数据不会被修改,DW层的数据应该是一致的、准确的、干净的数据,即对源系统数据进行了清洗(去除了杂质)后的数据。
- 数据应用层(DA)
- 数据应用(DataApplication),为了特定的应用目的或应用范围,而从数据仓库中独立出来的一部分数据,也可称为部门数据或主题数据,该数据面向应用。如根据报表、专题分析需求而计算生成的数据。
数据仓库之ETL
ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL是将业务系统的数据经过抽取、清洗、转换之后加载到数据仓库的过程,目的是将企业中分散、零乱、标准不统一的数据整合到一起。 ETL是数据仓库的流水线,也可以认为是数据仓库的血液,它维系着数据仓库中数据的新陈代谢,而数据仓库日常的管理和维护工作的大部分精力就是保持ETL的正常和稳定。
Hive基本概念
Hive介绍
什么是Hive
Hive是一个构建在Hadoop上的数据仓库框架。最初,Hive是由Facebook开发,后来移交由Apache软件基金会开发,并作为一个Apache开源项目。
- Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。
- 其本质是将SQL转换为MapReduce的任务进行运算,底层由HDFS来提供数据的存储,说白了hive可以理解为一个将SQL转换为MapReduce的任务的工具,甚至更进一步可以说hive就是一个MapReduce的客户端。
为什么使用Hive
- 直接使用hadoop所面临的问题
- 人员学习成本太高
- 项目周期要求太短
- MapReduce实现复杂查询逻辑开发难度太大
- 为什么使用Hive
- 操作接口采用类SQL语法,提供快速开发的能力
- 避免了去写MapReduce,减少开发人员的学习成本
- 功能扩展很方便
Hive的特点
- Hive最大的特点是通过类SQL来分析大数据,而避免了写MapReduce程序来分析数据,这样使得分析数据更容易。
- 数据是存储在HDFS上的,Hive本身并不提供数据的存储功能,它可以使已经存储的数据结构化。
- Hive是将数据映射成数据库和一张张的表,库和表的元数据信息一般存在关系型数据库上(比如MySQL)。
- 数据存储方面:它能够存储很大的数据集,可以直接访问存储在Apache HDFS或其他数据存储系统(如Apache HBase)中的文件。
- 数据处理方面:因为Hive语句最终会生成MapReduce任务去计算,所以不适用于实时计算的场景,它适用于离线分析。
- Hive除了支持MapReduce计算引擎,还支持Spark和Tez这两种分布式计算引擎
- 数据的存储格式有多种,比如数据源是二进制格式,普通文本格式等等
Hive架构
架构图
基本组成
- 客户端
- Client CLI(hive shell 命令行),JDBC/ODBC(java访问hive),WEBUI(浏览器访问hive)
- 元数据
- Metastore:本质上只是用来存储hive中有哪些数据库,哪些表,表的字段,表所属数据库(默认是default) ,分区,表的数据所在目录等,元数据默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore。
- 驱动器Driver
- 解析器(SQL Parser)
- 将SQL字符转换成抽象语法树AST,这一步一般使用都是第三方工具库完成,比如antlr,对AST进行语法分析,比如表是否存在,字段是否存在,SQL语句是否有误
- 编译器(Physical Plan)
- 将AST编译生成逻辑执行计划
- 优化器(Query Optimizer)
- 对逻辑执行计划进行优化
- 执行器(Execution)
- 把逻辑执行计划转换成可以运行的物理计划,对于Hive来说,就是MR/Spark
- 解析器(SQL Parser)
- 存储和执行
- Hive使用HDFS进行存储,使用MapReduce进行计算
Hive与传统数据库对比
hive具有sql数据库的外表,但应用场景完全不同,hive只适合用来做批量数据统计分析
Hive的安装
Hive的安装方式
**元数据服务(metastore)**作用是:客户端连接metastore服务,metastore再去连接MySQL数据库来存取元数据。有了metastore服务,就可以有多个客户端同时连接,而且这些客户端不需要知道MySQL数据库的用户名和密码,只需要连接metastore 服务即可。
内嵌模式
内嵌模式使用的是内嵌的Derby数据库来存储元数据,也不需要额外起Metastore服务。数据库和Metastore服务都嵌入在主Hive Server进程中。这个是默认的,配置简单,但是一次只能一个客户端连接,适用于用来实验,不适用于生产环境。 解压hive安装包 bin/hive 启动即可使用。 缺点:不同路径启动hive,每一个hive拥有一套自己的元数据,无法共享。
本地模式
本地模式采用外部数据库来存储元数据,目前支持的数据库有:MySQL、Postgres、Oracle、MS SQL Server.在这里我们使用MySQL。 本地模式不需要单独起metastore服务,用的是跟hive在同一个进程里的metastore服务。也就是说当你启动一个hive 服务,里面默认会帮我们启动一个metastore服务。 hive根据hive.metastore.uris 参数值来判断,如果为空,则为本地模式。 缺点是:每启动一次hive服务,都内置启动了一个metastore。
远程模式
远程模式下,需要单独起metastore服务,然后每个客户端都在配置文件里配置连接到该metastore服务。远程模式的metastore服务和hive运行在不同的进程里。 在生产环境中,建议用远程模式来配置Hive Metastore。 在这种情况下,其他依赖hive的软件都可以通过Metastore访问hive。 远程模式下,需要配置hive.metastore.uris 参数来指定metastore服务运行的机器ip和端口,并且需要单独手动启动metastore服务。 hiveserver2是Hive启动了一个server,客户端可以使用JDBC协议,通过IP+ Port的方式对其进行访问,达到并发访问的目的。
Hive的安装
见文档。
Hive的交互方式
- 第一种交互方式:bin/hive
hive
#创建一个数据库
create database mytest;
show databases;
- 第二种交互方式:使用SQL语句或SQL脚本进行交互
#不进入hive的客户端直接执行hive的hql语句
hive -e "create database mytest2"
#写SQL脚本执行(hive.sql)
create database mytest3;
use mytest3;
create table stu(id int,name string);
#通过hive -f执行SQL脚本
hive -f hive.sql
- 第三种交互方式:Beeline Client
hive经过发展,推出了第二代客户端beeline,但是beeline客户端不是直接访问metastore服务的,而是需要单独启动hiveserver2服务。
- 在node1的/export/server/hadoop-3.1.4/etc/hadoop目录下,修改core-site.xml,在该文件中添加以下配置,实现用户代理
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
- 将修改好的core-site.xml文件分发到node2和node3,然后重启Hadoop(stop-all.sh start-all.sh)
- 在hive运行的服务器上,确保已经启动metastore服务和hiveserver2服务,如果没有启动,则执行以下语句
nohup /export/server/hive/bin/hive --service metastore &
nohup /export/server/hive/bin/hive --service hiveserver2 &
- 在node3上使用beeline客户端进行连接访问
/export/server/hive-3.1.2/bin/beeline
- 连接成功之后,出现以下内容,可以在提示符后边输入hive sql命令
Hive一键启动脚本
使用expect模拟脚本启动
Hive数据库和表操作
数据库操作
创建数据库
create database if not exists myhive;
use myhive;
说明:hive的表存放位置模式是由hive-site.xml当中的一个属性指定的
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
创建数据库并指定hdfs存储位置
create database myhive2 location '/myhive2';
查看数据库详细信息
desc database myhive;
删除数据库
删除一个空数据库,如果数据库下面有数据表,那么就会报错
drop database myhive;
强制删除数据库,包含数据库下面的表一起删除
drop database myhive2 cascade;
数据库表操作
创建数据库表语法
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
- CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXISTS 选项来忽略这个异常。
- EXTERNAL关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION),Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。
- LIKE允许用户复制现有的表结构,但是不复制数据。
- ROW FORMAT DELIMITED 可用来指定行分隔符
- STORED AS SEQUENCEFILE|TEXTFILE|RCFILE 来指定该表数据的存储格式,hive中,表的默认存储格式为TextFile。
- CLUSTERED BY
- 对于每一个表(table)进行分桶(MapReuce中的分区),桶是更为细粒度的数据范围划分。Hive也是 针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
- LOCATION: 指定表在HDFS上的存储位置。
内部表操作
未被external修饰的是内部表(managed table),内部表又称管理表,内部表数据存储的位置由hive.metastore.warehouse.dir参数决定(默认:/user/hive/warehouse),删除内部表会直接删除元数据(metadata)及存储数据,因此内部表不适合和其他工具共享数据。
Hive建表
create database myhive;
use myhive;
create table stu(id int,name string);
insert into stu values (1,"zhangsan");
select * from stu;
创建表并指定字段之间的分隔符
create table if not exists stu3(id int ,name string) row format delimited fields terminated by '\t';
根据查询结果创建表
create table stu3 as select * from stu2;
根据已经存在的表结构创建表
create table stu4 like stu2;
查询表的类型
desc formatted stu2;
删除表
drop table stu2;
查看数据库和HDFS,发现删除内部表之后,所有的内容全部删除
给内部表加载数据
- 方式一:不要用
insert into stu values (1,'zhangsan');
- 方式二:可以用,不建议
hadoop fs -put stu.txt /user/hive/warehouse/myhive.db/stu
- 方式三:常用
-- 追加:从本地加载,就是将本地文件复制到hdfs的表目录
load data local inpath '/export/data/stu.txt' into table stu;
-- 覆盖:从本地加载,就是将本地文件复制到hdfs的表目录
load data local inpath '/export/data/stu.txt' overwrite into table stu;
-- 追加:从hdfs加载,将文件剪切到HDFS的表目录
load data inpath '/input/hive_data/stu.txt' into table stu;
-- 覆盖:从hdfs加载,将文件剪切到HDFS的表目录
load data inpath '/input/hive_data/stu.txt' overwrite into table stu;
外部表操作
在创建表的时候可以指定external关键字创建外部表,外部表对应的文件存储在location指定的hdfs目录下,向该目录添加新文件的同时,该表也会读取到该文件(当然文件格式必须跟表定义的一致)。 外部表因为是指定其他的hdfs路径的数据加载到表当中来,所以hive表会认为自己不完全独占这份数据,所以删除hive外部表的时候,数据仍然存放在hdfs当中,不会删掉。
- 在HDFS上创建一个文件夹:Covid
- hadoop fs -mkdir -p /Covid
- 将新冠数据上传到HDFS的Covid目录
- 创建三张外部表
create external table covid_a
(
date_val date comment '日期',
county string comment '县',
state string comment '州',
fips string comment '县编码code',
cases int comment '累计确诊病例',
deaths int comment '累计死亡病例'
)
row format delimited fields terminated by ','
location '/Covid'; -- location指定表数据的存放位置
create external table covid_b
(
date_val date comment '日期',
county string comment '县',
state string comment '州',
fips string comment '县编码code',
cases int comment '累计确诊病例',
deaths int comment '累计死亡病例'
)
row format delimited fields terminated by ','
location '/Covid'; -- location指定表数据的存放位置
create external table covid_c
(
date_val date comment '日期',
county string comment '县',
state string comment '州',
fips string comment '县编码code',
cases int comment '累计确诊病例',
deaths int comment '累计死亡病例'
)
row format delimited fields terminated by ','
location '/Covid'; -- location指定表数据的存放位置
- 分别查询三张表的位置
select * from covid_a limit 10;
select * from covid_b limit 10;
select * from covid_c limit 10;
- 删除其中的一张表,使用另外的表查看是否还有数据
drop table covid_a;
select * from covid_b limit 10;
select * from covid_c limit 10;
drop table covid_b;
select * from covid_c limit 10;
复杂类型操作
Array类型
Array是数组类型,Array中存放相同类型的数据
**源数据: ** 说明:name与locations之间制表符分隔,locations中元素之间逗号分隔
zhangsan beijing,shanghai,tianjin,hangzhou
wangwu changchun,chengdu,wuhan,beijing
lisi wuhan,chengdu,shenzhen,beijing
建表语句:
create external table hive_array(name string, work_locations array<string>)
row format delimited fields terminated by '\t'
collection items terminated by ',';
导入数据(从本地导入,同样支持从HDFS导入)
load data local inpath '/export/data/hivedatas/work_locations.txt' overwrite into table hive_array;
常用查询:
-- 查询所有数据
select * from hive_array;
-- 查询work_locations数组中第一个元素
select name, work_locations[0] location from hive_array;
-- 查询location数组中元素的个数
select name, size(work_locations) location_size from hive_array;
-- 查询location数组中包含tianjin的信息
select * from hive_array where array_contains(work_locations,'tianjin');
Map类型
map就是描述key-value数据
源数据: 说明:字段与字段分隔符: “,”;需要map字段之间的分隔符:"#";map内部k-v分隔符:":"
1,zhangsan,father:xiaoming#mother:xiaohuang#brother:xiaoxu,28
2,lisi,father:mayun#mother:huangyi#brother:guanyu,22
3,wangwu,father:wangjianlin#mother:ruhua#sister:jingtian,29
4,mayun,father:mayongzhen#mother:angelababy,26
建表语句
create table hive_map(
id int, name string, members map<string,string>, age int
)
row format delimited
fields terminated by ','
collection items terminated by '#'
map keys terminated by ':';
导入数据
load data local inpath '/export/data/hivedatas/hive_map.txt' overwrite into table hive_map;
常用查询
select * from hive_map;
#根据键找对应的值
select id, name, members['father'] father, members['mother'] mother, age from hive_map;
#获取所有的键
select id, name, map_keys(members) as relation from hive_map;
#获取所有的值
select id, name, map_values(members) as relation from hive_map;
#获取键值对个数
select id,name,size(members) num from hive_map;
#获取有指定key的数据
select * from hive_map where array_contains(map_keys(members), 'brother');
#查找包含brother这个键的数据,并获取brother键对应的值
select id,name, members['brother'] brother from hive_map where array_contains(map_keys(members), 'brother');
Struct类型
源数据: 说明:字段之间#分割,第二个字段之间冒号分割
192.168.1.1#zhangsan:40:男
192.168.1.2#lisi:50:男
192.168.1.3#wangwu:60:女
192.168.1.4#zhaoliu:70:女
建表语句
create table hive_struct(
ip string,
info struct<name:string, age:int,gender:string>
)
row format delimited
fields terminated by '#'
collection items terminated by ':';
导入数据
load data local inpath '/export/data/hivedatas/hive_struct.txt' into table hive_struct;
常用查询
select * from hive_struct;
#根据struct来获取指定的成员的值
select ip, info.name from hive_struct;
分区表
分区不是独立的表模型,要和内部表或者外部表结合: 内部分区表 外部分区表
基本操作
- 在大数据中,最常用的一种思想就是分治,分区表实际就是对应hdfs文件系统上的的独立的文件夹,该文件夹下是该分区所有数据文件。
- 分区可以理解为分类,通过分类把不同类型的数据放到不同的目录下。
- 分类的标准就是分区字段,可以一个,也可以多个。
- 分区表的意义在于优化查询。查询时尽量利用分区字段。如果不使用分区字段,就会全部扫描。
- 在查询是通过where子句查询来指定所需的分区。
- 在hive中,分区就是分文件夹
创建分区表语法
create table score(sid string,cid string, sscore int) partitioned by (month string) row format delimited fields terminated by '\t';
创建一个表带多个分区
create table score2 (sid string,cid string, sscore int) partitioned by (year string,month string,day string) row format delimited fields terminated by '\t';
加载数据到分区表中
load data local inpath '/export/data/score.txt' into table score partition (month='202206');
加载数据到一个多分区的表中去
load data local inpath '/export/data/score.txt' into table score2 partition(year='2022',month='06',day='01');
多分区联合查询使用union all来实现
select * from score where month = '202006' union all select * from score where month = '202007';
查看分区
show partitions score;
添加分区
-- 添加一个分区
alter table score add partition(month='202008');
-- 添加多个分区
alter table score add partition(month='202009') partition(month = '202010');
--注意:添加分区之后就可以在hdfs文件系统当中看到表下面多了一个文件夹
删除分区
alter table score drop partition(month = '202010');
动态分区表
-- ---------------------------动态分区-----------------------
--- ------------------动态分区(一级分区)----------------------------
-- 1、开启动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict; -- 设置为非严格格式
-- 2、模拟数据
/*
1 2022-01-01 zhangsan 80
2 2022-01-01 lisi 70
3 2022-01-01 wangwu 90
1 2022-01-02 zhangsan 90
2 2022-01-02 lisi 65
3 2022-01-02 wangwu 96
1 2022-01-03 zhangsan 91
2 2022-01-03 lisi 66
3 2022-01-03 wangwu 96
*/
-- 3、创建一个中间普通表(该表用来存入原始数据)
create table test1(
id int,
date_val string,
name string,
score int
)
row format delimited fields terminated by '\t';
;
load data local inpath '/export/data/partition.txt' into table test1;
select * from test1;
-- 4、来创建最终的分区表
create table test2(
id int,
name string,
score int
)
partitioned by (day string) -- 这个分区字段的名字随便写,它来决定HDFS上文件夹的名字:day=2022-01-01
row format delimited fields terminated by ',';
-- 5、将中间普通表的数据查询出来插入到最终的分区表,这个过程会执行MapReduce,会完成自动分区的动作
insert overwrite table test2 partition (day)
select id, name, score, date_val -- 这里就是根据select最后一个字段进行分区,date_val必须放在最后
from test1; -- 中间普通表
--- ------------------动态分区(多级分区)----------------------------
-- 2、模拟数据
/*
1 2022-01-01 zhangsan 男 80
2 2022-01-01 lisi 男 70
3 2022-01-01 wangwu 女 90
1 2022-01-02 zhangsan 女 90
2 2022-01-02 lisi 女 65
3 2022-01-02 wangwu 男 96
1 2022-01-03 zhangsan 女 91
2 2022-01-03 lisi 男 66
3 2022-01-03 wangwu 男 96
*/
-- 1、创建普通表
drop table if exists test3;
create table test3(
id int,
date_val string,
name string,
sex string,
score int
)
row format delimited fields terminated by '\t';
;
-- 2、给普通表加载数据
load data local inpath '/export/data/partition2.txt' into table test3;
select * from test3;
-- 3、创建最终的分区表
drop table test4;
create table test4(
id int,
name string,
score int
)
partitioned by (xxx string, yyy string)
row format delimited fields terminated by '\t'
;
-- 4、去普通表查询,将查询后的结果插入到最终的分区表,这个过程会执行MapReduce,会完成自动分区的动作
insert overwrite table test4 partition (xxx,yyy)
select id,name,score,date_val ,sex from test3; -- 这里的分区本质是看select的最后两个字段
分桶表
分桶就是将数据划分到不同的文件,其实就是MapReduce的分区
- Hive的分区表是将数据分到不同的文件夹中,而Hive的分桶表可以理解为将文件夹中的文件再进行细分。
- Hive的分区表,和MR中的分区没啥关系,就是分文件夹。
基本操作
将数据按照指定的字段进行分成多个桶中去,说白了就是将数据按照字段进行划分,可以将数据按照字段划分到多个文件当中去
- 开启hive的桶表功能(如果执行该命令报错,表示这个版本的Hive已经自动开启了分桶功能,则直接进行下一步)
set hive.enforce.bucketing=true;
- 设置reduce的个数
set mapreduce.job.reduces=3; #该参数在Hive2.x版本之后不起作用
- 创建分桶表
create table course (cid string,c_name string,tid string) clustered by(cid) into 3 buckets row format delimited fields terminated by '\t';
- 桶表的数据加载,由于桶表的数据加载通过hdfs dfs -put文件或者通过load data均不好使,只能通过insert overwrite
- 创建普通表,并通过insert overwrite的方式将普通表的数据通过查询的方式加载到桶表当中去。
-- 创建普通表
create table course_common (cid string,c_name string,tid string) row format delimited fields terminated by '\t';
-- 普通表中加载数据
load data local inpath '/export/data/hivedatas/course.txt' into table course_common;
--通过insert overwrite给桶表中加载数据
insert overwrite table course select * from course_common cluster by(cid);
分桶的意义
- 数据抽样
- 有时候当你的数据体量很大时,我需要分析的指标不是一些非常具体的值,而是一些趋势类的分析,这是没有必要对全部的数据进行分析,我们从中抽取一部分数据进行分析,则必须使用分桶表
- 提高多表join的效率
- 当A表和B表join时,如果两张表都是分桶表,而且分桶字段就是join字段,分桶数相同,则相同的分桶数据才会才会进行join,避免产生笛卡尔集
修改表
-- 把表score3修改成score4(重命名)
alter table score3 rename to score4;
-- 1:查询表结构
desc score4;
-- 2:添加列
alter table score4 add columns (mycolx string, myscoy string);
-- 3:查询表结构
desc score4;
-- 4:更新列
alter table score4 change column myscox mysconew int;
-- 5:查询表结构
desc score4;
-- 删除表
drop table score4;
-- 清空表数据(只能清空管理表,也就是内部表)
truncate table score4;
Hive表中的数据导出
将hive表中的数据导出到其他任意目录,例如linux本地磁盘,例如hdfs,例如mysql等等
insert导出
-- 将查询的结果导出到本地
insert overwrite local directory '/export/data/exporthive' select * from score;
-- 将查询的结果格式化导出到本地
insert overwrite local directory '/export/data/exporthive' row format delimited fields terminated by '\t' select * from student;
-- 将查询的结果导出到HDFS上(没有local)
insert overwrite directory '/exporthive' row format delimited fields terminated by '\t' select * from score;
hive shell命令导出
-- 基本语法:(hive -f/-e 执行语句或者脚本 > file)
hive -e "select * from myhive.score;" > /export/data/exporthive/score.txt
export导出到hdfs上
export table score to '/export/exporthive/score';
Hive查询语法
SELECT语句
运算符
比较运算符
- 操作符
操作符 | 支持的数据类型 | 描述 |
---|---|---|
A=B | 基本数据类型 | 如果A等于B则返回TRUE,反之返回FALSE |
A<=>B | 基本数据类型 | 如果A和B都为NULL,则返回TRUE,其他的和等号(=)操作符的结果一致,如果任一为NULL则结果为NULL |
A<>B, A!=B | 基本数据类型 | A或者B为NULL则返回NULL;如果A不等于B,则返回TRUE,反之返回FALSE |
A<B | 基本数据类型 | A或者B为NULL,则返回NULL;如果A小于B,则返回TRUE,反之返回FALSE |
A<=B | 基本数据类型 | A或者B为NULL,则返回NULL;如果A小于等于B,则返回TRUE,反之返回FALSE |
A>B | 基本数据类型 | A或者B为NULL,则返回NULL;如果A大于B,则返回TRUE,反之返回FALSE |
A>=B | 基本数据类型 | A或者B为NULL,则返回NULL;如果A大于等于B,则返回TRUE,反之返回FALSE |
A [NOT] BETWEEN B AND C | 基本数据类型 | 如果A,B或者C任一为NULL,则结果为NULL。如果A的值大于等于B而且小于或等于C,则结果为TRUE,反之为FALSE。如果使用NOT关键字则可达到相反的效果。 |
A IS NULL | 所有数据类型 | 如果A等于NULL,则返回TRUE,反之返回FALSE |
A IS NOT NULL | 所有数据类型 | 如果A不等于NULL,则返回TRUE,反之返回FALSE |
IN(数值1, 数值2) | 所有数据类型 | 使用 IN运算显示列表中的值 |
A [NOT] LIKE B | STRING 类型 | B是一个SQL下的简单正则表达式,如果A与其匹配的话,则返回TRUE;反之返回FALSE。B的表达式说明如下:‘x%’表示A必须以字母‘x’开头,‘%x’表示A必须以字母’x’结尾,而‘%x%’表示A包含有字母’x’,可以位于开头,结尾或者字符串中间。如果使用NOT关键字则可达到相反的效果。 |
A RLIKE B, A REGEXP B | STRING 类型 | B是一个正则表达式,如果A与其匹配,则返回TRUE;反之返回FALSE。匹配使用的是JDK中的正则表达式接口实现的,因为正则也依据其中的规则。例如,正则表达式必须和整个字符串A相匹配,而不是只需与其字符串匹配。 |
- Like和Rlike
- 使用LIKE运算选择类似的值
- 选择条件可以包含字符或数字:
- % 代表零个或多个字符(任意个字符)。
- _ 代表一个字符。
- RLIKE子句是Hive中这个功能的一个扩展,其可以通过Java的正则表达式这个更强大的语言来指定匹配条件。
(1)查找以8开头的所有成绩
select * from score where sscore like '8%';
(2)查找第二个数值为9的所有成绩数据
select * from score where sscore like '_9%';
(3)查找id中含1的所有成绩信息
select * from score where sid rlike '[1]';
逻辑运算符
操作符 | 含义 |
---|---|
AND | 逻辑并 |
OR | 逻辑或 |
NOT | 逻辑否 |
分组
Group By
GROUP BY语句通常会和聚合函数一起使用,按照一个或者多个列队结果进行分组,然后对每个组执行聚合操作。注意使用group by分组之后,select后面的字段只能是分组字段和聚合函数。
-- 计算每个学生的平均分数
select sid ,avg(sscore) from score group by sid;
-- 计算每个学生最高成绩
select sid ,max(sscore) from score group by sid;
Having语句
- Having和Where不同点
- where针对表中的列发挥作用,查询数据;having针对查询结果中的列发挥作用,筛选数据。
- where后面不能写分组函数,而having后面可以使用分组函数。
- having只用于group by分组统计语句。
-- 求每个学生的平均分数
select sid ,avg(sscore) from score group by sid;
-- 求每个学生平均分数大于85的人
select sid ,avg(sscore) avgscore from score group by sid having avgscore > 85;
JOIN语句
Hive的join操作只支持等值连接
内连接(INNER JOIN)
只有进行连接的两个表中都存在与连接条件相匹配的数据才会被保留下来。
select * from teacher t, course c where t.tid = c.tid; #隐式内连接
select * from teacher t inner join course c on t.tid = c.tid; #显式内连接
select * from teacher t join course c on t.tid = c.tid;
左外连接(LEFT OUTER JOIN)、右外连接(RIGHT OUTER JOIN)
JOIN操作符左边表中符合WHERE子句的所有记录将会被返回(左外) JOIN操作符右边表中符合WHERE子句的所有记录将会被返回(右外)
-- 查询老师对应的课程
select * from teacher t left join course c on t.tid = c.tid;
-- 查询老师对应的课程
select * from course t right join teacher c on t.tid = c.tid;
满外连接(FULL OUTER JOIN)
将会返回所有表中符合WHERE语句条件的所有记录。如果任一表的指定字段没有符合条件的值的话,那么就使用NULL值替代
SELECT * FROM teacher t FULL JOIN course c ON t.tid = c.tid ;
多表连接
多表连接查询,查询老师对应的课程,以及对应的分数,对应的学生
select * from teacher t
left join course c
on t.tid = c.tid
left join score s
on s.cid = c.cid
left join student stu
on s.sid = stu.sid;
- 大多数情况下,Hive会对每对JOIN连接对象启动一个MapReduce任务。本例中会首先启动一个MapReduce job对表teacher和表course进行连接操作,然后会再启动一个MapReduce job将第一个MapReduce job的输出和表score;进行连接操作。
排序
Order By——全局排序
Order By:全局排序,一个reduce
- 使用 ORDER BY 子句排序
- ASC(ascend): 升序(默认)
- DESC(descend): 降序
- ORDER BY 子句在SELECT语句的结尾
-- 查询学生的成绩,并按照分数降序排列
select * from student s left join score sc on s.sid=sc.sid order by sc.score desc;
-- 按照分数的平均值排序
select sid ,avg(sscore) avg from score group by sid order by avg;
-- 按照学生id和平均成绩进行排序
select sid ,avg(sscore) avg from score group by sid order by sid,avg;
Sort By——每个MapReduce内部局部排序
每个MapReduce内部进行排序,对全局结果集来说不是排序
--1)设置reduce个数
set mapreduce.job.reduces=3;
--2)查看设置reduce个数
set mapreduce.job.reduces;
--3)查询成绩按照成绩降序排列
select * from score sort by sscore;
--4)将查询结果导入到文件中(按照成绩降序排列)
insert overwrite local directory '/export/data/exporthive/sort' select * from score sort by sscore;
Distribute By——分区排序
类似MR中partition,进行分区,结合sort by使用 Hive要求DISTRIBUTE BY语句要写在SORT BY语句之前
- 对于distribute by进行测试,一定要分配多reduce进行处理,否则无法看到distribute by的效果
- 先按照学生id进行分区,再按照学生成绩进行排序
--1)设置reduce的个数,将我们对应的sid划分到对应的reduce当中去
set mapreduce.job.reduces=7;
--2)通过distribute by进行数据的分区
insert overwrite local directory '/export/data/exporthive/distribute' select * from score distribute by sid sort by sscore;
Cluster By
当distribute by和sort by字段相同时,可以使用cluster by方式 cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
-- 以下两种写法等价
select * from score cluster by sid;
select * from score distribute by sid sort by sid;
Hive函数
Hive的内置函数
数学函数
-- 1、取整函数
-- 语法: round(double a)
-- 返回值: BIGINT
-- 说明:返回double类型的整数值部分(遵循四舍五入)
select round(3.1415926); --3
-- 2、指定精度取整
-- 语法: round(double a, int d)
-- 返回值: DOUBLE
-- 说明:返回指定精度d的double类型
select round(3.1415926,4); --3.1416
-- 3、向下取整
-- 语法: floor(double a)
-- 返回值: BIGINT
-- 说明:返回等于或者小于该double变量的最大的整数
select floor(3.1415926); --3
-- 4、向上取整函数
-- 语法: ceil(double a)
-- 返回值: BIGINT
-- 说明:返回等于或者大于该double变量的最小的整数
select ceil(3.1415926); --4
-- 5、取随机数函数
-- 语法: rand(),rand(int seed)
-- 返回值: double
-- 说明:返回一个0到1范围内的随机数。如果指定种子seed,则会返回固定的随机数
hive> select rand();
--0.5577432776034763
hive> select rand();
--0.6638336467363424
hive> select rand(100);
--0.7220096548596434
hive> select rand(100);
--0.7220096548596434
-- 6、幂运算函数
-- 语法: pow(double a, double p)
-- 返回值: double
-- 说明:返回a的p次幂
select pow(2,4) ; --16
-- 7、绝对值函数
-- 语法: abs(double a) abs(int a)
-- 返回值: double int
-- 说明:返回数值a的绝对值
select abs(-3.9); --3.9
select abs(10.9); --10.9
字符串函数
select length('abcedfg'); --7
select reverse("hello"); --olleh
select concat('hello','world'); --helloworld
-- 字符串连接函数-带分隔符:concat_ws
-- 语法: concat_ws(string SEP, string A, string B…)
-- 返回值: string
-- 说明:返回输入字符串连接后的结果,SEP表示各个字符串间的分隔符
select concat_ws(',','abc','def','gh'); --abc,def,gh
-- 字符串截取
hive> select substr('abcde',3);--cde
hive> select substring('abcde',3);--cde
hive> select substr('abcde',-1);--e
hive> select substr('abcde',3,2);--cd
hive> select substring('abcde',3,2);--cd
hive> select substring('abcde',-2,2);--de
select upper('abSEd'); --ABSED
select ucase('abSEd'); --ABSED
select lower('abSEd'); --absed
select lcase('abSEd'); --absed
select trim(' abc '); --abc
select ltrim(' abc '); --abc
select rtrim(' abc '); -- abc
--正则表达式替换函数
--语法: regexp_replace(string A, string B, string C)
--返回值: string
--说明:将字符串A中的符合java正则表达式B的部分替换为C。注意,在有些情况下要使用转义字符,类似oracle中的regexp_replace函数。
select regexp_replace('foobar', 'oo|ar', ''); --fb
--⭐️URL解析函数:parse_url⭐️
--语法: parse_url(string urlString, string partToExtract [, stringkeyToExtract])
--返回值: string
--说明:返回URL中指定的部分。partToExtract的有效值为:HOST, PATH, QUERY, REF, PROTOCOL, AUTHORITY, FILE, and USERINFO.
hive> select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'HOST');
--facebook.com
hive> select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'PATH');
--/path1/p.php
hive> select parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'QUERY','k1');
--v1
select split('abtcdtef','t'); --["ab","cd","ef"]
日期函数
--获取当前UNIX时间戳函数:unix_timestamp
select unix_timestamp(); --1323309615
--UNIX时间戳转日期函数:from_unixtime
select from_unixtime(1598079966,'yyyy-MM-dd HH:mm:ss'); --2020-08-22 15:06:06
--日期转UNIX时间戳函数:unix_timestamp
select unix_timestamp('2011-12-07 13:01:03');--1323234063
--指定格式日期转UNIX时间戳函数:unix_timestamp
select unix_timestamp('20111207 13:01:03','yyyyMMdd HH:mm:ss');--1323234063
--dateFormat函数
select date_format('2020-1-1 1:1:1', 'yyyy-MM-dd HH:mm:ss');
--日期时间转日期函数:to_date
select to_date('2011-12-08 10:03:01');--2011-12-08
--日期转年函数: year
select year('2011-12-08 10:03:01'); --2011
--日期转月函数: month
select month('2011-12-08 10:03:01'); --12
--日期转天函数: day
select day('2011-12-08 10:03:01'); --8
--日期转hour,minute,second………………
--日期转周函数:weekofyear
select weekofyear('2011-12-08 10:03:01'); --49
--日期比较函数: datediff,返回结束日期减去开始日期的天数
select datediff('2012-12-08','2012-05-09'); --213
--日期增加函数: date_add
select date_add('2012-12-08',10); --2012-12-18
--日期减少函数: date_sub
select date_sub('2012-12-08',10); --2012-11-28
条件函数
--if函数: if
--语法: if(boolean testCondition, T valueTrue, T valueFalseOrNull)
--返回值: T
--说明: 当条件testCondition为TRUE时,返回valueTrue;否则返回valueFalseOrNull
select if(1=2,100,200); --200
select if(1=1,100,200); --100
--条件判断函数:CASE
--语法: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
--返回值: T
--说明:如果a等于b,那么返回c;如果a等于d,那么返回e;否则返回f
select case 100 when 50 then 'tom' when 100 then 'mary' else 'tim' end ;
--mary
select case 200 when 50 then 'tom' when 100 then 'mary'else 'tim' end ;
--tim
select case when 1=2 then 'tom' when 2=2 then 'mary' else'tim' end ;
--mary
select case when 1=1 then 'tom' when 2=2 then 'mary' else'tim' end ;
--tom
select sid,case when sscore>=60 then '及格' when sscore<60 then '不及格' else '其他' end from score;
转换函数
--类似于java中的强转
--公式:
--cast(表达式 as 数据类型)
--cast函数,可以将"20190607"这样类型的时间数据转化成int类型数据。
select cast(12.35 as int);
select cast('20190607' as int)
select cast('2020-12-05' as date);
Hive的行转列
介绍
- 行转列是指多行数据转换为一个列的字段。
- Hive行转列用到的函数:
- concat(str1,str2,...) --字段或字符串拼接
- concat_ws(sep, str1,str2) --以分隔符拼接每个字符串
- collect_set(col)/collect_list(col) --将某字段的值进行去重汇总,产生array类型字段
转换
20 SMITH
30 ALLEN
30 WARD
20 JONES
30 MARTIN
30 BLAKE
10 CLARK
20 SCOTT
10 KING
30 TURNER
20 ADAMS
30 JAMES
20 FORD
10 MILLER
select deptno,concat_ws("|",collect_set(ename)) as ems from emp group by deptno;
--行转列,COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生array类型字段。
-- COLLECT_LIST(col)是不会去重的,跟java的Set和List类似。
Hive的表生成函数
explode函数
- explode(col):将hive一列中复杂的array或者map结构拆分成多行。
- explode(ARRAY) 数组的每个元素生成一行
- explode(MAP) map中每个key-value对,生成一行,key为一列,value为一列
10 CLARK|KING|MILLER
20 SMITH|JONES|SCOTT|ADAMS|FORD
30 ALLEN|WARD|MARTIN|BLAKE|TURNER|JAMES
- 建表
create table emp2(
deptno int,
names array<string>
)
row format delimited fields terminated by '\t'
collection items terminated by '|';
- 插入数据
- 使用explode查询
select explode(names) as name from emp;
LATERAL VIEW侧视图
- LATERAL VIEW
- 用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
- 解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
- 列转行
select deptno,name from emp2 lateral view explode(names) tmp_tb as name;
Reflect函数
reflect函数可以支持在sql中调用java中的静态方法
使用java.lang.Math当中的Max求两列中最大值
--创建hive表
create table test_reflect(name string,chinese int,english int) row format delimited fields terminated by ',';
--准备数据 test_reflect.txt
张三,80,90
李四,78,87
王五,60,44
赵六,89,57
周七,95,86
--加载数据
load data local inpath '/export/data/hivedatas/test_reflect.txt' into table test_udf;
--使用java.lang.Math当中的Max求两列当中的最大值
select reflect("java.lang.Math","max", chinese, english) from test_udf;
Hive的开窗函数(窗口函数)
窗口函数(一)ROW_NUMBER,RANK,DENSE_RANK
- 加载数据
user1,2018-04-10,1
user1,2018-04-11,5
user1,2018-04-12,7
user1,2018-04-13,3
user1,2018-04-14,2
user1,2018-04-15,4
user1,2018-04-16,4
user2,2018-04-10,2
user2,2018-04-11,3
user2,2018-04-12,5
user2,2018-04-13,6
user2,2018-04-14,3
user2,2018-04-15,9
user2,2018-04-16,7
CREATE TABLE test_window_func1(
userid string,
createtime string, --day
pv INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
-- 加载数据:
load data local inpath '/export/data/hivedatas/test1.txt' into table test_window_func1;
ROW_NUMBER
ROW_NUMBER() 从1开始,按照顺序,生成分组内记录的序列
SELECT
userid,
createtime,
pv,
ROW_NUMBER() OVER(PARTITION BY userid ORDER BY pv desc) AS rn
FROM test_window_func1;
RANK和DENSE_RANK
RANK() 生成数据项在分组中的排名,排名相等会在名次中留下空位 DENSE_RANK() 生成数据项在分组中的排名,排名相等会在名次中不会留下空位
SELECT
userid,
createtime,
pv,
RANK() OVER(PARTITION BY userid ORDER BY pv desc) AS rn1,
DENSE_RANK() OVER(PARTITION BY userid ORDER BY pv desc) AS rn2,
ROW_NUMBER() OVER(PARTITION BY userid ORDER BY pv DESC) AS rn3
FROM itcast_t1
WHERE userid = 'user1';
#-----------TOP_N问题----------------
select * from
(
select *,
dense_rank() over (partition by userid order by pv desc) as rk -- 组内的排名序号
from test_window_func1
)t
where
rk <= 3;
#-----------------------------------------
with t1 as(
select *,
dense_rank() over (partition by userid order by pv desc) as rk -- 组内的排名序号
from test_window_func1
)
select * from t1 where rk <= 3;
Hive分析窗口函数——SUM、AVG、MIN、MAX
SUM(结果和ORDER BY相关,默认为升序)
select userid,createtime,pv,
sum(pv) over(partition by userid order by createtime) as pv1
from test_window_func1;
select userid,createtime,pv,
sum(pv) over(partition by userid order by createtime rows between unbounded preceding and current row) as pv2
from test_window_func1;
select userid,createtime,pv,
sum(pv) over(partition by userid) as pv3
from test_window_func1; --如果没有order by排序语句 默认把分组内的所有数据进行sum操作
select userid,createtime,pv,
sum(pv) over(partition by userid order by createtime rows between 3 preceding and current row) as pv4
from test_window_func1;
select userid,createtime,pv,
sum(pv) over(partition by userid order by createtime rows between 3 preceding and 1 following) as pv5
from test_window_func1;
select userid,createtime,pv,
sum(pv) over(partition by userid order by createtime rows between current row and unbounded following) as pv6
from test_window_func1;
--pv1: 分组内从起点到当前行的pv累积,如,11号的pv1=10号的pv+11号的pv, 12号=10号+11号+12号
--pv2: 同pv1
--pv3: 分组内(user1)所有的pv累加
--pv4: 分组内当前行+往前3行,如,11号=10号+11号, 12号=10号+11号+12号,
13号=10号+11号+12号+13号, 14号=11号+12号+13号+14号
--pv5: 分组内当前行+往前3行+往后1行,如,14号=11号+12号+13号+14号+15号=5+7+3+2+4=21
--pv6: 分组内当前行+往后所有行,如,13号=13号+14号+15号+16号=3+2+4+4=13,
14号=14号+15号+16号=2+4+4=10
/*
- 如果不指定rows between,默认为从起点到当前行;
- 如果不指定order by,则将分组内所有值累加;
- 关键是理解rows between含义,也叫做window子句:
- preceding:往前
- following:往后
- current row:当前行
- unbounded:起点
- unbounded preceding 表示从前面的起点
- unbounded following:表示到后面的终点
*/
AVG、MIN、MAX
select userid,createtime,pv,
avg(pv) over(partition by userid order by createtime rows between unbounded preceding and current row) as pv2
from test_window_func1;
select userid,createtime,pv,
max(pv) over(partition by userid order by createtime rows between unbounded preceding and current row) as pv2
from test_window_func1;
select userid,createtime,pv,
min(pv) over(partition by userid order by createtime rows between unbounded preceding and current row) as pv2
from test_window_func1;
Hive自定义函数
概述
Hive 自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展。 当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。
- 根据用户自定义函数类别分为以下三种:
- UDF(User-Defined-Function)
- 一进一出
- 类似于:lower/upper/reverse
- UDAF(User-Defined Aggregation Function)
- 聚集函数,多进一出
- 类似于:count/max/min
- UDTF(User-Defined Table-Generating Functions)
- 一进多出
- 如lateral view explode()
- UDF(User-Defined-Function)
自定义UDF
- 编程步骤
- 继承org.apache.hadoop.hive.ql.exec.UDF
- 需要实现evaluate函数;evaluate函数支持重载;
- 注意事项
- UDF必须要有返回类型,可以返回null,但是返回类型不能为void;
- UDF中常用Text/LongWritable等类型,不推荐使用java类型;
代码编写
- maven工程,导入jar
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
</dependencies>
- 开发java类继承UDF,并重载evaluate方法
public class MyUDF extends UDF {
//17801112345 ---> 178****2345
public String evaluate(String phoneNumStr) {
//匹配手机号是否合法
String regex = "1[35789][0-9]{9}";
boolean flag = phoneNumStr.matches(regex);
if (!flag) {
return null;
} else {
String str1 = phoneNumStr.substring(0, 3);
String str2 = phoneNumStr.substring(7);
return str1 + "****" + str2;
}
}
}
函数使用方式1-临时函数
- 将我们的项目打包,并上传到hive的lib目录下
- 添加jar包并重命名
cd /export/server/hive/lib
mv day19_udf-1.0-SNAPSHOT.jar my_udf.jar
- hive客户端添加jar包
hive> add jar /export/server/hive/lib/my_udf.jar
- 设置函数与我们的自定义函数关联-临时函数
hive> create temporary function my_jiami as 'cn.itcast.udf.MyUDF';
- 使用自定义函数
hive>select my_jiami(phone_num) from test_user;
函数使用方式2-永久函数
1. 把自定义函数的jar上传到hdfs中.
hadoop fs -mkdir /hive_func
hadoop fs -put my_udf.jar /hive_func
2. 创建永久函数
hive> create function my_jiami2 as 'cn.itcast.udf.MyUDF'
using jar 'hdfs://node1:8020/hive_func/my_udf.jar';
3. 验证
hive>select my_jiami2(phone_num) from test_user;
删除函数
-- 删除临时函数
drop temporary function if exists encryptPhoneNumber;
-- 删除永久函数,不会删除HDFS上的jar包
drop function if exists my_lower2;
自定义UDTF
代码实现
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.function.ObjDoubleConsumer;
public class MyUDTF extends GenericUDTF {
private final transient Object[] forwardListObj = new Object[1];
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//设置列名的类型
List<String> fieldNames = new ArrayList<>();
//设置列名
fieldNames.add("column_01");
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>() ;//检查器列表
//设置输出的列的值类型
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] objects) throws HiveException {
//1:获取原始数据
String args = objects[0].toString();
//2:获取数据传入的第二个参数,此处为分隔符
String splitKey = objects[1].toString();
//3.将原始数据按照传入的分隔符进行切分
String[] fields = args.split(splitKey);
//4:遍历切分后的结果,并写出
for (String field : fields) {
//将每一个单词添加值对象数组
forwardListObj[0] = field;
//将对象数组内容写出
forward(forwardListObj);
}
}
@Override
public void close() throws HiveException {
}
}
添加jar包
cd /export/data/hive/lib
mv original-day_10_hive_udtf-1.0-SNAPSHOT.jar my_udtf.jar
hive> add jar /export/server/hive/lib/my_udtf.jar;
创建临时函数与开发后的UDTF代码关联
hive>create temporary function my_udtf as 'cn.itcast.udf.MyUDTF';
使用自定义UDTF函数
hive>select my_udtf("zookeeper,hadoop,hdfs,hive,MapReduce",",") word;
Hive的数据压缩
在实际工作当中,hive当中处理的数据,一般都需要经过压缩,可以使用压缩来节省我们的MR处理的网络带宽
MR支持的压缩编码
压缩格式 | 工具 | 算法 | 文件扩展名 | 是否可切分 |
---|---|---|---|---|
DEFAULT | 无 | DEFAULT | .deflate | 否 |
Gzip | gzip | DEFAULT | .gz | 否 |
bzip2 | bzip2 | bzip2 | .bz2 | 是 |
LZO | lzop | LZO | .lzo | 否 |
LZ4 | 无 | LZ4 | .lz4 | 否 |
Snappy | 无 | Snappy | .snappy | 否 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示
压缩格式 | 对应的编码/解码器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
压缩性能的比较:
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
压缩配置参数
要在Hadoop中启用压缩,可以配置如下参数(mapred-site.xml文件中):
参数 | 默认值 | 阶段 | 建议 |
---|---|---|---|
io.compression.codecs | |||
(在core-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec, | ||
org.apache.hadoop.io.compress.Lz4Codec | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 | |
mapreduce.map.output.compress | false | mapper输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 使用LZO、LZ4或snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress | false | reducer输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec | org.apache.hadoop.io.compress. DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type | RECORD | reducer输出 | SequenceFile输出使用的压缩类型:NONE和BLOCK |
开启Map输出阶段压缩
开启map输出阶段压缩可以减少job中map和Reduce task间数据传输量。具体配置如下:
--开启hive中间传输数据压缩功能
hive(default)>set hive.exec.compress.intermediate=true;
--开启mapreduce中map输出压缩功能
hive (default)>set mapreduce.map.output.compress=true;
--设置mapreduce中map输出数据的压缩方式
hive (default)>set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;
--执行查询语句
select count(1) from score;
开启Reduce输出阶段压缩
当Hive将输出写入到表中时,输出内容同样可以进行压缩。属性hive.exec.compress.output控制着这个功能。用户可能需要保持默认设置文件中的默认值false,这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这个值为true,来开启输出结果压缩功能。
-- 1)开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
-- 2)开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
-- 3)设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
-- 4)设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
-- 5)测试一下输出结果是否是压缩文件
insert overwrite local directory '/export/data/compress' select * from score distribute by sid sort by sscore desc;
Hive的数据存储格式
Hive支持的存储数的格式主要有:TEXTFILE(行式存储) 、SEQUENCEFILE(行式存储)、ORC(列式存储)、PARQUET(列式存储)。
列式储存和行式储存
**行存储的特点: **查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。 **列存储的特点: **因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。 相比于行式存储,列式存储在分析场景下有着许多优良的特性:
- 分析场景中往往需要读大量行但是少数几个列。在行存模式下,数据按行连续存储,所有列的数据都存储在一个block中,不参与计算的列在IO时也要全部读出,读取操作被严重放大。而列存模式下,只需要读取参与计算的列即可,极大的减低了IO开销,加速了查询。
- 同一列中的数据属于同一类型,压缩效果显著。列存储往往有着高达十倍甚至更高的压缩比,节省了大量的存储空间,降低了存储成本。
- 更高的压缩比意味着更小的数据空间,从磁盘中读取相应数据耗时更短。
- 自由的压缩算法选择。不同列的数据具有不同的数据类型,适用的压缩算法也就不尽相同。可以针对不同列类型,选择最合适的压缩算法。
TEXTFILE和SEQUENCEFILE的存储格式都是基于行存储的;ORC和PARQUET是基于列式存储的。
主流文件存储格式对比实验
从存储文件的压缩比和查询速度两个角度对比。
TextFile
create table log_text (
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE ;
load data local inpath '/export/data/hivedatas/log.data' into table log_text ;
--查看表中数据大小
hadoop fs -du -h /user/hive/warehouse/myhive.db/log_text;
ORC
create table log_orc(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc ;
insert into table log_orc select * from log_text ;
--查看表中数据大小
hadoop fs -du -h /user/hive/warehouse/myhive.db/log_orc;
Parquet
create table log_parquet(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS PARQUET ;
insert into table log_parquet select * from log_text ;
hdoop fs -du -h /user/hive/warehouse/myhive.db/log_parquet;
结论
存储文件的压缩比总结:ORC > Parquet > textFile存储文件的查询速度总结:ORC > TextFile > Parquet
存储和压缩结合
ORC存储方式的压缩
Key | Default | Notes |
---|---|---|
orc.compress | ZLIB | high level compression (one of NONE, ZLIB, SNAPPY) |
orc.compress.size | 262,144 | number of bytes in each compression chunk |
orc.stripe.size | 67,108,864 | number of bytes in each stripe |
orc.row.index.stride | 10,000 | number of rows between index entries (must be >= 1000) |
orc.create.index | true | whether to create row indexes |
orc.bloom.filter.columns | "" | comma separated list of column names for which bloom filter should be created |
orc.bloom.filter.fpp | 0.05 | false positive probability for bloom filter (must >0.0 and <1.0) |
- 创建一个非压缩的ORC存储方式
create table log_orc_none(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="NONE");
insert into table log_orc_none select * from log_text ;
hadoop fs -du -h /user/hive/warehouse/myhive.db/log_orc_none; --7.7M
- 创建一个SNAPPY压缩的ORC存储方式
create table log_orc_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
insert into table log_orc_snappy select * from log_text ;
hadoop fs -du -h /user/hive/warehouse/myhive.db/log_orc_snappy ; --3.8M
- 若使用ORC存储的默认压缩方式,导入后数据大小为2.8M
- 比Snappy压缩的还小。原因是orc存储文件默认采用ZLIB压缩。比snappy压缩的小。
总结
在实际的项目开发当中,hive表的数据存储格式一般选择:orc或parquet。压缩方式一般选择snappy。 ORC+SNAPPY
Hive调优
本地模式
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
- 用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。
set hive.stats.column.autogather=false;
set hive.exec.mode.local.auto=true; --开启本地mr
--设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=51234560;
--设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
- 实操
--1)开启本地模式,并执行查询语句
hive (default)> set hive.exec.mode.local.auto=true;
hive (default)> select * from score cluster by sid;
18 rows selected (1.568 seconds)
--2)关闭本地模式,并执行查询语句
hive (default)> set hive.exec.mode.local.auto=false;
hive (default)> select * from score cluster by sid;
18 rows selected (11.865 seconds)
空key处理
空key过滤
有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。
--测试环境准备
create table ori(id bigint, time_val bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
create table nullidtable(id bigint, time_val bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
create table jointable(id bigint, time_valbigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
load data local inpath '/export/data/hivedatas/hive_big_table/*' into table ori;
load data local inpath '/export/data/hivedatas/hive_have_null_id/*' into table nullidtable;
--不过滤
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;
结果:
No rows affected (152.135 seconds)
--过滤
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
结果:
No rows affected (141.585 seconds)
空key替换
有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上。
--不随机分布
set hive.exec.reducers.bytes.per.reducer=32123456;
set mapreduce.job.reduces=7;
INSERT OVERWRITE TABLE jointable
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN 'hive' ELSE a.id END = b.id;
No rows affected (41.668 seconds) 52.477
- 这样的后果就是所有为null值的id全部都变成了相同的字符串,及其容易造成数据的倾斜(所有的key相同,相同key的数据会到同一个reduce当中去)
- 为了解决这种情况,我们可以通过hive的rand函数,随记的给每一个为空的id赋上一个随机值,这样就不会造成数据倾斜。
--随机分布
set hive.exec.reducers.bytes.per.reducer=32123456;
set mapreduce.job.reduces=7;
INSERT OVERWRITE TABLE jointable
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;
No rows affected (42.594 seconds)
SQL优化
Map端JOIN
默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。 并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。
- 开启Map端聚合参数设置
--(1)是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
--(2)在Map端进行聚合操作的条目数目(阈值)
set hive.groupby.mapaggr.checkinterval = 100000;
--(3)有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
- 当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。
Count(distinct)
数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:
--方式一
SELECT count(DISTINCT id) FROM bigtable;
--方式二
SELECT count(id) FROM (SELECT id FROM bigtable GROUP BY id) a;
- 虽然会多用一个Job来完成,但在数据量大的情况下,这个绝对是值得的。
避免笛卡尔积
尽量避免笛卡尔积,即避免join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。
并行执行
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。
- 通过设置参数hive.exec.parallel值为true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加。
set hive.exec.parallel=true; --打开任务并行执行
set hive.exec.parallel.thread.number=16; --同一个sql允许最大并行度,默认为8。
- 当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。
严格模式
Hive提供了一个严格模式,可以防止用户执行那些可能意向不到的不好的影响的查询。
- 通过设置属性hive.mapred.mode值为默认是非严格模式nonstrict 。开启严格模式需要修改hive.mapred.mode值为strict,开启严格模式可以禁止3种类型的查询。
set hive.mapred.mode = strict; --开启严格模式
set hive.mapred.mode = nostrict; --开启非严格模式
- 配置文件修改:hive-site.xml
<property>
<name>hive.mapred.mode</name>
<value>strict</value>
</property>
(1)对于分区表,在where语句中必须含有分区字段作为过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。 (2)对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加这个LIMIT语句可以防止Reducer额外执行很长一段时间。 (3)限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用where语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
存储方式ORC+压缩方式SNAPPY
大数据场景下存储格式压缩格式尤为关键,可以提升计算速度,减少存储空间,降低网络io,磁盘io,所以要选择合适的压缩格式和存储格式,存储方式和压缩方式之前已经讲过,这里不再描述。