Spark:比Hadoop更強(qiáng)大的分布式數(shù)據(jù)計(jì)算項(xiàng)目
Spark是一個(gè)由加州大學(xué)伯克利分校(UC Berkeley AMP)開(kāi)發(fā)的一個(gè)分布式數(shù)據(jù)快速分析項(xiàng)目。它的核心技術(shù)是彈性分布式數(shù)據(jù)集(Resilient distributed datasets),提供了比Hadoop更加豐富的MapReduce模型,可以快速在內(nèi)存中對(duì)數(shù)據(jù)集進(jìn)行多次迭代,來(lái)支持復(fù)雜的數(shù)據(jù)挖掘算法和圖計(jì)算算法。
Spark使用Scala開(kāi)發(fā),使用Mesos作為底層的調(diào)度框架,可以和hadoop和Ec2緊密集成,直接讀取hdfs或S3的文件進(jìn)行計(jì)算并把結(jié)果寫回hdfs或S3,是Hadoop和Amazon云計(jì)算生態(tài)圈的一部分。Spark是一個(gè)小巧玲瓏的項(xiàng)目,項(xiàng)目的core部分的代碼只有63個(gè)Scala文件,充分體現(xiàn)了精簡(jiǎn)之美。
Spark之依賴
Map Reduce模型:作為一個(gè)分布式計(jì)算框架,Spark采用了MapReduce模型。在它身上,Google的Map Reduce和Hadoop的痕跡很重,很明顯,它并非一個(gè)大的創(chuàng)新,而是微創(chuàng)新。在基礎(chǔ)理念不變的前提下,它借鑒,模仿并依賴了先輩,加入了一點(diǎn)改進(jìn),極大的提升了MapReduce的效率。
函數(shù)式編程:Spark由Scala寫就,而支持的語(yǔ)言亦是Scala。其原因之一就是Scala支持函數(shù)式編程。這一來(lái)造就了Spark的代碼簡(jiǎn)潔,二來(lái)使得基于Spark開(kāi)發(fā)的程序,也特別的簡(jiǎn)潔。一次完整的MapReduce,Hadoop中需要?jiǎng)?chuàng)建一個(gè)Mapper類和Reduce類,而Spark只需要?jiǎng)?chuàng)建相應(yīng)的一個(gè)map函數(shù)和reduce函數(shù)即可,代碼量大大降低。
Mesos:Spark將分布式運(yùn)行的需要考慮的事情,都交給了Mesos,自己不Care,這也是它代碼能夠精簡(jiǎn)的原因之一。
HDFS和S3:Spark支持2種分布式存儲(chǔ)系統(tǒng):HDFS和S3。應(yīng)該算是目前最主流的兩種了。對(duì)文件系統(tǒng)的讀取和寫入功能是Spark自己提供的,借助Mesos分布式實(shí)現(xiàn)。
Spark與Hadoop的對(duì)比
Spark的中間數(shù)據(jù)放到內(nèi)存中,對(duì)于迭代運(yùn)算效率更高。Spark更適合于迭代運(yùn)算比較多的ML和DM運(yùn)算。因?yàn)樵赟park里面,有RDD的抽象概念。
Spark比Hadoop更通用。
Spark提供的數(shù)據(jù)集操作類型有很多種,不像Hadoop只提供了Map和Reduce兩種操作。比如map,filter,flatMap,sample,groupByKey,reduceByKey,union,join,cogroup,mapValues,sort,partionBy等多種操作類型,Spark把這些操作稱為Transformations。同時(shí)還提供Count,collect,reduce,lookup,save等多種actions操作。
這些多種多樣的數(shù)據(jù)集操作類型,給給開(kāi)發(fā)上層應(yīng)用的用戶提供了方便。各個(gè)處理節(jié)點(diǎn)之間的通信模型不再像Hadoop那樣就是唯一的Data Shuffle一種模式。用戶可以命名,物化,控制中間結(jié)果的存儲(chǔ)、分區(qū)等??梢哉f(shuō)編程模型比Hadoop更靈活。
不過(guò)由于RDD的特性,Spark不適用那種異步細(xì)粒度更新?tīng)顟B(tài)的應(yīng)用,例如web服務(wù)的存儲(chǔ)或者是增量的web爬蟲和索引。就是對(duì)于那種增量修改的應(yīng)用模型不適合。
容錯(cuò)性。在分布式數(shù)據(jù)集計(jì)算時(shí)通過(guò)checkpoint來(lái)實(shí)現(xiàn)容錯(cuò),而checkpoint有兩種方式,一個(gè)是checkpoint data,一個(gè)是logging the updates。用戶可以控制采用哪種方式來(lái)實(shí)現(xiàn)容錯(cuò)。
可用性。Spark通過(guò)提供豐富的Scala, Java,Python API及交互式Shell來(lái)提高可用性。
Spark與Hadoop的結(jié)合
Spark可以直接對(duì)HDFS進(jìn)行數(shù)據(jù)的讀寫,同樣支持Spark on YARN。Spark可以與MapReduce運(yùn)行于同集群中,共享存儲(chǔ)資源與計(jì)算,數(shù)據(jù)倉(cāng)庫(kù)Shark實(shí)現(xiàn)上借用Hive,幾乎與Hive完全兼容。
Spark的核心概念
Resilient Distributed Dataset (RDD)彈性分布數(shù)據(jù)集
RDD是Spark的最基本抽象,是對(duì)分布式內(nèi)存的抽象使用,實(shí)現(xiàn)了以操作本地集合的方式來(lái)操作分布式數(shù)據(jù)集的抽象實(shí)現(xiàn)。RDD是Spark最核心的東西,它表示已被分區(qū),不可變的并能夠被并行操作的數(shù)據(jù)集合,不同的數(shù)據(jù)集格式對(duì)應(yīng)不同的RDD實(shí)現(xiàn)。RDD必須是可序列化的。RDD可以cache到內(nèi)存中,每次對(duì)RDD數(shù)據(jù)集的操作之后的結(jié)果,都可以存放到內(nèi)存中,下一個(gè)操作可以直接從內(nèi)存中輸入,省去了MapReduce大量的磁盤IO操作。這對(duì)于迭代運(yùn)算比較常見(jiàn)的機(jī)器學(xué)習(xí)算法, 交互式數(shù)據(jù)挖掘來(lái)說(shuō),效率提升比較大。
RDD的特點(diǎn):
它是在集群節(jié)點(diǎn)上的不可變的、已分區(qū)的集合對(duì)象。
通過(guò)并行轉(zhuǎn)換的方式來(lái)創(chuàng)建如(map, filter, join, etc)。
失敗自動(dòng)重建。
可以控制存儲(chǔ)級(jí)別(內(nèi)存、磁盤等)來(lái)進(jìn)行重用。
必須是可序列化的。
是靜態(tài)類型的。
RDD的好處:
RDD只能從持久存儲(chǔ)或通過(guò)Transformations操作產(chǎn)生,相比于分布式共享內(nèi)存(DSM)可以更高效實(shí)現(xiàn)容錯(cuò),對(duì)于丟失部分?jǐn)?shù)據(jù)分區(qū)只需根據(jù)它的lineage就可重新計(jì)算出來(lái),而不需要做特定的Checkpoint。
RDD的不變性,可以實(shí)現(xiàn)類Hadoop MapReduce的推測(cè)式執(zhí)行。
RDD的數(shù)據(jù)分區(qū)特性,可以通過(guò)數(shù)據(jù)的本地性來(lái)提高性能,這與Hadoop MapReduce是一樣的。
RDD都是可序列化的,在內(nèi)存不足時(shí)可自動(dòng)降級(jí)為磁盤存儲(chǔ),把RDD存儲(chǔ)于磁盤上,這時(shí)性能會(huì)有大的下降但不會(huì)差于現(xiàn)在的MapReduce。
RDD的存儲(chǔ)與分區(qū):
用戶可以選擇不同的存儲(chǔ)級(jí)別存儲(chǔ)RDD以便重用。
當(dāng)前RDD默認(rèn)是存儲(chǔ)于內(nèi)存,但當(dāng)內(nèi)存不足時(shí),RDD會(huì)spill到disk。
RDD在需要進(jìn)行分區(qū)把數(shù)據(jù)分布于集群中時(shí)會(huì)根據(jù)每條記錄Key進(jìn)行分區(qū)(如Hash 分區(qū)),以此保證兩個(gè)數(shù)據(jù)集在Join時(shí)能高效。
RDD的內(nèi)部表示:
分區(qū)列表(數(shù)據(jù)塊列表)
計(jì)算每個(gè)分片的函數(shù)(根據(jù)父RDD計(jì)算出此RDD)
對(duì)父RDD的依賴列表
對(duì)key-value RDD的Partitioner【可選】
每個(gè)數(shù)據(jù)分片的預(yù)定義地址列表(如HDFS上的數(shù)據(jù)塊的地址)【可選】
RDD的存儲(chǔ)級(jí)別:RDD根據(jù)useDisk、useMemory、deserialized、replication四個(gè)參數(shù)的組合提供了11種存儲(chǔ)級(jí)別。RDD定義了各種操作,不同類型的數(shù)據(jù)由不同的RDD類抽象表示,不同的操作也由RDD進(jìn)行抽實(shí)現(xiàn)。
RDD有兩種創(chuàng)建方式:
從Hadoop文件系統(tǒng)(或與Hadoop兼容的其它存儲(chǔ)系統(tǒng))輸入(例如HDFS)創(chuàng)建。
從父RDD轉(zhuǎn)換得到新RDD。
Spark On Mesos
Spark支持Local調(diào)用和Mesos集群兩種模式,在Spark上開(kāi)發(fā)算法程序,可以在本地模式調(diào)試成功后,直接改用Mesos集群運(yùn)行,除了文件的保存位置需要考慮以外,算法理論上不需要做任何修改。Spark的本地模式支持多線程,有一定的單機(jī)并發(fā)處理能力。但是不算很強(qiáng)勁。本地模式可以保存結(jié)果在本地或者分布式文件系統(tǒng),而Mesos模式一定需要保存在分布式或者共享文件系統(tǒng)。
為了在Mesos框架上運(yùn)行,安裝Mesos的規(guī)范和設(shè)計(jì),Spark實(shí)現(xiàn)兩個(gè)類,一個(gè)是SparkScheduler,在Spark中類名是MesosScheduler;一個(gè)是SparkExecutor,在Spark中類名是Executor。有了這兩個(gè)類,Spark就可以通過(guò)Mesos進(jìn)行分布式的計(jì)算。Spark會(huì)將RDD和MapReduce函數(shù),進(jìn)行一次轉(zhuǎn)換,變成標(biāo)準(zhǔn)的Job和一系列的Task。提交給SparkScheduler,SparkScheduler會(huì)把Task提交給Mesos Master,由Master分配給不同的Slave,最終由Slave中的Spark Executor,將分配到的Task一一執(zhí)行,并且返回,組成新的RDD,或者直接寫入到分布式文件系統(tǒng)。

