返回文章列表

Rust 异步编程:tokio 那些没人讲透的事

786·6 分钟阅读
RustTokio异步编程

异步编程最大的谎言是「加个 .await 就行了」。

写网络编程那篇文章的时候,很多读者留言说「tokio 会用,但总觉得哪里不对劲」。我太理解这种感觉了——async/await 语法把异步编程的门槛降到了地板,但也把理解难度抬到了天花板。

你写了 something().await,但你真的知道这行代码背后发生了什么吗?

这篇文章不教你怎么用 tokio——那些教程网上一大把。我想讲的是那些你以为懂了、但其实没懂的东西


先搞清楚一件事:async 到底在干什么

很多人把 async 理解成「多线程」或者「并行」。这是错的。

// 这两个东西完全不同
async fn fetch_data() -> String { ... }  // 异步函数
fn process_data() -> String { ... }      // 普通函数

async fn 编译后会变成一个状态机,不是一段「并行执行」的代码。它做的事情是:在遇到阻塞点(.await)的时候,把控制权交出去,让别的任务有机会跑。

同步世界(你熟悉的):
┌──────────────────────────────────────────────┐
│ 任务 A: ████████████████████ (全程占用 CPU)   │
│ 任务 B:                  ████████████████████ │
│ 总耗时: A + B                                  │
└──────────────────────────────────────────────┘

异步世界(tokio 在做的事):
┌──────────────────────────────────────────────┐
│ 任务 A: ██░░██░░██░░██ (IO 等待时让出 CPU)    │
│ 任务 B: ░░██░░██░░██░░ (A 等 IO 时跑起来)     │
│ 总耗时: max(A, B) 而不是 A + B                │
└──────────────────────────────────────────────┘

关键区别:异步不是让代码跑得更快,而是让等待的时间不浪费。

如果你的代码全是 CPU 计算没有 IO,async 对你没有任何帮助,反而会因为状态机的开销让性能更差。

经验法则:async 解决的是 IO 密集型问题,不是 CPU 密集型问题。 如果你的服务大部分时间在等数据库、等网络、等磁盘,用 async。如果在做大量计算,用线程池。


tokio Runtime:你选对了吗?

tokio 有两种 runtime,选错了性能差距可以到几倍。

current_thread:单线程,最轻量

#[tokio::main(flavor = "current_thread")]
async fn main() {
    // 所有任务跑在一个线程上
    // 没有线程切换开销
    // 适合 IO 密集、并发量不大的场景
}

multi_thread:多线程,默认选项

#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
    // 任务在线程间自动调度
    // 适合高并发、混合负载
    // worker_threads 默认 = CPU 核心数
}

怎么选?

场景 选哪个 原因
CLI 工具、简单 HTTP 服务 current_thread 轻量,启动快
Web API、微服务 multi_thread 需要利用多核
嵌入式、资源受限 current_thread 内存占用小
混合负载(IO + 计算) multi_thread 计算任务不会阻塞 IO

我踩过的坑: 一开始我把所有服务都用 multi_thread,觉得「多线程肯定比单线程好」。后来发现一个低并发的内部工具,用 current_thread 反而性能更好——因为省掉了线程切换的开销。

经验法则:不知道选什么就用默认的 multi_thread。但如果你的服务 QPS 不高(< 1000),试试 current_thread,可能有惊喜。


Pin 和 Unpin:最劝退的概念

这是 Rust 异步编程里最难理解的概念,没有之一。

为什么需要 Pin?

先看一个场景:

async fn example() {
    let data = vec![1, 2, 3];
    let reference = &data;  // 引用了栈上的 data
 
    some_async_op().await;  // 这里让出控制权
 
    println!("{:?}", reference);  // 继续用 reference
}

这个函数编译后变成一个状态机。await 之前的状态需要保存 datareference。问题是:如果这个 Future 被移动到内存的另一个位置,reference 就变成野指针了。

Future 在内存位置 A:
  data = [1, 2, 3]     ← 栈上
  reference = 0x7fff... ← 指向 data
 
Future 被移动到内存位置 B:
  data = [1, 2, 3]     ← 还在 A 的栈上
  reference = 0x7fff... ← 还是指向 A 的栈,但 A 已经不是这个 Future
  💥 悬垂指针

Pin 的作用就是:禁止 Future 被移动。 一旦 Pin 住,这个 Future 就钉在内存里不动了。

什么时候需要关心 Pin?

