前言
subscribeOn
指定Observable自身在哪个调度器上执行
observeOn
指定一个观察者在哪个调度器上观察这个Observable
subscribeOn这个操作符指定的是Observable自身在哪个调度器上执行,而且跟调用的位置没有关系。
而observableOn则是指定一个观察者在哪个调度器上观察这个Observable
多次调用
- 如果多次调用
subscribeOn 只有第一次调用subscribeOn时选择的调度器.subscribeOn(Schedulers.newThread())有作用,而后来选择的都没有作用。
- 当每次调用了
observableOn这个操作符时,之后都会在选择的调度器上进行观察,直到再次调用observableOn切换了调度器。
这说明了subscribeOn这个操作符,与调用的位置无关,而且只有第一次调用时会指定Observable自己在哪个调度器执行。
其实有一种情况特殊,就是在doOnSubscribe操作符之后调用,可以使doOnSubscribe在指定的调度器中执行。
比如如下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| Observable<String> obs = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { System.out.println("Observable:" + Thread.currentThread().getId()); emitter.onNext("数据"); emitter.onComplete(); } });
obs .subscribeOn(Schedulers.io()) .map(new Function<String, String>() { @Override public String apply(String o) throws Exception { System.out.println("map01:" + Thread.currentThread().getId()); return o; } }) .observeOn(Schedulers.io()) .map(new Function<String, String>() { @Override public String apply(String o) throws Exception { System.out.println("map02:" + Thread.currentThread().getId()); return o; } }) .observeOn(Schedulers.newThread()) .map(new Function<String, String>() { @Override public String apply(String o) throws Exception { System.out.println("map03:" + Thread.currentThread().getId()); return o; } }) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Exception { System.out.println("subscribe:" + Thread.currentThread().getId()); } });
|
会打印
1 2 3 4 5
| Observable:12 map01:12 map02:13 map03:14 subscribe:15
|
如果不指定subscribeOn 那么会在当前所在线程中操作
只要不调用observeOn 那么后续的操作一直会在之前的线程中操作
多次调用subscribeOn(Schedulers.io())会在不同的线程中操作