OpenRaft 在交易撮合引擎中的應(yīng)用
前言
由于工作需要,一直對原子多播應(yīng)用有非常濃厚的興趣。通過一段時間的技術(shù)選型。我們非常幸運的得到了?Openraft[1]?實操分享?Databend[2]?社區(qū)的熱心支持。我也想通過我們的實際工作,對?Openraft?的未來應(yīng)用盡一些微薄之力。Openraft?是一個 Raft 的改進版(包括優(yōu)化選舉沖突[3], 解決網(wǎng)絡(luò)抖動對?leadership?的影響),?它在?Databend[4]?中為 db, table 等元數(shù)據(jù)提供分布式存儲和強一致讀寫, 為databend 的云端數(shù)據(jù)庫集群提供事務(wù)性保證.我的實踐的上一篇文章反應(yīng)了我們的選型過程,有興趣的人可以看一下。Raft in Rust (原子多播+撮合引擎)[5]?這篇文章更多的是想說明我們在使用 OpenRaft 的實際問題,并且通過我們的實現(xiàn),揭秘 OpenRaft 的一些機制。
代碼倉庫
大家在使用 OpenRaft 的時候,我相信很多人都查看了手冊:Getting Started - openraftThe openraft user guide.[6]
當(dāng)然,這是一個非常優(yōu)秀的手冊。我們從這個手冊里,會學(xué)習(xí)到如何使用 OpenRaft 實現(xiàn)自己的應(yīng)用。而且,openraft/example-raft-kv[7]?這個例子確實能夠很好的說明如何實現(xiàn)一個簡單的應(yīng)用。但是,這個例子是使用的內(nèi)存來做持久化實現(xiàn)。當(dāng)然內(nèi)存不會真正做持久化,所以很容易在節(jié)點退出后,丟失狀態(tài)。而我們真正需要的示例是一個能夠持久化的例子。
另外一個實例就是?databend/metasrv[8]?而這個示例里面,我們可以看到一個完整的 metadata 存儲服務(wù)的實現(xiàn)。實際上,metasrv 本身是一個類似于 etcd 的kv存儲。存儲整個 databend 集群的 metadata。這個示例里面,metasrv 使用了 sled 作為底層存儲。sled 既存儲 log,也存儲 statemachine 的狀態(tài)。這個例子,statemachine 所有的更新都直接在 sled 存儲里通過 sled 的事物完成了。所以,對于如何存儲 snapshot 這個問題,我們并不太容易看清楚。所以 snapshot 的產(chǎn)生和傳遞主要是在節(jié)點間同步的時候使用。
這里,大家可以看到我們開放的源代碼。雖然這個示例是基于 example-raft-kv 示例,沒有達到 metasrv 的生產(chǎn)強度。但是我們還是非常全面的表現(xiàn)出了 openraft 對 log, snapshot 處理的行為和能力。
GitHub - raymondshe/matchengine-raft[9]
應(yīng)用場景
和 metasrv 的場景不同。我們需要我們的 statemachine 盡量在內(nèi)存里面更新,盡量少落盤。雖然 sled 本地落盤的速度也很快,但是內(nèi)存操作的速度會更快。所以,我們基本上就是這樣進行操作的。

總體設(shè)計圖[10]
所以在這個圖里面,大家可以看到日志是通過 sled 進行存儲的。而這些日志由于通過 Raft 協(xié)議,實際上他們在每臺機器上的順序是一致的。所以,不同的 matchengine-raft 實例,在相同的日志流情況下,對狀態(tài)機的操作就是一致的。所以,不管我們從哪一個日志開始寫 snapshot,通過加載 snapshot 并且回放后續(xù)的日志,我們都可以恢復(fù)到最新狀態(tài)。
按照設(shè)計圖中顯示,當(dāng)前 StateMachine 的狀態(tài)是處理了第 9 個日志里的消息。這時候,系統(tǒng)保存了所有的消息到 sled。并且在第 3 個消息的時候落盤了一次 snapshot,并且在低 6 個消息的時候落盤了一次 snapshot。如果這臺機器當(dāng)機,我們是可以從編號為 3 的 snapshot 恢復(fù)狀態(tài)機,并且繼續(xù)處理 3,4,5,6,7,8,9 這 6 條消息來恢復(fù)當(dāng)前狀態(tài)。當(dāng)然,我們也可以從編號為 6 的 snapshot 恢復(fù)狀態(tài)機,并且繼續(xù)處理 7,8,9 這 3 條消息來恢復(fù)當(dāng)前狀態(tài)。
當(dāng)然我們可以選擇多少個消息進行一次落盤。當(dāng)然落盤的次數(shù)越多越可靠,但是性能影響比較大。好在 snapshot 的生成和落盤是異步的方式做的。
有興趣的朋友可以看一下 akka 的?EventSroucing[11]??模式。這種模式和 Raft 單節(jié)點非常相像。不同的是 OpenRaft 強調(diào)多實例一致性,而 Akka 則提供了非常多的方式來存儲 Log(Journal) 和 Snapshot。
實現(xiàn)細(xì)節(jié)
談到實現(xiàn)細(xì)節(jié)。我們還是回到官方文檔?geting-started[12]?來。我們也按照這個文檔的順序進行說明。
Raft 對于從應(yīng)用開發(fā)著的角度,我們可以簡化到下面的這張圖里。Raft 的分布式共識就是要保證驅(qū)動狀態(tài)機的指令能夠在 Log 里被一致的復(fù)制到各個節(jié)點里。