大多数时候你不需要。tokio::spawn.await 会帮你处理 Pin。但当你想手动操作 Future 的时候,就会碰到它:

use std::pin::Pin;
use std::future::Future;
 
// 返回一个 boxed Future 的时候
fn make_future() -> Pin<Box<dyn Future<Output = i32> + Send>> {
    Box::pin(async { 42 })
}
 
// 或者用 pin! 宏(更轻量)
use tokio::pin;
 
async fn example() {
    let fut = async { 42 };
    pin!(fut);  // fut 现在被 Pin 住了
 
    // fut 不能被移动了,但可以安全地 .await
    let result = fut.await;
}

Unpin 是什么?

大多数类型都实现了 Unpin,意味着它们可以安全地被移动。async fn 生成的 Future 默认不实现 Unpin,因为它们可能包含自引用。

但你写的普通 struct 默认是 Unpin 的。只有当你在 struct 里存了 Pin<Box<dyn Future>> 的时候,才需要关心 Unpin

经验法则:90% 的情况你不需要直接碰 Pin。 tokio::spawn 要求 Future 是 Send + 'static,它会帮你处理 Pin。只有在写底层库或者手动组合 Future 的时候才需要深入理解。


tokio::spawn vs spawn_blocking:别搞混了

这是最常见的误用。

tokio::spawn:用于异步任务

tokio::spawn(async move {
    // 这里面只能跑异步代码
    let result = fetch_from_db().await;
    process(result).await;
});

spawn_blocking:用于同步阻塞代码

tokio::task::spawn_blocking(move || {
    // 这里面跑同步阻塞代码
    // 比如:文件 IO、加密计算、调用同步库
    std::fs::read_to_string("large_file.txt").unwrap()
}).await.unwrap();

为什么不能在 spawn 里跑阻塞代码?

tokio worker thread 的工作方式:
 
worker 线程:
  loop {
      task = ready_queue.pop();  // 取一个就绪任务
      task.poll();               // 执行,直到 .await
      // 如果 task 里有阻塞代码(没有 .await)
      // 这个 worker 就卡住了,其他任务都得等
  }

tokio 的 worker 线程是协作式调度——任务必须主动 .await 才会让出控制权。如果你在 spawn 里跑了一个 std::fs::read_to_string(),这个 worker 线程就被独占了,其他任务全部饿死。

真实的坑: 我在 spawn 里调了一个同步的 HTTP 客户端(reqwest::blocking),测试环境没问题(并发低),上线后 QPS 稍高就超时——因为所有 worker 线程都被阻塞在 HTTP 请求上了。

经验法则:在 tokio::spawn 里用了任何 std:: 的阻塞操作,都是 bug。tokio::fstokio::netreqwest(异步版)替代。如果必须用同步库,包一层 spawn_blocking


select! 的陷阱

tokio::select! 很好用,但有几个坑。

坑一:取消安全性(Cancellation Safety)

tokio::select! {
    result = fetch_a() => { /* A 先完成 */ }
    result = fetch_b() => { /* B 先完成 */ }
}

fetch_a() 先完成时,fetch_b() 的 Future 会被直接丢弃。如果 fetch_b() 正好在 .await 一个 IO 操作,那个操作会直接取消。

大多数时候这不是问题——TCP 读写、HTTP 请求被取消是安全的。但有些操作不是:

// 危险:select! 里用 channel
tokio::select! {
    msg = rx.recv() => { /* 收到消息 */ }
    _ = timeout() => { /* 超时 */ }
}
 
// 如果 recv() 正在等待,select! 取消它
// 消息可能已经从 channel 里取出来了,但你没处理
// 这叫「消息丢失」

解决方案: 对于不安全取消的操作,用 tokio::sync::mpsc::Receiver::recv()(它是取消安全的),或者用 CancellationToken

坑二:分支的公平性

loop {
    tokio::select! {
        _ = async_op_1() => { /* 处理 1 */ }
        _ = async_op_2() => { /* 处理 2 */ }
    }
}

select! 的分支是随机选择的(不是按顺序)。如果两个分支同时就绪,每次循环随机选一个。这意味着 async_op_1async_op_2 不会严格交替执行。

如果你需要严格的优先级,用 biased 关键字:

loop {
    tokio::select! {
        biased;  // 按声明顺序优先
        _ = high_priority() => { /* 优先处理 */ }
        _ = low_priority() => { /* 其次 */ }
    }
}

