返回文章列表

用 Rust 从零搭一个分布式 KV 存储:Fjall + OpenRaft 实战

941·6 分钟阅读
Rust分布式系统OpenRaftFjallKV 存储

"分布式存储听着吓人,其实就是「多台机器假装是一台机器」的艺术。"

有读者留言说 Fjall + Raft 用在分布式系统挺不错。我查了一下,Fjall 是 Rust 原生的 LSM-tree 存储引擎(类似 RocksDB),OpenRaft 是 Databend 团队写的 Raft 共识库。两个 crate 加起来,居然真的能用比较少的代码搭出一个分布式 KV 存储。

这篇文章就记录一下从零探索的过程——不生产代码,先把架构想清楚、把坑位标出来。


先说结论:什么时候需要自己写分布式 KV?

场景 推荐方案 理由
配置中心、服务发现 etcd / Consul 成熟方案,别重复造轮子
缓存层 Redis Cluster 生态无敌,客户端支持广
需要持久化 + 强一致的业务存储 自己写 etcd 不适合大数据量,Redis 不保证持久化
嵌入式 + 分布式(边缘计算、IoT) 自己写 需要轻量、无外部依赖
学习 Raft 共识原理 自己写 最好的学习方式就是动手

如果你的需求是「一个轻量的、嵌入式的、强一致的分布式 KV」——不想依赖 etcd/Consul 这些外部服务,Fjall + OpenRaft 是一个很自然的选择。

不适合的场景: 需要 SQL 查询、需要极高吞吐(百万级 QPS)、团队没有分布式系统经验。这些场景老老实实用 TiKV 或者直接上云服务。


整体架构:三节点长什么样

先看全局,3 个节点组成一个 Raft 集群:

┌─────────────────────────────────────────────────────┐
│                    客户端                             │
│              (写请求只发 Leader)                      │
└──────────────┬──────────────────┬───────────────────┘
               │                  │
               ▼                  ▼
┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐
│    Node 1        │  │    Node 2        │  │    Node 3        │
│  ┌────────────┐  │  │  ┌────────────┐  │  │  ┌────────────┐  │
│  │  OpenRaft  │  │  │  │  OpenRaft  │  │  │  │  OpenRaft  │  │
│  │  (共识层)   │  │  │  │  (Follower)│  │  │  │  (Follower)│  │
│  └─────┬──────┘  │  │  └─────┬──────┘  │  │  └─────┬──────┘  │
│        │         │  │        │         │  │        │         │
│  ┌─────▼──────┐  │  │  ┌─────▼──────┐  │  │  ┌─────▼──────┐  │
│  │   Fjall    │  │  │  │   Fjall    │  │  │  │   Fjall    │  │
│  │  ┌───────┐ │  │  │  │  ┌───────┐ │  │  │  │  ┌───────┐ │  │
│  │  │raft_log│ │  │  │  │  │raft_log│ │  │  │  │  │raft_log│ │  │
│  │  ├───────┤ │  │  │  │  ├───────┤ │  │  │  │  ├───────┤ │  │
│  │  │ sm    │ │  │  │  │  │ sm    │ │  │  │  │  │ sm    │ │  │
│  │  └───────┘ │  │  │  │  └───────┘ │  │  │  │  └───────┘ │  │
│  └────────────┘  │  │  └────────────┘  │  │  └────────────┘  │
└──────────────────┘  └──────────────────┘  └──────────────────┘

每个节点内部有两个 Fjall keyspace:

  • raft_log:存 Raft 日志条目(共识层用)
  • sm(state machine):存实际的 KV 数据(业务层用)

写请求的流程:

  1. 客户端把请求发给 Leader
  2. Leader 把操作写入 Raft log
  3. Leader 把 log 复制到 Follower 节点
  4. 多数节点确认后,Leader 把操作应用到 state machine
  5. 返回结果给客户端

读请求可以走 Leader(保证线性一致性),也可以走 Follower(可能读到旧数据,但性能更好)。


OpenRaft 的三个 Trait:你要实现什么

OpenRaft 的设计很聪明——它把 Raft 协议的「骨架」和「存储/网络」完全解耦了。你需要实现三个 trait:

