Rust语法之多线程(Tokio)

多线程

在 Rust 中,可以使用多线程来并发地操作 Vec

下面是一个使用 Rust 实现多线程操作 Vec 的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use std::thread;
use std::sync::{Arc, Mutex};

fn main() {
let mut v = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
let v = Arc::new(Mutex::new(v));

let mut handles = vec![];
for i in 0..3 {
let v = Arc::clone(&v);
let handle = thread::spawn(move || {
let mut v = v.lock().unwrap();
for j in (i * 3)..((i + 1) * 3) {
v[j] *= 2;
}
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

let v = v.lock().unwrap();
println!("{:?}", v);
}

该示例代码创建了一个包含 9 个元素的 Vec,然后使用 ArcMutex 包装了该 Vec。接着,我们创建了 3 个线程,每个线程负责修改 Vec 的三分之一元素的值。在每个线程的执行体中,我们使用 Mutex 来获取 Vec 的写锁,并修改 Vec 中的元素。最后,我们等待所有线程完成,并输出修改后的 Vec

需要注意的是,在使用多线程操作 Vec 时,需要注意避免数据竞争等问题。在上述示例代码中,我们使用了 ArcMutex 来保护 Vec 的访问,并确保了多个线程不会同时访问同一个元素,从而避免了数据竞争问题。同时,由于使用了 Mutex,并发性能可能会受到一定的影响,因此需要根据具体情况进行权衡和选择。

tokio实现异步

tokio是一个基于Futures和Async IO的异步编程库,它提供了一组基于Future的API,允许程序员编写非阻塞的、异步的网络应用程序。

tokio提供了一系列异步I/O相关的原语,包括异步TCP和UDP套接字、异步文件操作、异步定时器等等,这些原语可以用来构建异步网络应用程序。

tokio库的核心是reactor,它是一个事件循环,用于驱动异步任务的执行。tokio提供了一组异步原语,这些原语会向reactor注册事件,当事件触发时,reactor会通知对应的任务进行执行。

使用tokio编写异步程序通常需要使用async/await语法,将异步任务包装成Future,然后利用tokio提供的运行时来驱动这些Future的执行。tokio提供了一组运行时相关的API,例如tokio::spawn函数可以将一个Future注册到运行时中进行执行,tokio::run函数可以启动运行时。

API概要

Tokio 1.27是一个相对较新的版本,其API概要如下:

  • 异步任务调度:tokio::spawn, tokio::task::spawn, tokio::task::spawn_blocking, tokio::task::yield_now
  • 网络编程:tokio::net::TcpListener, tokio::net::TcpStream, tokio::net::UdpSocket
  • 异步文件I/O:tokio::fs::File, tokio::fs::OpenOptions, tokio::fs::create_dir_all, tokio::fs::read, tokio::fs::read_to_end, tokio::fs::write, tokio::fs::write_all
  • 异步定时器:tokio::time::sleep, tokio::time::Instant, tokio::time::Duration
  • 异步锁和同步原语:tokio::sync::{Mutex, RwLock, Semaphore}
  • 异步信号处理:tokio::signal::{ctrl_c, unix::{Signal, signal}}
  • 异步进程和命令执行:tokio::process::{Command, Child, ChildStdin, ChildStdout, ChildStderr}
  • 异步编解码:tokio::codec::{Framed, Lines, Decoder, Encoder}
  • 异步HTTP编程:tokio::task::spawn_blocking, tokio::time::timeout, tokio::io::{AsyncReadExt, AsyncWriteExt}, tokio::net::TcpStream

除此之外,Tokio还提供了一系列辅助函数和宏,例如tokio::select!, tokio::try_join!, tokio::time::timeout!等,这些工具可以帮助开发者更方便地编写异步代码。此外,Tokio还提供了一套完整的测试框架,可以帮助开发者编写和运行异步测试用例。

引用

在Rust中使用tokio库,需要在项目的Cargo.toml文件中添加tokio库的依赖声明,例如:

1
2
[dependencies]
tokio = { version = "1.27.0", features = ["full"] }

这个声明会告诉Cargo在构建项目时自动下载和编译tokio库,并将其链接到可执行文件中。

异步IO

以下是一个使用Tokio 1.27实现异步I/O的简单示例,它实现了一个简单的TCP Echo服务器,监听在本地8080端口,当有客户端连接时,它会将客户端发送的数据原封不动地返回给客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;

println!("Listening on: {}", listener.local_addr()?);

loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(e) => {
eprintln!("Failed to read from socket; err = {:?}", e);
return;
}
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("Failed to write to socket; err = {:?}", e);
return;
}
}
});
}
}

在这个例子中,我们首先使用TcpListener::bind函数创建了一个TCP监听器,它监听在本地8080端口上。然后我们进入一个无限循环,等待客户端连接。

