返回文章列表

消息队列的三次进化:用 Rust 从零写一个,顺便搞懂为什么这样设计

725·5 分钟阅读
Rust消息队列系统设计后端架构

"消息队列不就是发消息收消息吗?有什么好讲的?"

如果你这么想,说明你还没踩过坑。消息队列看起来简单,里面的水深得很。

这篇文章不教你搭 Kafka、配 NATS——那些教程网上一大把。我想做的是:用 Rust 从零写一个消息队列,在写的过程中把"为什么这样设计"讲清楚。

代码不是重点,设计决策才是重点。


先搞懂一件事:消息队列到底在解决什么问题?

想象一个外卖场景:

没有消息队列的世界:

  • 你点了外卖,店家接到单,立刻开始做
  • 做的过程中,你又改了地址,店家得停下来处理
  • 高峰期订单太多,店家忙不过来,后面的订单直接拒掉

有消息队列的世界:

  • 你下了单,订单进入一个"待处理队列"
  • 店家按自己的节奏从队列里取单,做完一个取一个
  • 高峰期订单堆积在队列里,不会丢,只是等的时间长一点

消息队列的本质就是:一个中间人,让发送方和接收方解耦。

  • 发送方不用管接收方在不在、忙不忙
  • 接收方不用管消息什么时候来的,按自己的节奏处理
  • 高峰期消息堆积在队列里,削峰填谷

用技术的话说:异步解耦 + 削峰填谷 + 流量整形

但这些只是表面。消息队列真正要解决的难题是:

  1. 消息放哪里? 内存快但会丢,磁盘慢但可靠
  2. 消费完了怎么标记? 消费到一半崩了怎么办?
  3. 消费者太多怎么分? 一条消息发给一个人还是一群人?
  4. 数据太大怎么扛? 单机扛不住,怎么水平扩展?

这些问题,不同时代的消息队列给出了不同的答案。我们从头来看。


第一代:传统消息队列——RabbitMQ 时代

2007 年 RabbitMQ 诞生,它的设计思路很直觉:

核心模型:信箱

想象公司前台的信箱:

  • 每个部门有自己的信箱(Queue)
  • 快递员(Producer)把信投到对应的信箱
  • 部门的人(Consumer)去信箱取信
  • 取走就没了,一封信只能被一个人取走
Producer → [Exchange] → [Queue] → Consumer

Exchange 负责"路由"——根据规则把消息分发到不同的 Queue。就像前台根据信封上的部门名,把信放到对应的信箱。

用 Rust 写一个最简版

先不管持久化、不管集群,用 50 行 Rust 写一个能跑的版本:

use tokio::sync::mpsc;
use std::collections::HashMap;
 
// 一个队列就是一个 channel
struct Broker {
    queues: HashMap<String, mpsc::Sender<String>>,
}
 
impl Broker {
    fn new() -> Self {
        Broker { queues: HashMap::new() }
    }
 
    // 创建队列
    fn create_queue(&mut self, name: &str) -> mpsc::Receiver<String> {
        let (tx, rx) = mpsc::channel(1024);
        self.queues.insert(name.to_string(), tx);
        rx
    }
 
    // 发送消息
    async fn publish(&self, queue: &str, msg: String) {
        if let Some(tx) = self.queues.get(queue) {
            let _ = tx.send(msg).await;
        }
    }
}
 
// 使用方式
#[tokio::main]
async fn main() {
    let mut broker = Broker::new();
    let mut rx = broker.create_queue("orders");
 
    // 生产者发消息
    broker.publish("orders", "订单001: 宫保鸡丁".into()).await;
    broker.publish("orders", "订单002: 鱼香肉丝".into()).await;
 
    // 消费者收消息
    while let Some(msg) = rx.recv().await {
        println!("处理: {}", msg);
    }
}

跑起来了,能收能发。但这个版本有 三个致命问题

  1. 消息存在内存里 —— 程序一重启,消息全没了
  2. 没有确认机制 —— 消费者收到消息就没了,处理到一半崩了消息就丢了
  3. 单机 —— 一台机器扛不住就没办法了

这三个问题,就是消息队列二十年来不断进化的驱动力。

这一代的代表

RabbitMQ、ActiveMQ、ZeroMQ

