Flutter/Dart中的异步编程之Isolate

前言

我们编程是用多线程一般实现两种场景,一种是异步执行,一种是并行执行

我们都知道 Dart 是单线程异步编程模型 这一点 和js 很像,它天生解决了异步执行的问题,详情查看Flutter中的异步编程Future

但是并行执行怎么处理呢?

在 Dart 中,它的线程概念被称为 Isolate

它与我们之前理解的 Thread 概念有所不同,各个 isolate 之间是无法共享内存空间,isolate 之间有自己的 event loop。我们只能通过 Port 传递消息,然后在另一个 isolate 中处理然后将结果传递回来,这样我们的 UI 线程就有更多余力处理 pipeline,而不会被卡住。

将非常耗时的任务添加到事件队列后,会拖慢整个事件循环的处理,甚至是阻塞。可见基于事件循环的异步模型仍然是有很大缺点的,这时候我们就需要Isolate,这个单词的中文意思是隔离。

简单说,可以把它理解为Dart中的线程。但它又不同于线程,更恰当的说应该是微线程。它与线程最大的区别就是不能共享内存,因此也不存在锁竞争问题,两个Isolate完全是两条独立的执行线,且每个Isolate都有自己的事件循环,它们之间只能通过发送消息通信,所以它的资源开销低于线程。

下文中所说的Dart中的线程都是指的微线程。

所以说Isolate,一句话总结它的作用就是

Isolate可以实现异步并行多个任务

Future实现异步串行多个任务

使用场景

在 Dart 中 async 和 Future 无法解决所有耗时的工作。Dart 虽然支持 异步执行,但其实如果是通过 async 的话,只是把工作丟到同一个 event loop 中, 让他暂时不会卡住目前的工作 , 等到真的轮到它执行的时候 ,如果它真的很耗时,那 main isolate 还是会 freeze(冻结) 住的 (为什么会冻结? 主线程负责 UI的渲染 工作 但是 如果 密集型计算 很耗时 假如 这个计算 占用 1s的时间 你的UI就会卡住1s) 。Dart 主要的 task 都是在 main isolate 中完成的,isolate 像是个 single thread 的 process。如果真的想要让某些工作能夠同时执行,不要卡住 main isolate 的话,就得要自己产生新的 isolate 來执行。

Isolate虽好,但也有合适的使用场景,不建议滥用Isolate,应尽可能多的使用Dart中的事件循环机制去处理异步任务,这样才能更好的发挥Dart语言的优势。

在Dart中我们使用多线程计算的时候,整个计算的时间会比单线程还要多,额外的耗时是什么呢?

  • 创建 Isolate
  • 线程间传递数据

Isolate 实际上是比较重的,每当我们创建出来一个新的 Isolate 至少需要 2mb 左右的空间甚至更多,取决于我们具体 isolate 的用途。

那么应该在什么时候使用Future,什么时候使用Isolate呢?

一个最简单的判断方法是根据某些任务的平均时间来选择:

  • 方法执行在几毫秒或十几毫秒左右的,应使用Future
  • 如果一个任务需要几百毫秒或之上的,则建议创建单独的Isolate

一些常见的可以参考的场景

  • JSON 解码
  • 加密
  • 图像处理:比如剪裁
  • 网络请求:加载资源、图片

创建Isolate

Isolate由一对Port分别由用于接收消息的ReceivePort对象,和用于发送消息的SendPort对象构成。其中SendPort对象不用单独创建,它已经包含在ReceivePort对象之中。需要注意,一对Port对象只能单向发消息,这就如同一根自来水管,ReceivePortSendPort分别位于水管的两头,水流只能从SendPort这头流向ReceivePort这头。因此,两个Isolate之间的消息通信肯定是需要两根这样的水管的,这就需要两对Port对象。

Dart中创建

我们可以通过 Isolate.spawn 创建一个 isolate。

1
static Future<Isolate> spawn<T>(void entryPoint(T message),T message);

