这篇文章主要讲解了“Spark中的RDD到底是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark中的RDD到底是什么”吧!
Spark是开源的分布式计算引擎,基于RDD来构造数据处理流程,并在集群间调度任务,通过分区数据管理机制来划分任务的并行度,并在任务之间交换分区数据,实现分布式的数据处理。
RDD是Spark中最重要的概念,理解了RDD是什么,基本也就理解了一半Spark的内部机密了。
1、RDD基类
RDD是Spark中表示数据集的基类,是可序列号的对象,因此RDD可在Spark节点中复制。RDD定义了数据迭代器来循环读取数据,以及在数据集上定义各类转换操作,生成新的RDD。
RDD的各种算子会触发生成新的RDD。如:
map操作生成MapPartitionsRDD。
filter操作也生成MapPartitionsRDD,filter操作其实是在之前的RDD迭代器上封装了一层filter操作,其实还是第一个迭代器,只不过这个迭代器会抛弃掉一些不满足的记录。
RDD的计算过程是通过compute方法来触发的。
1.1 RDD触发任务
submit过程是提交spark程序到集群,这时候会触发application事件和driver事件等,并通过master节点选择对应的node来创建app和driver,同时在node上执行spark jar包里的main方法。但task的真正执行要等到RDD的compute动作来触发的。
RDD通过compute触发任务,提交FinalStage给Dag执行。如collect(),count()等方法都会触发compute过程,间接提交任务。
RDD.compute()=> finalStage => dag.submitJob()=> submitMissingStage() .
dag.submitJob()=> scheduleImpl.launchTask()=>scheduleBackend => executorBackend=> executor.launchTask()=> executorBackend.taskComplete msg => scheduleBackend.taskCompleted=>dag.stageFinished()=> …
上面是RDD提交任务的大致流程。Compute函数是触发函数,这会导致最后一个RDD被执行,也是finalStage;finalStage调用DAG的submitJob函数提交stage,这里的stage就是finalStage。
Stage是从源头到finalStage串起来的,执行的时候是反向寻找的,这句话要好好体会,这个过程其实就是RDD的秘密了。
我们先看下RDD的经典图例。图中中间的部分Transformation是RDD的计算过程,左边的HDFS示意数据源,右边的HDFS示意RDD的finalStage执行的操作(图中的操作是写入hdfs,当然也可以是print操作等等,就看你怎么写了)。
Stage1和stage2是窄依赖,map和union都是窄依赖;stage3是宽依赖,这里是join操作。窄依赖的意思就是操作只依赖一个stage的数据,宽依赖的意思是依赖于多个stage,对这多个stage的数据要做全连接操作。
1.2、RDD执行示例
RDD通过runJob调用来获得执行,如下:
def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
Sc是SparkContext。
对每个分区执行func操作,返回结果是一个长度等于分区数的Array。
Sc.runJob再调dagScheduler.runJob方法。具体可以看DagScheduler的作业执行步骤,这里先不说,看笔者的专门论述DagScheduler的文章。
1.3、迭代器
RDD实际执行是通过迭代器读取数据的。
RDD是抽象类,定义了几个接口:
分别是getPartitions、compute、getPreferredLocations。RDD数据是分区存储,每一个分区可能分布在申请spark资源的任何位置。这三个接口可以描述RDD的全部信息,其中getPreferredLocations这个方法是和计算本地化有关的,这里我们就先忽略它,不影响我们理解RDD的原理。
override protected def getPartitions: Array[Partition] = {}
override def compute(split: Partition, context: TaskContext): Iterator[java.lang.Integer] = new NextIterator[Integer] {}
getPartitions方法我们也不用太关注,它的作用是返回一个分区列表,表示这个RDD有几个分区,实际运行的时候RDD的每个分区会被安排到单独的节点上运行,这样来实现分布式计算的。
我们最关心的是compute的方法,这个方法返回一个迭代器,这个迭代器就是这个RDD的split这个分区的数据集。至于这个迭代器的数据是什么,是在compute方法体中写代码来生成的。我们可以定义自己的RDD,只要写代码实现这几个方法就可以了!
自定义RDD有什么好处呢?最大的好处就是可以把自己的数据集纳入到Spark的分布式计算体系中,帮助你实现数据分区,任务分配,和其他RDD执行全连接汇聚操作等。
言归正传,回到compute方法本身。
怎么获得Iterator[T],对ShuffleRDD来说是从BlockManager获取迭代器Iterator[T]。这种迭代器是blockResult,是ShuffleMapTask执行结果的保存格式;另一种就是直接获得iter,这种是ResultTask的执行结果的数据。
第一种情况,看BlockManager能否找到本RDD的partition的BlockResult。看看getOrElseUpdate方法还传递了一个函数作为最后一个入参,如果不存在指定的BlockResult,则返回入参函数来计算得到iter,方法体定义如下:
() => { readCachedBlock = false computeOrReadCheckpoint(partition, context) }
主要就是调用computeOrReadCheckpoint方法计算分区。
def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } }
computeOrReadCheckpoint得到Iterator,如果是checkpoint的那么调用第一个父类的iterator方法得到Iterator,这里父类就是CheckpointRDD;否则就是调用compute方法得到Iterator。
所以,RDD的迭代器的实际获取分成两步:
首先,判断是否存在该RDD指定partition的BlockResult,如果存在则将BlockResult作为Iterator结果,此时表示该RDD是shuffleRDD之类。
然后如果上述不满足,则又分两种情况,第一种这是checkpoint的RDD,则调用父RDD的iterator方法(此时父RDD就是CheckpointRDD);否则调用compute方法来获得Iterator。
2、Stage划分
我们知道RDD的提交Spark集群执行是分阶段划分Stage提交的。从最后一个Stage开始,依次循环递归判断是否要调用依赖RDD的Stage,Stage的划分是根据是否要Shuffle作为分界点的。
如果某个RDD的依赖(dep)是ShuffleDependency,则次RDD作为ShuffleMapTask任务提交,否则最后一个RDD作为ResultTask提交。
递归提交Stage,对ShuffleMapTask类型的RDD,会一直递归判断该RDD是否存在前置的ShuffleDependency,如果存在则递归提交前依赖RDD。
整个Spark作业是RDD串接的,如果不存在Shuffle依赖,则提交最后一个RDD,并且只有这一个RDD被提交。在计算最后一个RDD的iterator时,被调用到父RDD的iterator方法,此时父RDD一般都是MapPartitionsRDD。在MapPartitionsRDD中有进一步叙述。
3、RDD子类
RDD含有多个子类,如MapPartitionRDD,HadoopRDD、CoGroupedRDD等等。笔者这里就找几个例子简单说明一下他们的内部逻辑。
3.1 MapPartitionsRDD
MapPartitionsRDD是RDD的子类,前面看到RDD的诸多算子都会生成新的MapPartitionRDD。
MapPartitionsRDD的构造函数需要入参f,它是一个函数抽象类或者叫做泛类。
f: (TaskContext, Int, Iterator[T]) => Iterator[U]
f的入参有三个:
(1) TaskContext:是任务上下文
(2) Int:是分区编码
(3) Iterator[T]是分区迭代器
f的输出也是一个Iterator迭代器。可以看出,f是一个抽象的从一个迭代器生成另一个迭代器的自定义函数。对数据的处理逻辑就是体现在f上。
MapPartitionRDD中触发计算的compute方法定义如下:
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))
这里的f是MapPartitionRDD的构造函数中传进入的入参,是用户自定义的map函数。这样,通过RDD的map、flatmap等算子和MapPartitionRDD,可以将RDD上的一系列操作不停的串联下去。
3.2 CoalescedRDD
CoalescedRDD将M个分区的RDD重新分成N个分区,形成新的RDD。在计算过程中,会引起Shuffle工程。
首先CoalescedRDD需要一个重新分区算法,将M个分区如何划分到N个分区,这里M>N。重新分区的结果是N的每个分区对应了M的多个分区,用List<Int>来表示,List<Int>中每个Int表示父RDD中M个分区之一的编号。
如果CoalescedRDD没有指定自己的重新分区算法,则用DefaultPartitionCoalescer来做重新分区计算。
CoalescedRDD的compute过程如下:
override def compute(partition: Partition, context: TaskContext): Iterator[T] = { partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition => firstParent[T].iterator(parentPartition, context) } }
partition.parents是指CoalescedRDD的第partition分区所对应的父RDD的分区列表,对分区列表的每个分区,执行:
firstParent[T].iterator(parentPartition, context)
然后得到最终的Iterator[T]。这段应该不难理解。
需要留意的是,这里得到的Iterator[T]最终是要写到Shuffle的,因为CoalescedRDD对应的ShuffleMapTask而不是ResultTask。
对于理解Spark计算流程来说,理解了Shuffle的过程,也就解决了一半的疑惑了。
感谢各位的阅读,以上就是“Spark中的RDD到底是什么”的内容了,经过本文的学习后,相信大家对Spark中的RDD到底是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是云搜网,小编将为大家推送更多相关知识点的文章,欢迎关注!