跳至主要內容

PySpark_3

Znyoung大数据PySpark

day03 PySpark课程笔记

今日内容:

  • 1- Spark框架与PySpark交互流程 (理解)
  • 2- Spark-Submit相关参数说明 (记录)
  • 3- RDD的基本介绍 (理解)
  • 4- 如何构建RDD(操作)

1. Spark框架与PySpark交互流程

  • 将Spark程序提交到Spark集群采用Client部署模式:
image-20220830221040127.png
image-20220830221040127.png
1- 首先, 在提交的节点上启动一个Driver的程序
2- Driver程序一旦启动后, 执行Main函数,首先要先创建SparkContext对象(底层是基于PY4J, 识别python代码 将其映射为Java代码)
3- 连接Spark集群主节点, 根据提交资源配置要求, 向Master申请资源,用于启动Executor
4-Master接收到资源申请后, 根据申请资源要求, 来进行资源分配工作, 底层分配逻辑为FIFO(先进先出), 当分配好了之后, 等待Driver拉取资源信息列表     
	executor1:  node1  2GB + 2CPU     
	executor2:  node3  2GB + 2CPU
5.接收到资源后, 连接对应的worker占用资源启动executor (反向注册通知)
6- Driver开始处理任务:       
	6.1 首先会加载所有的RDD的算子(API), 基于算子(API)之间的依赖关系, 形成DAG执行流程图, 并且划分stage阶段,并且还会确认每个阶段要运行多少个线程,每个线程最终由那个executor来执行(任务分配)      
	6.2 Driver程序通知对应的executor程序, 来执行具体的任务      
	6.3 executor在接收到任务后, 启动线程, 开始执行具体的任务工作: executor在执行的时候, 由于RDD代码中存在大量的python的函数, 而executor是一个JVM程序, 无法执行, 此时会调用python解释器来执行这些内容, 返回给executor结果即可      
	6.4 executor在运行的过程中, 如果发现最终的结果需要返回给Driver, 那么就直接返回给Driver, 如果不需要返回的直接输出, 那么就直接输出处理即可      
	6.5 Driver程序监听这个executor执行的状态信息, 当executor全部都执行完成后, Driver认为任务运行完成了

7- 当任务执行完成后, Driver执行 sc.stop()  通知Master任务执行完成, Master回收资源
  • PySpark程序提交Spark集群:  部署方式Cluster
1- 首先客户端将PySpark任务提交到Spark集群的Master节点上
2- Master收到任务后, 首先会根据任务信息中, 提供的Driver的资源信息, 随机找一台worker节点(找一个相对不忙,有资源)启动Driver程序 
3- Driver程序一旦启动后, 执行Main函数,首先要先创建SparkContext对象(底层是基于PY4J, 识别python代码 将其映射为Java代码), 同时启动后Driver程序也会和Master建立心跳
4- Driver程序根据任务信息中关于executor的资源配置要求, 向Master申请资源,用于启动Executor
5- Master接收到资源申请后, 根据申请资源要求, 来进行资源分配工作, 底层分配逻辑为FIFO(先进先出), 当分配好了之后, 等待Driver拉取资源信息列表     
	executor1:  node1  2GB + 2CPU     
	executor2:  node3  2GB + 2CPU
6. Driver接收到资源后, 连接对应的worker占用资源启动executor (反向注册通知)
7- Driver开始处理任务:       
	7.1 首先会加载所有的RDD的算子(API), 基于算子(API)之间的依赖关系, 形成DAG执行流程图, 并且划分stage阶段,并且还会确认每个阶段要运行多少个线程,每个线程最终由那个executor来执行(任务分配)      
	7.2 Driver程序通知对应的executor程序, 来执行具体的任务      
	7.3 executor在接收到任务后, 启动线程, 开始执行具体的任务工作: executor在执行的时候, 由于RDD代码中存在大量的python的函数, 而executor是一个JVM程序, 无法执行, 此时会调用python解释器来执行这些内容, 返回给executor结果即可      
	7.4 executor在运行的过程中, 如果发现最终的结果需要返回给Driver, 那么就直接返回给Driver, 如果不需要返回的直接输出, 那么就直接输出处理即可      
	7.5 Driver程序监听这个executor执行的状态信息, 当executor全部都执行完成后, Driver认为任务运行完成了

8- 当任务执行完成后, Driver执行 sc.stop()  通知Master任务执行完成, Master回收资源
  • PySpark程序提交Yarn集群: 部署方式Client