Transformations & Actions
對(duì)于RDD可以有兩種計(jì)算方式:轉(zhuǎn)換(返回值還是一個(gè)RDD)與操作(返回值不是一個(gè)RDD)。
轉(zhuǎn)換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說(shuō)從一個(gè)RDD轉(zhuǎn)換生成另一個(gè)RDD的操作不是馬上執(zhí)行,Spark在遇到Transformations操作時(shí)只會(huì)記錄需要這樣的操作,并不會(huì)去執(zhí)行,需要等到有Actions操作的時(shí)候才會(huì)真正啟動(dòng)計(jì)算過(guò)程進(jìn)行計(jì)算。
操作(Actions) (如:count, collect, save等),Actions操作會(huì)返回結(jié)果或把RDD數(shù)據(jù)寫到存儲(chǔ)系統(tǒng)中。Actions是觸發(fā)Spark啟動(dòng)計(jì)算的動(dòng)因。
它們本質(zhì)區(qū)別是:Transformation返回值還是一個(gè)RDD。它使用了鏈?zhǔn)秸{(diào)用的設(shè)計(jì)模式,對(duì)一個(gè)RDD進(jìn)行計(jì)算后,變換成另外一個(gè)RDD,然后這個(gè)RDD又可以進(jìn)行另外一次轉(zhuǎn)換。這個(gè)過(guò)程是分布式的。Action返回值不是一個(gè)RDD。它要么是一個(gè)Scala的普通集合,要么是一個(gè)值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統(tǒng)中。關(guān)于這兩個(gè)動(dòng)作,在Spark開(kāi)發(fā)指南中會(huì)有就進(jìn)一步的詳細(xì)介紹,它們是基于Spark開(kāi)發(fā)的核心。這里將Spark的官方ppt中的一張圖略作改造,闡明一下兩種動(dòng)作的區(qū)別。

Lineage(血統(tǒng))
利用內(nèi)存加快數(shù)據(jù)加載,在眾多的其它的In-Memory類數(shù)據(jù)庫(kù)或Cache類系統(tǒng)中也有實(shí)現(xiàn),Spark的主要區(qū)別在于它處理分布式運(yùn)算環(huán)境下的數(shù)據(jù)容錯(cuò)性(節(jié)點(diǎn)實(shí)效/數(shù)據(jù)丟失)問(wèn)題時(shí)采用的方案。為了保證RDD中數(shù)據(jù)的魯棒性,RDD數(shù)據(jù)集通過(guò)所謂的血統(tǒng)關(guān)系(Lineage)記住了它是如何從其它RDD中演變過(guò)來(lái)的。相比其它系統(tǒng)的細(xì)顆粒度的內(nèi)存數(shù)據(jù)更新級(jí)別的備份或者LOG機(jī)制,RDD的Lineage記錄的是粗顆粒度的特定數(shù)據(jù)轉(zhuǎn)換(Transformation)操作(filter, map, join etc.)行為。當(dāng)這個(gè)RDD的部分分區(qū)數(shù)據(jù)丟失時(shí),它可以通過(guò)Lineage獲取足夠的信息來(lái)重新運(yùn)算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。這種粗顆粒的數(shù)據(jù)模型,限制了Spark的運(yùn)用場(chǎng)合,但同時(shí)相比細(xì)顆粒度的數(shù)據(jù)模型,也帶來(lái)了性能的提升。
RDD在Lineage依賴方面分為兩種Narrow Dependencies與Wide Dependencies用來(lái)解決數(shù)據(jù)容錯(cuò)的高效性。
Narrow Dependencies是指父RDD的每一個(gè)分區(qū)最多被一個(gè)子RDD的分區(qū)所用,表現(xiàn)為一個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū)或多個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū),也就是說(shuō)一個(gè)父RDD的一個(gè)分區(qū)不可能對(duì)應(yīng)一個(gè)子RDD的多個(gè)分區(qū)。
Wide Dependencies是指子RDD的分區(qū)依賴于父RDD的多個(gè)分區(qū)或所有分區(qū),也就是說(shuō)存在一個(gè)父RDD的一個(gè)分區(qū)對(duì)應(yīng)一個(gè)子RDD的多個(gè)分區(qū)。對(duì)與Wide Dependencies,這種計(jì)算的輸入和輸出在不同的節(jié)點(diǎn)上,lineage方法對(duì)與輸入節(jié)點(diǎn)完好,而輸出節(jié)點(diǎn)宕機(jī)時(shí),通過(guò)重新計(jì)算,這種情況下,這種方法容錯(cuò)是有效的,否則無(wú)效,因?yàn)闊o(wú)法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統(tǒng)的意思),Narrow Dependencies對(duì)于數(shù)據(jù)的重算開(kāi)銷要遠(yuǎn)小于Wide Dependencies的數(shù)據(jù)重算開(kāi)銷。
在RDD計(jì)算,通過(guò)checkpint進(jìn)行容錯(cuò),做checkpoint有兩種方式,一個(gè)是checkpoint data,一個(gè)是logging the updates。用戶可以控制采用哪種方式來(lái)實(shí)現(xiàn)容錯(cuò),默認(rèn)是logging the updates方式,通過(guò)記錄跟蹤所有生成RDD的轉(zhuǎn)換(transformations)也就是記錄每個(gè)RDD的lineage(血統(tǒng))來(lái)重新計(jì)算生成丟失的分區(qū)數(shù)據(jù)。
Spark的Shuffle過(guò)程介紹
Shuffle Writer
Spark豐富了任務(wù)類型,有些任務(wù)之間數(shù)據(jù)流轉(zhuǎn)不需要通過(guò)Shuffle,但是有些任務(wù)之間還是需要通過(guò)Shuffle來(lái)傳遞數(shù)據(jù),比如wide dependency的group by key。
Spark中需要Shuffle輸出的Map任務(wù)會(huì)為每個(gè)Reduce創(chuàng)建對(duì)應(yīng)的bucket,Map產(chǎn)生的結(jié)果會(huì)根據(jù)設(shè)置的partitioner得到對(duì)應(yīng)的bucketId,然后填充到相應(yīng)的bucket中去。每個(gè)Map的輸出結(jié)果可能包含所有的Reduce所需要的數(shù)據(jù),所以每個(gè)Map會(huì)創(chuàng)建R個(gè)bucket(R是reduce的個(gè)數(shù)),M個(gè)Map總共會(huì)創(chuàng)建M*R個(gè)bucket。
Map創(chuàng)建的bucket其實(shí)對(duì)應(yīng)磁盤上的一個(gè)文件,Map的結(jié)果寫到每個(gè)bucket中其實(shí)就是寫到那個(gè)磁盤文件中,這個(gè)文件也被稱為blockFile,是Disk Block Manager管理器通過(guò)文件名的Hash值對(duì)應(yīng)到本地目錄的子目錄中創(chuàng)建的。每個(gè)Map要在節(jié)點(diǎn)上創(chuàng)建R個(gè)磁盤文件用于結(jié)果輸出,Map的結(jié)果是直接輸出到磁盤文件上的,100KB的內(nèi)存緩沖是用來(lái)創(chuàng)建Fast Buffered OutputStream輸出流。這種方式一個(gè)問(wèn)題就是Shuffle文件過(guò)多。

