前言
Android原生的多线程和异步
处理简直糟透了,反复的嵌套让代码看起来十分不明了,多线程上也没有iOS
的dispatch
好用,但是用了Rxjava
后就会有所改善,虽然代码量看起来会多一点,但是逻辑
就清晰
多了
本文代码对应的是Rxjava2
真前言
总的来说Rxjava
可以分为5块内容 分别为
- 发布者(Observable/Flowable/Single/Completable)
- 订阅者(Subscriber)
- 中转站(Subject)
- 线程(Scheduler)
- 操作符
形象的来说
发布者
就相当于报社
订阅者
就相当于用户
中转站
就相当于报亭
它既是订阅者
又是发布者
线程
是指定在哪个线程上处理操作符
则是把发布者的数据进行处理,再给订阅者
在发布者和订阅者之间传递的事件总共有三种
onNext()
: 发送事件的数据onCompleted()
: 事件队列完结。RxJava
不仅把每个事件单独处理,还会把它们看做一个队列。RxJava
规定,当不会再有新的onNext()
发出时,需要触发onCompleted()
方法作为标志。onError()
: 事件队列异常。在事件处理过程中出异常时,onError()
会被触发,同时队列自动终止,不允许再有事件发出。- 在一个正确运行的事件序列中,
onCompleted()
和onError()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()
和onError()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
下面就说一下各块内容
发布者
对比
Observable/Flowable:
Observable
不支持背压(backpressure)
Flowable
是Rxjava2新增加的支持背压(backpressure)
背压(backpressure)
:只有上下游运行在各自的线程中,且上游发射数据速度大于下游接收处理数据的速度时,才会产生背压问题。如果上游发送数据速度远大于下游接收数据的速度
用
Observable
就会内存溢出Flowable
则会抛弃掉处理不了的数据来防止溢出但是不能就都用
Flowable
因为Observable
的性能较高Single:
和Observable,Flowable一样会发送数据,不同的是订阅后只能接受到一次
普通Observable可以使用toSingle转换:
Observable.just(1).toSingle()
Completable
与Single类似,只能接受到一次完成(onComplete)或错误(onError)
同样也可以由普通的Observable转换而来:
Observable.just(1).toCompletable()
发布者发布事件 可以手动创建也可以调用内置方法
Observable
1 | Observable |
Flowable
1 | Flowable |
Single
1 | Single |
Completable
1 | Completable |
订阅者(Subscriber)
Observer/FlowableOnSubscribe/SingleOnSubscribe/CompletableOnSubscribe/Consumer/Subscriber
发布者 | 订阅者 |
---|---|
Observable | Observer/Consumer |
Flowable | FlowableOnSubscribe/Subscriber/Consumer |
Single | SingleObserver/Consumer/BiConsumer |
Completable | CompletableObserver/Action |
创建
1 | Observer<String> observer = new Observer<String>() { |
订阅
1 | observable.subscribe(observer); |
注意上面方法的顺序 看上去是发布者
订阅了订阅者
,之所以这样是因为链式代码的优雅
线程(Scheduler)
常用的方式是分线程
中处理数据,主线程
中使用数据生成页面
1 | Observable |
操作符
名称 | 解析 |
---|---|
amb() ambArray ambWith |
给定多个Observable,只让第一个发射数据的Observable发射全部数据 |
defaultIfEmpty() | 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据 |
switchIfEmpty() | 如果原始Observable没有发射数据,它发射一个备用Observable的发射物 |
skipUntil() | 跳过原始Observable发射的数据,直到第二个Observable发射了一个数据, 然后发射原始Observable的剩余数据 |
skipWhile() | 判断成功的都跳过 一旦为假 发送剩余的所有数据 |
takeUntil() | 发送为真包括以前的数据 不再处理后续数据 |
takeWhile() | 发送为真的数据 一旦为假就不再处理后续数据 |
create
参见面发布者部分
just/range/fromArray
- just
1 | Observable observable = Observable.just("好好学习", "天天向上"); |
- range
1 | Observable.range(1,10); |
- fromArray
1 | String[] quotations = {"好好学习", "天天向上"}; |
interval/timer
1 | //延迟10s每10s发送一次 |
throttleFirst/throttleLast
throttleFirst
操作符:仅发送指定时间段内的第一个信号
throttleLast
操作符:仅发送指定时间段内的第一个信号
1 | RxView.clicks(mBtn) |
debounce
指定时间段内没有新的信号时 则发出最后一个信号
比如监听文本变化进行搜索
1 | RxTextView.textChanges(etKey) |
map
类型变换
1 | String[] strs = {"11","22","33"}; |
concatMap
concatMap(): 这是一个很有用但非常难理解的变换。
首先假设这么一种需求:上面的{"11","22","33"}
我们像最终获取到1,1,2,2,3,3
1 | String[] strs = {"11","22","33"}; |
用 map()
显然是不行的,因为 map()
是一对一的转化,而我现在的要求是一对多的转化,就需要用 flatMap()
了
Kotlin
1 | Observable |
注意
concatMap
中一定要发送onComplete
事件
flatMap
flatMap和concatMap最大的区别是concatMap发射的数据集是有序的,flatMap发射的数据集是无序的
filter
过滤
假如我们要大于5的数
1 | Integer[] nums = {3, 4, 5, 6, 7}; |
defaultIfEmpty
当未发送onNext
直接发送onComplete
时 onNext
收到的默认值
1 | Observable |
switchEmpty
如果发射源没有发射数据就完成了,就发射switchIfEmpty里面新的Observable发射源
1 | Observable |
zip
1 | Observable |
上面的代码会收到 101、202
也就是说多个Observable
都发送时 才处理数据
amb/ambArray/ambWith
给定多个Observable,只让第一个发射数据的Observable发射全部数据。
take/takeWhile/takeUntil
take
1 | //取前两个信号 |
takeWhile
1 | //发送为真的数据 一旦为假就不再处理后续数据 |
takeUntil
1 | //发送为真包括以前的数据 不再处理后续数据 |
skip/skipWhile/skipUntil
skip
1 | //取前两个信号 |
skipWhile:判断成功的都跳过 一旦为假 发送剩余的所有数据
1 | Observable |
会收到2、3、2 判断成功的都跳过 一旦为假 发送剩余的所有数据
skipUntil:跳过原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
1 | Observable |
收不到数据 因为第二个Observable延迟1s结束后 原始Observable已经没有剩余数据了
中转站(Subject)
Rxjava和Rxjava2对比
io.reactivex.subjects.AsyncSubject
,io.reactivex.subjects.BehaviorSubject
,io.reactivex.subjects.PublishSubject
,io.reactivex.subjects.ReplaySubject
,io.reactivex.subjects.UnicastSubject
在RxJava2中依然存在,但现在他们不支持backpressure
。
新出现的
io.reactivex.processors.AsyncProcessor
,io.reactivex.processors.BehaviorProcessor
,io.reactivex.processors.PublishProcessor
,io.reactivex.processors.ReplayProcessor
io.reactivex.processors.UnicastProcessor
支持backpressure
Subject 在平时开发时 用的不是很多
它分为四种
- PublishSubject(之后)
- BehaviorSubject(前一个事件+之后)
- ReplaySubject(所有事件)
- AsyncSubject(最后事件)
用法如下
1 | observable.subscribe(subject); |
区别
假如发布者
也就是报社 只发布周一到周五
的报纸 一天一份
如果我们在周三
早上来报厅订报
如果报厅是
PublishSubject
我们可以收到周三 周四 周五
的报纸如果报厅是
BehaviorSubject
我们可以收到周二 至 周五
的报纸如果报厅是
ReplaySubject
我们可以收到周一 至 周五
的报纸如果报厅是
AsyncSubject
我们可以收到周五
的报纸 但是发布的事件中如果有错误
那我们只会接受到错误
而不是错误的前一个事件
Android中应用
添加依赖
1 | implementation 'io.reactivex.rxjava2:rxandroid:2.0.2' |
详细示例可参考 github博客生成APP(Rxjava1)