image-20220901194943754.png
image-20220901194943754.png
1- 首先, 在提交的节点上启动一个Driver的程序
2- Driver程序一旦启动后, 执行Main函数,首先要先创建SparkContext对象(底层是基于PY4J, 识别python代码 将其映射为Java代码)
3- 连接Yarn集群主节点, 根据提交资源配置要求,向Yarn提交了一个任务, 此任务的目的帮助申请资源, 并且启动executor
4- Yarn接收到任务信息后, 首先会先随机的在某一台nodemanager节点上, 启动ApplicationMaster, 当applicationMaster启动完成后, 会Yarn建立心跳机制,告知Yarn, 已经启动成功了
5- AppMaster接下来的就要进行资源申请工作, 将需要申请的资源通过心跳的方式发送给RM, RM收到资源信息, 就会调用Yarn的资源调度器来进行资源的分配工作, 分配好后等待AppMaster的拉取即可     
	executor1:  node1  2GB + 2CPU     
	executor2:  node3  2GB + 2CPU
6.AppMaster不断的基于心跳询问Yarn主节点是否已经准备好资源列表, 一旦发现准备好了, 立即获取,然后根据对应的资源信息连接对应的nodemanager占用资源启动executor (反向注册通知给Driver程序)
7- Driver开始处理任务:       
	7.1 首先会加载所有的RDD的算子(API), 基于算子(API)之间的依赖关系, 形成DAG执行流程图, 并且划分stage阶段,并且还会确认每个阶段要运行多少个线程,每个线程最终由那个executor来执行(任务分配)      
	7.2 Driver程序通知对应的executor程序, 来执行具体的任务      
	7.3 executor在接收到任务后, 启动线程, 开始执行具体的任务工作: executor在执行的时候, 由于RDD代码中存在大量的python的函数, 而executor是一个JVM程序, 无法执行, 此时会调用python解释器来执行这些内容, 返回给executor结果即可      
	7.4 executor在运行的过程中, 如果发现最终的结果需要返回给Driver, 那么就直接返回给Driver, 如果不需要返回的直接输出, 那么就直接输出处理即可      
	7.5 Driver程序监听这个executor执行的状态信息, 当executor全部都执行完成后, Driver认为任务运行完成了

8- 当任务执行完成后, Driver执行 sc.stop()  此时各个executor通知AppMaster任务已经执行完成,AppMaster通知Yarn主节点, 任务运行完成, Yarn通知AppMaster退出程序, 同时回收资源


核心: On Yarn Client模式中  Driver程序将资源申请工作, 交给了AppMaster  Driver仅负责任务的管理工作
  • PySpark程序提交Yarn集群: 部署方式cluster模式
image-20220901200339520.png
image-20220901200339520.png
1- 首先, 连接Yarn集群的主节点提交任务
2- 当YARN主节点接收到任务后, 首先会先随机的选择一台nodemanager来启动AppMaster(Driver)程序, 当AppMaster(Driver)程序启动后, 会和Yarn的主节点建立心跳机制, 报告已经启动完成了

3- AppMaster(Driver)程序一旦启动后, 执行Main函数,首先要先创建SparkContext对象(底层是基于PY4J, 识别python代码 将其映射为Java代码)
4- AppMaster(Driver)接下来的就要进行资源申请工作, 将需要申请的资源通过心跳的方式发送给RM, RM收到资源信息, 就会调用Yarn的资源调度器来进行资源的分配工作, 分配好后等待AppMaster(Driver)的拉取即可     
	executor1:  node1  2GB + 2CPU     
	executor2:  node3  2GB + 2CPU
5.AppMaster(Driver)不断的基于心跳询问Yarn主节点是否已经准备好资源列表, 一旦发现准备好了, 立即获取,然后根据对应的资源信息连接对应的nodemanager占用资源启动executor (反向注册通知给AppMaster(Driver)程序)
7- AppMaster(Driver)开始处理任务:       
	7.1 首先会加载所有的RDD的算子(API), 基于算子(API)之间的依赖关系, 形成DAG执行流程图, 并且划分stage阶段,并且还会确认每个阶段要运行多少个线程,每个线程最终由那个executor来执行(任务分配)      
	7.2 AppMaster(Driver)程序通知对应的executor程序, 来执行具体的任务      
	7.3 executor在接收到任务后, 启动线程, 开始执行具体的任务工作: executor在执行的时候, 由于RDD代码中存在大量的python的函数, 而executor是一个JVM程序, 无法执行, 此时会调用python解释器来执行这些内容, 返回给executor结果即可      
	7.4 executor在运行的过程中, 如果发现最终的结果需要返回给AppMaster(Driver), 那么就直接返回给AppMaster(Driver), 如果不需要返回的直接输出, 那么就直接输出处理即可      
	7.5 AppMaster(Driver)程序监听这个executor执行的状态信息, 当executor全部都执行完成后, Driver认为任务运行完成了

8- 当任务执行完成后, AppMaster(Driver)执行 sc.stop()  AppMaster(Driver)通知Yarn主节点, 任务运行完成, Yarn通知AppMaster(Driver)退出程序, 同时回收资源