Trait 职责 用 Fjall 怎么做
RaftLogStorage 存储 Raft 日志条目 一个 keyspace,key 是 log index
RaftStateMachine 存储业务数据 + 快照 一个 keyspace,按需做 snapshot
RaftNetwork 节点间通信 tonic gRPC 或自定义 TCP

关键洞察: OpenRaft 不关心你用什么存储引擎。你可以用内存、用 RocksDB、用 SQLite、用 Fjall——只要你正确实现这三个 trait。这就是 Rust trait 系统的威力:接口即契约,实现即自由。


实现 RaftLogStorage:用 Fjall 存日志

Raft 日志是一个有序的序列,每个条目有一个递增的 index。Fjall 的 LSM-tree 天然支持有序存储,用 range 查询就能高效遍历。

核心思路:

use fjall::{Keyspace, PersistMode};
use openraft::{LogId, Vote, storage::LogState};
 
// 日志条目的 key:把 u64 index 转成 big-endian bytes
// 为什么要 big-endian?因为 Fjall 按字节序排列,
// big-endian 保证 1 < 2 < 10 的顺序,而不是 1 < 10 < 2
fn log_key(index: u64) -> [u8; 8] {
    index.to_be_bytes()
}
 
// 存日志
fn append_log(ks: &Keyspace, entry: &Entry) -> Result<()> {
    let key = log_key(entry.log_id.index);
    let value = bincode::serialize(entry)?;
    ks.insert(key, value)?;
    Ok(())
}
 
// 读日志(范围查询)
fn get_logs(ks: &Keyspace, start: u64, end: u64) -> Result<Vec<Entry>> {
    let mut entries = Vec::new();
    for kv in ks.range(log_key(start)..log_key(end)) {
        let (_, value) = kv?;
        entries.push(bincode::deserialize(&value)?);
    }
    Ok(entries)
}

坑一:别用字符串做 key

第一次写的时候我用了 format!("{}", index) 做 key,结果 range 查询出来的顺序是 1, 10, 100, 2, 200...——字典序,不是数值序。

解法:u64::to_be_bytes() 转成固定长度的 big-endian 字节。这是 LSM-tree 存储引擎的通用技巧,RocksDB 也是一样的道理。

坑二:Vote 也要持久化

Raft 的 Vote(谁是 Leader、term 多少)必须持久化到磁盘。如果节点重启后丢失了 Vote 信息,可能会出现「脑裂」——两个节点同时认为自己是 Leader。

OpenRaft 会在 RaftLogStorage 里调用 save_vote()read_vote(),你需要把 Vote 也存到 Fjall 里。建议用一个单独的 key(比如 b"__vote__")来存。

坑三:Truncate 日志的时机

Raft 协议要求:如果 Leader 发现某个 Follower 的日志和自己不一致,需要 truncate(截断)Follower 的日志。OpenRaft 会调用 truncate_logs(after) 方法。

Fjall 没有原生的「删除范围」操作,你需要遍历 after.. 的所有 key 然后逐个删除。这里要注意:删除操作也要保证原子性,否则中途崩溃会导致日志不一致。


实现 RaftStateMachine:业务数据 + 快照

State machine 是「真正的数据」——你存在分布式 KV 里的 key-value 对。

// 应用日志条目到 state machine
async fn apply(&mut self, log_entry: &Entry<Request>) -> Result<Response> {
    match &log_entry.payload {
        EntryPayload::Normal(req) => {
            // 这才是真正的业务逻辑
            match req {
                Request::Set { key, value } => {
                    self.kv.insert(key, value)?;
                    Ok(Response { value: Some(value.clone()) })
                }
                Request::Get { key } => {
                    let val = self.kv.get(key)?;
                    Ok(Response { value: val })
                }
                Request::Delete { key } => {
                    self.kv.remove(key)?;
                    Ok(Response { value: None })
                }
            }
        }
        EntryPayload::Blank => Ok(Response { value: None }),
        EntryPayload::Membership(_) => Ok(Response { value: None }),
    }
}

坑四:快照(Snapshot)怎么做