針對(duì)上述Shuffle過(guò)程產(chǎn)生的文件過(guò)多問(wèn)題,Spark有另外一種改進(jìn)的Shuffle過(guò)程:consolidation Shuffle,以期顯著減少Shuffle文件的數(shù)量。在consolidation Shuffle中每個(gè)bucket并非對(duì)應(yīng)一個(gè)文件,而是對(duì)應(yīng)文件中的一個(gè)segment部分。Job的map在某個(gè)節(jié)點(diǎn)上第一次執(zhí)行,為每個(gè)reduce創(chuàng)建bucket對(duì)應(yīng)的輸出文件,把這些文件組織成ShuffleFileGroup,當(dāng)這次map執(zhí)行完之后,這個(gè)ShuffleFileGroup可以釋放為下次循環(huán)利用;當(dāng)又有map在這個(gè)節(jié)點(diǎn)上執(zhí)行時(shí),不需要?jiǎng)?chuàng)建新的bucket文件,而是在上次的ShuffleFileGroup中取得已經(jīng)創(chuàng)建的文件繼續(xù)追加寫一個(gè)segment;當(dāng)前次map還沒(méi)執(zhí)行完,ShuffleFileGroup還沒(méi)有釋放,這時(shí)如果有新的map在這個(gè)節(jié)點(diǎn)上執(zhí)行,無(wú)法循環(huán)利用這個(gè)ShuffleFileGroup,而是只能創(chuàng)建新的bucket文件組成新的ShuffleFileGroup來(lái)寫輸出。