当我们调用 Isolate.spawn 的时候,它将会返回一个对 isolate 的引用的 Future。我们可以通过这个 isolate 来控制创建出的 Isolate,例如 pause、resume、kill 等等。

  • entryPoint:这里传入我们想要在其他 isolate 中执行的方法,入参是一个任意类型的 message。entryPoint 只能是顶层方法或静态方法,且返回值为 void
  • message:创建 Isolate 第一个调用方法的入参,可以是任意值。

但是在此之前我们必须要创建两个 isolate 之间沟通的桥梁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import 'dart:isolate';
import 'dart:io';

void main() {
print("main isolate start");
create_isolate();
print("main isolate end");
}

// 创建一个新的 isolate
create_isolate() async{
ReceivePort rp = new ReceivePort();
SendPort port1 = rp.sendPort;

Isolate newIsolate = await Isolate.spawn(doWork, port1);

SendPort port2;
rp.listen((message){
print("main isolate message: $message");
if (message[0] == 0){
port2 = message[1];
}else{
port2?.send([1,"这条信息是 main isolate 发送的"]);
}
});
}

// 处理耗时任务
static void doWork(SendPort port1){
print("new isolate start");
ReceivePort rp2 = new ReceivePort();
SendPort port2 = rp2.sendPort;

rp2.listen((message){
print("doWork message: $message");
});

// 将新isolate中创建的SendPort发送到主isolate中用于通信
port1.send([0, port2]);
// 模拟耗时5秒
sleep(Duration(seconds:5));
port1.send([1, "doWork 任务完成"]);

print("new isolate end");
}

运行结果

main isolate start
main isolate end
new isolate start
main isolate message: [0, SendPort]
new isolate end
main isolate message: [1, doWork 任务完成]
doWork message: [1, 这条信息是 main isolate 发送的]

运行后都会创建两个进程,一个是主Isolate的微进程,一个是新Isolate的微进程,两个微进程都双向绑定了消息通信的通道,即使新的Isolate中的任务完成了,它的微进程也不会立刻退出,因此,当使用完自己创建的Isolate后,最好调用newIsolate.kill(priority: Isolate.immediate);Isolate立即杀死。

定义协议

我们的 entryPoint 只允许有一个入参,如果我们想要执行的方法需要传入其他参数怎么办呢。

其实很简单,我们定义一个协议就行了。

比如像下面这样我们定义一个 SpawnMessageProtocol 作为 message。

1
2
3
4
5
class SpawnMessageProtocol{
final SendPort sendPort;
final String url;
SpawnMessageProtocol(this.sendPort, this.url);
}

我们的耗时方法就可以写为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void doWork(SpawnMessageProtocol port1){
print("new isolate start");
ReceivePort rp2 = new ReceivePort();
SendPort port2 = rp2.sendPort;

rp2.listen((message){
print("doWork message: $message");
});

// 将新isolate中创建的SendPort发送到主isolate中用于通信
port1.sendPort.send([0, port2]);
// 模拟耗时5秒
sleep(Duration(seconds:5));
port1.sendPort.send([1, "doWork 任务完成"]);

print("new isolate end");
}

Flutter中创建

在Dart中创建一个Isolate显得有些繁琐,可惜的是Dart官方并未提供更高级封装。

但是,如果想在Flutter中创建Isolate,则有更简便的API,这是由Flutter官方进一步封装ReceivePort而提供的更简洁API。

使用compute函数来创建新的Isolate并执行耗时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import 'package:flutter/foundation.dart';
import 'dart:io';

// 创建一个新的Isolate,在其中运行任务doWork
create_new_task() async{
var str = "New Task";
var result = await compute(doWork, str);
print(result);
}

static String doWork(String value){
print("new isolate doWork start");
// 模拟耗时5秒
sleep(Duration(seconds:5));
print("new isolate doWork end");
return "complete:$value";
}

compute函数有两个必须的参数,第一个是待执行的函数,这个函数必须是一个顶级函数或静态方法,不能是类的实例方法,第二个参数为动态的消息类型,可以是被运行函数的参数。

