用 Rust 从零搭一个分布式 KV 存储:Fjall + OpenRaft 实战
"分布式存储听着吓人,其实就是「多台机器假装是一台机器」的艺术。"
有读者留言说 Fjall + Raft 用在分布式系统挺不错。我查了一下,Fjall 是 Rust 原生的 LSM-tree 存储引擎(类似 RocksDB),OpenRaft 是 Databend 团队写的 Raft 共识库。两个 crate 加起来,居然真的能用比较少的代码搭出一个分布式 KV 存储。
这篇文章就记录一下从零探索的过程——不生产代码,先把架构想清楚、把坑位标出来。
先说结论:什么时候需要自己写分布式 KV?
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 配置中心、服务发现 | etcd / Consul | 成熟方案,别重复造轮子 |
| 缓存层 | Redis Cluster | 生态无敌,客户端支持广 |
| 需要持久化 + 强一致的业务存储 | 自己写 | etcd 不适合大数据量,Redis 不保证持久化 |
| 嵌入式 + 分布式(边缘计算、IoT) | 自己写 | 需要轻量、无外部依赖 |
| 学习 Raft 共识原理 | 自己写 | 最好的学习方式就是动手 |
如果你的需求是「一个轻量的、嵌入式的、强一致的分布式 KV」——不想依赖 etcd/Consul 这些外部服务,Fjall + OpenRaft 是一个很自然的选择。
不适合的场景: 需要 SQL 查询、需要极高吞吐(百万级 QPS)、团队没有分布式系统经验。这些场景老老实实用 TiKV 或者直接上云服务。
整体架构:三节点长什么样
先看全局,3 个节点组成一个 Raft 集群:
┌─────────────────────────────────────────────────────┐
│ 客户端 │
│ (写请求只发 Leader) │
└──────────────┬──────────────────┬───────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Node 1 │ │ Node 2 │ │ Node 3 │
│ ┌────────────┐ │ │ ┌────────────┐ │ │ ┌────────────┐ │
│ │ OpenRaft │ │ │ │ OpenRaft │ │ │ │ OpenRaft │ │
│ │ (共识层) │ │ │ │ (Follower)│ │ │ │ (Follower)│ │
│ └─────┬──────┘ │ │ └─────┬──────┘ │ │ └─────┬──────┘ │
│ │ │ │ │ │ │ │ │
│ ┌─────▼──────┐ │ │ ┌─────▼──────┐ │ │ ┌─────▼──────┐ │
│ │ Fjall │ │ │ │ Fjall │ │ │ │ Fjall │ │
│ │ ┌───────┐ │ │ │ │ ┌───────┐ │ │ │ │ ┌───────┐ │ │
│ │ │raft_log│ │ │ │ │ │raft_log│ │ │ │ │ │raft_log│ │ │
│ │ ├───────┤ │ │ │ │ ├───────┤ │ │ │ │ ├───────┤ │ │
│ │ │ sm │ │ │ │ │ │ sm │ │ │ │ │ │ sm │ │ │
│ │ └───────┘ │ │ │ │ └───────┘ │ │ │ │ └───────┘ │ │
│ └────────────┘ │ │ └────────────┘ │ │ └────────────┘ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
每个节点内部有两个 Fjall keyspace:
- raft_log:存 Raft 日志条目(共识层用)
- sm(state machine):存实际的 KV 数据(业务层用)
写请求的流程:
- 客户端把请求发给 Leader
- Leader 把操作写入 Raft log
- Leader 把 log 复制到 Follower 节点
- 多数节点确认后,Leader 把操作应用到 state machine
- 返回结果给客户端
读请求可以走 Leader(保证线性一致性),也可以走 Follower(可能读到旧数据,但性能更好)。
OpenRaft 的三个 Trait:你要实现什么
OpenRaft 的设计很聪明——它把 Raft 协议的「骨架」和「存储/网络」完全解耦了。你需要实现三个 trait:
| Trait | 职责 | 用 Fjall 怎么做 |
|---|---|---|
RaftLogStorage |
存储 Raft 日志条目 | 一个 keyspace,key 是 log index |
RaftStateMachine |
存储业务数据 + 快照 | 一个 keyspace,按需做 snapshot |
RaftNetwork |
节点间通信 | tonic gRPC 或自定义 TCP |
关键洞察: OpenRaft 不关心你用什么存储引擎。你可以用内存、用 RocksDB、用 SQLite、用 Fjall——只要你正确实现这三个 trait。这就是 Rust trait 系统的威力:接口即契约,实现即自由。
实现 RaftLogStorage:用 Fjall 存日志
Raft 日志是一个有序的序列,每个条目有一个递增的 index。Fjall 的 LSM-tree 天然支持有序存储,用 range 查询就能高效遍历。
核心思路:
use fjall::{Keyspace, PersistMode};
use openraft::{LogId, Vote, storage::LogState};
// 日志条目的 key:把 u64 index 转成 big-endian bytes
// 为什么要 big-endian?因为 Fjall 按字节序排列,
// big-endian 保证 1 < 2 < 10 的顺序,而不是 1 < 10 < 2
fn log_key(index: u64) -> [u8; 8] {
index.to_be_bytes()
}
// 存日志
fn append_log(ks: &Keyspace, entry: &Entry) -> Result<()> {
let key = log_key(entry.log_id.index);
let value = bincode::serialize(entry)?;
ks.insert(key, value)?;
Ok(())
}
// 读日志(范围查询)
fn get_logs(ks: &Keyspace, start: u64, end: u64) -> Result<Vec<Entry>> {
let mut entries = Vec::new();
for kv in ks.range(log_key(start)..log_key(end)) {
let (_, value) = kv?;
entries.push(bincode::deserialize(&value)?);
}
Ok(entries)
}坑一:别用字符串做 key
第一次写的时候我用了 format!("{}", index) 做 key,结果 range 查询出来的顺序是 1, 10, 100, 2, 200...——字典序,不是数值序。
解法: 用 u64::to_be_bytes() 转成固定长度的 big-endian 字节。这是 LSM-tree 存储引擎的通用技巧,RocksDB 也是一样的道理。
坑二:Vote 也要持久化
Raft 的 Vote(谁是 Leader、term 多少)必须持久化到磁盘。如果节点重启后丢失了 Vote 信息,可能会出现「脑裂」——两个节点同时认为自己是 Leader。
OpenRaft 会在 RaftLogStorage 里调用 save_vote() 和 read_vote(),你需要把 Vote 也存到 Fjall 里。建议用一个单独的 key(比如 b"__vote__")来存。
坑三:Truncate 日志的时机
Raft 协议要求:如果 Leader 发现某个 Follower 的日志和自己不一致,需要 truncate(截断)Follower 的日志。OpenRaft 会调用 truncate_logs(after) 方法。
Fjall 没有原生的「删除范围」操作,你需要遍历 after.. 的所有 key 然后逐个删除。这里要注意:删除操作也要保证原子性,否则中途崩溃会导致日志不一致。
实现 RaftStateMachine:业务数据 + 快照
State machine 是「真正的数据」——你存在分布式 KV 里的 key-value 对。
// 应用日志条目到 state machine
async fn apply(&mut self, log_entry: &Entry<Request>) -> Result<Response> {
match &log_entry.payload {
EntryPayload::Normal(req) => {
// 这才是真正的业务逻辑
match req {
Request::Set { key, value } => {
self.kv.insert(key, value)?;
Ok(Response { value: Some(value.clone()) })
}
Request::Get { key } => {
let val = self.kv.get(key)?;
Ok(Response { value: val })
}
Request::Delete { key } => {
self.kv.remove(key)?;
Ok(Response { value: None })
}
}
}
EntryPayload::Blank => Ok(Response { value: None }),
EntryPayload::Membership(_) => Ok(Response { value: None }),
}
}坑四:快照(Snapshot)怎么做
Raft 需要定期做快照——把 state machine 的当前状态序列化保存,然后删掉快照之前的日志。这样日志不会无限增长。
用 Fjall 做快照有两个方案:
| 方案 | 做法 | 优缺点 |
|---|---|---|
| 全量导出 | 遍历整个 keyspace,序列化到文件 | 简单,但数据量大时慢 |
| Fjall 原生快照 | 利用 Fjall 的 MVCC snapshot | 快,但需要理解 Fjall 的 snapshot 语义 |
我建议先用全量导出,等遇到性能瓶颈再优化。先让它跑起来,再让它跑得快。
坑五:Compaction 和 Raft Log Purge 的配合
Fjall 有自己的 compaction 机制(LSM-tree 后台合并),OpenRaft 也有 log purge(删旧日志)。两者是独立的,你需要自己协调:
- OpenRaft 调用
purge_log(before)时,你删掉 Fjall 里before之前的日志 - Fjall 的 compaction 会回收这些删除操作产生的空间
- 别忘了调用
db.persist(PersistMode::SyncAll)确保数据落盘
实现 RaftNetwork:节点间怎么通信
OpenRaft 需要你实现 RaftNetwork trait,提供三个方法:
| 方法 | 作用 |
|---|---|
send_append_entries() |
Leader → Follower:复制日志 |
send_install_snapshot() |
Leader → Follower:同步快照 |
send_vote() |
Candidate → 其他节点:请求投票 |
通信层有几种选择:
| 方案 | 库 | 优缺点 |
|---|---|---|
| gRPC | tonic | 生态好,protobuf 序列化效率高,但引入较重 |
| 自定义 TCP | tokio::net | 轻量可控,但要自己处理协议 |
| QUIC | quinn | 之前网络编程文章讲过,连接迁移是杀手锏 |
如果你已经熟悉 tonic(微服务文章里用过),直接用 gRPC 是最快的路径。
坑六:超时和重试
Raft 的心跳超时和选举超时是核心参数。太短会导致频繁选举(集群不稳定),太长会导致故障恢复慢。
OpenRaft 的默认配置:
Config {
heartbeat_interval: 1000, // 心跳间隔 1s
election_timeout_min: 3000, // 选举超时下限 3s
election_timeout_max: 5000, // 选举超时上限 5s
..Default::default()
}生产建议: 先用默认值跑起来,然后根据你的网络延迟调整。跨机房部署的话,这些值都需要调大。
客户端接入:写请求怎么找到 Leader?
一个绕不开的问题:客户端怎么知道谁是 Leader?
Client → Node 2 (Follower)
"写 key=foo, value=bar"
→ 拒绝,我不是 Leader
→ 告诉客户端:Leader 是 Node 1
Client → Node 1 (Leader)
"写 key=foo, value=bar"
→ 写入成功
OpenRaft 提供了 Raft::ensure_linearizable() 方法来做线性一致性读。对于写请求,如果不是 Leader 会返回错误,错误里包含 Leader 信息。
坑七:Leader 切换期间的请求
Leader 挂了或者网络分区的时候,客户端会收到短暂的错误。你的客户端需要:
- 缓存当前 Leader 信息
- 写失败时,重试并更新 Leader
- 设置合理的超时(别无限等待)
async fn write_with_retry(&self, key: String, value: Vec<u8>) -> Result<()> {
let mut leader = self.current_leader.read().await;
for attempt in 0..3 {
match self.send_write(*leader, &key, &value).await {
Ok(resp) => return Ok(resp),
Err(RaftError::ForwardToLeader { leader_id: Some(new_leader), .. }) => {
leader = new_leader;
*self.current_leader.write().await = new_leader;
// 重试
}
Err(e) => return Err(e),
}
}
Err(anyhow!("max retries exceeded"))
}测试:怎么验证它真的能用
分布式系统最怕的不是功能 bug,而是时序 bug——正常情况下没问题,特定时序下才出错。
基础测试:三节点启动
#[tokio::test]
async fn test_basic_cluster() {
// 启动 3 个节点
let nodes = start_cluster(3).await;
// 写入数据
nodes[0].write("key1", "value1").await.unwrap();
// 等待复制
tokio::time::sleep(Duration::from_millis(500)).await;
// 从任意节点读取
let val = nodes[2].read("key1").await.unwrap();
assert_eq!(val, Some("value1".into()));
}进阶测试:模拟节点宕机
#[tokio::test]
async fn test_leader_failure() {
let nodes = start_cluster(3).await;
// 写入一些数据
nodes[0].write("key1", "value1").await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
// 杀掉 Leader
let leader = nodes[0].raft.current_leader().await.unwrap();
nodes[leader as usize].shutdown().await;
// 等待新 Leader 选举
tokio::time::sleep(Duration::from_secs(3)).await;
// 集群应该仍然可用(2/3 节点存活)
let new_leader = find_leader(&nodes).await;
nodes[new_leader].write("key2", "value2").await.unwrap();
}高级测试:网络分区
网络分区测试比较难模拟。可以用 iptables 或者在应用层用 proxy 来模拟。OpenRaft 自带了一些 chaos testing 的工具,但文档里说还没完成——这是个已知的坑。
性能和调优:从哪里下手
先看 OpenRaft 官方的基准测试数据:
| 场景 | 吞吐量 |
|---|---|
| 单写(1 client) | ~33,000 put/s |
| 64 并发 | ~912,000 put/s |
| 4096 并发 | ~3,548,000 put/s |
| Batch 写(4 entries/batch, 4096 并发) | ~5,615,000 put/s |
注意:这是 OpenRaft 框架本身的测试,用的是内存存储。 实际用 Fjall 的时候,性能瓶颈会在磁盘 I/O 上。
调优方向
| 方向 | 具体操作 |
|---|---|
| Batch 写 | 把多个写操作打包成一个 Raft log entry,减少共识轮次 |
| 并行复制 | OpenRaft 默认并行复制日志到各 Follower,确认 max_inflight_log 参数 |
| Fjall Memtable 大小 | 增大 max_memtable_size 减少 flush 频率 |
| Block Cache | 配置为可用内存的 20-25%,Fjall 官方推荐 |
| 压缩算法 | 默认 LZ4 已经很好,CPU 敏感场景可以关闭压缩 |
| 持久化策略 | PersistMode::SyncAll 最安全但最慢,PersistMode::SyncData 折中 |
读性能优化
如果不需要线性一致性读,可以让 Follower 直接响应读请求。这叫「ReadIndex」优化,OpenRaft 支持 ensure_linearizable() 来保证一致性读,不调用它就是 follower read。
和 etcd/TiKV 的对比
| 维度 | Fjall + OpenRaft | etcd | TiKV |
|---|---|---|---|
| 语言 | Rust | Go | Rust |
| 存储引擎 | LSM-tree (Fjall) | bbolt (B+ tree) | RocksDB (LSM-tree) |
| 共识协议 | Raft (OpenRaft) | Raft | Raft (自研) |
| 部署方式 | 嵌入式 | 独立服务 | 独立服务 |
| 数据量上限 | 取决于磁盘 | 几 GB 内最佳 | TB 级 |
| 学习成本 | 中等 | 低(有 etcdctl) | 高(组件多) |
| 适合场景 | 嵌入式/边缘 | 配置中心 | 大规模存储 |
你的定位: 如果你需要一个「嵌入到自己应用里的分布式 KV」,不想单独部署 etcd 集群,Fjall + OpenRaft 是一个很好的选择。
下一步
这篇文章先把架构和坑位理清楚了。如果你真的想动手写,建议这个顺序:
- 先跑通 OpenRaft 的官方 example(raft-kv-memstore),理解整体流程
- 把 memstore 替换成 Fjall,实现
RaftLogStorage和RaftStateMachine - 加上 tonic gRPC 做网络层
- 写测试,特别是 Leader 切换和日志截断的场景
- 做 benchmark,看看 Fjall 在你的场景下性能如何
记住:先让它跑起来,再让它跑得快。 分布式系统最大的坑不是性能,是一致性——你的代码在正常情况下都能跑,但只有在节点宕机、网络分区、日志截断这些极端情况下才会暴露问题。
Rust 的所有权系统在这个场景下特别有用。它会在编译期就阻止你做那些「在并发下不安全」的操作——比如同时读写同一个 Fjall keyspace、在异步上下文里持有不安全的引用。编译器挡掉的 bug,比你在测试里发现的多得多。
参考资料
- Fjall GitHub — LSM-tree 嵌入式存储引擎
- Fjall docs.rs — API 文档
- OpenRaft GitHub — Raft 共识协议实现
- OpenRaft Guide — 官方入门指南
- OpenRaft Examples — 官方示例代码
- Raft 论文 — 理论基础,强烈建议读一遍