比如一個(gè)Job有3個(gè)Map和2個(gè)reduce:(1) 如果此時(shí)集群有3個(gè)節(jié)點(diǎn)有空槽,每個(gè)節(jié)點(diǎn)空閑了一個(gè)core,則3個(gè)Map會(huì)調(diào)度到這3個(gè)節(jié)點(diǎn)上執(zhí)行,每個(gè)Map都會(huì)創(chuàng)建2個(gè)Shuffle文件,總共創(chuàng)建6個(gè)Shuffle文件;(2) 如果此時(shí)集群有2個(gè)節(jié)點(diǎn)有空槽,每個(gè)節(jié)點(diǎn)空閑了一個(gè)core,則2個(gè)Map先調(diào)度到這2個(gè)節(jié)點(diǎn)上執(zhí)行,每個(gè)Map都會(huì)創(chuàng)建2個(gè)Shuffle文件,然后其中一個(gè)節(jié)點(diǎn)執(zhí)行完Map之后又調(diào)度執(zhí)行另一個(gè)Map,則這個(gè)Map不會(huì)創(chuàng)建新的Shuffle文件,而是把結(jié)果輸出追加到之前Map創(chuàng)建的Shuffle文件中;總共創(chuàng)建4個(gè)Shuffle文件;(3) 如果此時(shí)集群有2個(gè)節(jié)點(diǎn)有空槽,一個(gè)節(jié)點(diǎn)有2個(gè)空core一個(gè)節(jié)點(diǎn)有1個(gè)空core,則一個(gè)節(jié)點(diǎn)調(diào)度2個(gè)Map一個(gè)節(jié)點(diǎn)調(diào)度1個(gè)Map,調(diào)度2個(gè)Map的節(jié)點(diǎn)上,一個(gè)Map創(chuàng)建了Shuffle文件,后面的Map還是會(huì)創(chuàng)建新的Shuffle文件,因?yàn)樯弦粋€(gè)Map還正在寫,它創(chuàng)建的ShuffleFileGroup還沒(méi)有釋放;總共創(chuàng)建6個(gè)Shuffle文件。
Shuffle Fetcher
Reduce去拖Map的輸出數(shù)據(jù),Spark提供了兩套不同的拉取數(shù)據(jù)框架:通過(guò)socket連接去取數(shù)據(jù);使用netty框架去取數(shù)據(jù)。
每個(gè)節(jié)點(diǎn)的Executor會(huì)創(chuàng)建一個(gè)BlockManager,其中會(huì)創(chuàng)建一個(gè)BlockManagerWorker用于響應(yīng)請(qǐng)求。當(dāng)Reduce的GET_BLOCK的請(qǐng)求過(guò)來(lái)時(shí),讀取本地文件將這個(gè)blockId的數(shù)據(jù)返回給Reduce。如果使用的是Netty框架,BlockManager會(huì)創(chuàng)建ShuffleSender用于發(fā)送Shuffle數(shù)據(jù)。并不是所有的數(shù)據(jù)都是通過(guò)網(wǎng)絡(luò)讀取,對(duì)于在本節(jié)點(diǎn)的Map數(shù)據(jù),Reduce直接去磁盤上讀取而不再通過(guò)網(wǎng)絡(luò)框架。
Reduce拖過(guò)來(lái)數(shù)據(jù)之后以什么方式存儲(chǔ)呢?Spark Map輸出的數(shù)據(jù)沒(méi)有經(jīng)過(guò)排序,Spark Shuffle過(guò)來(lái)的數(shù)據(jù)也不會(huì)進(jìn)行排序,Spark認(rèn)為Shuffle過(guò)程中的排序不是必須的,并不是所有類型的Reduce需要的數(shù)據(jù)都需要排序,強(qiáng)制地進(jìn)行排序只會(huì)增加Shuffle的負(fù)擔(dān)。Reduce拖過(guò)來(lái)的數(shù)據(jù)會(huì)放在一個(gè)HashMap中,HashMap中存儲(chǔ)的也是<key, value>對(duì),key是Map輸出的key,Map輸出對(duì)應(yīng)這個(gè)key的所有value組成HashMap的value。Spark將Shuffle取過(guò)來(lái)的每一個(gè)<key, value>對(duì)插入或者更新到HashMap中,來(lái)一個(gè)處理一個(gè)。HashMap全部放在內(nèi)存中。
Shuffle取過(guò)來(lái)的數(shù)據(jù)全部存放在內(nèi)存中,對(duì)于數(shù)據(jù)量比較小或者已經(jīng)在Map端做過(guò)合并處理的Shuffle數(shù)據(jù),占用內(nèi)存空間不會(huì)太大,但是對(duì)于比如group by key這樣的操作,Reduce需要得到key對(duì)應(yīng)的所有value,并將這些value組一個(gè)數(shù)組放在內(nèi)存中,這樣當(dāng)數(shù)據(jù)量較大時(shí),就需要較多內(nèi)存。
當(dāng)內(nèi)存不夠時(shí),要不就失敗,要不就用老辦法把內(nèi)存中的數(shù)據(jù)移到磁盤上放著。Spark意識(shí)到在處理數(shù)據(jù)規(guī)模遠(yuǎn)遠(yuǎn)大于內(nèi)存空間時(shí)所帶來(lái)的不足,引入了一個(gè)具有外部排序的方案。Shuffle過(guò)來(lái)的數(shù)據(jù)先放在內(nèi)存中,當(dāng)內(nèi)存中存儲(chǔ)的<key, value>對(duì)超過(guò)1000并且內(nèi)存使用超過(guò)70%時(shí),判斷節(jié)點(diǎn)上可用內(nèi)存如果還足夠,則把內(nèi)存緩沖區(qū)大小翻倍,如果可用內(nèi)存不再夠了,則把內(nèi)存中的<key, value>對(duì)排序然后寫到磁盤文件中。最后把內(nèi)存緩沖區(qū)中的數(shù)據(jù)排序之后和那些磁盤文件組成一個(gè)最小堆,每次從最小堆中讀取最小的數(shù)據(jù),這個(gè)和MapReduce中的merge過(guò)程類似。
MapReduce和Spark的Shuffle過(guò)程對(duì)比