核心: On Yarn cluster模式中  Driver程序 和 AppMaster程序 二合一, 共同资源申请 和 任务管理工作

2. Spark-Submit相关的参数说明(day02)

3. RDD的基本介绍

3.1 什么是RDD

RDD: 弹性的分布式数据集

RDD出现的目的: 用于支持更高效的迭代计算操作

背景:

在早期的计算模型中:  单机模型
	比如: Excel MySQL
	依赖于单个节点性能
	适用于少量数据集统计计算分析处理工作  整个计算也是基于一个进程中, 进行不断的迭代处理操作

当数据体量变大了以后, 单机这种模型可能无法进行处理操作, 此时可以采用分布式计算模型
	核心: 让更多的节点参与计算, 将计算任务进行划分, 将各个部分交给各个节点来执行处理操作, 运行完成后, 将结果进行汇总整合即可(分而治之)
	
	比如: MR SPARK  FLINK  STORM(几乎很少有人用了)....
	
	MapReduce计算模型:  
		在整个计算过程中, 每一个MR都是由 MapTask 和 ReduceTask 组成的
		在计算过程中, 需要将数据从磁盘读取到内存中, 从内存落入到磁盘中, 再次磁盘中读取, 在落入内存 .... 整个计算操作IO是非常大(MR也被称为IO密集型框架) 从而导致执行效率非常低
	
		由于整个MR只有MapTask和ReduceTask, mapTask用于分布式计算操作, reduceTask进行汇总处理, 如果需要进行多次的分布式处理, 多次的聚合处理呢, 对于MR来说, 单个MR无法处理, 必须将多个MR串联在一起, 每一个MR都要重新申请资源 回收资源,大量的时间可能都浪费在资源处理上, 以及IO处理上, 整个迭代计算效率也是非常差的
		
		发现 MR存在的一些问题点: 
			1- 是否可以让中间处理过程尽可能在内存中, 从而提升效率
			2- 是否可以在一个程序中支持多次完成不断的迭代计算
		
		这些解决的问题方案, 最终由Spark来具体处理, Spark的RDD的产生就是为了解决这样的问题

MR的迭代计算过程:

image-20220901210132911.png
image-20220901210132911.png

Spark的迭代计算过程

image-20220901210201663.png
image-20220901210201663.png

3.2 RDD的五大特性(明确知道)

五大特性:

1- RDD是可分区: RDD的分区是一个抽象的分区,  每一个RDD的分区对应就是一个线程 多个分区实现并行计算
2- RDD的计算函数是作用在RDD的每一个分区上的
3- RDD之间存在依赖关系的
4- [可选的] 对于KV类型的RDD 默认每一个分区是基于Hash 分区方案来处理的
5- [可选的] 移动数据不如移动计算, 让计算函数运行在离数据最近的地方

4 如何构建RDD

构建RDD对象的方式主要有二种:

1- 通过 parallelism 并行的方式来构建RDD (程序内部模拟数据)  -- 测试

2- 通过读取外部文件的方式来获取一个RDD对象
image-20220901212253695.png
image-20220901212253695.png

4.1 通过并行的方式来构建RDD

代码演示:

# 演示WordCount实现代码
from pyspark import SparkContext, SparkConf
import os

# 锁定远程的环境版本(固定代码): 写了一定是没问题的, 但是不写可能会报错
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'


if __name__ == '__main__':
    print("演示: RDD的构建方式  方式一 程序内部初始化数据")

    # 1- 创建SparkContext对象
    conf = SparkConf().setAppName('create_rdd').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 2- 构建RDD对象:  方式一
    rdd_init = sc.parallelize(['张三','李四','王五','赵六','田七','周八','李九'])

    print(type(rdd_init))
    print(rdd_init.collect())

    # RDD是可分区,那么此处的RDD是否有分区呢?
    print(rdd_init.getNumPartitions()) # 查看当前这个RDD有多少个分区
    # 那如何查看每个分区的数据内容呢?
    # [['张三', '李四', '王五'], ['赵六', '田七', '周八', '李九']]
    print(rdd_init.glom().collect())

总结:

总结方式一的RDD的分区数量的说明: 
1) 默认情况下, 分区数量取决于 setMaster参数设置, 比如 local[5] 表示有五个线程, 默认就是五个5分区 如果loca[*] 默认与当前节点CPU核数是相同的

2) 支持手动设置数据的分区数量: 
	sc.parallelize(初始化数据, 分区数量)

建议: 手动设置分区数量 尽量不要超过 初始的线程的数量

相关的API:
1) 如何获取分区的数量:  rdd.getNumPartitions()
2) 如何获取每个分区的内容: rdd.glom().collect()
上次编辑于:
贡献者: 麦正阳