坑三:在循环里 select

// 常见模式:循环处理多个 channel
loop {
    tokio::select! {
        msg = rx1.recv() => { handle(msg); }
        msg = rx2.recv() => { handle(msg); }
        else => { break; }  // 所有 channel 关闭
    }
}

这个模式本身没问题,但要注意:else 分支只有在所有 recv() 都返回 None(channel 关闭)时才会触发。 如果你想在任意一个 channel 关闭时退出,需要额外处理。

经验法则:select! 里放的 Future 必须是「取消安全」的。 不确定的话,查文档。tokio 的文档会标注哪些函数是取消安全的。


异步取消:最隐蔽的 bug

这是 Rust 异步编程里最容易被忽略的问题。

场景:数据库事务

async fn transfer_money(from: AccountId, to: AccountId, amount: i64) -> Result<()> {
    let mut tx = db.begin().await?;
 
    // 扣款
    tx.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, from).await?;
 
    // 加款
    tx.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, to).await?;
 
    tx.commit().await?;
    Ok(())
}

如果这个 Future 在 扣款加款 之间被取消(比如 select! 跳到了另一个分支),会发生什么?

钱扣了,但没加上去。

因为 tx 被 drop 了,Rust 不会自动 rollback——它怎么知道你是想 commit 还是 rollback?

解决方案:用 scopeguard 或者 Drop

async fn transfer_money(from: AccountId, to: AccountId, amount: i64) -> Result<()> {
    let mut tx = db.begin().await?;
 
    // 确保在任何情况下(包括取消)都会 rollback
    let _guard = scopeguard::guard(tx, |mut tx| {
        // 注意:这里不能 .await,所以要用 blocking
        tokio::task::block_in_place(|| {
            tokio::runtime::Handle::current().block_on(tx.rollback()).ok();
        });
    });
 
    _guard.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, from).await?;
    _guard.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, to).await?;
 
    // 成功时手动 commit,然后 disarm guard
    _guard.commit().await?;
    scopeguard::ScopeGuard::into_inner(_guard); // 不再 rollback
    Ok(())
}

更简洁的方案: 大多数数据库库(sqlx)的事务在 drop 时会自动 rollback。但你得确认你的库有这个行为。

经验法则:在异步代码里,任何「多步操作」都要考虑「如果在中间被取消会怎样」。 数据库事务、文件写入、HTTP 请求,都是重灾区。


Send 约束:tokio::spawn 的隐藏门槛

tokio::spawn(async move {
    let data = Rc::new(42);  // Rc 不是 Send!
    // ...
});

编译报错:Rc<i32> cannot be sent between threads safely

为什么 tokio::spawn 要求 Send?

因为 multi_thread runtime 会把任务在线程间移动。一个 Future 可能在线程 A 上创建,在线程 B 上继续执行。如果它包含了 RcRefCell 等非线程安全类型,移动后就会出问题。

什么时候会踩这个坑?

最常见的场景是:在 async fn 里用了 Rc 或者 RefCell

// 编译不过
async fn handle_request() {
    let cache = Rc::new(HashMap::new());  // Rc 不是 Send
    let data = fetch_data().await;        // 这个 .await 让 Future 可能被跨线程移动
    cache.insert("key", data);
}
 
// 解决方案:用 Arc
async fn handle_request() {
    let cache = Arc::new(Mutex::new(HashMap::new()));  // Arc + Mutex 是 Send
    let data = fetch_data().await;
    cache.lock().unwrap().insert("key", data);
}

另一个隐藏的坑: std::sync::MutexGuard 不是 Send。如果你在 .await 之前 lock 了一个 std::sync::Mutex,然后 .await 了,编译器会报错。

// 编译不过
async fn bad() {
    let lock = std::sync::Mutex::new(vec![]);
    let guard = lock.lock().unwrap();  // MutexGuard 不是 Send
    some_async_op().await;             // Future 被移动时,guard 也被移动
    guard.push(1);
}
 
// 解决方案:缩小锁的范围
async fn good() {
    let lock = std::sync::Mutex::new(vec![]);
    {
        let mut guard = lock.lock().unwrap();
        guard.push(1);
    }  // guard 在 .await 之前 drop
    some_async_op().await;
}
 
// 或者用 tokio::sync::Mutex(支持异步等待)
async fn also_good() {
    let lock = tokio::sync::Mutex::new(vec![]);
    let mut guard = lock.lock().await;  // 异步 lock,可以跨 .await 持有
    some_async_op().await;
    guard.push(1);
}