每当有一个客户端连接时,我们使用tokio::spawn函数将一个异步任务注册到tokio运行时中,该任务的作用是处理与客户端的交互。具体来说,我们将交互处理的逻辑封装到一个无限循环中,该循环通过socket.read()异步读取客户端发送的数据,并将其原封不动地通过socket.write_all()异步返回给客户端。如果读取或写入失败,任务会立即退出。

在异步任务中使用tokio的异步API需要使用async/await语法,例如使用TcpListener::accept().await来等待客户端连接,使用socket.read().await来异步读取客户端发送的数据,使用socket.write_all().await来异步写入响应内容。同时,我们使用tokio::spawn将异步任务放入tokio运行时中进行异步执行,避免了阻塞主线程。

这个例子仅仅是一个最简单的使用tokio实现异步I/O的例子,tokio提供了丰富的异步API和工具,可以帮助开发者编写高效、健壮的异步网络应用程序。

测试

使用CMD中输入

1
telnet 127.0.0.1 8080

可以看到我们输入的任何字符都会返回同样的字符。

异步任务调度

以下是一个使用Tokio 1.27进行异步任务调度的简单示例,其中异步方法有返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use std::time::Duration;
use tokio::task;

async fn square(n: u32) -> u32 {
n * n
}

#[tokio::main]
async fn main() {
let handle = task::spawn(async {
let result = square(5).await;
println!("Result: {}", result);
result
});

println!("Waiting for task to complete...");
let result = handle.await.unwrap();
println!("Task returned: {}", result);
}

在这个示例中,我们定义了一个异步函数square,它接受一个整数n,并返回n的平方值。在main函数中,我们使用tokio::task::spawn函数创建了一个异步任务,并在任务完成后打印了返回值。最后,我们等待任务完成并打印另一条消息。

与之前的示例不同,这个示例使用了asyncawait关键字。在spawn闭包中,我们使用await关键字调用square函数,并将结果赋值给变量result。我们还通过在闭包的最后一行返回result来返回结果值。在main函数中,我们使用await关键字等待任务完成,并将结果值存储在变量result中。

需要注意的是,在异步任务中使用return语句将返回值返回给tokio::task::JoinHandle是不推荐的。相反,我们应该在闭包的最后一行使用表达式返回值。这样可以确保正确地处理异步任务的返回值。

异步任务的异常处理

在异步编程中,异步任务中可能会发生错误或异常。Tokio 1.27提供了一些方式来处理这些错误或异常,以便我们可以正确地处理它们。

以下是一个使用Tokio 1.27进行异步任务调度并处理异常的简单示例:

添加依赖

1
2
3
[dependencies]
tokio = { version = "1.27.0", features = ["full"] }
rand = "0.8.4"

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use tokio::task;

async fn maybe_fail() -> Result<u32, &'static str> {
if rand::random() {
Ok(42)
} else {
Err("something went wrong!")
}
}

#[tokio::main]
async fn main() {
let handle = task::spawn(async {
let result = maybe_fail().await;
match result {
Ok(value) => {
println!("Task completed successfully with value: {}", value);
value
},
Err(e) => {
eprintln!("Task failed: {}", e);
0
},
}
});

println!("Waiting for task to complete...");
let result = handle.await;
match result {
Ok(value) => println!("Task returned successfully with value: {}", value),
Err(e) => eprintln!("Task panicked: {:?}", e),
}
}

在这个示例中,我们定义了一个异步函数maybe_fail,它模拟了一个有50%的概率成功的操作,并在其他情况下返回错误信息。该函数返回Result<u32, &'static str>类型的值,其中Ok表示成功并返回一个u32类型的值,而Err表示失败并返回一个静态字符串&'static str

main函数中,我们使用tokio::task::spawn函数创建了一个异步任务,并使用await关键字等待任务完成。在任务完成后,我们使用match表达式检查任务的结果。如果结果为Ok(value),则打印成功消息并使用value变量来访问异步函数的返回值;否则打印错误消息并返回一个默认值0

在等待任务完成时,我们也使用了match表达式来检查任务的结果。如果结果为Ok(value),则打印成功消息并使用value变量来访问异步函数的返回值;否则打印错误消息。需要注意的是,如果异步任务中发生了panic,这个示例将使用eprintln!打印出错误消息。这是因为异步任务中的panic将被捕获并封装为JoinError类型,该类型将被返回给调用方。

更简单的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async fn maybe_fail() -> Result<u32, &'static str> {
if rand::random() {
Ok(42)
} else {
Err("something went wrong!")
}
}

#[tokio::main]
async fn main() {
let result = maybe_fail().await;
match result {
Ok(value) => {
println!("Task completed successfully with value: {}", value);
}
Err(e) => {
eprintln!("Task failed: {}", e);
}
}
}