Raft 需要定期做快照——把 state machine 的当前状态序列化保存,然后删掉快照之前的日志。这样日志不会无限增长。

用 Fjall 做快照有两个方案:

方案 做法 优缺点
全量导出 遍历整个 keyspace,序列化到文件 简单,但数据量大时慢
Fjall 原生快照 利用 Fjall 的 MVCC snapshot 快,但需要理解 Fjall 的 snapshot 语义

我建议先用全量导出,等遇到性能瓶颈再优化。先让它跑起来,再让它跑得快。

坑五:Compaction 和 Raft Log Purge 的配合

Fjall 有自己的 compaction 机制(LSM-tree 后台合并),OpenRaft 也有 log purge(删旧日志)。两者是独立的,你需要自己协调:

  • OpenRaft 调用 purge_log(before) 时,你删掉 Fjall 里 before 之前的日志
  • Fjall 的 compaction 会回收这些删除操作产生的空间
  • 别忘了调用 db.persist(PersistMode::SyncAll) 确保数据落盘

实现 RaftNetwork:节点间怎么通信

OpenRaft 需要你实现 RaftNetwork trait,提供三个方法:

方法 作用
send_append_entries() Leader → Follower:复制日志
send_install_snapshot() Leader → Follower:同步快照
send_vote() Candidate → 其他节点:请求投票

通信层有几种选择:

方案 优缺点
gRPC tonic 生态好,protobuf 序列化效率高,但引入较重
自定义 TCP tokio::net 轻量可控,但要自己处理协议
QUIC quinn 之前网络编程文章讲过,连接迁移是杀手锏

如果你已经熟悉 tonic(微服务文章里用过),直接用 gRPC 是最快的路径。

坑六:超时和重试

Raft 的心跳超时和选举超时是核心参数。太短会导致频繁选举(集群不稳定),太长会导致故障恢复慢。

OpenRaft 的默认配置:

Config {
    heartbeat_interval: 1000,    // 心跳间隔 1s
    election_timeout_min: 3000,  // 选举超时下限 3s
    election_timeout_max: 5000,  // 选举超时上限 5s
    ..Default::default()
}

生产建议: 先用默认值跑起来,然后根据你的网络延迟调整。跨机房部署的话,这些值都需要调大。


客户端接入:写请求怎么找到 Leader?

一个绕不开的问题:客户端怎么知道谁是 Leader?

Client → Node 2 (Follower)
         "写 key=foo, value=bar"
         → 拒绝,我不是 Leader
         → 告诉客户端:Leader 是 Node 1

Client → Node 1 (Leader)
         "写 key=foo, value=bar"
         → 写入成功

OpenRaft 提供了 Raft::ensure_linearizable() 方法来做线性一致性读。对于写请求,如果不是 Leader 会返回错误,错误里包含 Leader 信息。

坑七:Leader 切换期间的请求

Leader 挂了或者网络分区的时候,客户端会收到短暂的错误。你的客户端需要:

  1. 缓存当前 Leader 信息
  2. 写失败时,重试并更新 Leader
  3. 设置合理的超时(别无限等待)
async fn write_with_retry(&self, key: String, value: Vec<u8>) -> Result<()> {
    let mut leader = self.current_leader.read().await;
    for attempt in 0..3 {
        match self.send_write(*leader, &key, &value).await {
            Ok(resp) => return Ok(resp),
            Err(RaftError::ForwardToLeader { leader_id: Some(new_leader), .. }) => {
                leader = new_leader;
                *self.current_leader.write().await = new_leader;
                // 重试
            }
            Err(e) => return Err(e),
        }
    }
    Err(anyhow!("max retries exceeded"))
}

测试:怎么验证它真的能用

分布式系统最怕的不是功能 bug,而是时序 bug——正常情况下没问题,特定时序下才出错。

基础测试:三节点启动

#[tokio::test]
async fn test_basic_cluster() {
    // 启动 3 个节点
    let nodes = start_cluster(3).await;
 
    // 写入数据
    nodes[0].write("key1", "value1").await.unwrap();
 
    // 等待复制
    tokio::time::sleep(Duration::from_millis(500)).await;
 
    // 从任意节点读取
    let val = nodes[2].read("key1").await.unwrap();
    assert_eq!(val, Some("value1".into()));
}