特点: 功能丰富(路由、优先级、延迟队列),但单机性能有上限,扩展靠搭集群。

适合场景: 企业内部系统,消息量不大,但需要灵活的路由规则。


第二代:分布式日志——Kafka 时代

2011 年 LinkedIn 开源了 Kafka,它对消息队列的理解完全不一样。

核心思想:消息不是"一封信",而是一条"日志"

传统消息队列把消息当"信件"——取走就没了。

Kafka 把消息当"日志"——写进去就不动了,消费者自己记录读到哪里了。

用图书馆来类比:

  • 传统 MQ(RabbitMQ): 借书——你借走了,别人就看不到了,你还回来别人才能借
  • Kafka: 阅览室——书一直在那里,你自己拿个书签标记看到第几页,别人看别人的

这个设计决策带来了巨大的好处:

  1. 消息可以重复消费 —— 没读好?倒回去重新读
  2. 多个消费者互不干扰 —— 你看到第 5 页,我看到第 10 页,互不影响
  3. 消息可以保留很久 —— 不用删,磁盘便宜

用 Rust 改造我们的代码

把内存队列换成"追加日志":

use std::fs::{File, OpenOptions};
use std::io::{Write, BufWriter};
use std::path::Path;
 
struct LogQueue {
    writer: BufWriter<File>,
    offset: u64,  // 当前写到哪里了
}
 
impl LogQueue {
    fn new(path: &Path) -> std::io::Result<Self> {
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(path)?;
        Ok(LogQueue {
            writer: BufWriter::new(file),
            offset: 0,
        })
    }
 
    // 追加写入,返回这条消息的 offset
    fn append(&mut self, msg: &str) -> std::io::Result<u64> {
        let bytes = msg.as_bytes();
        // 写法:[4字节长度][消息内容]
        let len = bytes.len() as u32;
        self.writer.write_all(&len.to_le_bytes())?;
        self.writer.write_all(bytes)?;
        self.writer.flush()?;
 
        let current_offset = self.offset;
        self.offset += 4 + bytes.len() as u64;
        Ok(current_offset)
    }
}

但这里有个大坑:什么时候刷盘?

你注意到了吗?上面用了 BufWriter——数据先存在内存缓冲区,攒够了再一次性写磁盘。

好处: 顺序写磁盘很快,批量写比一条一条写快 100 倍。

坏处: 程序崩了,缓冲区里还没刷到磁盘的消息就丢了。

这就是消息队列最经典的 trade-off:性能 vs 可靠性。

Kafka 的默认策略是:攒一批再刷盘,接受可能丢几条消息。 因为 Kafka 设计的目标场景是日志收集——丢几条日志没关系,但吞吐量必须高。

如果你的场景是金融交易——一条都不能丢——那就得每条消息都 fsync,代价是性能暴跌。

// 可靠模式:每条消息都刷盘
fn append_reliable(&mut self, msg: &str) -> std::io::Result<u64> {
    // ... 写入逻辑同上 ...
    self.writer.flush()?;
    self.writer.get_ref().sync_data()?;  // 强制刷盘,性能杀手
    Ok(current_offset)
}

我的经验: 大部分场景不需要每条都刷盘。攒 100 条或每秒刷一次,性能和可靠性都够用。

Consumer Group:一群人分着干活

Kafka 另一个杀手级设计是 Consumer Group:

                    ┌─ Consumer A (处理分区 0, 1)
Producer → Topic ──┤
                    └─ Consumer B (处理分区 2, 3)
  • 一个 Topic(主题)可以有多个 Partition(分区)
  • 一个 Consumer Group 里的消费者,每人负责几个分区
  • 分区数决定了最大并行度

为什么这样设计? 因为这是水平扩展的关键。

单个消费者处理速度有上限。想快怎么办?加消费者。但消费者多了,怎么分配消息?

传统 MQ 的做法是"竞争消费"——多个消费者抢同一个队列,谁抢到算谁的。问题是:你没法控制哪条消息给谁,也没法保证顺序。

Kafka 的做法是"分区绑定"——消费者和分区一一对应。好处是:

  1. 顺序保证 —— 同一个分区内的消息,一定是按顺序消费的
  2. 无锁 —— 不用抢,分好了就是你的
  3. 好扩展 —— 想加消费者?加分区就行