Raft 有兩個重要的組成部分:
如何一致的在節(jié)點之間復(fù)制日志
并且在狀態(tài)機里面如何消費這些日志
基于 OpenRaft 實現(xiàn)我們自己的 Raft 應(yīng)用其實并不復(fù)雜,只需要以下三部分:
定義好客戶端的請求和應(yīng)答
實現(xiàn)好存儲 RaftStore 來持久化狀態(tài)
實現(xiàn)一個網(wǎng)絡(luò)層,來保證 Raft 節(jié)點之間能相互傳遞消息。
好,那我們就開始吧:
1.定義好客戶端的請求和應(yīng)答
請求有可能是客戶端發(fā)出的驅(qū)動 Raft 狀態(tài)機的數(shù)據(jù),而應(yīng)答則是 Raft 狀態(tài)機會打給客戶端的數(shù)據(jù)。請求和應(yīng)答需要實現(xiàn) AppData 和 AppDataResponse 這里,我們的實現(xiàn)如下:
這兩個類型完全是應(yīng)用相關(guān)的,而且和 RaftStrage 的狀態(tài)機實現(xiàn)相關(guān)。
這里,Set 是 example-raft-kv 原示例就有的命令。
大家也注意到了,命令都是對狀態(tài)機有驅(qū)動的命令。也就是會對狀態(tài)機狀態(tài)造成改變的命令。如果我們需要取狀態(tài)機內(nèi)部數(shù)據(jù)的值返回給客戶端。我們大可不必定義到這里。
2.實現(xiàn) RaftStorage
這是整個項目非常關(guān)鍵的一部分。只要實現(xiàn)好?trait RaftStorage
,我們就把數(shù)據(jù)存儲和消費的行為定義好。RaftStoreage
?可以包裝一個像?RocksDB[13],Sled[14]?的本地 KV 存儲或者遠(yuǎn)端的 SQL DB。RaftStorage
?定義了下面一些 API
讀寫 Raft 狀態(tài),比方說 term,vote(term:任期,vote:投票結(jié)果)
讀寫日志
將日志的內(nèi)容應(yīng)用到狀態(tài)機
創(chuàng)建和安裝快照(snapshot)
在?ExampleStore[15], 這些內(nèi)存化存儲行為是非常明確簡單的。而我們不是要真正落盤了嗎?那我們就看一下 matchengine-rust 是怎么實現(xiàn)的。
這里是?matchengine-raft/src/store[16]