进阶测试:模拟节点宕机

#[tokio::test]
async fn test_leader_failure() {
    let nodes = start_cluster(3).await;
 
    // 写入一些数据
    nodes[0].write("key1", "value1").await.unwrap();
    tokio::time::sleep(Duration::from_millis(500)).await;
 
    // 杀掉 Leader
    let leader = nodes[0].raft.current_leader().await.unwrap();
    nodes[leader as usize].shutdown().await;
 
    // 等待新 Leader 选举
    tokio::time::sleep(Duration::from_secs(3)).await;
 
    // 集群应该仍然可用(2/3 节点存活)
    let new_leader = find_leader(&nodes).await;
    nodes[new_leader].write("key2", "value2").await.unwrap();
}

高级测试:网络分区

网络分区测试比较难模拟。可以用 iptables 或者在应用层用 proxy 来模拟。OpenRaft 自带了一些 chaos testing 的工具,但文档里说还没完成——这是个已知的坑。


性能和调优:从哪里下手

先看 OpenRaft 官方的基准测试数据:

场景 吞吐量
单写(1 client) ~33,000 put/s
64 并发 ~912,000 put/s
4096 并发 ~3,548,000 put/s
Batch 写(4 entries/batch, 4096 并发) ~5,615,000 put/s

注意:这是 OpenRaft 框架本身的测试,用的是内存存储。 实际用 Fjall 的时候,性能瓶颈会在磁盘 I/O 上。

调优方向

方向 具体操作
Batch 写 把多个写操作打包成一个 Raft log entry,减少共识轮次
并行复制 OpenRaft 默认并行复制日志到各 Follower,确认 max_inflight_log 参数
Fjall Memtable 大小 增大 max_memtable_size 减少 flush 频率
Block Cache 配置为可用内存的 20-25%,Fjall 官方推荐
压缩算法 默认 LZ4 已经很好,CPU 敏感场景可以关闭压缩
持久化策略 PersistMode::SyncAll 最安全但最慢,PersistMode::SyncData 折中

读性能优化

如果不需要线性一致性读,可以让 Follower 直接响应读请求。这叫「ReadIndex」优化,OpenRaft 支持 ensure_linearizable() 来保证一致性读,不调用它就是 follower read。


和 etcd/TiKV 的对比

维度 Fjall + OpenRaft etcd TiKV
语言 Rust Go Rust
存储引擎 LSM-tree (Fjall) bbolt (B+ tree) RocksDB (LSM-tree)
共识协议 Raft (OpenRaft) Raft Raft (自研)
部署方式 嵌入式 独立服务 独立服务
数据量上限 取决于磁盘 几 GB 内最佳 TB 级
学习成本 中等 低(有 etcdctl) 高(组件多)
适合场景 嵌入式/边缘 配置中心 大规模存储

你的定位: 如果你需要一个「嵌入到自己应用里的分布式 KV」,不想单独部署 etcd 集群,Fjall + OpenRaft 是一个很好的选择。


下一步

这篇文章先把架构和坑位理清楚了。如果你真的想动手写,建议这个顺序:

  1. 先跑通 OpenRaft 的官方 example(raft-kv-memstore),理解整体流程
  2. 把 memstore 替换成 Fjall,实现 RaftLogStorageRaftStateMachine
  3. 加上 tonic gRPC 做网络层
  4. 写测试,特别是 Leader 切换和日志截断的场景
  5. 做 benchmark,看看 Fjall 在你的场景下性能如何

记住:先让它跑起来,再让它跑得快。 分布式系统最大的坑不是性能,是一致性——你的代码在正常情况下都能跑,但只有在节点宕机、网络分区、日志截断这些极端情况下才会暴露问题。

Rust 的所有权系统在这个场景下特别有用。它会在编译期就阻止你做那些「在并发下不安全」的操作——比如同时读写同一个 Fjall keyspace、在异步上下文里持有不安全的引用。编译器挡掉的 bug,比你在测试里发现的多得多。


参考资料