消息队列的三次进化:用 Rust 从零写一个,顺便搞懂为什么这样设计
"消息队列不就是发消息收消息吗?有什么好讲的?"
如果你这么想,说明你还没踩过坑。消息队列看起来简单,里面的水深得很。
这篇文章不教你搭 Kafka、配 NATS——那些教程网上一大把。我想做的是:用 Rust 从零写一个消息队列,在写的过程中把"为什么这样设计"讲清楚。
代码不是重点,设计决策才是重点。
先搞懂一件事:消息队列到底在解决什么问题?
想象一个外卖场景:
没有消息队列的世界:
- 你点了外卖,店家接到单,立刻开始做
- 做的过程中,你又改了地址,店家得停下来处理
- 高峰期订单太多,店家忙不过来,后面的订单直接拒掉
有消息队列的世界:
- 你下了单,订单进入一个"待处理队列"
- 店家按自己的节奏从队列里取单,做完一个取一个
- 高峰期订单堆积在队列里,不会丢,只是等的时间长一点
消息队列的本质就是:一个中间人,让发送方和接收方解耦。
- 发送方不用管接收方在不在、忙不忙
- 接收方不用管消息什么时候来的,按自己的节奏处理
- 高峰期消息堆积在队列里,削峰填谷
用技术的话说:异步解耦 + 削峰填谷 + 流量整形。
但这些只是表面。消息队列真正要解决的难题是:
- 消息放哪里? 内存快但会丢,磁盘慢但可靠
- 消费完了怎么标记? 消费到一半崩了怎么办?
- 消费者太多怎么分? 一条消息发给一个人还是一群人?
- 数据太大怎么扛? 单机扛不住,怎么水平扩展?
这些问题,不同时代的消息队列给出了不同的答案。我们从头来看。
第一代:传统消息队列——RabbitMQ 时代
2007 年 RabbitMQ 诞生,它的设计思路很直觉:
核心模型:信箱
想象公司前台的信箱:
- 每个部门有自己的信箱(Queue)
- 快递员(Producer)把信投到对应的信箱
- 部门的人(Consumer)去信箱取信
- 取走就没了,一封信只能被一个人取走
Producer → [Exchange] → [Queue] → Consumer
Exchange 负责"路由"——根据规则把消息分发到不同的 Queue。就像前台根据信封上的部门名,把信放到对应的信箱。
用 Rust 写一个最简版
先不管持久化、不管集群,用 50 行 Rust 写一个能跑的版本:
use tokio::sync::mpsc;
use std::collections::HashMap;
// 一个队列就是一个 channel
struct Broker {
queues: HashMap<String, mpsc::Sender<String>>,
}
impl Broker {
fn new() -> Self {
Broker { queues: HashMap::new() }
}
// 创建队列
fn create_queue(&mut self, name: &str) -> mpsc::Receiver<String> {
let (tx, rx) = mpsc::channel(1024);
self.queues.insert(name.to_string(), tx);
rx
}
// 发送消息
async fn publish(&self, queue: &str, msg: String) {
if let Some(tx) = self.queues.get(queue) {
let _ = tx.send(msg).await;
}
}
}
// 使用方式
#[tokio::main]
async fn main() {
let mut broker = Broker::new();
let mut rx = broker.create_queue("orders");
// 生产者发消息
broker.publish("orders", "订单001: 宫保鸡丁".into()).await;
broker.publish("orders", "订单002: 鱼香肉丝".into()).await;
// 消费者收消息
while let Some(msg) = rx.recv().await {
println!("处理: {}", msg);
}
}跑起来了,能收能发。但这个版本有 三个致命问题:
- 消息存在内存里 —— 程序一重启,消息全没了
- 没有确认机制 —— 消费者收到消息就没了,处理到一半崩了消息就丢了
- 单机 —— 一台机器扛不住就没办法了
这三个问题,就是消息队列二十年来不断进化的驱动力。
这一代的代表
RabbitMQ、ActiveMQ、ZeroMQ
特点: 功能丰富(路由、优先级、延迟队列),但单机性能有上限,扩展靠搭集群。
适合场景: 企业内部系统,消息量不大,但需要灵活的路由规则。
第二代:分布式日志——Kafka 时代
2011 年 LinkedIn 开源了 Kafka,它对消息队列的理解完全不一样。
核心思想:消息不是"一封信",而是一条"日志"
传统消息队列把消息当"信件"——取走就没了。
Kafka 把消息当"日志"——写进去就不动了,消费者自己记录读到哪里了。
用图书馆来类比:
- 传统 MQ(RabbitMQ): 借书——你借走了,别人就看不到了,你还回来别人才能借
- Kafka: 阅览室——书一直在那里,你自己拿个书签标记看到第几页,别人看别人的
这个设计决策带来了巨大的好处:
- 消息可以重复消费 —— 没读好?倒回去重新读
- 多个消费者互不干扰 —— 你看到第 5 页,我看到第 10 页,互不影响
- 消息可以保留很久 —— 不用删,磁盘便宜
用 Rust 改造我们的代码
把内存队列换成"追加日志":
use std::fs::{File, OpenOptions};
use std::io::{Write, BufWriter};
use std::path::Path;
struct LogQueue {
writer: BufWriter<File>,
offset: u64, // 当前写到哪里了
}
impl LogQueue {
fn new(path: &Path) -> std::io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
Ok(LogQueue {
writer: BufWriter::new(file),
offset: 0,
})
}
// 追加写入,返回这条消息的 offset
fn append(&mut self, msg: &str) -> std::io::Result<u64> {
let bytes = msg.as_bytes();
// 写法:[4字节长度][消息内容]
let len = bytes.len() as u32;
self.writer.write_all(&len.to_le_bytes())?;
self.writer.write_all(bytes)?;
self.writer.flush()?;
let current_offset = self.offset;
self.offset += 4 + bytes.len() as u64;
Ok(current_offset)
}
}但这里有个大坑:什么时候刷盘?
你注意到了吗?上面用了 BufWriter——数据先存在内存缓冲区,攒够了再一次性写磁盘。
好处: 顺序写磁盘很快,批量写比一条一条写快 100 倍。
坏处: 程序崩了,缓冲区里还没刷到磁盘的消息就丢了。
这就是消息队列最经典的 trade-off:性能 vs 可靠性。
Kafka 的默认策略是:攒一批再刷盘,接受可能丢几条消息。 因为 Kafka 设计的目标场景是日志收集——丢几条日志没关系,但吞吐量必须高。
如果你的场景是金融交易——一条都不能丢——那就得每条消息都 fsync,代价是性能暴跌。
// 可靠模式:每条消息都刷盘
fn append_reliable(&mut self, msg: &str) -> std::io::Result<u64> {
// ... 写入逻辑同上 ...
self.writer.flush()?;
self.writer.get_ref().sync_data()?; // 强制刷盘,性能杀手
Ok(current_offset)
}我的经验: 大部分场景不需要每条都刷盘。攒 100 条或每秒刷一次,性能和可靠性都够用。
Consumer Group:一群人分着干活
Kafka 另一个杀手级设计是 Consumer Group:
┌─ Consumer A (处理分区 0, 1)
Producer → Topic ──┤
└─ Consumer B (处理分区 2, 3)
- 一个 Topic(主题)可以有多个 Partition(分区)
- 一个 Consumer Group 里的消费者,每人负责几个分区
- 分区数决定了最大并行度
为什么这样设计? 因为这是水平扩展的关键。
单个消费者处理速度有上限。想快怎么办?加消费者。但消费者多了,怎么分配消息?
传统 MQ 的做法是"竞争消费"——多个消费者抢同一个队列,谁抢到算谁的。问题是:你没法控制哪条消息给谁,也没法保证顺序。
Kafka 的做法是"分区绑定"——消费者和分区一一对应。好处是:
- 顺序保证 —— 同一个分区内的消息,一定是按顺序消费的
- 无锁 —— 不用抢,分好了就是你的
- 好扩展 —— 想加消费者?加分区就行
这一代的代表
Kafka、Pulsar(早期)、RocketMQ
特点: 高吞吐、可扩展、消息可回溯。但运维复杂(Kafka 要搭 ZooKeeper),延迟不如 RabbitMQ 低。
适合场景: 日志收集、大数据管道、事件溯源。
第三代:云原生——NATS 时代
2010 年代末,容器化和微服务火了,Kafka 的问题也暴露了:
- 太重了 —— ZooKeeper + Broker + 一堆配置,搭起来费劲
- 运维成本高 —— 扩容要 rebalance,分区数改不了
- 延迟不够低 —— 追求吞吐,牺牲了延迟
NATS 走了一条完全不同的路。
核心思想:先简单,再加功能
NATS 最初的设计理念是:大部分消息不需要持久化。
想想现实场景:
- 传感器数据(温度、湿度)—— 丢了就丢了,下一秒还有新的
- 实时通知 —— 推送失败重试一下就行
- 服务间调用 —— 调用方在线等着,不需要存盘
NATS 的核心就是一个极快的内存消息总线,不持久化、不保证送达、但极快极简单。
后来用户说"我们也需要持久化",NATS 就加了 JetStream——一个可选的持久化层。
这种"先做减法,再按需加"的思路,和 Kafka 的"一步到位"完全相反。
用 Rust 模拟 NATS 的设计
use tokio::sync::broadcast;
struct NatsCore {
// broadcast channel:一个消息发给所有订阅者
subjects: dashmap::DashMap<String, broadcast::Sender<String>>,
}
impl NatsCore {
fn new() -> Self {
NatsCore { subjects: dashmap::DashMap::new() }
}
// 发布消息(fire and forget)
fn publish(&self, subject: &str, msg: &str) {
if let Some(tx) = self.subjects.get(subject) {
let _ = tx.send(msg.to_string()); // 没人订阅?丢掉
}
}
// 订阅主题
fn subscribe(&self, subject: &str) -> broadcast::Receiver<String> {
self.subjects
.entry(subject.to_string())
.or_insert_with(|| broadcast::channel(1024).0)
.subscribe()
}
}注意 let _ = tx.send(...) —— 如果没有订阅者,消息直接丢掉。这就是 NATS 的"不保证送达"。
JetStream:给 NATS 加上"记忆"
当用户需要持久化时,NATS 的做法不是改核心,而是加一层:
Publisher → NATS Core → JetStream (持久化) → Consumer
JetStream 的设计也很有意思:
- 消息存储在 Stream 里 —— 类似 Kafka 的 Topic
- 消费者通过 Consumer 对象读取 —— 有 ACK、重试、死信队列
- 但不需要 ZooKeeper —— 集群管理靠内置的 RAFT 协议
// JetStream 模式:持久化 + ACK
struct JetStream {
store: Vec<Vec<u8>>, // 简化版,实际会写磁盘
consumers: HashMap<String, usize>, // consumer -> 已消费到的 offset
}
impl JetStream {
fn publish(&mut self, msg: &[u8]) -> u64 {
let offset = self.store.len() as u64;
self.store.push(msg.to_vec());
offset
}
fn ack(&mut self, consumer: &str, offset: u64) {
self.consumers.insert(consumer.to_string(), offset as usize);
}
// 重连后从上次的位置继续消费
fn resume(&self, consumer: &str) -> &[Vec<u8>] {
let offset = self.consumers.get(consumer).copied().unwrap_or(0);
&self.store[offset..]
}
}NATS 的设计哲学:简单的事情简单做,复杂的事情按需做。
这一代的代表
NATS、Redpanda(Kafka 兼容但去掉了 ZooKeeper)、各种云厂商的 Serverless MQ
特点: 轻量、运维简单、云原生友好。但功能不如 Kafka 丰富,生态没那么大。
适合场景: 微服务通信、IoT、实时应用。
我们的手搓项目:设计谱系在哪?
写到这里,你应该对消息队列的设计演进有了一个全景图。我们从零写的这个 MQ,其实就是在这些设计决策中做选择:
| 设计决策 | 我们的选择 | 为什么 |
|---|---|---|
| 存储 | 追加日志(WAL) | 简单、可靠、性能好 |
| 刷盘策略 | 攒一批再刷 | 平衡性能和可靠性 |
| 消费模型 | Pull(拉取) | 消费者掌控节奏 |
| 消费者分组 | 分区绑定 | 顺序保证 + 无锁 |
| 集群 | 暂不做 | 先把单机写好 |
我们的定位:一个"麻雀虽小五脏俱全"的教学级 MQ。
它不会有 Kafka 的百万 TPS,不会有 NATS 的毫秒延迟,不会有 RabbitMQ 的复杂路由。但它能帮你搞懂消息队列的核心设计思想。
实现中的几个坑(真实踩过的)
坑一:消息顺序不是你以为的"顺序"
你可能觉得:我按顺序写入,消费者按顺序读取,不就是顺序的吗?
单线程没问题,多线程就完了。
假设两个生产者同时发消息:
Producer A: 发送消息 1
Producer B: 发送消息 2
实际写入顺序可能是 1, 2,也可能是 2, 1——取决于谁先拿到锁。
解决方案: 要么单线程写入(牺牲性能),要么按 key 分区(同一个 key 的消息保证顺序)。
坑二:消费者 ACK 不是"收到"而是"处理完"
很多初学者以为:消费者收到消息就 ACK。
错!应该是处理完才 ACK。
消费者收到消息 → 开始处理 → 处理到一半崩了 → 没有 ACK → 重启后这条消息会被重新投递
如果你收到就 ACK,处理到一半崩了,消息就丢了。
// 错误示范
async fn consume_bad(rx: &mut Receiver<Message>) {
while let Some(msg) = rx.recv().await {
ack(&msg); // 先 ACK
process(msg).await; // 再处理——如果这里崩了,消息丢了
}
}
// 正确做法
async fn consume_good(rx: &mut Receiver<Message>) {
while let Some(msg) = rx.recv().await {
process(msg).await; // 先处理
ack(&msg); // 再 ACK
}
}坑三:消息堆积时的背压
消费者处理速度跟不上生产者怎么办?消息在队列里堆积,内存越来越大,最后 OOM。
解决方案:背压(Backpressure)。
// 当队列满了,阻塞生产者
let (tx, mut rx) = mpsc::channel(10000); // 最多存 10000 条
// 生产者
tx.send(msg).await.unwrap(); // 队列满了会阻塞,不会丢消息
// 或者用 try_send,队列满了就报错
match tx.try_send(msg) {
Ok(_) => {},
Err(_) => println!("队列满了,消息被丢弃"), // 生产者决定怎么处理
}关键决策:队列满了,是阻塞生产者还是丢弃消息? 这取决于你的业务场景。
和真实 MQ 的差距
老老实实承认:我们写的这个,和真正的生产级 MQ 差了十万八千里。
我们没做的:
| 功能 | Kafka 怎么做的 | NATS 怎么做的 |
|---|---|---|
| 副本同步 | ISR 机制,Leader + Follower | RAFT 协议 |
| Leader 选举 | ZooKeeper 管理 | 内置 RAFT |
| 消息回溯 | 按 offset 随便跳 | 按时间/序号回溯 |
| 消息压缩 | Snappy/LZ4/ZSTD | 可选压缩 |
| 事务 | 两阶段提交 | 支持 |
| 死信队列 | 需要自己实现 | JetStream 内置 |
但这些不影响我们理解核心原理。 就像你不需要会造发动机才能学会开车——但如果你想成为好司机,了解发动机原理绝对有帮助。
总结:写完这个你学到了什么?
消息队列这二十年的进化,本质上是在回答几个问题:
- 消息放哪里? → 内存 → 磁盘日志 → 可选持久化
- 怎么消费? → 抢占 → 分区绑定 → 灵活订阅
- 怎么扩展? → 单机集群 → 分区 → 云原生
- 怎么保证可靠? → ACK + 重试 + 幂等
不同的时代,不同的回答。没有"最好的"设计,只有"最适合你的场景"的设计。
如果你只记住一件事,记住这个:
消息队列不是"发消息收消息"那么简单。它的核心是在性能、可靠性、复杂度之间做取舍。每一个设计决策背后,都有一个真实的生产事故在推动。