達(dá)觀數(shù)據(jù)是為企業(yè)提供大數(shù)據(jù)處理、個(gè)性化推薦系統(tǒng)服務(wù)的知名公司,在應(yīng)對海量數(shù)據(jù)處理時(shí),積累了大量實(shí)戰(zhàn)經(jīng)驗(yàn)。其中達(dá)觀數(shù)據(jù)在面對大量的數(shù)據(jù)交互和消息處理時(shí),使用了稱為DPIO的設(shè)計(jì)思路進(jìn)行快速、穩(wěn)定、可靠的消息數(shù)據(jù)傳遞機(jī)制,本文分享了達(dá)觀數(shù)據(jù)在應(yīng)對大規(guī)模消息數(shù)據(jù)處理時(shí)所開發(fā)的通訊中間件DPIO的設(shè)計(jì)思路和處理經(jīng)驗(yàn)。
一、數(shù)據(jù)通訊進(jìn)程模型
我們在設(shè)計(jì)達(dá)觀數(shù)據(jù)的消息數(shù)據(jù)處理機(jī)制時(shí),首先充分借鑒了ZeroMQ和ProxyIO的設(shè)計(jì)思想。ZeroMQ提供了一種底層的網(wǎng)絡(luò)通訊框架,提供了基本的RoundRobin負(fù)載均衡算法,性能優(yōu)越,而ProxyIO是雅虎的網(wǎng)絡(luò)通訊中間件,承載了雅虎內(nèi)部大量計(jì)算節(jié)點(diǎn)間的實(shí)時(shí)消息處理。但是ZeroMQ沒有實(shí)現(xiàn)基于節(jié)點(diǎn)健康狀態(tài)的最快響應(yīng)算法,并且ZeroMQ和ProxyIO對節(jié)點(diǎn)的狀態(tài)管理,連接管理,負(fù)載均衡調(diào)度等也需要各應(yīng)用自己來實(shí)現(xiàn)。
達(dá)觀科技在借鑒兩種設(shè)計(jì)思路的基礎(chǔ)上,從進(jìn)程模型、服務(wù)架構(gòu)、線程模型、通訊協(xié)議、負(fù)載均衡、雪崩處理、連接管理、消息流程、狀態(tài)監(jiān)控等各方面進(jìn)行了開拓,開發(fā)了DPIO(達(dá)觀ProxyIO的簡寫,下文統(tǒng)稱DPIO),確保系統(tǒng)高性能處理相關(guān)數(shù)據(jù)。
在DPIO的整個(gè)通訊框架體系中,采用集中管理、統(tǒng)一監(jiān)控策略管理節(jié)點(diǎn)提供服務(wù),節(jié)點(diǎn)間直接進(jìn)行交互,并不依賴統(tǒng)一的管理節(jié)點(diǎn)(桂洪冠)。幾種節(jié)點(diǎn)間通過http或者tcp協(xié)議進(jìn)行消息傳遞、配置更新、狀態(tài)跟蹤等通訊行為。集群將不同應(yīng)用的服務(wù)抽象成組的概念,相同應(yīng)用的服務(wù)啟動(dòng)時(shí)加入的相同的組。每個(gè)通訊組有兩種端點(diǎn)client和server。應(yīng)用啟動(dòng)時(shí)通過配置決定自己是client端點(diǎn)還是server端點(diǎn),在一個(gè)組內(nèi),每個(gè)應(yīng)用只能有一個(gè)身份;不同組沒要求。
- 監(jiān)控節(jié)點(diǎn),顧名思義即提供系統(tǒng)監(jiān)控服務(wù)的,用來給系統(tǒng)管理員查看集群中節(jié)點(diǎn)的服務(wù)狀態(tài)及負(fù)載情況,系統(tǒng)對監(jiān)控節(jié)點(diǎn)并無實(shí)時(shí)性及穩(wěn)定性要求,在本模型中是單點(diǎn)系統(tǒng)。
- 在上圖的架構(gòu)中把管理節(jié)點(diǎn)設(shè)計(jì)成雙master結(jié)構(gòu),參考zookeeper集群管理思路,多個(gè)master通過一定算法分別服務(wù)于集群中一部分節(jié)點(diǎn),相對于另外的服務(wù)節(jié)點(diǎn)則為備份管理節(jié)點(diǎn),他們通過內(nèi)部通訊同步數(shù)據(jù),每個(gè)管理節(jié)點(diǎn)都有一個(gè)web服務(wù)為監(jiān)控節(jié)點(diǎn)提供服務(wù)節(jié)點(diǎn)的狀態(tài)數(shù)據(jù)。
- 服務(wù)節(jié)點(diǎn)即是下文要談的代理服務(wù),根據(jù)服務(wù)對象不同分為應(yīng)用端代理和服務(wù)端代理。集群中的服務(wù)節(jié)點(diǎn)根據(jù)提供服務(wù)的不同分為多個(gè)組,每個(gè)代理啟動(dòng)都需要注冊到相應(yīng)的組中,然后提供服務(wù)。
二、DPIO消息傳遞邏輯架構(gòu)
DPIO服務(wù)節(jié)點(diǎn)內(nèi)/間的通訊及消息傳遞模型見下圖:
- clientHost和serverHost間使用socketa
pi進(jìn)行tcp通訊,相同主機(jī)內(nèi)部的多個(gè)進(jìn)程間使用共享內(nèi)存?zhèn)鬟f消息內(nèi)容,client和clientproxy、server和serverproxy之間通過domain socket進(jìn)行事件通知;在socket連接的一方收到對端的事件通知后,從共享內(nèi)存中獲取消息內(nèi)容。 - clientproxy/serverproxy啟動(dòng)時(shí)綁定到host的一個(gè)端口響應(yīng)應(yīng)用api的連接,在連接到來時(shí)將該api對應(yīng)的共享內(nèi)存初始化,將偏移地址告訴給應(yīng)用。clientproxy和serverproxy中分別維護(hù)了一個(gè)到應(yīng)用api的連接句柄隊(duì)列,并通過io復(fù)用技術(shù)監(jiān)聽這些連接上的讀寫事件。
- serverproxy在啟動(dòng)時(shí)通過socket綁定到服務(wù)器的一個(gè)端口,并以server身份注冊到一個(gè)group監(jiān)聽該端口的連接事件,當(dāng)事件到達(dá)時(shí)回調(diào)注冊的事件處理函數(shù)響應(yīng)事件。
- 在serverproxy內(nèi)部通過不同的thread分別管理從本地應(yīng)用建立的連接和從clientproxy建立的連接。thread的個(gè)數(shù)在啟動(dòng)proxy時(shí)由用戶指定,默認(rèn)是分別1個(gè)。每個(gè)clientproxy啟動(dòng)時(shí)會(huì)以client身份注冊到一個(gè)group,并建立到同組的所有serverproxy的連接,clientproxy內(nèi)部包含了連接的自管理能力及failover的處理(將在下面連接管理部分描述)。 DPIO實(shí)現(xiàn)了負(fù)載均衡,路由選擇和透明代理的功能。
三、線程模型
DPIO的線程模型:
App epoll thread檢測從api來的請求信息,并將請求信息轉(zhuǎn)發(fā)到待處理隊(duì)列中。從已處理隊(duì)列中獲取應(yīng)答包,并將處理結(jié)果轉(zhuǎn)發(fā)給api。
Io epoll thread檢測從遠(yuǎn)端的proxy來的可寫事件,并將請求包轉(zhuǎn)發(fā)到遠(yuǎn)端的proxy。檢測從遠(yuǎn)端的proxy的可讀事件,并將應(yīng)答包放在已處理隊(duì)列中。
Monitor thread檢測DPIO的工作狀態(tài)請求,將DPIO的工作狀態(tài)返回。并將決定Io epoll thread和app epoll thread的負(fù)載均衡(桂洪冠)。
四、通信協(xié)議
- Api與DPIO通信協(xié)議
- 共享內(nèi)存存儲消息格式
字段 | 含義 | 長度 |
protocol len | 協(xié)議包的總長度 | 4bytes |
protocol head len | 協(xié)議頭的長度 | 1byte |
Version_protocol_id | 協(xié)議的版本號和協(xié)議號 | 1byte |
Flag | 消息標(biāo)志,標(biāo)志路由模式,是否記錄來源地址,有二級路由,所以這個(gè)字段一定要
Eg,末位表示要記錄src,倒數(shù)第二位表示按roundrobin路由,倒數(shù)第3位表示按消息頭路由,xxx |
1byte |
Proxy | 來源/目的 proxy | 2bytes |
Api | 來源/目的 api | 2bytes |
ApiTtl | 協(xié)議包的發(fā)送時(shí)間 | 2Bytes |
ClientTtl | 消息存活的時(shí)間,后面添加,增加路由策略,選擇app_server | 2Bytes |
ClientProcessTime | 客戶端處理所用時(shí)間 | 2Bytes |
ServerTtl | 消息存活的時(shí)間,后面添加,增加路由策略,選擇app_client | 2Bytes |
timeout | 協(xié)議包的超時(shí)時(shí)間 | 2 byte |
Sid | 消息序列號 | 4bytes |
protocol body len | Body長度 | 4bytes |
protocol body | 消息體 | Size |
- 請求協(xié)議包
字段 | 含義 | 長度 |
protocol head len | 協(xié)議頭的長度 | 1byte |
Version_protocol_id | 協(xié)議的版本號和協(xié)議號 | 1byte |
Flag | 消息標(biāo)志,標(biāo)志路由模式,是否記錄來源地址,有二級路由,所以這個(gè)字段一定要
Eg,末位表示要記錄src,倒數(shù)第二位表示按roundrobin路由,倒數(shù)第3位表示按消息頭路由,xxx |
1byte |
ApiTtl | 協(xié)議包的發(fā)送時(shí)間 | 2bytes |
Timeout | 協(xié)議包的超時(shí)時(shí)間 | 2bytes |
Api | 來源/目的 api | 2bytes |
Sid | 消息序列號 | 4byte |
Begin_offset | 協(xié)議包的起始偏移 | 4bytes |
len | 協(xié)議包長度 | 4bytes |
- 響應(yīng)協(xié)議包
字段 | 含義 | 長度 |
protocol head len | 協(xié)議頭的長度 | 1byte |
Version_protocol_id | 協(xié)議的版本號和協(xié)議號 | 1byte |
Flag | 消息標(biāo)志,標(biāo)志路由模式,是否記錄來源地址,有二級路由,所以這個(gè)字段一定要
Eg,末位表示要記錄src,倒數(shù)第二位表示按roundrobin路由,倒數(shù)第3位表示按消息頭路由,xxx |
1byte |
Result | 處理結(jié)果 | 1byte |
sid | 消息序列號 | 4bytes |
begin_offset | 協(xié)議包的起始偏移 | 4bytes |
len | 協(xié)議包長度 | 4bytes |
- Proxy與監(jiān)控中心的監(jiān)控信息
- 請求協(xié)議包
字段 | 含義 | 長度 |
protocol len | 協(xié)議包的總長度 | 4bytes |
protocol head len | 協(xié)議頭的長度 | 4bytes |
Version | 協(xié)議的版本號 | 4bytes |
protocol id | 協(xié)議的協(xié)議號 | 4bytess |
status_version | 當(dāng)前狀態(tài)版本 | 4bytes |
Proxy_identify_len | 該proxy標(biāo)識長度 | 4bytess |
Proxy_identify | 該proxy 標(biāo)識 | 4bytes |
protocol body | 消息體 | Size |
- 應(yīng)答包
字段 | 含義 | 長度 |
protocol len | 協(xié)議包的總長度 | 4bytes |
protocol head len | 協(xié)議頭的長度 | 4bytes |
Version | 協(xié)議的版本號 | 4bytes |
protocol id | 協(xié)議的協(xié)議號 | 4bytess |
protocol body len | Body長度 | 4bytes |
protocol body | 消息體 | Size |
五、負(fù)載均衡
DPIO的負(fù)載均衡基于最快響應(yīng)法
DPIO將所有的統(tǒng)計(jì)信息更新到監(jiān)控中心,監(jiān)控中心通過處理所有的節(jié)點(diǎn)的狀態(tài)信息,統(tǒng)一負(fù)責(zé)負(fù)載均衡。
DPIO從監(jiān)控中心獲取所有連接的負(fù)載均衡策略。
每個(gè)連接知道只需知道自己的處理能力。
以上圖為例,有三個(gè)proxy server處理程序。處理能力分別為50、30、20
一次epoll過程能夠同時(shí)探測多個(gè)連接的可寫事件。
假設(shè):三個(gè)proxy server的屬于同一epoll thread,且三個(gè)proxy server假設(shè)都處理能力無限大。
限制:如果剛開始時(shí)待處理隊(duì)列的數(shù)據(jù)包個(gè)數(shù)為100個(gè),多次發(fā)送輪回后proxy server A≥proxy server B≥proxy server C, 每個(gè)發(fā)送的最多發(fā)送協(xié)議包數(shù)為待處理隊(duì)列協(xié)議包個(gè)數(shù) * 該連接所占權(quán)重
六、雪崩處理
大型在線服務(wù),特別是對于時(shí)延敏感的服務(wù),當(dāng)系統(tǒng)外部請求超過系統(tǒng)服務(wù)能力,而沒有適當(dāng)?shù)倪^載保護(hù)措施時(shí),當(dāng)系統(tǒng)累計(jì)的超時(shí)請求達(dá)到一定規(guī)模,將可能導(dǎo)致系統(tǒng)緩沖區(qū)隊(duì)列溢出,后端服務(wù)資源耗盡,最終像雪崩一樣形成惡性循環(huán)。這時(shí)系統(tǒng)處理的每個(gè)請求都因?yàn)槌瑫r(shí)而無效,系統(tǒng)對外呈現(xiàn)的服務(wù)能力為0,且這種情況下不能自動(dòng)恢復(fù)。
我們的解決策略是對協(xié)議包進(jìn)行生命周期管理,現(xiàn)在協(xié)議包進(jìn)出待處理隊(duì)列和已處理隊(duì)列時(shí)進(jìn)行超時(shí)檢測和超時(shí)處理(超時(shí)則丟棄)。
proxy client:
當(dāng)app epoll thread將協(xié)議包放入待處理隊(duì)列時(shí),會(huì)將該協(xié)議包的發(fā)送時(shí)間、該協(xié)議包的超時(shí)時(shí)間,當(dāng)前時(shí)間戳來判斷該協(xié)議包是否已經(jīng)超時(shí)。
當(dāng)app epoll thread將協(xié)議包從已處理隊(duì)列中移除時(shí),會(huì)將該協(xié)議包的發(fā)送時(shí)間、該協(xié)議包的超時(shí)時(shí)間,已經(jīng)當(dāng)前時(shí)間戳來判斷該協(xié)議包是否已經(jīng)超時(shí)。
當(dāng)Io epoll thread將協(xié)議包從待處理隊(duì)列中移除時(shí),會(huì)將該協(xié)議包的發(fā)送時(shí)間、該協(xié)議包的超時(shí)時(shí)間,當(dāng)前時(shí)間戳,該連接的協(xié)議包的平均處理時(shí)間移除。
當(dāng)io epoll thread將協(xié)議包放入已處理隊(duì)列時(shí),會(huì)將將該協(xié)議包的發(fā)送時(shí)間、該協(xié)議包的超時(shí)時(shí)間,已經(jīng)當(dāng)前時(shí)間戳來判斷該協(xié)議包是否已經(jīng)超時(shí)。
proxy server:
當(dāng)App epoll thread將協(xié)議包從待處理隊(duì)列中移除時(shí),會(huì)將該協(xié)議包在客戶端的處理時(shí)間、該協(xié)議包的超時(shí)時(shí)間、該協(xié)議包的proxy server接收時(shí)間戳、當(dāng)前時(shí)間戳來判斷該協(xié)議包是否已超時(shí)。
當(dāng)app epoll thread將協(xié)議包放入已處理隊(duì)列時(shí),會(huì)將該協(xié)議包的發(fā)送時(shí)間、該協(xié)議包的超時(shí)時(shí)間,已經(jīng)當(dāng)前時(shí)間戳來判斷該協(xié)議包是否已經(jīng)超時(shí)。
當(dāng)io epoll thread將協(xié)議包從已處理隊(duì)列中移除時(shí),會(huì)將該協(xié)議包的發(fā)送時(shí)間、該協(xié)議包的超時(shí)時(shí)間,已經(jīng)當(dāng)前時(shí)間戳來判斷該協(xié)議包是否已經(jīng)超時(shí)。
當(dāng)io epoll thread將協(xié)議包放入待處理隊(duì)列時(shí),會(huì)將該協(xié)議包的發(fā)送時(shí)間、該協(xié)議包的超時(shí)時(shí)間來判斷該協(xié)議包是否已超時(shí)。
七、連接管理
紅黑樹:
紅黑樹:保存所有連接的最近的讀/寫時(shí)間戳。
當(dāng)epoll_wait時(shí),首先從紅黑樹中獲取oldest的時(shí)間戳,并將當(dāng)前時(shí)間戳與oldest時(shí)間戳的時(shí)間差作為epoll_wait的超時(shí)時(shí)間,當(dāng)連接中有可讀/寫事件發(fā)送時(shí),首先從紅黑樹中刪除該節(jié)點(diǎn),當(dāng)可讀/寫事件處理完畢后,再將節(jié)點(diǎn)插入到紅黑樹中,當(dāng)處理完所有連接的可讀/寫事件時(shí),再從紅黑樹中依次從移除時(shí)間戳小于當(dāng)前時(shí)間戳的連接,并觸發(fā)該連接的timeout事件。
八、消息處理流程
- apiclient通過調(diào)用api的接口,將消息傳給
- api接受消息體,從共享內(nèi)存中申請內(nèi)存,填寫消息頭size(協(xié)議總長度)、Offset (協(xié)議版本號和協(xié)議號)、Headsize (協(xié)議頭的總長度)、flag(路由策略),ApiTtl (協(xié)議包的發(fā)送時(shí)間)、timeout (協(xié)議包的超時(shí)時(shí)間)、sid(序列號),size(消息體長度)字段,封裝成協(xié)議包,將協(xié)議包寫入共享內(nèi)存。
- api通過socket發(fā)送請求給proxy。
- app epoll thread通過檢測api的可讀事件,接受請求。通過解析請求內(nèi)容,獲取請求協(xié)議包所在的共享內(nèi)存的偏移、請求協(xié)議包的長度和api連接index加入到處理隊(duì)列。
- proxy client的io epoll thread通過檢測對端DPIO連接的可寫事件,從發(fā)送隊(duì)列中獲取請求包,將api的index加入到協(xié)議包的api index字段。
- proxy client的io epoll thread從共享內(nèi)存中讀取協(xié)議包,釋放由請求包中所標(biāo)識的內(nèi)存空間。
- proxy server的io epoll thread通過檢測對端DPIO的可讀事件,接受請求。
- proxy server的io epoll thread從共享內(nèi)存中申請空間,將proxy的index加入到協(xié)議包的proxy index字段。將請求內(nèi)存寫入到申請的空間中。
- proxy server的io epoll thread 將協(xié)議包在共享內(nèi)存的偏移和協(xié)議包的長度加入的待處理隊(duì)列中。
- app epoll thread從待處理隊(duì)列中獲取請求包,將協(xié)議包轉(zhuǎn)發(fā)給相應(yīng)的api進(jìn)行處理。
- api通過檢測DPIO的可讀事件,解析請求內(nèi)容。
- api通過解析請求內(nèi)容,獲取請求協(xié)議包在共享內(nèi)存中的偏移和請求協(xié)議包的長度。從共享內(nèi)存中讀取請求內(nèi)容,并釋放相應(yīng)空間。
- api將請求協(xié)議包返回給應(yīng)用層進(jìn)行處理。
- 應(yīng)用層將應(yīng)答包傳給api。
- Api從共享內(nèi)存中申請空間,將應(yīng)答包寫入到共享內(nèi)存中。
- Api將應(yīng)答包在共享內(nèi)存中的偏移和應(yīng)答包的大小寫入到共享內(nèi)存中。
- App epoll thread通過檢測可讀事件,將應(yīng)答包寫入到已處理隊(duì)列中。
- proxy server的Io epoll thread通過檢測對端的DPIO的可寫事件,將已處理隊(duì)列中獲取應(yīng)答包。
- proxy server的Io epoll thread從共享內(nèi)存中讀取應(yīng)答包。
- Proxy client的Io epoll thread檢測可讀事件,讀取應(yīng)答包。
- Proxy client的Io epoll thread通過解析應(yīng)答包,從共享內(nèi)存中申請空間,將應(yīng)答包寫入到申請的內(nèi)存中。
- Proxy client的Io epoll thread將應(yīng)答包移入到已處理隊(duì)列。
- App epoll thread通過檢測api的可寫事件,將已處理隊(duì)列中獲取應(yīng)答包。
- App epoll thread發(fā)送應(yīng)答包。
- Api通過檢測可讀事件,獲取應(yīng)答包,通過解析應(yīng)到包,獲取應(yīng)答包在共享內(nèi)存中的偏移和應(yīng)到的大小,從共享內(nèi)存中讀取應(yīng)到包。
- Api將應(yīng)答包返回給應(yīng)用端。
九、狀態(tài)監(jiān)控
連接池中存在:當(dāng)前可用連接個(gè)數(shù)
連接池中再分別獲取每個(gè)連接的狀態(tài)
每個(gè)可用連接分別維護(hù)以下信息:
連接處理的數(shù)據(jù)包個(gè)數(shù)、連接send失敗次數(shù)、連接協(xié)議包的平均處理時(shí)間。
連接的連接狀態(tài)(當(dāng)重連失敗達(dá)到一定次數(shù)時(shí),定義為連接失敗)。
連接的重連次數(shù)、連接的超時(shí)次數(shù)。
當(dāng)監(jiān)控線程accept到client的連接時(shí),解析請求內(nèi)容,然后調(diào)用連接池對象的statistics方法,連接池對象首先寫入自己的統(tǒng)計(jì)信息,然后分別調(diào)用每個(gè)連接的statistics方法,每個(gè)連接分別填寫自己的統(tǒng)計(jì)信息。
十、全文總結(jié)
達(dá)觀數(shù)據(jù)在處理大規(guī)模數(shù)據(jù)方面有多年的技術(shù)積累,DPIO是達(dá)觀在處理大數(shù)據(jù)通訊時(shí)的一些經(jīng)驗(yàn),和感興趣的朋友們分享。未來達(dá)觀數(shù)據(jù)將不斷分享更多的技術(shù)經(jīng)驗(yàn),與大家交流與合作。