数仓生态圈辅助工具
资料
Day02_数仓生态圈辅助工具
知识点01:课程内容大纲与学习目标
#课程内容大纲
1、大数据分析交互平台Hue
介绍、功能、架构原理
Hue的使用(操作HDFS Hive)
2、数据迁移同步工具Sqoop
介绍、工作机制、原理
数据导入
全量数据
增量数据
条件部分
Hive HCatalog API
数据导出
全量导出
增量导出
3、工作流调度工具Oozie
工作流概念
Oozie介绍、架构
Oozie工作流类型
Oozie使用案例
#学习目标
了解Hue的功能、使用
重点掌握Sqoop功能、使用
了解oozie的功能、使用(azkban,airflow)
#辅助工具是帮助我们在开发中更方便、更迅速的从事某些任务。同类可替代的产品众多。
#希望大家能够具备一种意识:当我需要的时候 应该如何拾起来这个工具去帮我干活 如果换其他同类型产品如何快速切换使用
知识点02:Hue介绍、功能与架构原理
- CloudraManger
Cloudra公司研发的集成化管理大数据的一套框架,CloudraManger可以让运维通过鼠标点击和简单的配置就能快速搭建大数据开发环境
- Hue介绍
Hue是安装在CloudraManger框架中的一个软件
HUE=Hadoop User Experience
Hue是一个开源的Apache Hadoop UI系统,由Cloudera Desktop演化而来,最后Cloudera公司将其贡献给Apache基金会的Hadoop社区,它是基于Python Web框架Django实现的。
通过使用Hue,可以在浏览器端的Web控制台上与Hadoop集群进行交互,来分析处理数据,例如操作HDFS上的数据,运行MapReduce Job,执行Hive的SQL语句,浏览HBase数据库等等。
HUE对我们的作用:
1:可以通过写HiveSQL ---》idea或者Datgrip
2:可以对HDFS上的文件或者目录进行增删改查 --->HDFS命令 或者9870页面
3:可以oozie进行调度
- Hue功能
- Hue架构原理
Hue是一个友好的界面集成框架,可以集成各种大量的大数据体系软件框架,通过一个界面就可以做到查看以及执行所有的框架。
Hue提供的这些功能相比Hadoop生态各组件提供的界面更加友好,但是一些需要debug的场景可能还是要使用原生系统才能更加深入的找到错误的原因。
知识点03:Hue安装与Web UI页面
- Hue安装
- 官方下载源码包、手动编译安装
最大的挑战在于软件之间的兼容性问题。
- 使用CM集群在线安装
- Hue Web UI页面
- 从CM集群页面进入 http://hadoop01:7180/cmf 用户名密码:admin
- 浏览器直接进入 http://hadoop02:8889/hue 用户名密码:hue
知识点04:Hue操作HDFS
- 进入HDFS管理页面
- 新建文件、文件夹
- 上传、下载文件
- 查看文件内容
- 在线实时编辑文件内容
- 删除文件
- 修改文件权限
知识点05:Hue操作Hive
- 进入Hive面板
- SQL编写、执行
知识点06:ETL到底如何理解?
- ETL定义
百科定义:ETL(Extract-Transform-Load)是将数据从来源端经过抽取(extract)、转换(transform)、==加载(load)==至目的端的过程。
ETL较常用在数据仓库中,是将原始数据经过抽取(Extract)、清洗转换(Transform)之后加载(Load)到数据仓库的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据。
- 数据抽取
确定数据源,需要确定从哪些源系统进行数据抽取;
定义数据接口,对每个源文件及系统的每个字段进行详细说明;
确定数据抽取的方法:是主动抽取还是由源系统推送?
是增量抽取还是全量抽取?是按照每日抽取还是按照每月抽取?
常见的抽取源系统:
#1、从RDBMS抽取数据
通常OLTP系统采用RDBMS存储业务操作数据,从RDBMS抽取操作型数据是最多一种数据抽取方式。
数据从RDBMS抽取后通常会先以文件的方式存储到分布式文件系统中(例如HDFS),方便ETL程序读取原始数据。也有的是将抽取后的数据直接存储到数据仓库中,采用第二种方法需要提前在数据仓库创建与原始数据相同结构的数据仓库模型。
#2、从日志文件抽取
OLTP系统通过日志系统将用户的操作日志、系统日志等存储在OLTP服务器上,由专门的采集程序从服务器上采集日志文件信息。
#3、从数据流接口抽取
OLTP系统提供对外输出数据的接口(比如telnet),采集系统与该接口对接,从数据流接口抽取需要的数据。
- 数据转换
数据转换也叫做数据清洗转换。是将采集过来的原始数据(通常原始数据存在一定的脏数据)清洗(过虑)掉不符合要求的脏数据,并且根据数据仓库的要求对数据格式进行转换。
通常经过数据清洗转换后是符合数据仓库要求的数据。
#具体包括
剔除错误:明显和需求无关的数据进行剔除处理
空值处理:可捕获字段空值,进行加载或替换为其他含义数据
数据标准:统一标准字段、统一字段类型定义
数据拆分:依据业务需求做数据拆分,如身份证号,拆分区划、出生日期、性别等
数据验证:时间规则、业务规则、自定义规则
数据转换:格式转换或者内容转换
数据关联:关联其他数据或数学,保障数据完整性
- 数据加载
数据加载就是清洗转换后的数据存储到数据仓库中,数据加载的方式包括:全量加载、增量加载。
#全量加载:
全量加载相当于覆盖加载的方式,每个加载都会覆盖原始数据将数据全部加载到数据仓库。此类加载方式通常用于维度数据。
对于数据源中不经常变化的数据,采用全量加载(行政区域表)
#增量加载:
增量加载按照一定的计划(通常是时间计划)逐步的一批一批的将数据加载到数据仓库,此类加载方式通常用于OLTP的业务操作数据。
对数据源中每天,每月。。都会发生变化的数据一般采用增量加载
- ETL的概念
- ETL
按其字面含义理解就是按照E-T-L这个顺序流程进行处理:先抽取、然后转换、完成后加载到目标中。
在ETL架构中,数据的流向是从源数据流到ETL工具,ETL工具是一个单独的数据处理引擎,一般会在单独的硬件服务器上,实现所有数据转化的工作,然后将数据加载到目标数据仓库中。
如果要增加整个ETL过程的效率,则只能增强ETL工具服务器的配置,优化系统处理流程(一般可调的东西非常少)。
知识点07:Apache Sqoop介绍、工作机制
- Sqoop介绍
sqoop是apache旗下一款“Hadoop和关系数据库服务器之间传送数据”的工具。
导入数据:MySQL,Oracle导入数据到Hadoop的HDFS、HIVE、HBASE等数据存储系统;
导出数据:从Hadoop的HDFS、HIVE中导出数据到关系数据库mysql等。
- Sqoop工作机制
Sqoop工作机制是将导入或导出命令翻译成mapreduce程序来实现。
在翻译出的mapreduce中主要是对inputformat和outputformat进行定制。
- sqoop安装
你的hive安装在哪一台主机,你的sqoop就要装在哪一台主机,该操作是以node1为例
1、在node1中,上传sqoop的安装包到/export/software目录
2、在node1中,解压sqoop安装包到/export/server目录
tar -xvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /export/server/
3、在node1中,对解压后的sqoop目录进行重命名
cd /export/server/
mv sqoop-1.4.7.bin__hadoop-2.6.0 sqoop-1.4.7
4、在node1中,修改sqoop的配置文件,
cd /export/server/sqoop-1.4.7/conf
mv sqoop-env-template.sh sqoop-env.sh
修改sqoop-env.sh 文件,设置以下内容
export HADOOP_COMMON_HOME=/export/server/hadoop-3.1.4
export HADOOP_MAPRED_HOME=/export/server/hadoop-3.1.4
export HIVE_HOME=/export/server/hive-3.1.2
5、在node1中,加入mysql的jdbc驱动包和hive的执行包
cp /export/server/hive-3.1.2/lib/mysql-connector-java-5.1.32-bin.jar /export/server/sqoop-1.4.7/lib/
cp /export/servers/hive-3.1.2/lib/hive-exec-3.1.2.jar /export/servers/sqoop-1.4.7/lib/
6、在node1,node2、node3中,配置环境变量(手敲,不然宕机)
vim /etc/profile
添加以下内容
export SQOOP_HOME=/export/server/sqoop-1.4.7
export PATH=:$SQOOP_HOME/bin:$PATH
# HCatelog
export HCAT_HOME=/export/server/hive-3.1.2/hcatalog
export PATH=$PTAH:$HCAT_HOME/bin
添加完之后一定要保存退出,执行以下命令
source /etc/profile
7、在node1中,测试sqoop
sqoop list-databases \
--connect jdbc:mysql://node1:3306/ \
--username root --password hadoop
- sqoop测试
测试1:主机名写hadoop01
sqoop list-databases \
--connect jdbc:mysql://hadoop01:3306/ \
--username root --password 123456
测试2:主机名写localhost
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password 123456
#也可以这么写 \表示命令未完待续 下一行还有命令参数 否则遇到回车换行就会自动提交执行
sqoop list-databases \
--connect jdbc:mysql://localhost:3306/ \
--username root \
--password 123456
知识点08:增量数据、全量数据
- 全量数据(Full data)
就是全部数据,所有数据。如对于表来说,就是表中的所有数据。
- 增量数据(Incremental data)
就是上次操作之后至今产生的新数据。
- 数据子集
也叫做部分数据。整体当中的一部分。
知识点09:Sqoop数据导入至HDFS
- 测试数据准备
- 全量导入MySQL数据到HDFS
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table emp \
--target-dir /sqoop/result1 \
--delete-target-dir \
--m 1
#解释
sqoop将MySQL中userdb数据库中emp表数据全部导入到hdfs的/sqoop/result1目录下,使用一个mapstak
这里没有指定导入之后的分隔符,默认导入到HDFS之后分隔符是逗号
-- ##############非新零售主机使用命令####################
sqoop import \
--connect jdbc:mysql://192.168.88.100:3306/userdb \
--username root \
--password 123456 \
--table emp \
--target-dir /sqoop/result1 \
--delete-target-dir \
--m 1
- 指定分隔符
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/result2 \
--delete-target-dir \
--fields-terminated-by '\t' \
--table emp \
--m 1
-- ##############非新零售主机使用命令####################
sqoop import \
--connect jdbc:mysql://192.168.88.100:3306/userdb \
--username root \
--password 111111 \
--target-dir /sqoop/result2 \
--delete-target-dir \
--fields-terminated-by '\t' \
--table emp \
--m 1
- 指定任务并行度(maptask个数)
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/result3 \
--delete-target-dir \
--fields-terminated-by '\t' \
--split-by id \
--table emp \
--m 2
#解释使用两个maptask将MySQL中userdb数据库中emp表数据全部导入到hdfs的/sqoop/result3目录下
#split-by 一般是数字类型,一般都是将主键作为切分条件
#SELECT MIN(`id`), MAX(`id`) FROM `emp` ---》5 -1 = 4 / 2 = 2
第一个maptask 1 2
第二个maptask 3 4 5
-- ##############非新零售主机使用命令####################
sqoop import \
--connect jdbc:mysql://192.168.88.100:3306/userdb \
--username root \
--password 111111 \
--target-dir /sqoop/result3 \
--delete-target-dir \
--fields-terminated-by '\t' \
--split-by id \
--table emp \
--m 2
#下面这个命令是错误的 没有指定切割的判断依据
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/result3 \
--delete-target-dir \
--fields-terminated-by '\t' \
--table emp \
--m 2
知识点10:Sqoop数据导入至Hive
- 测试准备
-- Hive中创建测试使用的数据库
drop database if exists test cascade ;
create database if not exists test;
- 方式1
1、先复制表结构到hive中再导入数据,将关系型数据的表结构复制到hive中
sqoop create-hive-table \
--connect jdbc:mysql://hadoop01:3306/userdb \
--username root \
--password 123456 \
--table emp_add \
--hive-table test.emp_add_sp
其中:
--table emp_add为mysql中的数据库userdb中的表。
--hive-table emp_add_sp 为hive中新建的表名称。
从关系数据库导入文件到hive中
sqoop import \
--connect jdbc:mysql://hadoop01:3306/userdb \
--username root \
--password 123456 \
--table emp_add \
--hive-table test.emp_add_sp \
--hive-import \
--m 1
- 方式2:直接导入数据(包括建表)
-- 1、使用hive默认分隔符
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table emp_conn \
--hive-import \
--hive-database test \
--m 1
#导入之后文件默认的分隔符是'\001'
-- 2、使用指定分隔符
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table emp_conn \
--hive-import \
--hive-database test \
--fields-terminated-by '\t' \
--m 1
-- ##############非新零售主机使用命令####################
sqoop import \
--connect jdbc:mysql://192.168.88.100:3306/userdb \
--username root \
--password 111111 \
--table emp_conn \
--hive-import \
--hive-database test \
--m 1
知识点11:Sqoop数据导入至Hive--HCatalog API
- sqoop API 原生方式
所谓sqoop原生的方式指的是sqoop自带的参数完成的数据导入。
但是有什么不好的地方呢?请看下面案例
-- 手动在hive中建一张表
create table test.emp_hive
(
id int,
name string,
deg string,
salary int,
dept string
)
row format delimited fields terminated by '\t'
stored as orc;
--注意,这里指定了表的文件存储格式为ORC。
--从存储效率来说,ORC格式胜于默认的textfile格式。
sqoop import \
--connect jdbc:mysql://hadoop01:3306/userdb \
--username root \
--password 123456 \
--table emp \
--fields-terminated-by '\t' \
--hive-database test \
--hive-table emp_hive \
-m 1
执行之后,可以发现虽然针对表emp_hive的sqoop任务成功,但是Hive表中却没有数据。
- HCatalog API方式
Apache HCatalog是基于Apache Hadoop之上的数据表和存储管理服务。
包括:
- 提供一个共享的模式和数据类型的机制。
- 抽象出表,使用户不必关心他们的数据怎么存储,底层什么格式。
- 提供可操作的跨数据处理工具,如Pig,MapReduce,Streaming,和Hive。
sqoop的官网也做了相关的描述说明,使用HCatalog支持ORC等数据格式。
- sqoop原生API和 HCatalog区别
#数据格式支持(这是实际中使用HCatalog的主要原因,否则还是原生的灵活一些)
Sqoop方式支持的数据格式较少;
HCatalog支持的数据格式多,包括RCFile, ORCFile, CSV, JSON和SequenceFile等格式。
#数据覆盖
Sqoop方式允许数据覆盖,HCatalog不允许数据覆盖,每次都只是追加。
#字段名匹配
Sqoop方式比较随意,不要求源表和目标表字段相同(字段名称和个数都可以不相同),它抽取的方式是将字段按顺序插入,比如目标表有3个字段,源表有一个字段,它会将数据插入到Hive表的第一个字段,其余字段为NULL。
但是HCatalog不同,源表和目标表字段名需要相同,字段个数可以不相等,如果字段名不同,抽取数据的时候会报NullPointerException错误。HCatalog抽取数据时,会将字段对应到相同字段名的字段上,哪怕字段个数不相等。
行存储和列存储
1、行存储
1)表数据在硬盘上以行为单位,一行的数据是连续存储在一起 ,select * from A 查询效率高。
2)行存储代表:TextFile、SequenceFile
create table test.emp_hive
(
id int,
name string,
deg string,
salary int,
dept string
)
row format delimited fields terminated by '\t'
stored as textfile;
2、列存储
1)表数据在硬盘上以列为单位,一列的数据是连续存储在一起 ,select 字段 from A 查询效率高。
2)列存储代表:ORC、Parquet
create table test.emp_hive
(
id int,
name string,
deg string,
salary int,
dept string
)
row format delimited fields terminated by '\t'
stored as orc;
3、ORC格式插入数据的步骤
1)准备数据:log.dat 18.2M
2)创建普通表,存储格式是TEXTFILE
create temporary table log_text (
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE ;
3)给普通表插入数据
load data local inpath '/root/log.data' into table log_text;
4)创建ORC存储的表
create table log_orc(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc ;
5)从普通表查询数据插入到orc表
insert into table log_orc select * from log_text;
知识点12:Sqoop数据导入--条件部分导入
在实际开发中,有时候我们从RDBMS(MySQL)中导入数据时,不需要将数据全部导入,而只需要导入满足条件的数据,则需要进行条件导入
- query查询
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/result5 \
--query 'select id,name,deg from emp WHERE id>1203 and $CONDITIONS' \
--fields-terminated-by '\001' \
--m 1
知识点13:Sqoop数据导入--增量导入
1、全量导入,表所有数据全部导入
2、条件导入,只要满足查询条件的就导入
3、增量导入,之前已经导入过一次,下一次只导入新增加或者修改的数据
方式1-使用sqoop自带的参数实现增量导入
方式2-使用用户自定义条件来实现增量导入(使用该方式比较多)
增量导入的难点:
因为你之前已经导入多一次,下一次导入时一定要判断哪些数据是已经导入过的,则不要导入,哪些数据是新增加的或者修改的,则需要导入
- 方式一:sqoop自带参数实现
设计思路:对某一列值进行判断,只要大于上一次的值就会导入。
所谓的增量实现,肯定需要一个判断的依据,上次到哪里了,这次从哪里开始。
- append模式
- 要求:必须有一列自增的值,按照自增的int值进行判断
- 特点:只能导入增加的数据,无法导入更新的数据
#第一次,全量导入-首先执行以下指令先将我们之前的数据导入
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/appendresult \
--table emp --m 1
#查看生成的数据文件,发现数据已经导入到hdfs中.
#模拟新增加数据,然后在mysql的emp中插入2条数据:
insert into `userdb`.`emp` (`id`, `name`, `deg`, `salary`, `dept`) values ('1206', 'allen', 'admin', '30000', 'tp');
insert into `userdb`.`emp` (`id`, `name`, `deg`, `salary`, `dept`) values ('1207', 'woon', 'admin', '40000', 'tp');
#第二次导入,执行如下的指令,实现增量的导入:
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table emp --m 1 \
--target-dir /sqoop/appendresult \
--incremental append \
--check-column id \
--last-value 1205
#解释:
--incremental append :新导入的数据会单独创建一个文件,不会和原来的数据进行合并,只能追加
--check-column id :通过id来判断是否新增
--last-value 1205 :上一次导入到id为1205的数据了,这一次从 >1205开始导入
####如果想实现sqoop自动维护增量记录 可以使用sqoop job作业来实现
21/10/09 15:03:37 INFO tool.ImportTool: --incremental append
21/10/09 15:03:37 INFO tool.ImportTool: --check-column id
21/10/09 15:03:37 INFO tool.ImportTool: --last-value 1207
21/10/09 15:03:37 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
并且还可以结合sqoop job作业,实现sqoop自动记录维护last-value值,详细可以参考课程资料。
- lastmodifield模式
- 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
- 特点:既导入新增的数据也导入更新的数据
# 首先我们要在mysql中创建一个customer表,指定一个时间戳字段
create table userdb.customertest(
id int,name varchar(20),
last_mod timestamp default current_timestamp on update current_timestamp
);
#此处的时间戳设置为在数据的产生和更新时都会发生改变.
#插入如下记录:
insert into userdb.customertest(id,name) values(1,'neil');
insert into userdb.customertest(id,name) values(2,'jack');
insert into userdb.customertest(id,name) values(3,'martin');
insert into userdb.customertest(id,name) values(4,'tony');
insert into userdb.customertest(id,name) values(5,'eric');
#第一次全量导入:此时执行sqoop指令将数据导入hdfs:
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/lastmodifiedresult \
--table customertest --m 1
#再次插入一条数据进入customertest表
insert into customertest(id,name) values(6,'james');
#更新一条已有的数据,这条数据的时间戳会更新为我们更新数据时的系统时间.
update customertest set name = 'NEIL' where id = 1;
#第二次导入:执行如下指令,把id字段作为merge-key:
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table customertest \
--target-dir /sqoop/lastmodifiedresult \
--incremental lastmodified \
--check-column last_mod \
--last-value '2022-06-10 11:22:48 \
--m 1 \
--merge-key id
#解释:
--incremental lastmodified 第二次导入数据和第一次导入的数据合并,重新执行MR,将MR执行的 覆盖原来的数据
--last-value "2022-06-10 10:59:36" 把大于等于这个日期的数据导入
--merge-key id 如果第一次导入的数据和第二次导入的数据id相同,则合并
#由于merge-key这种模式是进行了一次完整的mapreduce操作,
#因此最终我们在lastmodifiedresult文件夹下可以发现id=1的name已经得到修改,同时新增了id=6的数据
- 方式二:用户条件过滤实现(不用敲)
- 通过where对字段进行过滤
-- 导入从某一个时间点的数据
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--query "select * from customertest where last_mod >'2022-06-10 10:59:36' and \$CONDITIONS" \
--fields-terminated-by '\001' \
--hcatalog-database test \
--hcatalog-table customertest \
-m 1
-- 导入上一天数据,指定时间区间
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--query "select * from customertest where last_mod >= '2022-06-09 00:00:00' and last_mod <= '2022-06-09 23:59:59' and \$CONDITIONS" \
--fields-terminated-by '\001' \
--hcatalog-database test \
--hcatalog-table customertest \
-m 1
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--query "select * from emp where id>1203 and \$CONDITIONS" \
--fields-terminated-by '\001' \
--hcatalog-database test \
--hcatalog-table emp_hive \
-m 1
知识点14:Sqoop数据导出
sqoop导出操作最大的特点是,目标表需要自己手动提前创建。
1、在大数据中,数据的导出一般发生在大数据分析的最后一个阶段
2、将分析后的指标导入一些通用的数据库系统中,用于进一步使用
3、导入到MySQL时,需要在MySQL中提前创建表
- 全量数据导出
#step1:MySQL中建表
mysql> use userdb;
mysql> create table employee (
id int not null primary key,
name varchar(20),
deg varchar(20),
salary int,
dept varchar(10));
#step2:从HDFS导出数据到MySQL
sqoop export \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table employee \
--export-dir /sqoop/result1/
#解释:将HDFS/sqoop/result1/目录下的文件内容,导出到mysql中userdb数据库下的employee表
#step3:从Hive导出数据到MySQL
#首先清空MySQL表数据
truncate table employee;
sqoop export \
--connect "jdbc:mysql://192.168.88.80:3306/userdb? useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table employee \
--hcatalog-database test \
--hcatalog-table emp_hive \
--input-fields-terminated-by '\t' \
-m 1
#解释: 将Hive中test数据库下的emp_hive表导出到MySQL的userdb数据库中employee表
#--input-fields-terminated-by '\t' :表示sqoop去hive的表目录下读取文件时,使用'\t'对文件进行切割,如果hive文件分隔符是'\001',则该参数不用指定
#注意,如果Hive中的表底层是使用ORC格式存储的,那么必须使用hcatalog API进行操作。
- 增量数据导出
- updateonly:只增量导出更新的数据
- allowerinsert:既导出更新的数据,也导出新增的数据
- updateonly模式
#在HDFS文件系统中/sqoop/updateonly/目录的下创建一个文件updateonly_1.txt
hadoop fs -mkdir -p /sqoop/updateonly/
hadoop fs -put updateonly_1.txt /sqoop/updateonly/
1201,gopal,manager,50000
1202,manisha,preader,50000
1203,kalil,php dev,30000
#手动创建mysql中的目标表
mysql> USE userdb;
mysql> CREATE TABLE updateonly (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(20),
deg VARCHAR(20),
salary INT);
#先执行全部导出操作:
sqoop export \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table updateonly \
--export-dir /sqoop/updateonly/updateonly_1.txt
#新增一个文件updateonly_2.txt:修改了前三条数据并且新增了一条记录
1201,gopal,manager,1212
1202,manisha,preader,1313
1203,kalil,php dev,1414
1204,allen,java,1515
hadoop fs -put updateonly_2.txt /sqoop/updateonly/
#执行更新导出:
sqoop export \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table updateonly \
--export-dir /sqoop/updateonly/updateonly_2.txt \
--update-key id \
--update-mode updateonly
#解释:
--update-key id 根据id这列来判断id两次导出的数据是否是同一条数据,如果是则更新
--update-mode updateonly 导出时,只导出第一次和第二次的id都有的数据,进行更新,不会导出HDFS中新增加的数据
- allowinsert模式
#手动创建mysql中的目标表
mysql> USE userdb;
mysql> CREATE TABLE allowinsert (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(20),
deg VARCHAR(20),
salary INT);
#先执行全部导出操作
sqoop export \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table allowinsert \
--export-dir /sqoop/updateonly/updateonly_1.txt
#执行更新导出
sqoop export \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root --password 123456 \
--table allowinsert \
--export-dir /sqoop/updateonly/updateonly_2.txt \
--update-key id \
--update-mode allowinsert
知识点15:工作流介绍
- 工作流概念
工作流(Workflow),指“业务过程的部分或整体在计算机应用环境下的自动化”。是对工作流程及其各操作步骤之间业务规则的抽象、概括描述。
工作流解决的主要问题是:为了实现某个业务目标,利用计算机软件在多个参与者之间按某种预定规则自动传递文档、信息或者任务。
一个完整的数据分析系统通常都是由多个前后依赖的模块组合构成的:数据采集、数据预处理、数据分析、数据展示等。各个模块单元之间存在时间先后依赖关系,且存在着周期性重复。
核心概念:依赖执行 周期重复执行
- 工作流实现方式
- 自己开发实现调度工具
- 使用第三方调度软件
知识点16:Apache Oozie介绍、架构
- 工作流调度
1、在大数据的实际开发中,我们数据的整个处理和分析过程是周期型的工作,比如每隔一天都需要讲整个流程执行一遍
2、所以我们需要一个工作流调度框架,帮我们在合适的时间重复执行这个分析任务
- oozie介绍
Oozie是一个用来管理 Hadoop生态圈job的工作流调度系统。由Cloudera公司贡献给Apache。
Cloudra产品: Cloudera Manager、Cloudera Hadoop(CDH)、Oozie
Oozie是运行于Java servlet容器上的一个java web应用。
Oozie的目的是按照DAG(有向无环图)调度一系列的Map/Reduce或者Hive等任务。Oozie 工作流由hPDL(Hadoop Process Definition Language)定义(这是一种XML流程定义语言)。
适用场景包括:
需要按顺序进行一系列任务;
需要并行处理的任务;
需要定时、周期触发的任务;
可视化作业流运行过程;
运行结果或异常的通报。
- oozie架构
#Oozie Client
提供命令行、java api、rest等方式,对Oozie的工作流流程的提交、启动、运行等操作;
#Oozie WebApp
即 Oozie Server,本质是一个java应用。可以使用内置的web容器,也可以使用外置的web容器;
#Hadoop Cluster
底层执行Oozie编排流程的各个hadoop生态圈组件;
知识点17:Oozie工作流类型
workflow 普通工作流 没有定时和条件触发功能。
coordinator 定时工作流 可以设置执行周期和频率
bundle 批处理工作流 一次可以提交执行多个coordinator
知识点18:Oozie使用案例
- Oozie的操作步骤-WorkFlow
- 1、开启Oozie服务
- 2、打开Oozie页面
- 3、创建WorkFlow工作流,并命名
- 4、让oozie调度hive脚本
- 5、找到你的hive脚本
- 5.1 在Linux的/root目录,写一个SQL脚本:hive_test.sql
create database myhive;
use myhive;
create table if not exists stu(id int ,name string)
row format delimited fields terminated by '\t';
insert into stu values (1,"zhangsan");
insert into stu values (2,"lisi");
- 5.1 在本地设置脚本权限为为777
chmod 777 /root/hive_test.sh
- 5.2 将脚本上传到HDFS的/user/hue/oozie_test目录
hadoop fs -put /root/hive_test.sh /user/hue/oozie_test
hadoop fs -chmod 777 /user/hue/oozie_test/hive_test.sh
- 6、保存工作流
- 7、点击执行
- 8、查看进度
- 9、以后如何找到该工作流?
- Oozie的操作步骤-定时计划
- 1、打开定时计划页面
- 2、进行设置
- 3、保存并执行
- Oozie的操作步骤-多个定时计划批量执行(Bundle)
- WorkFlow 、计划、Bundle之间的关系
1、WorkFlow是一次性的工作流
2、计划是让WorkFlow可以定时的多次执行
3、Bundle是包含多个计划,可以让多个计划进行批量管理
结论: Bundle包含计划, 计划包含WorkFlow, WorkFlow包含命令和脚本
- 新增oozie程序
- 提交任务
- 修复异常