Rust 异步编程:tokio 那些没人讲透的事
异步编程最大的谎言是「加个
.await就行了」。
写网络编程那篇文章的时候,很多读者留言说「tokio 会用,但总觉得哪里不对劲」。我太理解这种感觉了——async/await 语法把异步编程的门槛降到了地板,但也把理解难度抬到了天花板。
你写了 something().await,但你真的知道这行代码背后发生了什么吗?
这篇文章不教你怎么用 tokio——那些教程网上一大把。我想讲的是那些你以为懂了、但其实没懂的东西。
先搞清楚一件事:async 到底在干什么
很多人把 async 理解成「多线程」或者「并行」。这是错的。
// 这两个东西完全不同
async fn fetch_data() -> String { ... } // 异步函数
fn process_data() -> String { ... } // 普通函数async fn 编译后会变成一个状态机,不是一段「并行执行」的代码。它做的事情是:在遇到阻塞点(.await)的时候,把控制权交出去,让别的任务有机会跑。
同步世界(你熟悉的):
┌──────────────────────────────────────────────┐
│ 任务 A: ████████████████████ (全程占用 CPU) │
│ 任务 B: ████████████████████ │
│ 总耗时: A + B │
└──────────────────────────────────────────────┘
异步世界(tokio 在做的事):
┌──────────────────────────────────────────────┐
│ 任务 A: ██░░██░░██░░██ (IO 等待时让出 CPU) │
│ 任务 B: ░░██░░██░░██░░ (A 等 IO 时跑起来) │
│ 总耗时: max(A, B) 而不是 A + B │
└──────────────────────────────────────────────┘
关键区别:异步不是让代码跑得更快,而是让等待的时间不浪费。
如果你的代码全是 CPU 计算没有 IO,async 对你没有任何帮助,反而会因为状态机的开销让性能更差。
经验法则:async 解决的是 IO 密集型问题,不是 CPU 密集型问题。 如果你的服务大部分时间在等数据库、等网络、等磁盘,用 async。如果在做大量计算,用线程池。
tokio Runtime:你选对了吗?
tokio 有两种 runtime,选错了性能差距可以到几倍。
current_thread:单线程,最轻量
#[tokio::main(flavor = "current_thread")]
async fn main() {
// 所有任务跑在一个线程上
// 没有线程切换开销
// 适合 IO 密集、并发量不大的场景
}multi_thread:多线程,默认选项
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
// 任务在线程间自动调度
// 适合高并发、混合负载
// worker_threads 默认 = CPU 核心数
}怎么选?
| 场景 | 选哪个 | 原因 |
|---|---|---|
| CLI 工具、简单 HTTP 服务 | current_thread |
轻量,启动快 |
| Web API、微服务 | multi_thread |
需要利用多核 |
| 嵌入式、资源受限 | current_thread |
内存占用小 |
| 混合负载(IO + 计算) | multi_thread |
计算任务不会阻塞 IO |
我踩过的坑: 一开始我把所有服务都用 multi_thread,觉得「多线程肯定比单线程好」。后来发现一个低并发的内部工具,用 current_thread 反而性能更好——因为省掉了线程切换的开销。
经验法则:不知道选什么就用默认的
multi_thread。但如果你的服务 QPS 不高(< 1000),试试current_thread,可能有惊喜。
Pin 和 Unpin:最劝退的概念
这是 Rust 异步编程里最难理解的概念,没有之一。
为什么需要 Pin?
先看一个场景:
async fn example() {
let data = vec![1, 2, 3];
let reference = &data; // 引用了栈上的 data
some_async_op().await; // 这里让出控制权
println!("{:?}", reference); // 继续用 reference
}这个函数编译后变成一个状态机。await 之前的状态需要保存 data 和 reference。问题是:如果这个 Future 被移动到内存的另一个位置,reference 就变成野指针了。
Future 在内存位置 A:
data = [1, 2, 3] ← 栈上
reference = 0x7fff... ← 指向 data
Future 被移动到内存位置 B:
data = [1, 2, 3] ← 还在 A 的栈上
reference = 0x7fff... ← 还是指向 A 的栈,但 A 已经不是这个 Future 了
💥 悬垂指针Pin 的作用就是:禁止 Future 被移动。 一旦 Pin 住,这个 Future 就钉在内存里不动了。
什么时候需要关心 Pin?
大多数时候你不需要。tokio::spawn 和 .await 会帮你处理 Pin。但当你想手动操作 Future 的时候,就会碰到它:
use std::pin::Pin;
use std::future::Future;
// 返回一个 boxed Future 的时候
fn make_future() -> Pin<Box<dyn Future<Output = i32> + Send>> {
Box::pin(async { 42 })
}
// 或者用 pin! 宏(更轻量)
use tokio::pin;
async fn example() {
let fut = async { 42 };
pin!(fut); // fut 现在被 Pin 住了
// fut 不能被移动了,但可以安全地 .await
let result = fut.await;
}Unpin 是什么?
大多数类型都实现了 Unpin,意味着它们可以安全地被移动。async fn 生成的 Future 默认不实现 Unpin,因为它们可能包含自引用。
但你写的普通 struct 默认是 Unpin 的。只有当你在 struct 里存了 Pin<Box<dyn Future>> 的时候,才需要关心 Unpin。
经验法则:90% 的情况你不需要直接碰 Pin。
tokio::spawn要求 Future 是Send + 'static,它会帮你处理 Pin。只有在写底层库或者手动组合 Future 的时候才需要深入理解。
tokio::spawn vs spawn_blocking:别搞混了
这是最常见的误用。
tokio::spawn:用于异步任务
tokio::spawn(async move {
// 这里面只能跑异步代码
let result = fetch_from_db().await;
process(result).await;
});spawn_blocking:用于同步阻塞代码
tokio::task::spawn_blocking(move || {
// 这里面跑同步阻塞代码
// 比如:文件 IO、加密计算、调用同步库
std::fs::read_to_string("large_file.txt").unwrap()
}).await.unwrap();为什么不能在 spawn 里跑阻塞代码?
tokio worker thread 的工作方式:
worker 线程:
loop {
task = ready_queue.pop(); // 取一个就绪任务
task.poll(); // 执行,直到 .await
// 如果 task 里有阻塞代码(没有 .await)
// 这个 worker 就卡住了,其他任务都得等
}tokio 的 worker 线程是协作式调度——任务必须主动 .await 才会让出控制权。如果你在 spawn 里跑了一个 std::fs::read_to_string(),这个 worker 线程就被独占了,其他任务全部饿死。
真实的坑: 我在 spawn 里调了一个同步的 HTTP 客户端(reqwest::blocking),测试环境没问题(并发低),上线后 QPS 稍高就超时——因为所有 worker 线程都被阻塞在 HTTP 请求上了。
经验法则:在
tokio::spawn里用了任何std::的阻塞操作,都是 bug。 用tokio::fs、tokio::net、reqwest(异步版)替代。如果必须用同步库,包一层spawn_blocking。
select! 的陷阱
tokio::select! 很好用,但有几个坑。
坑一:取消安全性(Cancellation Safety)
tokio::select! {
result = fetch_a() => { /* A 先完成 */ }
result = fetch_b() => { /* B 先完成 */ }
}当 fetch_a() 先完成时,fetch_b() 的 Future 会被直接丢弃。如果 fetch_b() 正好在 .await 一个 IO 操作,那个操作会直接取消。
大多数时候这不是问题——TCP 读写、HTTP 请求被取消是安全的。但有些操作不是:
// 危险:select! 里用 channel
tokio::select! {
msg = rx.recv() => { /* 收到消息 */ }
_ = timeout() => { /* 超时 */ }
}
// 如果 recv() 正在等待,select! 取消它
// 消息可能已经从 channel 里取出来了,但你没处理
// 这叫「消息丢失」解决方案: 对于不安全取消的操作,用 tokio::sync::mpsc::Receiver::recv()(它是取消安全的),或者用 CancellationToken。
坑二:分支的公平性
loop {
tokio::select! {
_ = async_op_1() => { /* 处理 1 */ }
_ = async_op_2() => { /* 处理 2 */ }
}
}select! 的分支是随机选择的(不是按顺序)。如果两个分支同时就绪,每次循环随机选一个。这意味着 async_op_1 和 async_op_2 不会严格交替执行。
如果你需要严格的优先级,用 biased 关键字:
loop {
tokio::select! {
biased; // 按声明顺序优先
_ = high_priority() => { /* 优先处理 */ }
_ = low_priority() => { /* 其次 */ }
}
}坑三:在循环里 select
// 常见模式:循环处理多个 channel
loop {
tokio::select! {
msg = rx1.recv() => { handle(msg); }
msg = rx2.recv() => { handle(msg); }
else => { break; } // 所有 channel 关闭
}
}这个模式本身没问题,但要注意:else 分支只有在所有 recv() 都返回 None(channel 关闭)时才会触发。 如果你想在任意一个 channel 关闭时退出,需要额外处理。
经验法则:
select!里放的 Future 必须是「取消安全」的。 不确定的话,查文档。tokio 的文档会标注哪些函数是取消安全的。
异步取消:最隐蔽的 bug
这是 Rust 异步编程里最容易被忽略的问题。
场景:数据库事务
async fn transfer_money(from: AccountId, to: AccountId, amount: i64) -> Result<()> {
let mut tx = db.begin().await?;
// 扣款
tx.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, from).await?;
// 加款
tx.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, to).await?;
tx.commit().await?;
Ok(())
}如果这个 Future 在 扣款 和 加款 之间被取消(比如 select! 跳到了另一个分支),会发生什么?
钱扣了,但没加上去。
因为 tx 被 drop 了,Rust 不会自动 rollback——它怎么知道你是想 commit 还是 rollback?
解决方案:用 scopeguard 或者 Drop
async fn transfer_money(from: AccountId, to: AccountId, amount: i64) -> Result<()> {
let mut tx = db.begin().await?;
// 确保在任何情况下(包括取消)都会 rollback
let _guard = scopeguard::guard(tx, |mut tx| {
// 注意:这里不能 .await,所以要用 blocking
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(tx.rollback()).ok();
});
});
_guard.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, from).await?;
_guard.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, to).await?;
// 成功时手动 commit,然后 disarm guard
_guard.commit().await?;
scopeguard::ScopeGuard::into_inner(_guard); // 不再 rollback
Ok(())
}更简洁的方案: 大多数数据库库(sqlx)的事务在 drop 时会自动 rollback。但你得确认你的库有这个行为。
经验法则:在异步代码里,任何「多步操作」都要考虑「如果在中间被取消会怎样」。 数据库事务、文件写入、HTTP 请求,都是重灾区。
Send 约束:tokio::spawn 的隐藏门槛
tokio::spawn(async move {
let data = Rc::new(42); // Rc 不是 Send!
// ...
});编译报错:Rc<i32> cannot be sent between threads safely
为什么 tokio::spawn 要求 Send?
因为 multi_thread runtime 会把任务在线程间移动。一个 Future 可能在线程 A 上创建,在线程 B 上继续执行。如果它包含了 Rc、RefCell 等非线程安全类型,移动后就会出问题。
什么时候会踩这个坑?
最常见的场景是:在 async fn 里用了 Rc 或者 RefCell。
// 编译不过
async fn handle_request() {
let cache = Rc::new(HashMap::new()); // Rc 不是 Send
let data = fetch_data().await; // 这个 .await 让 Future 可能被跨线程移动
cache.insert("key", data);
}
// 解决方案:用 Arc
async fn handle_request() {
let cache = Arc::new(Mutex::new(HashMap::new())); // Arc + Mutex 是 Send
let data = fetch_data().await;
cache.lock().unwrap().insert("key", data);
}另一个隐藏的坑: std::sync::MutexGuard 不是 Send。如果你在 .await 之前 lock 了一个 std::sync::Mutex,然后 .await 了,编译器会报错。
// 编译不过
async fn bad() {
let lock = std::sync::Mutex::new(vec![]);
let guard = lock.lock().unwrap(); // MutexGuard 不是 Send
some_async_op().await; // Future 被移动时,guard 也被移动
guard.push(1);
}
// 解决方案:缩小锁的范围
async fn good() {
let lock = std::sync::Mutex::new(vec![]);
{
let mut guard = lock.lock().unwrap();
guard.push(1);
} // guard 在 .await 之前 drop
some_async_op().await;
}
// 或者用 tokio::sync::Mutex(支持异步等待)
async fn also_good() {
let lock = tokio::sync::Mutex::new(vec![]);
let mut guard = lock.lock().await; // 异步 lock,可以跨 .await 持有
some_async_op().await;
guard.push(1);
}经验法则:在
.await之前持有任何锁,都是潜在的Send问题。 要么缩小锁的范围,要么用tokio::sync::Mutex,要么用Arc<Mutex>替代Rc<RefCell>。
JoinSet:比 JoinHandle 更好用
以前 spawn 多个任务,收集结果是这样写的:
let mut handles = vec![];
for i in 0..10 {
handles.push(tokio::spawn(async move {
process(i).await
}));
}
for handle in handles {
let result = handle.await.unwrap();
// 处理结果
}问题是:任务按顺序等待,不是按完成顺序。 如果第 0 个任务最慢,你得等它完成才能处理第 1 个的结果。
JoinSet:按完成顺序处理
use tokio::task::JoinSet;
let mut set = JoinSet::new();
for i in 0..10 {
set.spawn(async move {
process(i).await
});
}
// 谁先完成就先处理谁
while let Some(result) = set.join_next().await {
let value = result.unwrap();
println!("got: {}", value);
}JoinSet 的自动清理
JoinSet 被 drop 时,里面还没完成的任务会被自动取消。这比 Vec<JoinHandle> 好——JoinHandle 被 drop 时任务还在跑,只是你拿不到结果了。
async fn with_timeout() {
let mut set = JoinSet::new();
set.spawn(slow_task_1());
set.spawn(slow_task_2());
tokio::select! {
_ = set.join_next() => {
// 有一个完成了,处理结果
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
// 超时,set 被 drop,所有任务自动取消
}
}
}经验法则:需要 spawn 多个任务并「按完成顺序处理」,用
JoinSet。 需要「等待所有完成」,用futures::future::join_all。需要「等待任意一个完成」,用select!。
tokio::task::yield_now():什么时候该让出
有些场景下,你想主动让出 CPU:
async fn process_large_batch(items: Vec<Item>) {
for (i, item) in items.iter().enumerate() {
process_item(item).await;
// 每处理 100 个,让出一次
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
}什么时候需要 yield?
- 长循环没有
.await——纯 CPU 计算的循环不会让出控制权,其他任务饿死 - 公平性要求高——不想让一个任务独占 worker 线程
- 测试——有些竞态条件在 yield 时才会暴露
// 不好的写法:纯 CPU 循环不让出
async fn cpu_heavy() {
for i in 0..1_000_000 {
compute(i); // 没有 .await,不会让出
}
}
// 好的写法:定期让出
async fn cpu_heavy_fair() {
for i in 0..1_000_000 {
compute(i);
if i % 10000 == 0 {
tokio::task::yield_now().await;
}
}
}
// 更好的写法:用 spawn_blocking
async fn cpu_heavy_best() {
tokio::task::spawn_blocking(move || {
for i in 0..1_000_000 {
compute(i);
}
}).await.unwrap();
}经验法则:如果你的 async 函数里有一个循环,但循环体里没有
.await,要么加yield_now(),要么把整个循环丢进spawn_blocking。
总结
Rust 异步编程的坑,本质上来自两个地方:
- async/await 语法糖藏住了复杂性——你以为在写同步代码,其实在写状态机
- tokio 的协作式调度需要你主动配合——不
.await就不让出,这是特性也是陷阱
最核心的几条规则:
spawn里不要有阻塞代码——用spawn_blocking.await之前不要持有锁——缩小范围或换tokio::sync::Mutexselect!里的 Future 要取消安全——查文档确认- 多步操作要考虑取消——用
scopeguard或 Drop 保护 - CPU 密集型任务不要跑在 async 里——用
spawn_blocking或线程池
异步编程不是「加个 .await 就行」。但理解了这些底层机制之后,你会发现 tokio 的设计其实很优雅——它给了你足够的控制权,同时也给了你足够的安全网。
最后一条建议:如果你的异步代码出了 bug,先检查是不是上面这些坑里的某一个。 我排查过的异步 bug,90% 都是这五个问题之一。