近十年來,隨著Hadoop生態(tài)系統(tǒng)的不斷完善,Hadoop早已成為大數(shù)據(jù)事實上的行業(yè)標(biāo)準(zhǔn)之一。面對當(dāng)今互聯(lián)網(wǎng)產(chǎn)生的巨大的TB甚至PB級原始數(shù)據(jù),利用基于Hadoop的數(shù)據(jù)倉庫解決方案Hive早已是Hadoop的熱點應(yīng)用之一。達(dá)觀數(shù)據(jù)團(tuán)隊長期致力于研究和積累Hadoop系統(tǒng)的技術(shù)和經(jīng)驗,并構(gòu)建起了分布式存儲、分析、挖掘以及應(yīng)用的整套大數(shù)據(jù)處理平臺。本文將從Hive的原理、架構(gòu)及優(yōu)化等方面來分享Hive的一些心得和使用經(jīng)驗,希望對大家有所收貨。
1. Hive基本原理
Hadoop是一個流行的開源框架,用來存儲和處理商用硬件上的大規(guī)模數(shù)據(jù)集。對于HDFS上的海量日志而言,編寫Mapreduce程序代碼對于類似數(shù)據(jù)倉庫的需求來說總是顯得相對于難以維護(hù)和重用,Hive作為一種基于Hadoop的數(shù)據(jù)倉庫解決方案應(yīng)運而生,并得到了廣泛應(yīng)用。
Hive是基于Hadoop的數(shù)據(jù)倉庫平臺,由Facebook貢獻(xiàn),其支持類似SQL的結(jié)構(gòu)化查詢功能。Facebook設(shè)計開發(fā)Hive的初衷就是讓那些熟悉sql編程方式的人也可以更好的利用hadoop,hive可以讓數(shù)據(jù)分析人員只關(guān)注于具體業(yè)務(wù)模型,而不需要深入了解Map/Reduce的編程細(xì)節(jié),但是這并不意味著使用hive不需要了解和學(xué)習(xí)Map/Reduce編程模型和hadoop,復(fù)雜的業(yè)務(wù)需求和模型總是存在的,對于Hive分析人員來說,深入了解Hadoop和Hive的原理和Mapreduce模型,對于優(yōu)化查詢總有益處。
以下先以一個簡單的例子說明利用hadoop Map/Reduce程序和Hive實現(xiàn)hadoop word count的例子。
圖1:mapreduce和hive分別實現(xiàn)count
通過以上可以看出,hive優(yōu)點:成本低,可以通過類sql語句快速實現(xiàn)簡單或復(fù)雜的MapReduce統(tǒng)計。
借助于Hadoop和HDFS的大數(shù)據(jù)存儲能力,數(shù)據(jù)仍然存儲于Hadoop的HDFS中,Hive提供了一種類SQL的查詢語言:HiveQL(HQL),對數(shù)據(jù)進(jìn)行管理和分析,開發(fā)人員可以近乎sql的方式來實現(xiàn)邏輯,從而加快應(yīng)用開發(fā)效率。(關(guān)于Hadoop、hdfs的更多知識請參考hadoop官網(wǎng)及hadoop權(quán)威指南)
HQL經(jīng)過解析和編譯,最終會生成基于Hadoop平臺的Map Reduce任務(wù),Hadoop通過執(zhí)行這些任務(wù)來完成HQL的執(zhí)行。
1.1?? Hive組件
Hive的組件總體上可以分為以下幾個部分:用戶接口(UI)、驅(qū)動、編譯器、元數(shù)據(jù)(Hive系統(tǒng)參數(shù)數(shù)據(jù))和執(zhí)行引擎。
圖2:Hive執(zhí)行流程圖
1)????? 對外的接口UI包括以下幾種:命令行CLI,Web界面、JDBC/ODBC接口;
2)????? 驅(qū)動:接收用戶提交的查詢HQL;
3)???? 編譯器:解析查詢語句,執(zhí)行語法分析,生成執(zhí)行計劃;
4)???? 元數(shù)據(jù)Metadata:存放系統(tǒng)的表、分區(qū)、列、列類型等所有信息,以及對應(yīng)的HDFS文件信息等;
5)???? 執(zhí)行引擎:執(zhí)行執(zhí)行計劃,執(zhí)行計劃是一個有向無環(huán)圖,執(zhí)行引擎按照各個任務(wù)的依賴關(guān)系選擇執(zhí)行任務(wù)(Job)。
需要注意的是,元數(shù)據(jù)庫一般是通過關(guān)系型數(shù)據(jù)庫MySQL來存儲。元數(shù)據(jù)維護(hù)了庫信息、表信息、列信息等所有內(nèi)容,例如表T包含哪些列,各列的類型等等。因此元數(shù)據(jù)庫十分重要,需要定期備份以及支持查詢的擴(kuò)展性。
讀時驗證機(jī)制
與傳統(tǒng)數(shù)據(jù)庫對表數(shù)據(jù)進(jìn)行寫時嚴(yán)重不同,Hive對數(shù)據(jù)的驗證方式為讀時模式,即只有在讀表數(shù)據(jù)的時候,hive才檢查解析具體的字段、shema等,從而保證了大數(shù)據(jù)量的快速加載。
既然hive采用的讀時驗證機(jī)制,那么 如果表schema與表文件內(nèi)容不匹配,會發(fā)生什么呢?
答案是hive會盡其所能的去讀數(shù)據(jù)。如果schema中表有10個字段,而文件記錄卻只有3個字段,那么其中7個字段將為null;如果某些字段類型定位為數(shù)值類型,但是記錄中卻為非數(shù)值字符串,這些字段也將會被轉(zhuǎn)換為null。簡而言之,hive會努力catch讀數(shù)據(jù)時遇到的錯誤,并努力返回。
既然Hive表數(shù)據(jù)存儲在HDFS中且Hive采用的是讀時驗證方式,定義完表的schema會自動生成表數(shù)據(jù)的HDFS目錄,且我們可以以任何可能的方式來加載表數(shù)據(jù)或者利用HDFS API將數(shù)據(jù)寫入文件,同理,當(dāng)我們?nèi)粜枰獙ive數(shù)據(jù)寫入其他庫(如oracle),也可以直接通過api讀取數(shù)據(jù)再寫入目標(biāo)庫。
在實際生產(chǎn)環(huán)境中,當(dāng)需要數(shù)據(jù)倉庫之間的遷移時,就可以直接利用api將源庫的數(shù)據(jù)直接寫入hive庫的表文件中,包括淘寶開源的datax數(shù)據(jù)交換系統(tǒng)都采用類似的方式來交換跨庫數(shù)據(jù)。
再次注意,加載或者寫入的數(shù)據(jù)內(nèi)容要和表定義的schema一致,否則將會造成字段或者表為空。
1.2?? Hive數(shù)據(jù)模型
從數(shù)據(jù)倉庫的角度看,Hive是建立在Hadoop上的數(shù)據(jù)倉庫基礎(chǔ)架構(gòu),可以方便的ETL操作。Hive沒有專門的數(shù)據(jù)存儲格式,也沒有為數(shù)據(jù)建立索引,用于可以非常自由的組織Hive中的表,只需要在創(chuàng)建表的時候定義好表的schema即可。Hive中包含4中數(shù)據(jù)模型:Tabel、ExternalTable、Partition、Bucket。
圖3:hive數(shù)據(jù)模型
- Table:類似與傳統(tǒng)數(shù)據(jù)庫中的Table,每一個Table在Hive中都有一個相應(yīng)的目錄來存儲數(shù)據(jù)。例如:一個表t,它在HDFS中的路徑為:/user/hive/warehouse/t。
- Partition:類似于傳統(tǒng)數(shù)據(jù)庫中劃分列的索引。在Hive中,表中的一個Partition對應(yīng)于表下的一個目錄,所有的Partition數(shù)據(jù)都存儲在對應(yīng)的目錄中。例如:t表中包含ds和city兩個Partition,則對應(yīng)于ds=2014,city=beijing的HDFS子目錄為:/user/hive/warehouse/t/ds=2014/city=Beijing;
需要注意的是,分區(qū)列是表的偽列,表數(shù)據(jù)文件中并不存在這個分區(qū)列的數(shù)據(jù)。
- Buckets:對指定列計算的hash,根據(jù)hash值切分?jǐn)?shù)據(jù),目的是為了便于并行,每一個Buckets對應(yīng)一個文件。將user列分?jǐn)?shù)至32個Bucket上,首先對user列的值計算hash,比如,對應(yīng)hash=0的HDFS目錄為:/user/hive/warehouse/t/ds=2014/city=Beijing/part-00000;對應(yīng)hash=20的目錄為:/user/hive/warehouse/t/ds=2014/city=Beijing/part-00020。
- External Table指向已存在HDFS中的數(shù)據(jù),可創(chuàng)建Partition。Managed Table創(chuàng)建和數(shù)據(jù)加載過程,可以用統(tǒng)一語句實現(xiàn),實際數(shù)據(jù)被轉(zhuǎn)移到數(shù)據(jù)倉庫目錄中,之后對數(shù)據(jù)的訪問將會直接在數(shù)據(jù)倉庫的目錄中完成。刪除表時,表中的數(shù)據(jù)和元數(shù)據(jù)都會刪除。External Table只有一個過程,因為加載數(shù)據(jù)和創(chuàng)建表是同時完成。數(shù)據(jù)是存儲在Location后面指定的HDFS路徑中的,并不會移動到數(shù)據(jù)倉庫中。
1.3?? Hive翻譯成MapReduce Job
Hive編譯器將HQL代碼轉(zhuǎn)換成一組操作符(operator),操作符是Hive的最小操作單元,每個操作符代表了一種HDFS操作或者M(jìn)apReduce作業(yè)。Hive中的操作符包括:
表:Hive執(zhí)行常用的操作符列表
操作符 | 描述 |
TableScanOperator | 掃描hive表數(shù)據(jù) |
ReduceSinkOperator | 創(chuàng)建將發(fā)送到Reducer端的<Key,Value>對 |
JoinOperator | Join兩份數(shù)據(jù) |
SelectOperator | 選擇輸出列 |
FileSinkOperator | 建立結(jié)果數(shù)據(jù),輸出至文件 |
FilterOperator | 過濾輸入數(shù)據(jù) |
GroupByOperator | Group By 語句 |
MapJoinOperator | Mapjoin |
LimitOperator | Limit語句 |
UnionOperator | Union語句 |
對于MapReduce操作單元,Hive通過ExecMapper和ExecReducer執(zhí)行MapReduce任務(wù)。
對于Hive語句:
INSERT OVERWRITE TABLE read_log_tmp
SELECT a.userid,a.bookid,b.author,b.categoryid
FROM user_read_log a JOIN book_info b ON a.bookid = b.bookid;
其執(zhí)行計劃為:
圖4:reduce端join的任務(wù)執(zhí)行流程
1.4?? 與一般SQL的區(qū)別
Hive 視圖與一般數(shù)據(jù)庫視圖
Hive視圖與一般數(shù)據(jù)庫視圖作用角色相同,都是基于數(shù)據(jù)規(guī)??s減或者基于安全機(jī)制下的某些條件查詢下的數(shù)據(jù)子集。Hive視圖只支持邏輯視圖,不支持物化視圖,即每次對視圖的查詢hive都將執(zhí)行查詢?nèi)蝿?wù),因此視圖不會帶來性能上的提升。作為Hive查詢優(yōu)化的一部分,對視圖的查詢條件語句和視圖的定義查詢條件語句將會盡可能的合并成一個條件查詢。
Hive索引與一般數(shù)據(jù)庫索引
相比于傳統(tǒng)數(shù)據(jù)庫,Hive只提供有限的索引功能,通過在某些字段上建立索引來加速某些操作。通常當(dāng)邏輯分區(qū)太多太細(xì),partition無法滿足時,可以考慮建立索引。Hive1.2.1版本目前支持的索引類型有CompactIndexHandler和Bitmap。
CompactIndexHandler 壓縮索引通過將列中相同的值得字段進(jìn)行壓縮從而減小存儲和加快訪問時間。需要注意的是Hive創(chuàng)建壓縮索引時會將索引數(shù)據(jù)也存儲在Hive表中。對于表tb_index (id int, name string) 而言,建立索引后的索引表中默認(rèn)的三列一次為索引列(id)、hdfs文件地址(_bucketname)、偏移量(offset)。特別注意,offset列類型為array<bigint>。
Bitmap 位圖索引作為一種常見的索引,如果索引列只有固定的幾個值,那么就可以采用位圖索引來加速查詢。利用位圖索引可以方便的進(jìn)行AND/OR/XOR等各類計算,Hive0.8版本開始引入位圖索引,位圖索引在大數(shù)據(jù)處理方面的應(yīng)用廣泛,比如可以利用bitmap來計算用戶留存率(索引做與運算,效率遠(yuǎn)好于join的方式)。如果Bitmap索引很稀疏,那么就需要對索引壓縮以節(jié)省存儲空間和加快IO。Hive的Bitmap Handler采用的是EWAH(https://github.com/lemire/javaewah)壓縮方式。
圖5:Hive compact索引及bitmap索引
從Hive索引功能來看,其主要功能就是避免第一輪mr任務(wù)的全表掃描,而改為掃描索引表。如果索引索引表本身很大,其開銷仍然很大,在集群資源充足的情況下,可以忽略使用hive下的索引。
2. Schema設(shè)計
沒有通用的schema,只有合適的schema。在設(shè)計Hive的schema的時候,需要考慮到存儲、業(yè)務(wù)上的高頻查詢造成的開銷等等,設(shè)計適合自己的數(shù)據(jù)模型。
2.1?? 設(shè)置分區(qū)表
對于Hive來說,利用分區(qū)來設(shè)計表總是必要的,分區(qū)提供了一種隔離數(shù)據(jù)和優(yōu)化查詢的便利的方式。特別是面對日益增長的數(shù)據(jù)規(guī)模。設(shè)置符合邏輯的分區(qū)可以避免進(jìn)行全表掃描,只需加載特定某些hdfs目錄的數(shù)據(jù)文件。
設(shè)置分區(qū)時,需要考慮被設(shè)置成分區(qū)的字段,按照時間分區(qū)一般而言就是一個好的方案,其好處在于其是按照不同時間粒度來確定合適大小的數(shù)據(jù)積累量,隨著時間的推移,分區(qū)數(shù)量的增長是均勻的,分區(qū)的大小也是均勻的。達(dá)觀數(shù)據(jù)每日處理大量的用戶日志,對于user_log來說,設(shè)置分區(qū)字段為日期(天)是合理的。如果以userid字段來建立動態(tài)分區(qū),而userid的基數(shù)是非常大的,顯然分區(qū)數(shù)目是會超過hive的默認(rèn)設(shè)置而執(zhí)行失敗。如果相對userid進(jìn)行hash,我們可以以userid進(jìn)行分桶(bucket),根據(jù)userid進(jìn)行hash然后分發(fā)到桶中,相同hash值的userid會分發(fā)到同一個桶中。每個桶對應(yīng)著一個單獨的文件。
2.2?? 避免小文件
雖然分區(qū)有利于隔離數(shù)據(jù)和查詢,設(shè)置過多過細(xì)的分區(qū)也會帶來瓶頸,主要是因為HDFS非常容易存儲大數(shù)據(jù)文件,由于分區(qū)對應(yīng)著hdfs的目錄結(jié)構(gòu),當(dāng)存在過多的分區(qū)時,意味著文件的數(shù)目就越多,過多增長的小文件會給namecode帶來巨大的性能壓力。同時小文件過多會影響JOB的執(zhí)行,hadoop會將一個job轉(zhuǎn)換成多個task,即使對于每個小文件也需要一個task去單獨處理,task作為一個獨立的jvm實例,其開啟和停止的開銷可能會大大超過實際的任務(wù)處理時間。因此,hive表設(shè)計的分區(qū)不應(yīng)該過多過細(xì),每個目錄下的文件足夠大,應(yīng)該是文件系統(tǒng)中塊大小的若干倍。(達(dá)觀數(shù)據(jù) 文輝)
查詢避免生成小文件技巧
既然hive或者說hadoop需要大文件,HQL執(zhí)行語句也需要注意輸入文件和輸出文件的大小,防止生成過多小文件。hive可以通過配置參數(shù)在mr過程中合并小文件。
Map合并小文件:
set mapred.max.split.size=256000000? #每個Map最大輸入大小(單位:字節(jié))
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat? #執(zhí)行Map前進(jìn)行小文件合并
輸出合并:
set hive.merge.mapfiles = true???????????????? #在Map-only的任務(wù)結(jié)束時合并小文件
set hive.merge.mapredfiles= true????? ?????????#在Map-Reduce的任務(wù)結(jié)束時合并小文件
set hive.merge.size.per.task = 256*1000*1000 ???#合并文件的大小
set hive.merge.smallfiles.avgsize=16000000 ?????#當(dāng)輸出文件的平均大小小于該值時,啟動一個獨立的map-reduce任務(wù)進(jìn)行文件merge
create table user_log (user_id int,url string,source_ip string)
partitioned by (dt? string)
clustered by (user_id) into 96 buckets;
我們知道hive輸出最終是mr的輸出,即reducer(或mapper)的輸出,有多少個reducer(mapper)輸出就會生成多少個輸出文件,根據(jù)shuffle/sort的原理,每個文件按照某個值進(jìn)行shuffle后的結(jié)果。我們可通過設(shè)置hive.enforce.bucketing=true來強(qiáng)制將對應(yīng)記錄分發(fā)到正確桶中,或者通過添加cluster by語句以及設(shè)置set mapred.reduce.tasks=96來設(shè)置reducer的數(shù)目,從而保證輸出與schema一致。根據(jù)hive的讀時驗證方式,正確的插入數(shù)據(jù)取決與我們自己,而不能依靠schema。
2.3?? 選擇文件格式
Hive提供的默認(rèn)文件存儲格式有textfile、sequencefile、rcfile等。用戶也可以通過實現(xiàn)接口來自定義輸入輸?shù)奈募袷健?/p>
在實際應(yīng)用中,textfile由于無壓縮,磁盤及解析的開銷都很大,一般很少使用。Sequencefile以鍵值對的形式存儲的二進(jìn)制的格式,其支持針對記錄級別和塊級別的壓縮。rcfile是一種行列結(jié)合的存儲方式(text file和sequencefile都是行表[row table]),其保證同一條記錄在同一個hdfs塊中,塊以列式存儲。一般而言,對于OLTP而言,行表優(yōu)勢大于列表,對于OLAP而言,列表的優(yōu)勢大于行表,特別容易想到當(dāng)做聚合操作時,列表的復(fù)雜度將會比行表小的多,雖然單獨rcfile的列運算不一定總是存在的,但是rcfile的高壓縮率確實減少文件大小,因此實際應(yīng)用中,rcfile總是成為不二的選擇,達(dá)觀數(shù)據(jù)平臺在選擇文件存儲格式時也大量選擇了rcfile方案。
3. 查看執(zhí)行計劃及優(yōu)化
達(dá)觀的數(shù)據(jù)倉庫基于Hive搭建,每日需要處理大量的計算流程,Hive的穩(wěn)定性和性能至關(guān)重要。眾多的任務(wù)需要我們合理的調(diào)節(jié)分配集群資源,合理的配置各參數(shù),合理的優(yōu)化查詢。Hive優(yōu)化包含各個方面,如job個數(shù)優(yōu)化、job的map/reducer個數(shù)優(yōu)化、并行執(zhí)行優(yōu)化等等,本節(jié)將主要從HQL查詢優(yōu)化角度來具體說明
3.1?? Join語句
對于上述的join語句
INSERT OVERWRITE TABLE read_log_tmp
SELECT a.userid,a.bookid,b.author
FROM user_read_log a JOIN book_info b ON a.bookid = b.bookid;
explain該查詢語句后:
圖:map端join的執(zhí)行計劃
由于表中數(shù)據(jù)為空,對于小數(shù)據(jù)量,hive會自動采取map join的方式來優(yōu)化join,從mapreduce的編程模型來看,實現(xiàn)join的方式主要有map端join、reduce端join。Map端join利用hadoop 分布式緩存技術(shù)通過將小表變換成hashtable文件分發(fā)到各個task,map大表時可以直接判斷hashtable來完成join,注意小表的hashtable是放在內(nèi)存中的,在內(nèi)存中作匹配,因此map join是一種非??斓膉oin方式,也是一種常見的優(yōu)化方式。如果小表夠小,那么就可以以map join的方式來完成join完成。Hive通過設(shè)置hive.auto.convert.join=true(默認(rèn)值)來自動完成map join的優(yōu)化,而無需顯示指示map join。缺省情況下map join的優(yōu)化是打開的。
Reduce端join需要reducer來完成join過程,對于上述join代碼,reduce 端join的mr流程如下,
圖:reduce端join的mapreduce過程
相比于map join, reduce 端join無法再map過程中過濾任何記錄,只能將join的兩張表的所有數(shù)據(jù)按照join key進(jìn)行shuffle/sort,并按照join key的hash值將<key,value>對分發(fā)到特定的reducer。Reducer對于所有的鍵值對執(zhí)行join操作,例如0號(bookid的hash值為0)reducer收到的鍵值對如下,其中T1、T2表示記錄的來源表,起到標(biāo)識作用:
圖:reduce端join的reducer join
Reducer端join無法避免的reduce截斷以及傳輸?shù)拇罅繑?shù)據(jù)都會給集群網(wǎng)絡(luò)帶來壓力,從上圖可以看出所有hash(bookid) % reducer_number等于0的key-value對都會通過shuffle被分發(fā)到0號reducer,如果分到0號reducer的記錄數(shù)目遠(yuǎn)大于其他reducer的記錄數(shù)目,顯然0號的reducer的數(shù)據(jù)處理量將會遠(yuǎn)大于其他reducer,因此處理時間也會遠(yuǎn)大于其他reducer,甚至?xí)韮?nèi)存等其他問題,這就是數(shù)據(jù)傾斜問題。對于join造成的數(shù)據(jù)傾斜問題我們可以通過設(shè)置參數(shù)set Hive.optimize.skewjoin=true,讓hive自己嘗試解決join過程中產(chǎn)生的傾斜問題。
3.2?? Group by語句
我們對user_read_log表按userid goup by語句來繼續(xù)探討數(shù)據(jù)傾斜問題,首先我們explain group by語句:
explain select userid,count(*) from user_read_log group by userid。
圖:goup by的執(zhí)行計劃
Group by的執(zhí)行計劃按照userid的hash值分發(fā)記錄,同時在map端也做了本地reduce,group by的shuffle過程是按照hash(userid)來分發(fā)的,實際應(yīng)用中日志中很多用戶都是未注冊用戶或者未登錄,userid字段為空的記錄數(shù)遠(yuǎn)大于userid不為空的記錄數(shù),當(dāng)所有的空userid記錄都分發(fā)到特定某一個reducer后,也會帶來嚴(yán)重的數(shù)據(jù)傾斜問題。造成數(shù)據(jù)傾斜的主要原因在于分發(fā)到某個或某幾個reducer的數(shù)據(jù)量遠(yuǎn)大于其他reducer的數(shù)據(jù)量。
對于group by造成的數(shù)據(jù)傾斜問題,我們可以通過設(shè)置參數(shù)
set hive.map.aggr=true (開啟map端combiner);
set hive.groupby.skewindata=true;
這個參數(shù)的作用是做Reduce操作的時候,拿到的key并不是所有相同值給同一個Reduce,而是隨機(jī)分發(fā),然后Reduce做聚合,做完之后再做一輪MR,拿前面聚合過的數(shù)據(jù)再算結(jié)果。雖然多了一輪MR任務(wù),但是可以有效的減少數(shù)據(jù)傾斜問題可能帶來的危險。
正確的設(shè)置Hive參數(shù)可以在某種程度上避免的數(shù)據(jù)傾斜問題,合適的查詢語句也可以避免數(shù)據(jù)傾斜問題。要盡早的過濾數(shù)據(jù)和裁剪數(shù)據(jù),減少后續(xù)處理的數(shù)據(jù)量,使得join key的數(shù)據(jù)分布較為均勻,將空字段隨機(jī)賦予值,這樣既可以均勻分發(fā)傾斜的數(shù)據(jù):
select userid,name from user_info a
join (
select case when userid is null then cast(rand(47)*100000 as int)
else userid
from user_read_log
) b on a.userid = b.userid
如果用戶在定義schema的時候就已經(jīng)預(yù)料到表數(shù)據(jù)可能會存在嚴(yán)重的數(shù)據(jù)傾斜問題,Hive自0.10.0引入了skew table的概念,如建表語句
CREATE TABLE user_read_log (userid int,bookid, …)
SKEWED BY (userid) ON (null) [STORED AS DIRECTORIES];
需要注意的是,skew table只是將傾斜特別嚴(yán)重的列的分開存儲為不同的文件,每個制定的傾斜值制定為一個文件或者目錄,因此在查詢的時候可以通過過濾傾斜值來避免數(shù)據(jù)傾斜問題:
select userid,name from user_info a
join (
select userid from user_read_log where pt=’2015’ and userid is not null
) b on a.userid = b.userid
可以看出,如果不加過濾條件,傾斜問題還是會存在,通過對skew table加過濾條件的好處是避免了mapper的表掃描過濾操作。
3.3?? Join的物理優(yōu)化
Hive內(nèi)部實現(xiàn)了MapJoinResolver(處理MapJoin)、SkewJoinResolver(處理傾斜join)、CommonJoinResolver
(處理普通Join)等類來實現(xiàn)join的查詢物理優(yōu)化(/org/apache/hadoop/hive/ql/optimizer/physical)。
CommonJoinResolver類負(fù)責(zé)將普通Join轉(zhuǎn)換成MapJoin,Hive通過這個類來實現(xiàn)mapjoin的自動優(yōu)化。對于表A和表B的join查詢,會產(chǎn)生3個分支:
- 以表A作為大表進(jìn)行Mapjoin;
- 以表A作為大表進(jìn)行Mapjoin;
- Map-reduce join
由于不知道輸入數(shù)據(jù)規(guī)模,因此編譯時并不會決定走那個分支,而是在運行時判斷走那個分支。需要注意的是要像完成上述自動轉(zhuǎn)換,需要將hive.auto.convert.join.noconditionaltask設(shè)置為true(默認(rèn)值),同時可以手工控制轉(zhuǎn)載進(jìn)內(nèi)存的小表的大?。╤ive.auto.convert.join.noconditionaltask.size)。
MapJoinResolver 類負(fù)責(zé)迭代各個mr任務(wù),檢查每個任務(wù)是否存在map join操作,如果有,會將local map work轉(zhuǎn)換成local map join work。
SkewJoinResolver類負(fù)責(zé)迭代有join操作的reducer任務(wù),一旦單個reducer產(chǎn)生了傾斜,那么就會將傾斜值得數(shù)據(jù)寫入hdfs,然后用一個新的map join的任務(wù)來處理傾斜值的計算。雖然多了一輪mr任務(wù),但是由于采用的map join,效率也是很高的。良好的mr模式和執(zhí)行流程總是至關(guān)重要的。
4. 窗口分析函數(shù)
Hive提供了豐富了數(shù)學(xué)統(tǒng)計函數(shù),同時也提供了用戶自定義函數(shù)的接口,用戶可以自定義UDF、UDAF、UDTF Hive 0.11版本開始提供窗口和分析函數(shù)(Windowing and Analytics Functions),包括LEAD、LAG、FIRST_VALUE、LAST_VALUE、RANK、ROW_NUMBER、PERCENT_RANK、CUBE、ROLLUP等。
窗口函數(shù)是深受數(shù)據(jù)分析人員的喜愛,利用窗口函數(shù)可以方便的實現(xiàn)復(fù)雜的數(shù)據(jù)統(tǒng)計分析需求,oracle、db2、postgresql等數(shù)據(jù)庫中也提供了window function的功能。窗口函數(shù)與聚合函數(shù)一樣,都是對表子集的操作,從結(jié)果上看,區(qū)別在于窗口函數(shù)的結(jié)果不會聚合,原有的每行記錄依然會存在。
窗口函數(shù)的典型分析應(yīng)用包括:
- 按分區(qū)聚合(排序,top n問題)
- 行間計算(時間序列分析)
- 關(guān)聯(lián)計算(購物籃分析)
我們以一個簡單的行間計算的例子說明窗口函數(shù)的應(yīng)用(關(guān)于其他函數(shù)的具體說明,請參考hive文檔)。用戶閱讀行為的統(tǒng)計分析需要從點擊書籍行為中歸納統(tǒng)計出來,用戶在時間點T1點擊了章節(jié)A,在時間點T2點擊了章節(jié)B,在時間點T3點擊了章節(jié)C 。用戶瀏覽日志結(jié)構(gòu)如下表所示。
USER_ID | BOOK_ID | CHAPTER_ID | LOG_TIME |
1001 | 2001 | 40001 | 1443016010 |
1001 | 2001 | 40004 | 1443016012 |
1001 | 2001 | 40005 | 1443016310 |
通過對連續(xù)的用戶點擊日志分析,通過Hive提供的窗口分析函數(shù)可以計算出用戶各章節(jié)的閱讀時間。按照USER_ID、BOOKID構(gòu)建窗口,并按照LOG_TIME排序,對窗口的每一條記錄取相對下一條記錄的LOG_TIME減去當(dāng)前記錄的LOG_TIME即為當(dāng)前記錄章節(jié)的閱讀時間。
SELECT
Userid, bookid, chapterid, end_time – start_time as read_time
FROM
(
SELECT userid, bookid, chapterid, log_time as start_time,
lead(log_time,1,null) over(partition by userid, bookid order by log_time) as end_time
FROM user_read_log where pt=’2015–12–01’
) a;
通過上述查詢既可以找出2015-12-01日所有用戶對每一章節(jié)的閱讀時間。感謝窗口函數(shù),否則hive將束手無策。只能通過開發(fā)mr代碼或者實現(xiàn)udaf來實現(xiàn)上述功能。
窗口分析函數(shù)關(guān)鍵在于定義的窗口數(shù)據(jù)集及其對窗口的操作,通過over(窗口定義語句)來定義窗口。日常分析和實際應(yīng)用中,經(jīng)常會有窗口分析應(yīng)用的場景,例如基于分區(qū)的排序、集合、統(tǒng)計等復(fù)雜操作。例如我們需要統(tǒng)計每個用戶閱讀時間最多的3本書:
圖:行間計算示意圖及代碼
對上述語句explain后的結(jié)果:
圖:行間計算的執(zhí)行計劃
窗口函數(shù)使得Hive的具備了完整的數(shù)據(jù)分析功能,在實際的應(yīng)用環(huán)境中,達(dá)觀數(shù)據(jù)分析團(tuán)隊大量使用hive窗口分析函數(shù)來實現(xiàn)較為復(fù)雜的邏輯,提高開發(fā)和迭代效率。
5. 總結(jié)和展望
本文在介紹Hive的原理和架構(gòu)的基礎(chǔ)上,分享了達(dá)觀團(tuán)隊在Hive上的部分使用經(jīng)驗。Hive仍然處在不斷的發(fā)展之中,將HQL理解成Mapreduce程序、理解Hadoop的核心能力是更好的使用和優(yōu)化Hive的根本。
技術(shù)的發(fā)展日新月異,隨著spark的日益完善和流行,hive社區(qū)正考慮將spark作為hive的執(zhí)行引擎之一。Spark是一種基于rdd(彈性數(shù)據(jù)集)的內(nèi)存分布式并行處理框架,內(nèi)部集成了Spark SQL模塊來實現(xiàn)對結(jié)構(gòu)化數(shù)據(jù)的SQL功能。相比于Hadoop將大量的中間結(jié)果寫入HDFS,Spark避免了中間結(jié)果的持久化,速度更快且更有利于迭代計算。 達(dá)觀數(shù)據(jù)團(tuán)隊也將緊跟技術(shù)發(fā)展潮流,結(jié)合自身的業(yè)務(wù)需求,采取合理的框架架構(gòu),提升系統(tǒng)的處理能力。(達(dá)觀數(shù)據(jù)聯(lián)合創(chuàng)始人 文輝)
6. 參考資料:
- Hive wiki:https://cwiki.apache.org/confluence/display/Hive/Home
- Hive Design Docs:https://cwiki.apache.org/confluence/display/Hive/DesignDocs
- Hadoop: The Definitive Guide (3rd Edition)
- Programming Hive
- Analytical Queries with Hive:http://www.slideshare.net/Hadoop_Summit/analytical-queries-with-hive