需要注意,使用compute应导入'package:flutter/foundation.dart'包。

线程管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import 'dart:isolate';
typedef LikeCallback = void Function(Object value);

class ThreadManagement {
//entryPoint 必须是静态方法
static Future<Map> runtask (void entryPoint(SendPort message), LikeCallback(Object value),{Object parameter})async{
final response = ReceivePort();
Isolate d = await Isolate.spawn(entryPoint, response.sendPort);
// 调用sendReceive自定义方法
if(parameter!=null){
SendPort sendPort = await response.first;
ReceivePort receivePort = ReceivePort();
sendPort.send([parameter, receivePort.sendPort]);
receivePort.listen((value){
receivePort.close();
d.kill();
LikeCallback(value);
});
return {
'isolate': d,
"receivePort":receivePort,
};
}else{
response.listen((value){
response.close();
d.kill();
LikeCallback(value);
});
return {
'isolate': d,
"receivePort":response,
};
}
}
}

调用无参任务

不带参数任务

1
2
3
4
5
6
7
8
9
static void getbannerthread(SendPort port) async {
var c = await HttpManager.isolationnetFetch(
"http://www.psvmc.cn/getbanner"
,null,
null,
new Options(method: 'GET')
);
port.send(c);
}

port.send(c); 是回调计算结果

调用任务

1
2
3
4
5
ThreadManagement.runtask(API.getbannerthread, (value){
if(value != null){
//业务逻辑
}
});

调用有参任务

带参数任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static getVideolisttask(SendPort port) async {
ReceivePort receivePort = ReceivePort();
port.send(receivePort.sendPort);
// 监听外界调用
await for (var msg in receivePort) {
Map requestURL =msg[0];
SendPort callbackPort =msg[1];
receivePort.close();
var res = await HttpManager.isolationnetFetch(
"http://xxxxxx?type="+requestURL["type"]+"&after="+requestURL["after"]
,null,
null,
new Options(method: 'GET')
);
callbackPort.send(res);
}
}

执行带参数的任务

1
2
3
4
5
6
7
8
ThreadManagement.runtask(API.getVideolisttask, (value){
if(value != null){
//业务逻辑
}
},parameter: {
"type":"hot",
"after":"1"
});

线程池

使用LoadBalancer

如何减少 isolate 创建所带来的消耗呢。自然一个想法就是能否创建一个线程池,初始化到那里。当我们需要使用的时候再拿来用就好了。

实际上 dart team 已经为我们写好一个非常实用的 package,其中就包括 LoadBalancer

我们现在 pubspec.yaml 中添加 isolate 的依赖。

1
isolate: ^2.0.2

然后我们可以通过 LoadBalancer 创建出指定个数的 isolate。

由于 dart 天生支持顶层函数,我们可以在 dart 文件中直接创建这个 LoadBalancer

1
Future<LoadBalancer> loadBalancer = LoadBalancer.create(2, IsolateRunner.spawn);

这段代码将会创建出一个 isolate 线程池,并自动实现了负载均衡。

下面我们再来看看应该如何使用 LoadBalancer 中的 isolate。

1
2
3
4
5
int useLoadBalancer() async {
final lb = await loadBalancer;
int res = await lb.run<int, int>(_doSomething, 110);
return res;
}

我们关注的只有 Future<R> run<R, P>(FutureOr<R> function(P argument), argument, 方法。我们还是需要传入一个 function 在某个 isolate 中运行,并传入其参数 argument。run 方法将会返回我们执行方法的返回值。

整体和 compute 使用感觉上差不多,但是当我们多次使用额外的 isolate 的时候,不再需要重复创建了。

并且 LoadBalancer 还支持 runMultiple,可以让一个方法在多线程中执行。

LoadBalancer 经过测试,它会在第一次使用其 isolate 的时候初始化线程池。

当应用打开后,即使我们在顶层函数中调用了 LoadBalancer.create,但是还是只会有一个 Isolate。

当我们调用 run 方法时,才真正创建出了实际的 isolate。