经验法则:在 .await 之前持有任何锁,都是潜在的 Send 问题。 要么缩小锁的范围,要么用 tokio::sync::Mutex,要么用 Arc<Mutex> 替代 Rc<RefCell>


JoinSet:比 JoinHandle 更好用

以前 spawn 多个任务,收集结果是这样写的:

let mut handles = vec![];
for i in 0..10 {
    handles.push(tokio::spawn(async move {
        process(i).await
    }));
}
 
for handle in handles {
    let result = handle.await.unwrap();
    // 处理结果
}

问题是:任务按顺序等待,不是按完成顺序。 如果第 0 个任务最慢,你得等它完成才能处理第 1 个的结果。

JoinSet:按完成顺序处理

use tokio::task::JoinSet;
 
let mut set = JoinSet::new();
for i in 0..10 {
    set.spawn(async move {
        process(i).await
    });
}
 
// 谁先完成就先处理谁
while let Some(result) = set.join_next().await {
    let value = result.unwrap();
    println!("got: {}", value);
}

JoinSet 的自动清理

JoinSet 被 drop 时,里面还没完成的任务会被自动取消。这比 Vec<JoinHandle> 好——JoinHandle 被 drop 时任务还在跑,只是你拿不到结果了。

async fn with_timeout() {
    let mut set = JoinSet::new();
    set.spawn(slow_task_1());
    set.spawn(slow_task_2());
 
    tokio::select! {
        _ = set.join_next() => {
            // 有一个完成了,处理结果
        }
        _ = tokio::time::sleep(Duration::from_secs(5)) => {
            // 超时,set 被 drop,所有任务自动取消
        }
    }
}

经验法则:需要 spawn 多个任务并「按完成顺序处理」,用 JoinSet 需要「等待所有完成」,用 futures::future::join_all。需要「等待任意一个完成」,用 select!


tokio::task::yield_now():什么时候该让出

有些场景下,你想主动让出 CPU:

async fn process_large_batch(items: Vec<Item>) {
    for (i, item) in items.iter().enumerate() {
        process_item(item).await;
 
        // 每处理 100 个,让出一次
        if i % 100 == 0 {
            tokio::task::yield_now().await;
        }
    }
}

什么时候需要 yield?

  1. 长循环没有 .await——纯 CPU 计算的循环不会让出控制权,其他任务饿死
  2. 公平性要求高——不想让一个任务独占 worker 线程
  3. 测试——有些竞态条件在 yield 时才会暴露
// 不好的写法:纯 CPU 循环不让出
async fn cpu_heavy() {
    for i in 0..1_000_000 {
        compute(i);  // 没有 .await,不会让出
    }
}
 
// 好的写法:定期让出
async fn cpu_heavy_fair() {
    for i in 0..1_000_000 {
        compute(i);
        if i % 10000 == 0 {
            tokio::task::yield_now().await;
        }
    }
}
 
// 更好的写法:用 spawn_blocking
async fn cpu_heavy_best() {
    tokio::task::spawn_blocking(move || {
        for i in 0..1_000_000 {
            compute(i);
        }
    }).await.unwrap();
}

经验法则:如果你的 async 函数里有一个循环,但循环体里没有 .await,要么加 yield_now(),要么把整个循环丢进 spawn_blocking


总结

Rust 异步编程的坑,本质上来自两个地方:

  1. async/await 语法糖藏住了复杂性——你以为在写同步代码,其实在写状态机
  2. tokio 的协作式调度需要你主动配合——不 .await 就不让出,这是特性也是陷阱

最核心的几条规则:

  1. spawn 里不要有阻塞代码——用 spawn_blocking
  2. .await 之前不要持有锁——缩小范围或换 tokio::sync::Mutex
  3. select! 里的 Future 要取消安全——查文档确认
  4. 多步操作要考虑取消——用 scopeguard 或 Drop 保护
  5. CPU 密集型任务不要跑在 async 里——用 spawn_blocking 或线程池

异步编程不是「加个 .await 就行」。但理解了这些底层机制之后,你会发现 tokio 的设计其实很优雅——它给了你足够的控制权,同时也给了你足够的安全网。

最后一条建议:如果你的异步代码出了 bug,先检查是不是上面这些坑里的某一个。 我排查过的异步 bug,90% 都是这五个问题之一。