Rust异步编程

包括以下内容:

当编写IO密集型应用,如服务器时,程序员常常创建大量线程并在线程之间频繁切换以应对高并发需求。传统的线程并发模型由于上下文切换成本较大,在应对这类场景时显得力不从心。其他主流的并发编程范式有以下几种:

  • 事件驱动型编程。以使用回调函数为主要编程方式。其缺点在于冗长、非线性的控制流,以及难以debug或跟踪函数调用。
  • 协程。例如Goroutine,类似于系统线程但开销更低。其缺点是runtime包装了过多抽象,用户无法接触底层环境。
  • actor模型
  • async/await

Rust选择async(异步)编程,这是一种用顺序执行的逻辑实现并发的编程方式。它实现原理较难理解,但却有诸多好处,例如与协程同样轻量化等。Rust中的协程具有一些特点,例如零额外开销,惰性求值,提供单线程分时并发等待。当面对高IO场景时,应当选择异步编程。

Rust只为async提供了必要的trait,但没有提供实际的运行时。编写能够运行的异步代码需要依赖社区提供的第三方库,本文使用tokio库作为范例。

Async的基本使用

在函数前添加async关键字即可将函数转化为异步调用。

1
2
3
async fn do_something() {
println!("Hello world!");
}

异步函数的特点是返回值为impl Future的对象。调用异步函数并不会执行原函数,只有在Future对象上使用关键字.await或轮询(polling)它时,才会惰性地执行异步函数。

1
2
3
4
5
6
7
8
9
async fn call() {
let future = do_something();
println!("print first");
future.await;
}

async fn do_something() {
println!("Hello world!");
}
1
2
3
# 只有调用了.await之后才输出"Hello world!"
print first
Hello world!

当我们直接调用.await时,程序是顺序执行的。调用.await会使线程阻塞直到任务执行完成,这使得我们编写程序的逻辑依然是顺序的。

1
2
3
4
5
6
7
8
9
10
11
async fn loop_3() {
for i in 0..3 {
println!("{i}");
thread::sleep(Duration::from_millis(100));
}
}

async fn call() {
loop_10().await;
loop_10().await;
}
1
2
3
4
5
6
7
# 体现顺序性
0
1
2
0
1
2

要使协程能够并发完成,需要依赖Executor对Future进行轮询,其具体的原理在之后的内容中介绍。对于tokio库,我们可以使用类似Rust线程的语法完成异步并发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async fn loop_3() {
for i in 0..3 {
println!("{i}");
thread::sleep(Duration::from_millis(100));
}
}

async fn call() {
let mut handles = vec![];
for i in 0..2 {
handles.push(tokio::spawn(loop_3()));
}
for handle in handles {
handle.await.unwrap();
}
}
1
2
3
4
5
6
7
# 体现并发
0
0
1
1
2
2

除了在fn前添加async关键字外,我们还可以使用async修饰任意表达式块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async fn loop_3() {
for i in 0..3 {
println!("{i}");
thread::sleep(Duration::from_millis(100));
}
}

// 等价于
fn loop_3() -> impl Future<Output=()> {
async {
for i in 0..3 {
println!("{i}");
thread::sleep(Duration::from_millis(100));
}
}
}

async同样存在着生命周期问题。当异步调用的函数包含引用参数时,则Future的生命周期与引用参数一致。这意味着Future必须在参数被drop之前.await

1
2
3
4
5
6
async fn foo(x: &u8) -> u8 { *x }

// 等价于
fn foo(x: &u8) -> impl Future<Output=u8> + 'a {
async move { *x }
}

要想避免这一问题,可以在该异步函数中.await所有与引用参数相关的异步操作。

最后,由于.await只能在异步函数中调用,而Rust中的main函数原生不能声明为异步函数,故我们尚不能运行异步函数。tokio解决这一问题的方法是使用#[tokio::main]属性为异步提供runtime,使得main可以被声明为异步。

1
2
3
4
#[tokio::main]
async fn main() {
call().await
}
1
2
3
4
5
6
7
// 等价于
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
call().await
})
}

Async的基本原理

Future基础

Future是异步编程的核心,它是一个可以产生值的异步计算。Future的核心特征是它可以被轮询,轮询将推动Future完成计算任务。

1
2
3
4
5
6
fn poll(&self, wake: fn()) -> Poll<Self::Ouput> {}

enum Poll<T> {
Ready(T),
Pending,
}

调用poll方法进行一次轮询,当Future完成计算时,返回Poll::Ready(T)枚举值,否则返回Pending表示未完成。

除了主动轮询以外,Future还通过wake函数主动通知执行者Executor可以继续轮询,这避免了Executor只能通过不断轮询判断Future是否执行完成。

Waker与Executor

Waker的作用是允许Future主动通知Executor其可以被继续轮询。

Future被轮询时,会接收一个带有Waker的上下文,Future的职责是将Waker保存下来,并在任务已经完成时,调用该waker对象的wake方法通知Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Pin 表示对象在内存中不可移动,Context为包含了上下文的Waker
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// ...
if (self.not_ready) {
self.waker = Some(cx.waker().clone())
}
// ...
}

fn do_some_stuff(&self) {
// ... do some stuff
// when finish jobs
self.waker.wake()
}

Waker则必须具有与Executor通信的能力。Executor是一个管理一组Future直到其完成的对象。它的职责是在适当的时刻轮询管理的Future,而这个时刻由Waker决定。

我们可以假定Executor中包含一个任务队列,一个任务包含一个Future和一个WakerExecutor会不断轮询队首的Future。对于Waker来说,它的wake()方法便类似于向队尾中添加其负责的Future

tokio的基本使用

tokio是一个提供了异步运行时,具有强大异步功能的网络编程库。

许多tokio的API都类似于Rust标准库。以tokio::spawn为例,它接收一个Future并提供并发能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#[tokio::main]
async fn main() {
let mut handles = vec![];
for i in 0..2 {
let handle = tokio::spawn(async {
for j in 0..10 {
thread::sleep(Duration::from_millis(100));
println!("{j}");
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}

类似于thread::spawntokio::spawn接收Future并返回句柄,在句柄上调用.await即类似于在线程句柄上调用join

除此之外,tokio还提供了一些异步的网络编程接口,如TcpStreamTcpSocket,其用法与标准库非常类似。当进行异步编程时,应使用tokio提供的接口代替标准库。