我們說明一些設(shè)計要點
ExampleStore 的數(shù)據(jù):
ExchangeStore
?里面主要是包含下面的成員變量。
幫助我們落盤的成員主要是 log, vote。而需要產(chǎn)生 snapshot 進行落盤的所有內(nèi)容都在 state_machine.
1.last_purged_log_id
:這是最后刪除的日志 ID。刪除日志本身可以節(jié)約存儲,但是,對我們來講,我了保證數(shù)據(jù)存儲的安全。在刪除日志之前,我們必須有這條日志 index 大的 snapshot 產(chǎn)生。否則,我們就沒有辦法通過 snapshot 來恢復(fù)數(shù)據(jù)。
2.log
:這是一個 sled::Tree,也就是一個 map。如果看著部分代碼的話,我們就可以清楚的明白 log 對象的結(jié)構(gòu)。key 是一個?log_id_index?
的 Big Endian 的字節(jié)片段。value 是通過 serd_json 進行序列化的內(nèi)容
3.state_machine:這里就是通過日志驅(qū)動的所有狀態(tài)的集合
StateMachine
?里面最重要的數(shù)據(jù)就是 orderbook 這部分就是撮合引擎里面重要的訂單表。存放買方和賣方的未成交訂單信息。這是主要的業(yè)務(wù)邏輯。data 這部分是原來例子中的 kv 存儲。我們還在這里沒有刪除。
這里?last_applied_log
,?last_menbership
?這些狀態(tài)和業(yè)務(wù)邏輯沒有太大關(guān)系。所以,如果您要實現(xiàn)自己的 StateMachine。還是盡量和例子保持一致。主要是因為這兩個狀態(tài)是通過?apply_to_state_machine()?
這個接口更新。也正好需要持久化。如果需要進一步隱藏 Raft 的細(xì)節(jié),我們還是建議 openraft 能將這兩個狀態(tài)進一步進行隱藏封裝。
對?state_machine?
的落盤操作主要集中在這里:store/store.rs[17]。有興趣的可以看一下。這里面比較有意思的問題是?orderbook?
本身無法被默認(rèn)的serde_json?
序列化/反序列化。所以我們才在?matchengine/mod.rs[18]?加了這段代碼:
4.vote
:就是對最后一次?vote?
的存儲。具體請看, 這段代碼倒不是因為這段代碼有多重要,只是由于代碼比較簡單,看可以少寫一些說明
但是這兒確實有個小坑,之前我沒有注意到?vote?
需要持久化,開始調(diào)試的時候產(chǎn)生了很多問題。直到找到 Openraft 作者?Zhang Yanpo[19]?才解決。也是觸發(fā)我想開源這個 openraft 文件持久化實現(xiàn)的誘因吧。感謝 Zhang Yanpo,好樣的。
5.其他的成員變量其實沒什么太好說的了。和原例子一樣。
對日志和快照的控制:
日志,快照相互配合,我們可以很好的持久化狀態(tài),并且恢復(fù)最新狀態(tài)。多久寫一次快照,保存多少日志。在這里我們使用了下面的代碼。
強烈建議大家看一下?Config in openraft::config - Rust[20]
重點看?snapshot_policy
, 代碼里可以清楚的標(biāo)識,我們需要 500 次 log 寫一次快照。也就是 openraft 會調(diào)用?build_snapshot()
?函數(shù)創(chuàng)建 snapshot。原示例里,snapshot 只是在內(nèi)存里保存在?current_snapshot
?變量里。而我們需要真實的落盤。請注意這段代碼的?self.write_snapshot()
這下我們有了 snapshot,當(dāng)然 snapshot 一方面可以用來在節(jié)點之間同步狀態(tài)。另一方面就是在啟動的時候恢復(fù)狀態(tài)。而 openraft 的實現(xiàn)非常好。實際上恢復(fù)狀態(tài)只需要回復(fù)到最新的 snapshot 就行。只要本地日志完備,openraft 會幫助你調(diào)用?apply_to_statemachine()
?來恢復(fù)到最新狀態(tài)。所以我們就有了?restore()
?函數(shù)。#[async_trait]
大家注意一下 snapshot 的操作。當(dāng)然,在這里,我們也恢復(fù)了 last_purged_log_id。當(dāng)然 store 這個函數(shù)會在 ExampleStore 剛剛構(gòu)建的時候調(diào)用。
如何確定 RaftStorage 是對的:
請查閱?Test suite for RaftStorage[21]?如果通過這個測試,一般來講, OpenRaft 就可以使用他了。
RaftStorage 的競爭狀態(tài):
在我們的設(shè)計里,在一個時刻,最多有一個線程會寫狀態(tài),但是,會有多個線程來進行讀取。比方說,可能有多個復(fù)制任務(wù)在同時度日志和存儲。
實現(xiàn)必須保證數(shù)據(jù)持久性:
調(diào)用者會假設(shè)所有的寫操作都被持久化了。而且 Raft 的糾錯機制也是依賴于可靠的存儲。
實現(xiàn) RaftNetwork
為了節(jié)點之間對日志能夠有共識,我們需要能夠讓節(jié)點之間進行通訊。trait?RaftNetwork?
就定義了數(shù)據(jù)傳輸?shù)男枨蟆?code>RaftNetwork?的實現(xiàn)可以是考慮調(diào)用遠(yuǎn)端的 Raft 節(jié)點的服務(wù)
ExampleNetwork[22]??顯示了如何調(diào)用傳輸消息。每一個 Raft 節(jié)點都應(yīng)該提供有這樣一個 RPC 服務(wù)。當(dāng)節(jié)點收到 raft rpc,服務(wù)會把請求傳遞給 raft 實例,并且通過?raft-server-endpoint[23]?返回應(yīng)答。
在實際情況下可能使用??Tonic gRPC[24]?是一個更好的選擇。 Databend-meta[25]?里有一個非常好的參考實現(xiàn)。
在我們的 matchengen-raft 實現(xiàn)里,我們解決了原示例中大量重連的問題。
1.維護一個可服用量的 client
這段代碼在:network/raft_network_impl.rs
2.在服務(wù)器端引入keep_alive
這段代碼在:lib.rs[26]
這樣的改動確實是對性能有一些提升。但是真的需要更快的話,我們使用 grpc,甚至使用 reliable multicast,比方說?pgm。啟動集群
啟動集群
由于我們保留了之前的 key/value 實現(xiàn)。所以之前的腳本應(yīng)該還是能夠工作的。而且之前的 key/value 有了真正的存儲。為了能夠運行集群:
啟動三個沒有初始化的 raft 節(jié)點;
初始化其中一臺 raft 節(jié)點;
把其他的 raft 節(jié)點加入到這個集群里;
更新 raft 成員配置。?example-raft-kv[27]?的 readme 文檔里面把這些步驟都介紹的比較清楚了。
下面兩個測試腳本是非常有用的:test-cluster.sh[28]?這個腳本可以簡練的掩飾如何用 curl 和 raft 集群進行交互。在腳本里,您可以看到所有 http 請求和應(yīng)答。
test_cluster.rs[29]?這個 rust 程序顯示了怎么使用 ExampleClient 操作集群,發(fā)送請求和接受應(yīng)答。這里我們要強調(diào)的是,在初始化 raft 集群的時候。我們需要上述的過程。如果集群已經(jīng)被初始化,并且我們已經(jīng)持久化了相應(yīng)的狀態(tài) (menbership, vote, log) 以后,再某些節(jié)點退出并且重新加入,我們就不需要再過多干預(yù)了。在使用 metasrv 啟動 meta service 的時候,我也遇到了相同的情況。所以還是要先啟動一個single node 以保證這個節(jié)點作為種子節(jié)點被合理初始化了。Deploy a Databend Meta Service Cluster | Databend[30]為了更好的啟動管理集群,我們在項目里添加了?test.sh[31]。用法如下:
我們可以在不同階段調(diào)用不同的命令。大家有興趣的話可以看一下代碼。這部分是主程序部分,包含了我們實現(xiàn)的所有命令。
未來的工作
當(dāng)前我們實現(xiàn)的 matchengine-raft 只是為了示例怎么通過 raft 應(yīng)用到撮合引擎這樣一個對性能,穩(wěn)定性,高可用要求都非??量痰膽?yīng)用場景。通過 raft 集群來完成撮合引擎的分布式管理。我們相信真正把這個玩具撮合引擎推向產(chǎn)品環(huán)境,我們還是需要進行很多工作:
1.優(yōu)化序列化方案,serd_json
?固然好,但是通過字符串進行編解碼還是差點兒意思。至少用到 bson 或者更好的用 protobuf, avro 等,提高編解碼速度,傳輸和存儲的開銷。
2.優(yōu)化 RaftNetwork, 在可以使用 multi-cast 的情況下使用 pgm,如果不行,可以使用 grpc。
3.撮合結(jié)果的分發(fā)。這部分在很多情況下依賴消息隊列中間件比較好。
4.增加更多的撮合算法。這部分完全是業(yè)務(wù)需求,和 openraft 無關(guān)。我們就不在這個文章里討論了。
5.完善測試和客戶端的調(diào)用。
6.完善壓測程序,準(zhǔn)備進一步調(diào)優(yōu)。
結(jié)論
通過這個簡單的小項目,我們:
1.實現(xiàn)了一個簡單的玩具撮合引擎。
2.驗證了 OpenRaft 在功能上對撮合引擎場景的支持能力。
3.給 OpenRaft 提供了一個基于?sled KV 存儲的日志存儲的參考實現(xiàn)。
4.給 OpenRaft 提供了一個基于本地文件的快照存儲的參考實現(xiàn)。
給大家透露一個小秘密,SAP也在使用?OpenRaft[32]?來構(gòu)建關(guān)鍵應(yīng)用。大家想想,都用到 Raft 協(xié)議了,一定是非常重要的應(yīng)用。
對于 Databend?社區(qū)的幫助,我表示由衷的感謝。 作為一個長期工作在軟件行業(yè)一線的老程序猿,看到中國開源軟件開始在基礎(chǔ)構(gòu)建發(fā)力,由衷的感到欣慰。也希望中國開源社群越來越好,越來越強大,走向軟件行業(yè)的頂端。
作者信息:沈勇 Decisive Density ?CTO引用鏈接
引用鏈接
[1]
?Openraft:?https://github.com/datafuselabs/openraft[2]
?Databend:?https://github.com/datafuselabs/databend[3]
?優(yōu)化選舉沖突:?https://datafuselabs.github.io/openraft/vote.html[4]
?Databend:?https://github.com/datafuselabs/databend[5]
?Raft in Rust (原子多播+撮合引擎):?http://t.csdn.cn/jcOnv[6]
?Getting Started - openraftThe openraft user guide.:?https://datafuselabs.github.io/openraft/getting-started.html[7]
?openraft/example-raft-kv:?https://github.com/datafuselabs/openraft/tree/main/examples/raft-kv-memstore[8]
?databend/metasrv:?https://github.com/datafuselabs/databend/tree/main/metasrv[9]
?GitHub - raymondshe/matchengine-raft:?https://github.com/raymondshe/matchengine-raft[10]
?總體設(shè)計圖:?https://excalidraw.com/#json=kSzwFGNGr_WNjytPO65RN,CjvsM4m_3efHnSIGK37Sow[11]
?EventSroucing:?https://doc.akka.io/docs/akka/current/typed/index-persistence.html[12]
?geting-started:?https://datafuselabs.github.io/openraft/getting-started.html[13]
?RocksDB:?https://docs.rs/rocksdb/latest/rocksdb/[14]
?Sled:?https://github.com/spacejam/sled[15]
?ExampleStore:?https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/src/store/mod.rs[16]
?matchengine-raft/src/store:?https://github.com/raymondshe/matchengine-raft/tree/master/src/store[17]
?store/store.rs:?https://github.com/raymondshe/matchengine-raft/blob/master/src/store/store.rs[18]
?matchengine/mod.rs:?https://github.com/raymondshe/matchengine-raft/blob/master/src/matchengine/mod.rs[19]
?Zhang Yanpo:?https://github.com/drmingdrmer[20]
?Config in openraft::config - Rust:?https://docs.rs/openraft/latest/openraft/config/struct.Config.html[21]
?Test suite for RaftStorage:?https://github.com/datafuselabs/openraft/blob/main/memstore/src/test.rs[22]
?ExampleNetwork:?https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/src/network/raft_network_impl.rs[23]
?raft-server-endpoint:?https://github.com/datafuselabs/openraft/blob/main/example-raft-kv/src/network/raft.rs[24]
?Tonic gRPC:?https://github.com/hyperium/tonic[25]
?databend-meta:?#L89[26]
?lib.rs:?https://github.com/raymondshe/matchengine-raft/blob/master/src/lib.rs[27]
?example-raft-kv:?https://github.com/datafuselabs/openraft/tree/main/examples/raft-kv-memstore[28]
?test-cluster.sh:?https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/test-cluster.sh[29]
?test_cluster.rs:?https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/tests/cluster/test_cluster.rs[30]
?Deploy a Databend Meta Service Cluster | Databend:?https://databend.rs/doc/manage/metasrv/metasrv-deploy[31]
?test.sh:?https://github.com/raymondshe/matchengine-raft/blob/master/test.sh[32]
?OpenRaft:?https://github.com/datafuselabs/openraft
關(guān)于 Databend
關(guān)于 DatabendDatabend 是一款開源、彈性、低成本,基于對象存儲也可以做實時分析的新式數(shù)倉。期待您的關(guān)注,一起探索云原生數(shù)倉解決方案,打造新一代開源 Data Cloud。
Databend 文檔:https://databend.rs/
Twitter:https://twitter.com/Datafuse_Labs
Slack:https://datafusecloud.slack.com/
Wechat:Databend
GitHub :https://github.com/datafuselabs/databend