Spark的資源管理與作業(yè)調(diào)度
Spark對(duì)于資源管理與作業(yè)調(diào)度可以使用本地模式,Standalone(獨(dú)立模式),Apache Mesos及Hadoop YARN來(lái)實(shí)現(xiàn)。Spark on Yarn在Spark0.6時(shí)引用,但真正可用是在現(xiàn)在的branch-0.8版本。Spark on Yarn遵循YARN的官方規(guī)范實(shí)現(xiàn),得益于Spark天生支持多種Scheduler和Executor的良好設(shè)計(jì),對(duì)YARN的支持也就非常容易,Spark on Yarn的大致框架圖。

讓Spark運(yùn)行于YARN上與Hadoop共用集群資源可以提高資源利用率。
編程接口
Spark通過(guò)與編程語(yǔ)言集成的方式暴露RDD的操作,類似于DryadLINQ和FlumeJava,每個(gè)數(shù)據(jù)集都表示為RDD對(duì)象,對(duì)數(shù)據(jù)集的操作就表示成對(duì)RDD對(duì)象的操作。Spark主要的編程語(yǔ)言是Scala,選擇Scala是因?yàn)樗暮?jiǎn)潔性(Scala可以很方便在交互式下使用)和性能(JVM上的靜態(tài)強(qiáng)類型語(yǔ)言)。
Spark和Hadoop MapReduce類似,由Master(類似于MapReduce的Jobtracker)和Workers(Spark的Slave工作節(jié)點(diǎn))組成。用戶編寫的Spark程序被稱為Driver程序,Dirver程序會(huì)連接master并定義了對(duì)各RDD的轉(zhuǎn)換與操作,而對(duì)RDD的轉(zhuǎn)換與操作通過(guò)Scala閉包(字面量函數(shù))來(lái)表示,Scala使用Java對(duì)象來(lái)表示閉包且都是可序列化的,以此把對(duì)RDD的閉包操作發(fā)送到各Workers節(jié)點(diǎn)。 Workers存儲(chǔ)著數(shù)據(jù)分塊和享有集群內(nèi)存,是運(yùn)行在工作節(jié)點(diǎn)上的守護(hù)進(jìn)程,當(dāng)它收到對(duì)RDD的操作時(shí),根據(jù)數(shù)據(jù)分片信息進(jìn)行本地化數(shù)據(jù)操作,生成新的數(shù)據(jù)分片、返回結(jié)果或把RDD寫入存儲(chǔ)系統(tǒng)。