这一代的代表

Kafka、Pulsar(早期)、RocketMQ

特点: 高吞吐、可扩展、消息可回溯。但运维复杂(Kafka 要搭 ZooKeeper),延迟不如 RabbitMQ 低

适合场景: 日志收集、大数据管道、事件溯源。


第三代:云原生——NATS 时代

2010 年代末,容器化和微服务火了,Kafka 的问题也暴露了:

  1. 太重了 —— ZooKeeper + Broker + 一堆配置,搭起来费劲
  2. 运维成本高 —— 扩容要 rebalance,分区数改不了
  3. 延迟不够低 —— 追求吞吐,牺牲了延迟

NATS 走了一条完全不同的路。

核心思想:先简单,再加功能

NATS 最初的设计理念是:大部分消息不需要持久化。

想想现实场景:

  • 传感器数据(温度、湿度)—— 丢了就丢了,下一秒还有新的
  • 实时通知 —— 推送失败重试一下就行
  • 服务间调用 —— 调用方在线等着,不需要存盘

NATS 的核心就是一个极快的内存消息总线,不持久化、不保证送达、但极快极简单。

后来用户说"我们也需要持久化",NATS 就加了 JetStream——一个可选的持久化层。

这种"先做减法,再按需加"的思路,和 Kafka 的"一步到位"完全相反。

用 Rust 模拟 NATS 的设计

use tokio::sync::broadcast;
 
struct NatsCore {
    // broadcast channel:一个消息发给所有订阅者
    subjects: dashmap::DashMap<String, broadcast::Sender<String>>,
}
 
impl NatsCore {
    fn new() -> Self {
        NatsCore { subjects: dashmap::DashMap::new() }
    }
 
    // 发布消息(fire and forget)
    fn publish(&self, subject: &str, msg: &str) {
        if let Some(tx) = self.subjects.get(subject) {
            let _ = tx.send(msg.to_string());  // 没人订阅?丢掉
        }
    }
 
    // 订阅主题
    fn subscribe(&self, subject: &str) -> broadcast::Receiver<String> {
        self.subjects
            .entry(subject.to_string())
            .or_insert_with(|| broadcast::channel(1024).0)
            .subscribe()
    }
}

注意 let _ = tx.send(...) —— 如果没有订阅者,消息直接丢掉。这就是 NATS 的"不保证送达"。

JetStream:给 NATS 加上"记忆"

当用户需要持久化时,NATS 的做法不是改核心,而是加一层

Publisher → NATS Core → JetStream (持久化) → Consumer

JetStream 的设计也很有意思:

  • 消息存储在 Stream 里 —— 类似 Kafka 的 Topic
  • 消费者通过 Consumer 对象读取 —— 有 ACK、重试、死信队列
  • 但不需要 ZooKeeper —— 集群管理靠内置的 RAFT 协议
// JetStream 模式:持久化 + ACK
struct JetStream {
    store: Vec<Vec<u8>>,  // 简化版,实际会写磁盘
    consumers: HashMap<String, usize>,  // consumer -> 已消费到的 offset
}
 
impl JetStream {
    fn publish(&mut self, msg: &[u8]) -> u64 {
        let offset = self.store.len() as u64;
        self.store.push(msg.to_vec());
        offset
    }
 
    fn ack(&mut self, consumer: &str, offset: u64) {
        self.consumers.insert(consumer.to_string(), offset as usize);
    }
 
    // 重连后从上次的位置继续消费
    fn resume(&self, consumer: &str) -> &[Vec<u8>] {
        let offset = self.consumers.get(consumer).copied().unwrap_or(0);
        &self.store[offset..]
    }
}

NATS 的设计哲学:简单的事情简单做,复杂的事情按需做。

这一代的代表

NATS、Redpanda(Kafka 兼容但去掉了 ZooKeeper)、各种云厂商的 Serverless MQ

特点: 轻量、运维简单、云原生友好。但功能不如 Kafka 丰富,生态没那么大。

适合场景: 微服务通信、IoT、实时应用。


我们的手搓项目:设计谱系在哪?

