spark基础知识之sparkRDD四

spark基础知识之sparkRDD四

2023年6月29日发(作者:)

spark基础知识之sparkRDD四 spark基础知识之 spark RDD <四>本期内容::基于⼯作集的应⽤抽象内幕解密思考精通了RDD,学习Spark的时间⼤⼤缩短。解决问题能⼒⼤⼤提⾼,彻底把精⼒聚集在RDD的理解上,SparkStreaming、SparkSQL、SparkML底层封装的都是RDD。RDD是spark的基⽯,1)RDD提供了通⽤的抽象2)现在Spark有5个⼦框架SparkStreaming、SparkSQL、SparkML、GraphX、SparkR,可以根据⾃⼰从事的领域如医疗等建模后建⽴另外的库。所有顶级spark⾼⼿:1解决bug,性能调优。包括框架的BUG及对框架的修改。2.拿spark就是做修改的,以配合⾃⼰从事的具体业务1.的mr是基于数据集的处理基于⼯作集&基于数据集的共同特征:位置感知,容错,负载均衡,基于数据集的处理⼯作⽅式是从物理存储设备上加载数据,操作数据,写⼊物理存储设备。spark也是mr的⼀种⽅式,只是更细致更⾼效。其实基于数据集的⽅式也是⼀张有向⽆环图。但与基于⼯作集不同。基于数据集的⽅式每次都从物理存储读取数据操作数据然后写回物理设备。hadoop的mr的劣势、不适⽤的场景:1.含有⼤量迭代。2.交互式查询。重点是:基于数据流的⽅式不能复⽤曾经的结果或中间计算结果。假设有数千⼈并发数据仓库,假设100⼈的查询完全相同,那么每个⼈都需要重新查询。Spark就可以避免,因为可以复⽤。spark会对结果进⾏重⽤假如有⼀千⼈查询同⼀个数据仓库spark的话,如果第⼀个⼈计算过的步骤,其他⼈都可以复⽤。RDD是基于⼯作集的,除了有共同特点外,还增加了resillientDistributedDatasetRDD弹性:1:⾃动的进⾏内存和磁盘数据存储的切换2.基于lineage的⾼效容错失败会⾃动进⾏特定次数的重试如果失败会⾃动进⾏特定次数的重试⽽且重试时只会试算失败的分⽚。oint和persist,是效率和容错的延伸。6.数据调度弹性:DAGTASK和资源管理⽆关7.数据分⽚的⾼度弹性计算过程中有很多数据碎⽚,那么Partition就会⾮常⼩,每个Partition都会由⼀个线程处理,就会降低处理效率。这时就要考虑把⼩⽂件合并成⼀个⼤⽂件。另外⼀个⽅⾯,如果内存不多,⽽每个Partition⽐较⼤(数据Block⼤),就要考虑变成更⼩的分⽚,Sparke有更多的处理批次但不会出现OOM。所以说根据数据分⽚的⼤⼩来提⾼并⾏度或降低并⾏度也是Spark⾼度弹性的表现。同时需要指出的是,不管是提⾼并⾏度还是降低并⾏度,仍具有数据本地性。当然,提⾼并⾏度还是降低度⾏度都是⼈⼯通过代码来调整的。假设有⼀百万个数据分⽚,每个数据分⽚都⾮常⼩(1K或10KB),如果要把数据分⽚调整为⼀万个,如果使⽤repartition,就需要Shuffle。从中的repartition⽅法可以看到内部调⽤的是coalesce,传⼊的参数是shuffle并设置为true。源码如下:defrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{coalesce(numPartitions,shuffle=true)}所以,如果要把很多⼩的数据分⽚合并成⼤的数据分⽚的话千万不要直接调⽤repartition,⽽要调⽤coalesce,coalesce默认的shuffle为false。coalesce的源码如下:/***ReturnanewRDDthatisreducedinto`numPartitions`partitions.**Thisresultsinanarrowdependency,ofrom1000partitions*to100partitions,therewillnotbeashuffle,insteadeachofthe100*newpartitionswillclaim10ofthecurrentpartitions.**However,ifyou'redoingadrasticcoalesce,artitions=1,*thismayresultinyourcomputationtakingplaceonfewernodesthan*youlike(einthecafnumPartitions=1).Toavoidthis,*youcanpassshuffle=lladdashufflestep,butmeansthe*currentupstreampartitionswillbeexecutedinparallel(perwhatever*thecurrentpartitioningis).**Note:Withshuffle=true,youcanactuallycoalescetoalargernumber*usefulifyouhaveasmallnumberofpartitions,*say100,g*coalesce(1000,shuffle=true)willresultin1000partitionswiththe*datadistributedusingahashpartitioner.*/defcoalesce(numPartitions:Int,shuffle:Boolean=false)(implicitord:Ordering[T]=null):RDD[T]=withScope{if(shuffle){/**Distributeselementsevenlyacrossoutputpartitions,startingfromarandompartition.*/valdistributePartition=(index:Int,items:Iterator[T])=>{varposition=(newRandom(index)).nextInt(numPartitions){t=>//hPartitioner//on=position+1(position,t)}}:Iterator[(Int,T)] //includeashufflestepsothatourupstreamtasksarestilldistributednewCoalescedRDD(newShuffledRDD[Int,T,T](mapPartitionsWithIndex(distributePartition),newHashPartitioner(numPartitions)),numPartitions).values}else{newCoalescedRDD(this,numPartitions)}}如果要把⼀万个数据分⽚变成⼀百万个的话,可以⽤shuffle,也可以不⽤shuffle。RDD允许⽤户在执⾏多个查询时显式地将⼯作集缓存在内存中,如果有其他⼈执⾏同样的查询的话就可以存⼊⼯作集,这就极⼤地提⾼了效率。特别是在数据仓库中,如果有⼀千⼈做同样的查询,第⼀个⼈在查询之后,每⼆个⼈查询时就可以直接从缓存中取结果。退⼀步说,如果⼀千⼈执⾏的查询只有前10个步骤是⼀样的,如果第⼀个⼈计算完成后,后⾯的⼈的前10步就不需要再计算了。这就极⼤地提升了查询速度。如果是Hadoop的话⼀千个⼈执⾏同样的查询,就需要重复计算⼀千次。缓存随时可以清理掉,如果内存或磁盘不⾜就需要根据优先度将不常使⽤的缓存内容清理掉。RDD的cache是直接放在内存中的。RDD的cache通过checkpoint来清除。但checkpoint是重量级的。SparkStreaming经常进⾏Checkpoint,原因是经常要⽤到以前的内容。假设要统计⼀段时间的内容,那就需要以前的数据。如果Spark的⼀个Stage中有⼀千个步骤的话,默认只会产⽣⼀次结果。如果是HadoopMR就会产⽣999次中间结果,如果数据量很⼤的话,内存和磁盘都可能存不下。Spark本⾝就是RDD的内容,RDD是只读分区的集合。RDD是数据集合,可以简单理解为List或Array。⽤⼀句话概括:RDD是分布式函数式的抽象。基于RDD的编程⼀般都是通过⾼阶函数的⽅式,原因是函数⾥传函数要对当前的Map函数作⽤的数据集进⾏记录的明细化操作。Spark的每⼀步操作都是对RDD进⾏操作,⽽RDD是只读分区的集合。由于每⼀次操作都是只读的,⽽操作会改变数据,那么产⽣中间结果怎么办?=>不能⽴即计算。这就是lazy:不⽤时不算,⽤时才计算,所以不会产⽣中间结果RDD的核⼼之⼀就是它的Lazy,因为这不计算,开始时只是对数据处理进⾏标记⽽已。例如WordCount中的map、flatmap其实并不计算数据,只是对数据操作的标记⽽已。flatMap的源码如下:/***ReturnanewRDDbyfirstapplyingafunctiontoallelementsofthis*RDD,andthenflatteningtheresults.*/defflatMap[U:ClassTag](f:T=>TraversableOnce[U]):RDD[U]=withScope{valcleanF=(f)newMapPartitionsRDD[U,T](this,(context,pid,iter)=>p(cleanF))}可以看出在flatMap中创建了⼀个MapPartitionsRDD,但第⼀个参数是this,这个this是指它依赖的⽗RDD。每次创建新的RDD都会把⽗RDD作为第⼀个参数传⼊。⽽这只是对数据处理的标记⽽已。这让我们联想到⼀种操作:f(x)=x+2x=y+1y=z+3执⾏时只是函数展开。所以RDD每次构建对象都依赖于⽗RDD,最后就是函数展开。所以如果⼀个Stage中有⼀千个步骤的话,不会产⽣999次中间结果。以前有⼈说spark不适合⼤规模计算,当时确实有其道理,主要有两点原因:1)Spark⼀直基于内存迭代会消耗⼤量内存。例如如⼀千个步骤虽然不产⽣中间结果,但如果要复⽤别⼈的结果时就需要⼿动persist或cache,这确实⾮常消耗内存。2)主要是因为shuffle机制。但现在Shuffle⽀持很多种机制,如hashShuffle、sortBasedShuffle、钨丝计划等,⽽且现在Shuffle只是⼀个接⼝,⼀个插件,可以⾃定义,所以可以应对任意规模的数据处理。Spark1.2以前确实有规模的限制,是由其Shuffle机制导致的,但现在没有了。现在⽣产环境spark最低要1.3版本,因为Spark1.3引⼊了dataframe,这是⾥程碑式的。推荐使⽤Spark1.6由于RDD是只读的,为了应对计算模型,RDD⼜是lazy级别的。每次操作都会产⽣RDD,每次构建新的RDD都是把⽗RDD作为第⼀个参数传⼊,这就构成了⼀个链条。在最后Action时才触发,这就构成了⼀个从后往前回溯的过程,其实就是函数展开的过程。由于这种从后往前的回溯机制,Spark的容错的开销会⾮常低。常规容错⽅式:1)数据检查点checkpoint2)记录数据的更新数据检查点⾮常致命。数据检查点的⼯作⽅式是通过数据中⼼的⽹络连接不同的机器,每次操作时都要复制整个数据集,就相当于每次都有⼀个拷贝,拷贝是要通过⽹络的,⽽⽹络带宽就是分布式的瓶颈。同时因为要拷贝,就需要重组资源,这也是对性能的⾮常⼤的消耗。记录数据的更新:每次数据变化时作记录,不需要拷贝,但1)⽐较复杂,⽽且每次更新数据需要权限,容易失控。2)耗性能。spark的RDD就是记录数据更新的⽅式,但为何⾼效?1)RDD是不可变的⽽且是lazy的。由于RDD是不可变的,所以每次操作时就要产⽣新的RDD,新的RDD将⽗RDD作为第⼀个参数传⼊,所以不存在全局修改的问题,控制难度就有极⼤的下降。计算时每次都是从后往前回溯,不会产⽣中间结果。在此基础上还有计算链条,出错可以从中间开始恢复。恢复点要么是checkpoint要么是前⼀个stage的结果(因为Stage结束时会⾃动写磁盘)2)如果每次对数据进⾏很⼩的修都要记录,那代价很⼤。RDD是粗粒度的操作:原因是为了效率为了简化。粗粒度就是每次操作时作⽤的都是所有的数据集合。细粒度代价太⼤。对RDD的具体的数据的改变操作都是粗粒度的。--RDD的写操作是粗粒度的。但RDD的读操作既可以是粗粒度的⼜可以是细粒度的。RDD的粗粒度的写操作限制了RDD的应⽤场景。例如⽹络爬⾍就不适合sparkRDD。但现实中处理场景⼤部分都是粗粒度的。特别是⽀持数据并⾏批处理的应⽤,例如,图计算,数据挖掘,都是在很多记录上进⾏相应操作,都是粗粒度的表现。RDD不适合做细粒度和异步更新的应⽤。如果想让Spark直接操作的数据或者操作HBASE数据就需要复写RDD。RDD的数据分⽚上运⾏的计算逻辑都是⼀样的。对于每个计算逻辑都有计算函数computedefcompute(split:Partition,context:TaskContext):Iterator[T]所有的RDD操作返回的都是⼀个迭代器。如Map/flatMap等。这样的好处:假设⽤SparkSQL提取到了数据,产⽣了新的RDD,机器学习去访问这个RDD,但根本不需要知道这是来⾃于SparkSQL。这就可以让所有的框架⽆缝集成。结果就是机器学习可以直接调⽤SparkSQL,流处理也可以⽤机器学习进⾏训练。因为⽆论是什么操作返回的都是Iterator。所以就可以⽤hasNext来看看有没有下⼀个元素,然后通过Next读取下⼀个元素。Next具体怎么读取下⼀个元素和具体RDD实现有关。Iterator的部分源码(or):traitIterator[+A]extendsTraversableOnce[A]{self=>defseq:Iterator[A]=this/**Testswhetherthisiteratorcanprovideanotherelement.**@return`true`ifasubsequentcallto`next`willyieldanelement,*`false`otherwise.*@noteReuse:$preservesIterator*/defhasNext:Boolean/**Producesthenextelementofthisiterator.**@returnthenextelementofthisiterator,if`hasNext`is`true`,*undefinedbehaviorotherwise.*@noteReuse:$preservesIterator*/defnext():A/**Testswhetherthisiteratorisempty.**@return`true`ifhasNextisfalse,`false`otherwise.*@noteReuse:$preservesIterator*/defisEmpty:Boolean=!hasNext⽆论是什么操作,返回的结果都是Iterator接⼝,⾯向接⼝编程时能不能操作SparkSQL/RDD的⼦类的⽅法?=>Spark可以,不可。原因是有从Java的⾓度讲,⾯向接⼝编程不能调⽤⼦类的⽅法,但如果是有运⾏时的⽀持,会指向具体的⼦类,这样就可以调⽤⼦类的⽅法。SparkStreaming可以调⽤ML的⼦类进⾏训练,RDD本⾝是个abstractclass,与机器学习等的算法⽆关。但由于有了可以通过runtime把实例赋值给RDD,这样就可以操作了。如果开发了⼀个⾃⼰领域的⼦框架,例如⾦融领域,这个⼦框架就可以直接在代码中调⽤机器学习,调⽤图计算进⾏风险预测、个性化分析、⾏为模式分析等,也可以调⽤SparkSQL⽤于数据挖掘。同时机器学习也可以调⽤⾦融框架。⼜例如开发⼀个电商框架,那么⽤户⽀付时⼜可以直接调⽤⾦融框架。就是说每增加⼀个功能就会让所有的功能都增强。每提出⼀个新的框架都可以使⽤其他所有的功能。这是核聚变级别的。Spark的所有⼦框架都是基于RDD的,只不过是⼦类⽽已。下⾯再看⼀下类中的preferredLocation的源码:finaldefpreferredLocations(split:Partition):Seq[String]={(_.getPreferredLocations(split)).getOrElse{getPreferredLocations(split)}}分布式⼤数据计算时优先考虑数据不动代码动,由于有了preferredLocation,可以说spark不仅可以处理⼤数据,Spark可以处理⼀切数据。可以处理数据的数据,可以处理普通⽂件系统上的数据,可以在上运⾏,也可以在Windows上运⾏,可以运⾏⼀切⽂件格式。由于preferredLocation,所以每次计算都符合完美的数据本地性。Spark要做⼀体化多元化的数据通⽤处理框架,兼容⼀切⽂件系统⼀切操作系统⼀切⽂件格式。Spark计算更快,算⼦更丰富,使⽤更简单,⼀统数据处理天下。IBM在2016年6⽉16⽇宣布承诺⼤⼒推进ApacheSpark项⽬,并称该项⽬为:在以数据为主导的,未来⼗年最为重要的新的开源项⽬。这⼀承诺的核⼼是将Spark嵌⼊IBM业内领先的分析和商务平台,并将Spark作为⼀项服务,在IBMBluemix平台上提供给客户。IBM还将投⼊超过3500名研究和开发⼈员在全球⼗余个实验室开展与Spark相关的项⽬,并将为Spark开源⽣态系统⽆偿提供突破性的机器学习技术——IBMSystemML,同时,IBM还将培养超过100万名Spark数据科学家和数据⼯程师。因为Spark是运⾏在JVM上的,⼀切能运⾏在JVM上的数据Spark都能处理。只有⼀点:spark替代不了实时事务处理。如银⾏转帐等,因为Spark反应还不够快,⽽且实时事务性处理控制难度⽐较⼤。Spark完全可以做实时处理。SparkStreaming可以达到1ms内的响应速度(官⽅200ms)。spark要统⼀数据计算领域除了实时事务性处理。下⾯再看⼀下中的dependencies的源码:/***GetthelistofdependenciesofthisRDD,takingintoaccountwhetherthe*RDDischeckpointedornot.*/finaldefdependencies:Seq[Dependency[_]]={(r=>List(newOneToOneDependency(r))).getOrElse{if(dependencies_==null){dependencies_=getDependencies}dependencies_}}后⾯的RDD对前⾯的RDD都有依赖,所以容错性⾮常好。下⾯再看⼀下中的partitions的源码:/***GetthearrayofpartitionsofthisRDD,takingintoaccountwhetherthe*RDDischeckpointedornot.*/finaldefpartitions:Array[Partition]={(_.partitions).getOrElse{if(partitions_==null){partitions_=getPartitions}partitions_}}进⾏下⼀步操作时可以改变并⾏度。并⾏度是弹性的⼀部分。RDD的缺陷:不⽀持细粒度的更新操作和增量迭代计算(如⽹络爬⾍)增量迭代时每次可能只迭代⼀部分数据,但RDD是粗粒度的,⽆法考虑是不是只是⼀部分数据。Jstorm⽀持增量迭代计算,是⽤Java的⽅式重构的Storm(由阿⾥开发)真相给你⾃由,⼈的⼀切痛苦都源于不了解真相。所以必须了解Spark的真相,在⼯作时才能⾃由。

发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1687988002a64242.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信