Scala:Spark使用Scala開(kāi)發(fā),默認(rèn)使用Scala作為編程語(yǔ)言。編寫Spark程序比編寫Hadoop MapReduce程序要簡(jiǎn)單的多,SparK提供了Spark-Shell,可以在Spark-Shell測(cè)試程序。寫SparK程序的一般步驟就是創(chuàng)建或使用(SparkContext)實(shí)例,使用SparkContext創(chuàng)建RDD,然后就是對(duì)RDD進(jìn)行操作。
Java:Spark支持Java編程,但對(duì)于使用Java就沒(méi)有了Spark-Shell這樣方便的工具,其它與Scala編程是一樣的,因?yàn)槎际荍VM上的語(yǔ)言,Scala與Java可以互操作,Java編程接口其實(shí)就是對(duì)Scala的封裝。如:
Python:現(xiàn)在Spark也提供了Python編程接口,Spark使用py4j來(lái)實(shí)現(xiàn)python與java的互操作,從而實(shí)現(xiàn)使用python編寫Spark程序。Spark也同樣提供了pyspark,一個(gè)Spark的python shell,可以以交互式的方式使用Python編寫Spark程序。
Spark生態(tài)系統(tǒng)
Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎(chǔ)上提供和Hive一樣的H iveQL命令接口,為了最大程度的保持和Hive的兼容性,Shark使用了Hive的API來(lái)實(shí)現(xiàn)query Parsing和 Logic Plan generation,最后的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。通過(guò)配置Shark參數(shù),Shark可以自動(dòng)在內(nèi)存中緩存特定的RDD,實(shí)現(xiàn)數(shù)據(jù)重用,進(jìn)而加快特定數(shù)據(jù)集的檢索。同時(shí),Shark通過(guò)UDF用戶自定義函數(shù)實(shí)現(xiàn)特定的數(shù)據(jù)分析學(xué)習(xí)算法,使得SQL數(shù)據(jù)查詢和運(yùn)算分析能結(jié)合在一起,最大化RDD的重復(fù)使用。
Spark streaming: 構(gòu)建在Spark上處理Stream數(shù)據(jù)的框架,基本的原理是將Stream數(shù)據(jù)分成小的時(shí)間片斷(幾秒),以類似batch批量處理的方式來(lái)處理這小部分?jǐn)?shù)據(jù)。Spark Streaming構(gòu)建在Spark上,一方面是因?yàn)镾park的低延遲執(zhí)行引擎(100ms+)可以用于實(shí)時(shí)計(jì)算,另一方面相比基于Record的其它處理框架(如Storm),RDD數(shù)據(jù)集更容易做高效的容錯(cuò)處理。此外小批量處理的方式使得它可以同時(shí)兼容批量和實(shí)時(shí)數(shù)據(jù)處理的邏輯和算法。方便了一些需要?dú)v史數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù)聯(lián)合分析的特定應(yīng)用場(chǎng)合。
Bagel: Pregel on Spark,可以用Spark進(jìn)行圖計(jì)算,這是個(gè)非常有用的小項(xiàng)目。Bagel自帶了一個(gè)例子,實(shí)現(xiàn)了Google的PageRank算法。
Spark的適用場(chǎng)景
Spark是基于內(nèi)存的迭代計(jì)算框架,適用于需要多次操作特定數(shù)據(jù)集的應(yīng)用場(chǎng)合。需要反復(fù)操作的次數(shù)越多,所需讀取的數(shù)據(jù)量越大,受益越大,數(shù)據(jù)量小但是計(jì)算密集度較大的場(chǎng)合,受益就相對(duì)較小
由于RDD的特性,Spark不適用那種異步細(xì)粒度更新?tīng)顟B(tài)的應(yīng)用,例如web服務(wù)的存儲(chǔ)或者是增量的web爬蟲和索引。就是對(duì)于那種增量修改的應(yīng)用模型不適合。
總的來(lái)說(shuō)Spark的適用面比較廣泛且比較通用。
在業(yè)界的使用
Spark項(xiàng)目在2009年啟動(dòng),2010年開(kāi)源, 現(xiàn)在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Spark的python克隆版Dpark。
騰訊?廣點(diǎn)通是最早使用Spark的應(yīng)用之一。騰訊大數(shù)據(jù)精準(zhǔn)推薦借助Spark快速迭代的優(yōu)勢(shì),圍繞“數(shù)據(jù)+算法+系統(tǒng)”這套技術(shù)方案,實(shí)現(xiàn)了在“數(shù)據(jù)實(shí)時(shí)采集、算法實(shí)時(shí)訓(xùn)練、系統(tǒng)實(shí)時(shí)預(yù)測(cè)”的全流程實(shí)時(shí)并行高維算法,最終成功應(yīng)用于廣點(diǎn)通pCTR投放系統(tǒng)上,支持每天上百億的請(qǐng)求量。
基于日志數(shù)據(jù)的快速查詢系統(tǒng)業(yè)務(wù)構(gòu)建于Spark之上的Shark,利用其快速查詢以及內(nèi)存表等優(yōu)勢(shì),承擔(dān)了日志數(shù)據(jù)的即席查詢工作。在性能方面,普遍比Hive高2-10倍,如果使用內(nèi)存表的功能,性能將會(huì)比Hive快百倍。Yahoo?Yahoo將Spark用在Audience Expansion中的應(yīng)用。Audience Expansion是廣告中尋找目標(biāo)用戶的一種方法:首先廣告者提供一些觀看了廣告并且購(gòu)買產(chǎn)品的樣本客戶,據(jù)此進(jìn)行學(xué)習(xí),尋找更多可能轉(zhuǎn)化的用戶,對(duì)他們定向廣告。Yahoo采用的算法是logisticregression。同時(shí)由于有些SQL負(fù)載需要更高的服務(wù)質(zhì)量,又加入了專門跑Shark的大內(nèi)存集群,用于取代商業(yè)BI/OLAP工具,承擔(dān)報(bào)表/儀表盤和交互式/即席查詢,同時(shí)與桌面BI工具對(duì)接。目前在Yahoo部署的Spark集群有112臺(tái)節(jié)點(diǎn),9.2TB內(nèi)存。
淘寶?阿里搜索和廣告業(yè)務(wù),最初使用Mahout或者自己寫的MR來(lái)解決復(fù)雜的機(jī)器學(xué)習(xí),導(dǎo)致效率低而且代碼不易維護(hù)。淘寶技術(shù)團(tuán)隊(duì)使用了Spark來(lái)解決多次迭代的機(jī)器學(xué)習(xí)算法、高計(jì)算復(fù)雜度的算法等。將Spark運(yùn)用于淘寶的推薦相關(guān)算法上,同時(shí)還利用Graphx解決了許多生產(chǎn)問(wèn)題,包括以下計(jì)算場(chǎng)景:基于度分布的中樞節(jié)點(diǎn)發(fā)現(xiàn)、基于最大連通圖的社區(qū)發(fā)現(xiàn)、基于三角形計(jì)數(shù)的關(guān)系衡量、基于隨機(jī)游走的用戶屬性傳播等。
優(yōu)酷土豆?優(yōu)酷土豆在使用Hadoop集群的突出問(wèn)題主要包括:第一是商業(yè)智能BI方面,分析師提交任務(wù)之后需要等待很久才得到結(jié)果;第二就是大數(shù)據(jù)量計(jì)算,比如進(jìn)行一些模擬廣告投放之時(shí),計(jì)算量非常大的同時(shí)對(duì)效率要求也比較高,最后就是機(jī)器學(xué)習(xí)和圖計(jì)算的迭代運(yùn)算也是需要耗費(fèi)大量資源且速度很慢。
最終發(fā)現(xiàn)這些應(yīng)用場(chǎng)景并不適合在MapReduce里面去處理。通過(guò)對(duì)比,發(fā)現(xiàn)Spark性能比MapReduce提升很多。首先,交互查詢響應(yīng)快,性能比Hadoop提高若干倍;模擬廣告投放計(jì)算效率高、延遲小(同hadoop比延遲至少降低一個(gè)數(shù)量級(jí));機(jī)器學(xué)習(xí)、圖計(jì)算等迭代計(jì)算,大大減少了網(wǎng)絡(luò)傳輸、數(shù)據(jù)落地等,極大的提高的計(jì)算性能。目前Spark已經(jīng)廣泛使用在優(yōu)酷土豆的視頻推薦(圖計(jì)算)、廣告業(yè)務(wù)等。京東?應(yīng)用于京東云海項(xiàng)目,集成MQ和Kafka, 基于Spark Streaming進(jìn)行實(shí)時(shí)計(jì)算,輸出到HBase
網(wǎng)易?在網(wǎng)易大數(shù)據(jù)平臺(tái)中,數(shù)據(jù)存儲(chǔ)在HDFS之后,提供Hive的數(shù)據(jù)倉(cāng)庫(kù)計(jì)算和查詢,要提高數(shù)據(jù)處理的性能并達(dá)到實(shí)時(shí)級(jí)別,網(wǎng)易公司采用的是Impala和Shark結(jié)合的混合實(shí)時(shí)技術(shù)。Cloudera Impala是基于Hadoop的實(shí)時(shí)檢索引擎開(kāi)源項(xiàng)目,其效率比Hive提高3-90倍,其本質(zhì)是Google Dremel的模仿,但在SQL功能上青出于藍(lán)勝于藍(lán)。Shark是基于Spark的SQL實(shí)現(xiàn),Shark可以比 Hive 快40倍(其論文所描述), 如果執(zhí)行機(jī)器學(xué)習(xí)程序,可以快 25倍,并完全和Hive兼容
百度?據(jù)說(shuō)百度是國(guó)內(nèi)規(guī)模最大的Spark集群的運(yùn)營(yíng)者——實(shí)際生產(chǎn)環(huán)境,最大單集群規(guī)模1300臺(tái)(包含數(shù)萬(wàn)核心和上百TB內(nèi)存),公司內(nèi)部同時(shí)還運(yùn)行著大量的小型Spark集群。
當(dāng)前百度的Spark集群由上千臺(tái)物理主機(jī)(數(shù)萬(wàn)Cores,上百TBMemory)組成,日提交App在數(shù)百,已應(yīng)用于鳳巢、大搜索、直達(dá)號(hào)、百度大數(shù)據(jù)等業(yè)務(wù)。之以選擇Spark,甄鵬總結(jié)了三個(gè)原因:快速高效、API 友好易用和組件豐富。同時(shí)百度開(kāi)放云還提供Spark集群計(jì)算服務(wù),BMR中的Spark同樣隨用隨起,集群空閑即銷毀,幫助用戶節(jié)省預(yù)算。此外,集群創(chuàng)建可以在3到5分鐘內(nèi)完成,包含了完整的Spark+HDFS+YARN堆棧。同時(shí),BMR也提供Long Running模式,并有多種套餐可選。大眾點(diǎn)評(píng)?2013年在建立了公司主要的大數(shù)據(jù)架構(gòu)后,他們上線了HBase的應(yīng)用,并引入Spark/Shark以提高Ad Hoc Query的執(zhí)行時(shí)間,并調(diào)研分布式日志收集系統(tǒng),來(lái)取代手工腳本做日志導(dǎo)入。