写到这里,你应该对消息队列的设计演进有了一个全景图。我们从零写的这个 MQ,其实就是在这些设计决策中做选择:

设计决策 我们的选择 为什么
存储 追加日志(WAL) 简单、可靠、性能好
刷盘策略 攒一批再刷 平衡性能和可靠性
消费模型 Pull(拉取) 消费者掌控节奏
消费者分组 分区绑定 顺序保证 + 无锁
集群 暂不做 先把单机写好

我们的定位:一个"麻雀虽小五脏俱全"的教学级 MQ。

它不会有 Kafka 的百万 TPS,不会有 NATS 的毫秒延迟,不会有 RabbitMQ 的复杂路由。但它能帮你搞懂消息队列的核心设计思想


实现中的几个坑(真实踩过的)

坑一:消息顺序不是你以为的"顺序"

你可能觉得:我按顺序写入,消费者按顺序读取,不就是顺序的吗?

单线程没问题,多线程就完了。

假设两个生产者同时发消息:

Producer A: 发送消息 1
Producer B: 发送消息 2

实际写入顺序可能是 1, 2,也可能是 2, 1——取决于谁先拿到锁。

解决方案: 要么单线程写入(牺牲性能),要么按 key 分区(同一个 key 的消息保证顺序)。

坑二:消费者 ACK 不是"收到"而是"处理完"

很多初学者以为:消费者收到消息就 ACK。

错!应该是处理完才 ACK。

消费者收到消息 → 开始处理 → 处理到一半崩了 → 没有 ACK → 重启后这条消息会被重新投递

如果你收到就 ACK,处理到一半崩了,消息就丢了。

// 错误示范
async fn consume_bad(rx: &mut Receiver<Message>) {
    while let Some(msg) = rx.recv().await {
        ack(&msg);  // 先 ACK
        process(msg).await;  // 再处理——如果这里崩了,消息丢了
    }
}
 
// 正确做法
async fn consume_good(rx: &mut Receiver<Message>) {
    while let Some(msg) = rx.recv().await {
        process(msg).await;  // 先处理
        ack(&msg);  // 再 ACK
    }
}

坑三:消息堆积时的背压

消费者处理速度跟不上生产者怎么办?消息在队列里堆积,内存越来越大,最后 OOM。

解决方案:背压(Backpressure)。

// 当队列满了,阻塞生产者
let (tx, mut rx) = mpsc::channel(10000);  // 最多存 10000 条
 
// 生产者
tx.send(msg).await.unwrap();  // 队列满了会阻塞,不会丢消息
 
// 或者用 try_send,队列满了就报错
match tx.try_send(msg) {
    Ok(_) => {},
    Err(_) => println!("队列满了,消息被丢弃"),  // 生产者决定怎么处理
}

关键决策:队列满了,是阻塞生产者还是丢弃消息? 这取决于你的业务场景。


和真实 MQ 的差距

老老实实承认:我们写的这个,和真正的生产级 MQ 差了十万八千里。

我们没做的:

功能 Kafka 怎么做的 NATS 怎么做的
副本同步 ISR 机制,Leader + Follower RAFT 协议
Leader 选举 ZooKeeper 管理 内置 RAFT
消息回溯 按 offset 随便跳 按时间/序号回溯
消息压缩 Snappy/LZ4/ZSTD 可选压缩
事务 两阶段提交 支持
死信队列 需要自己实现 JetStream 内置

但这些不影响我们理解核心原理。 就像你不需要会造发动机才能学会开车——但如果你想成为好司机,了解发动机原理绝对有帮助。


总结:写完这个你学到了什么?

消息队列这二十年的进化,本质上是在回答几个问题:

  1. 消息放哪里? → 内存 → 磁盘日志 → 可选持久化
  2. 怎么消费? → 抢占 → 分区绑定 → 灵活订阅
  3. 怎么扩展? → 单机集群 → 分区 → 云原生
  4. 怎么保证可靠? → ACK + 重试 + 幂等

不同的时代,不同的回答。没有"最好的"设计,只有"最适合你的场景"的设计。

如果你只记住一件事,记住这个:

消息队列不是"发消息收消息"那么简单。它的核心是在性能、可靠性、复杂度之间做取舍。每一个设计决策背后,都有一个真实的生产事故在推动。