如何处理并发, Green Thread? Async? OS Thread?
当你学习完基础的编程语言语法后, 你可能会接触到并发编程相关的概念, 例如协程, 线程, 异步编程等 这些概念有时会让人感到困惑, 因为它们看起来很相似, 有的是很甚至自相矛盾, 但实际上它们解决的问题和实现方式是不同的 最近读了一本书叫做Asynchronous Programming in Rust 不得不说让人豁然开朗, 有种丰收的喜悦, 感觉浑身充满了力量 因此本人决定胡言乱语一通, 记录一下自己的理解
本文有 AI 修饰 若我的无知冒犯了你, 欢迎在评论区补充!
背景
在这之前, 让我们先明白并发和并行所解决的问题 让我们把调度的单位叫做"任务"(Task), 他可以指代任何需要调度的工作单元, 例如函数调用, 协程, 线程等
什么是任务?
-
例如多线程你新开的线程就是一个任务
- 在Rust中可以用std::thread::spawn新建一个线程:
let handle = std::thread::spawn(|| { // 这是一个任务 }); // 你在这有了handle之后就可以等待这个任务完成 handle.join().unwrap();- 在Java中你新建的Thread也是一个任务:
public class MyThread extends Thread { public void run() { // 这是一个任务 } } MyThread t = new MyThread(); t.start();- 协程库中新建的协程也是一个任务:
go func() { // 这是一个任务 }()- 一个future(Rust)或者promise(JS)也是一个任务:
async fn task() { // 这是一个任务(Rust) }async function task() { // 这是一个任务(JS) }
在这之前,让我们先理解并发和并行所解决的问题
让我们把调度的单位叫做"任务"(Task),它可以指代任何需要调度的工作单元,例如函数调用、协程、线程等
运行时/调度器
任务的完成需要一个关键的资源——CPU时间片 我们把提供任务 CPU 时间片资源的组件叫做运行时/调度器
在 OS thread 模型中, 操作系统内核充当调度器的角色, 它负责在多个线程之间分配 CPU 时间片
在 Green Thread 模型中, 语言运行时或库实现了自己的调度器, 它在用户态管理多个 Green Thread 的调度
在异步编程模型中, 运行时/调度器负责管理任务的状态转换和执行顺序
并发和并行解决的核心问题(来源书籍第一个章节)
让我们用一个生动的例子来理解:想象你经营一家只卖Guinness啤酒的酒吧,制作一杯完美的Guinness需要:
- 倾斜45度倒酒至3/4满(15秒)
- 等待沉淀(100秒) ← 注意这里只是等待
- 倒满剩余部分(5秒)
- 上菜
每位调酒师就像一个CPU核心,每个订单就是一个任务.
方案1: 同步执行(一个调酒师)
调酒师: 接单 → 倒酒(15s) → [等待100s] → 倒满(5s) → 上菜 → 接下一单
问题: 调酒师在等待沉淀的100秒里什么都不做,只是站着看着酒杯.每小时只能服务30个客人,效率极低
对应到编程: 这就像同步阻塞的代码,线程在等待IO时被完全阻塞,CPU空闲浪费.
// 同步阻塞示例
fn serve_sync() {
pour(); // 15秒
sleep(100); // ← CPU在这里干等着,什么都不做
top_up(); // 5秒
serve();
}方案2: 并行执行(12个调酒师,同步工作)
12个调酒师,每人独立处理订单,互不干扰
改进: 吞吐量达到360杯/小时(12 × 30) 问题: 成本太高!每个调酒师仍然有大量等待时间,资源利用率很低
对应到编程: 这就像为每个IO请求都创建一个新线程,虽然能处理更多请求,但线程开销巨大(内存、上下文切换)
方案3: 并发执行(一个调酒师,异步工作)
调酒师: 接单1 → 倒酒1(15s) → 放下让它沉淀
→ 接单2 → 倒酒2(15s) → 放下让它沉淀
→ 接单3 → 倒酒3(15s) → 放下让它沉淀
→ ...
→ 订单1沉淀好了 → 倒满1(5s) → 上菜1
→ 订单2沉淀好了 → 倒满2(5s) → 上菜2
关键突破: 调酒师在等待沉淀时不再闲着,而是继续接新订单! 结果: 实际吞吐量达到180杯/小时,理论最大240杯/小时
对应到编程: 这就是异步编程!一个线程可以管理多个IO任务,在等待IO时切换到其他任务.
// 异步非阻塞示例
async fn serve_async() {
pour().await; // 15秒,实际工作
tokio::spawn(async { // 启动一个后台任务等待沉淀
sleep(100).await; // 在等待期间,当前线程可以处理其他订单
top_up().await; // 5秒,实际工作
serve().await;
});
// 立即返回,可以接下一个订单
}方案4: 并行+并发(2个调酒师,异步协作)
两个调酒师都异步工作,且可以"偷"对方的任务:
- 调酒师1可以开始倒酒并放下沉淀
- 调酒师2如果发现调酒师1在忙,可以把已沉淀好的酒倒满并上菜
最佳结果: 460杯/小时(每人230杯/小时),接近理论极限
对应到编程: 这就是现代异步运行时(如Tokio)的工作方式——多个工作线程共享一个任务队列,动态负载均衡
1. 最初始的想法, OS 线程
How it works?
操作系统为我们提供了线程这一抽象资源, 让我们可以并行处理多个任务.每个线程的本质在于资源的分割:同一进程中的线程共享内存空间、文件描述符(fd)、UID 等资源,但每个线程拥有自己独立的栈空间(通常几百KB到几MB)和寄存器上下文(包括程序计数器PC、栈指针SP等).操作系统内核作为调度者,负责线程的创建、销毁和调度.
当线程进行上下文切换时,CPU 首先保存当前线程的寄存器状态(PC、SP、通用寄存器等)到内核的线程控制块(TCB)中,然后切换到内核态,OS调度器选择下一个要运行的线程,恢复其寄存器状态,最后从内核态返回用户态继续执行.整个过程虽然涉及内核态切换,但因为同一进程内的线程共享内存空间(页表相同),不需要刷新TLB,所以线程切换比进程切换快得多.
代表性语言和示例
几乎所有语言都提供线程支持:
C/C++ (POSIX Threads):
pthread_t thread;
pthread_create(&thread, NULL, task_function, arg);
pthread_join(thread, NULL);Java:
Thread thread = new Thread(() -> {
// 任务代码
});
thread.start();
thread.join();Python:
import threading
thread = threading.Thread(target=task_function, args=(arg,))
thread.start()
thread.join()Rust:
let handle = std::thread::spawn(|| {
// 任务代码
});
handle.join().unwrap();存在的问题
OS 线程虽然好理解,但高并发的时候问题就来了。首先吃内存:每个线程要几百KB到几MB的栈,你想开10000个线程?光栈就得好几个G。其次虽然切线程比切进程快,但还是得在用户态和内核态之间来回切,还要保存恢复寄存器,频繁切的话性能就不行了。
共享内存:方便数据交换,但也引入了数据竞争、死锁等经典并发问题,需要小心翼翼地使用各种同步原语, 复杂的情况男的要死. 最后,由于调度权在内核手中,OS可能在任意时刻抢占你的线程,而内核的调度策略是通用的,不一定适合你的应用场景,这种不可控性有时会带来意想不到的性能问题
2. 绿色线程(Green Threads) / 纤程(Fibers)
What are they?
Green Thread 本质上是语言运行时在用户态实现的轻量级线程,它模仿了 OS 的线程模型,但调度完全由用户态的运行时控制,而不是内核. 每个 Green Thread 仍然有独立的栈空间(但通常只有2-8KB起步)和独立的寄存器上下文,关键区别在于它们的调度不需要内核参与,整个调度过程都在用户态完成
How it works?
1. M:N 线程模型
大多数 Green Thread 实现采用 M:N 模型:
- M 个 Green Threads 映射到 N 个 OS 线程上(M >> N)
- 用户态的调度器决定哪个 Green Thread 运行在哪个 OS 线程上
例如 Go 的 GMP 模型:
- G (Goroutine): Green Thread,代表一个任务
- M (Machine): OS 线程,真正执行代码
- P (Processor): 调度器上下文,维护 Goroutine 队列
[G1, G2, G3, G4, G5, ...] (成千上万个 Goroutines)
↓ 用户态调度器
[P1] [P2] [P3] [P4] (GOMAXPROCS 个处理器)
↓ ↓ ↓ ↓
[M1] [M2] [M3] [M4] (OS线程,通常和CPU核心数相同)
2. 上下文切换过程
当 Green Thread 让出 CPU 时(例如在 I/O 操作点),运行时首先保存当前 Goroutine 的栈指针(SP)和程序计数器(PC),然后从调度器的运行队列中取出下一个可运行的 Goroutine,恢复其 SP 和 PC,继续执行。这里的关键在于:整个过程在用户态完成,不需要系统调用,不需要内核态切换!相比 OS 线程的上下文切换,这个开销要小得多
3. 动态栈增长
Green Thread 的栈这么小(Go 才2KB起步),不够用咋办?
栈复制(Stack Copying) - Go 用的方案:
初始栈: 2KB
[func_a 的栈帧]
[func_b 的栈帧] ← SP (快满了!)
检测到栈快满 → 分配新的更大栈(例如4KB)
新栈: 4KB
[func_a 的栈帧] ← 复制
[func_b 的栈帧] ← 复制
[空闲空间] ← 更多空间
然后更新所有指向旧栈的指针 → SP 指向新栈
栈分段(Segmented Stack) - 早期 Go 的方案(已废弃):
栈段1: [func_a] → 栈段2: [func_b] → 栈段3: [func_c]
(2KB) (2KB) (2KB)
这方案有个坑叫"热分裂"(hot split):要是有个函数频繁调用,刚好卡在栈边界上,就会不停地分配释放栈段,性能拉胯
4. 非阻塞 I/O 的透明处理
你写的代码看着像是阻塞的,其实背地里是非阻塞:
// 用户写的代码
conn.Read(buf) // 看起来是阻塞调用运行时背地里干的事:
- 发起非阻塞系统调用(比如 epoll_wait)
- 要是数据还没准备好 → 注册个 epoll 事件
- 保存当前 Goroutine 的状态
- 把 CPU 让出来,跑其他 Goroutine
- epoll 事件来了 → 把这个 Goroutine 唤醒
- 继续执行 conn.Read 后面的代码
5. FFI 和系统调用的处理
// Cgo 调用
C.some_c_function()- 把当前 Goroutine 绑到一个专门的 OS 线程上
- 这个 OS 线程有标准的大栈(几MB那种)
- 在这个标准栈上执行 C 函数
- C 函数返回后,Goroutine 解绑,又可以到处跑了
- 在这个标准栈上执行 C 函数
- C 函数返回后,Goroutine 解绑,可以继续迁移
代表性语言和示例
Go (Goroutine):
// 创建100万个并发任务
for i := 0; i < 1000000; i++ {
go func(id int) {
result := doWork(id) // 看起来是阻塞,实际非阻塞
fmt.Println(result)
}(i)
}
// 每个 Goroutine 只占用约2KB内存Erlang/Elixir (Process - 实际上是 Green Thread):
# 创建一个轻量级进程
spawn(fn ->
result = do_work()
IO.puts(result)
end)
# Erlang VM 可以运行数百万个进程Java Project Loom (Virtual Threads):
// JDK 21+ 的虚拟线程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 1000000; i++) {
executor.submit(() -> {
// 虚拟线程,调度在少量平台线程上
String result = blockingIO();
System.out.println(result);
});
}
}Ruby (Fibers):
fiber = Fiber.new do
puts "Fiber started"
Fiber.yield # 显式让出控制权
puts "Fiber resumed"
end
fiber.resume # 启动/恢复 Fiber优缺点
- 优点:
- 用起来方便, 语言层面支持, 不需要额外的库
- 轻量级, 可以创建大量的 Green Thread, 适合高并发场景
- 而且这种基于栈的协程, 也可以很方便的实现抢占式调度, 在 Go 中, 是通过编译器在函数调用点插入调度点来实现的
- 缺点:
- 栈增长很麻烦, 会引入很多的复杂性
- 你还是有上下文的切换, 虽然没有内核态切换, 但用户态的切换也不是免费的, 需要Store/Restore寄存器上下文
- FFI 交互复杂, 有使用过 Cgo 的人都知道有各种的垃圾坑
- 需要为不同的平台实现不同的运行时
3. 回调函数 (Callback Functions)
What is it?
回调函数是一种基于事件驱动的异步编程模型,核心思想是: "我现在发起一个操作,你先去做,做完了通知我".
How it works?
基本流程
回调函数的工作模式很简单:当你发起一个异步操作(如文件读取、网络请求)时,你传入一个回调函数,然后主线程立即继续执行其他代码而不阻塞。当异步操作完成后,事件循环会调用你提供的回调函数来处理结果。这种模式的核心思想是"注册-等待-通知"。
事件循环机制(以 Node.js 为例)
// 主线程执行
console.log("1. 开始");
// 发起异步操作,注册回调
readFile("data.txt", function callback(data) {
console.log("3. 文件内容:", data);
});
console.log("2. 继续执行");
// 输出顺序: 1 → 2 → 3底层发生了什么:readFile() 首先把文件读取请求提交给操作系统(使用非阻塞 I/O),同时把 callback 函数注册到事件队列中,然后立即返回,让主线程继续执行(打印 "2. 继续执行")。此时文件读取在后台进行,当操作系统完成读取后,事件循环检测到 I/O 完成事件,从队列中取出 callback 函数执行,最终打印出 "3. 文件内容"。
回调地狱(Callback Hell)的本质
当你有多个异步操作要按顺序来的时候,代码就开始套娃了:
fetchUser(userId, function(user) {
fetchPosts(user.id, function(posts) {
fetchComments(posts[0].id, function(comments) {
fetchReplies(comments[0].id, function(replies) {
// 已经嵌套4层了...看着头疼吧
console.log(replies);
});
});
});
});每个异步操作完了才能发起下一个,维护起来很恶心
代表性语言和示例
JavaScript (Node.js) - 早期模式:
// 经典的 Node.js 回调风格
const fs = require('fs');
fs.readFile('file.txt', 'utf8', (err, data) => {
if (err) {
console.error('Error:', err);
return;
}
console.log('Content:', data);
});
// HTTP 请求回调
const http = require('http');
http.get('http://api.example.com', (res) => {
let data = '';
res.on('data', (chunk) => { data += chunk; });
res.on('end', () => {
console.log('Response:', data);
});
});Python (Twisted 框架):
from twisted.internet import reactor, defer
def on_success(result):
print(f"Success: {result}")
def on_error(failure):
print(f"Error: {failure}")
# 注册回调
d = defer.Deferred()
d.addCallback(on_success)
d.addErrback(on_error)
# 某处触发回调
d.callback("Operation completed")Java (CompletableFuture):
CompletableFuture.supplyAsync(() -> {
return fetchData(); // 异步执行
}).thenAccept(data -> {
System.out.println("Data: " + data); // 回调处理
}).exceptionally(ex -> {
System.err.println("Error: " + ex);
return null;
});C (libuv - Node.js 的底层):
// libuv 的异步文件读取
uv_fs_t req;
uv_fs_read(loop, &req, file, &buf, 1, -1, on_read_callback);
void on_read_callback(uv_fs_t* req) {
if (req->result < 0) {
fprintf(stderr, "Read error\n");
} else {
printf("Read %ld bytes\n", req->result);
}
uv_fs_req_cleanup(req);
}优缺点
- 优点:
- 实现简单, 概念直观, 不需要语言特殊支持
- 性能好, 单线程 + 非阻塞 I/O 可以处理大量并发
- 缺点:
- 回调地狱: 嵌套深度随依赖链增长,代码难以维护
- 错误处理困难: 每层都要处理错误,容易遗漏
- 调试困难: 栈帧不连续,很难追踪异步调用链
4. Async/Await 和 Future/Promise
What are they?
这是一种**基于状态机的无栈协程(Stackless Coroutine)**实现.核心概念:
- Future/Promise: 代表一个"尚未完成的计算"的对象
- async/await: 让你用同步的代码风格编写异步逻辑的语法糖
关键区别于 Green Thread: 没有独立的栈,所有状态保存在堆上的状态机对象中.
How it works?
1. 编译器的状态机转换
当你写一个 async 函数:
async fn fetch_and_process() -> String {
let response = http_get("http://api.com").await; // 等待点1
let data = parse(response).await; // 等待点2
format!("Result: {}", data)
}
```:
编译器会把它转换成一个状态机:
Future/Promise 代表一个"尚未完成的计算"的对象,而 async/await 则是让你用同步的代码风格编写异步逻辑的语法糖.与 Green Thread 最关键的区别在于:Async/Await **没有独立的栈**,所有状态都保存在堆上的状态机对象中,这使得每个任务的内存占用可以降到几百字节
```rust
enum FetchAndProcessState {
Start,
WaitingForHttp { /* 保存的局部变量 */ },
WaitingForParse { response: String },
Done,
}
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<String> {
loop {
match self.state {
Start => {
// 发起 HTTP 请求
let fut = http_get("http://api.com");
match fut.poll(cx) {
Poll::Pending => {
self.state = WaitingForHttp;
return Poll::Pending; // 告诉运行时:我还没完成
}
Poll::Ready(response) => {
self.state = WaitingForParse { response };
}
}
}
WaitingForParse { response } => {
let fut = parse(response);
match fut.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(data) => {
return Poll::Ready(format!("Result: {}", data));
}
}
}
_ => panic!("Invalid state"),
}
}
}
}编译器把每个 .await 当成状态转换点,局部变量都存在状态机的枚举里。这里没有传统的函数调用栈,全被拍扁成状态机的 poll() 方法了。每次 poll 的时候,状态机看看当前是啥状态,决定下一步干啥。要是某个操作还没完成就返回 Pending(等会再来),全部搞定了就返回 Ready
2. 运行时的调度机制(以 Tokio 为例)
[任务队列]
Task1 (Future)
Task2 (Future)
Task3 (Future)
↓
[运行时调度器]
Worker Thread 1 ────┐
Worker Thread 2 ────┤ 从队列取任务并 poll()
Worker Thread 3 ────┤
Worker Thread 4 ────┘
↓
[Reactor (I/O 事件驱动)]
epoll/kqueue 监听 I/O 事件
I/O 就绪时唤醒对应的 Future
调度流程:
- 运行时从任务队列取出一个 Future
- 调用 Future.poll()
- 运行时不停地从任务队列里拿 Future 出来 poll。poll 返回 Pending?说明还没搞完,运行时就暂停它,继续处理下一个;返回 Ready(value)?任务完成了,可以拿结果了。底层 I/O 准备好的时候,Reactor 会把对应的 Future 叫醒,重新丢回队列,等着下次被 poll
[Goroutine 1: 有独立栈 2KB]
栈上的局部变量
函数调用链
[Goroutine 2: 有独立栈 2KB]
栈上的局部变量
函数调用链
Async/Await (Rust):
[Future 1: 状态机对象,约100-500字节]
enum State { ... } ← 所有状态塞这里
[Future 2: 状态机对象,约100-500字节]
enum State { ... }
内存占用差距挺大的:Future 通常就几百字节,Goroutine 怎么着也得2KB起步。
4. 为什么不能抢占式调度?
async fn long_compute() {
// 要是这里有个超耗时的计算循环
for i in 0..1000000000 {
// 没有 .await 点!
compute(i);
}
// 运行时没法中途打断
}因为状态机只在 .await 点才保存状态和让出控制权,要是一个 async 函数里有长时间计算又没有 .await,就会把整个线程堵住
对比 Go:编译器在每个函数调用点塞了调度检查,可以随时抢占
代表性语言和示例
Rust (tokio):
use tokio;
#[tokio::main]
async fn main() {
let task1 = tokio::spawn(async {
let data = fetch_data().await;
process(data).await
});
let task2 = tokio::spawn(async {
let result = compute().await;
save(result).await。
**对比 Go**:编译器在每个函数调用点塞了调度检查,可以随时抢占。
// 并发等待多个任务
let (r1, r2) = tokio::join!(task1, task2);
}
async fn fetch_data() -> String {
// 编译成状态机
tokio::time::sleep(Duration::from_secs(1)).await;
"data".to_string()
}JavaScript (原生支持):
// async 函数返回 Promise
async function fetchUserData(userId) {
const user = await fetch(`/api/users/${userId}`);
const posts = await fetch(`/api/posts/${user.id}`);
return { user, posts };
}
// Promise 链式调用(底层实现)
function fetchUserDataPromise(userId) {
return fetch(`/api/users/${userId}`)
.then(user => fetch(`/api/posts/${user.id}`)
.then(posts => ({ user, posts })));
}
// 并发执行多个异步任务
const [users, posts, comments] = await Promise.all([
fetch('/api/users'),
fetch('/api/posts'),
fetch('/api/comments')
]);Python (asyncio):
import asyncio
async def fetch_data(url):
# await 让出控制权
response = await http_client.get(url)
data = await response.json()
return data
async def main():
# 并发运行多个任务
results = await asyncio.gather(
fetch_data("http://api1.com"),
fetch_data("http://api2.com"),
fetch_data("http://api3.com")
)
print(results)
# 运行事件循环
asyncio.run(main())C# (.NET):
public async Task<string> FetchAndProcessAsync() {
// async 方法返回 Task<T>
var response = await httpClient.GetAsync("http://api.com");
var content = await response.Content.ReadAsStringAsync();
var result = await ProcessAsync(content);
return result;
}
// 并发执行
var task1 = FetchDataAsync("url1");
var task2 = FetchDataAsync("url2");
await Task.WhenAll(task1, task2);Kotlin (Coroutines):
suspend fun fetchData(): String {
delay(1000) // 挂起点
return "data"
}
fun main() = runBlocking {
// 启动多个协程
val job1 = launch {
val data = fetchData()
println(data)
}
val job2 = launch {
val result = compute()
println(result)
}
job1.join()
job2.join()
}优缺点
- 优点:
- 代码结构清晰, 易于维护, 像写同步代码一样写异步代码
- 基于状态机实现, 性能好, 没有额外的上下文切换开销, 内存占用极低(每个任务几百字节)
- 零成本抽象(Rust): 编译后性能接近手写状态机
- 缺点:
- 函数被转成状态机了,调试可能比较难,堆栈跟踪不完整
- 不能抢占式调度,只能在 await 点让出 CPU,长计算会把整个线程堵住
- 需要编译器/解释器支持 async/await 语法,增加了语言实现的复杂度
- "函数传染性":async 函数只能在 async 上下文里 await,会一层层传播
Comments
Loading comments...