前言
Android原生的多线程和异步处理简直糟透了,反复的嵌套让代码看起来十分不明了,多线程上也没有iOS的dispatch好用,但是用了Rxjava后就会有所改善,虽然代码量看起来会多一点,但是逻辑就清晰多了
本文代码对应的是Rxjava2
真前言
总的来说Rxjava可以分为5块内容 分别为
- 发布者(Observable/Flowable/Single/Completable)
 - 订阅者(Subscriber)
 - 中转站(Subject)
 - 线程(Scheduler)
 - 操作符
 
形象的来说
发布者就相当于报社订阅者就相当于用户中转站就相当于报亭它既是订阅者又是发布者线程是指定在哪个线程上处理操作符则是把发布者的数据进行处理,再给订阅者
在发布者和订阅者之间传递的事件总共有三种
onNext(): 发送事件的数据onCompleted(): 事件队列完结。RxJava不仅把每个事件单独处理,还会把它们看做一个队列。RxJava规定,当不会再有新的onNext()发出时,需要触发onCompleted()方法作为标志。onError(): 事件队列异常。在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。- 在一个正确运行的事件序列中, 
onCompleted()和onError()有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()和onError()二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。 
下面就说一下各块内容
发布者
对比
Observable/Flowable:
Observable不支持背压(backpressure)Flowable是Rxjava2新增加的支持背压(backpressure)背压(backpressure):只有上下游运行在各自的线程中,且上游发射数据速度大于下游接收处理数据的速度时,才会产生背压问题。如果上游发送数据速度远大于下游接收数据的速度
用
Observable就会内存溢出Flowable则会抛弃掉处理不了的数据来防止溢出但是不能就都用
Flowable因为Observable的性能较高Single:
和Observable,Flowable一样会发送数据,不同的是订阅后只能接受到一次
普通Observable可以使用toSingle转换:
Observable.just(1).toSingle()Completable
与Single类似,只能接受到一次完成(onComplete)或错误(onError)
同样也可以由普通的Observable转换而来:
Observable.just(1).toCompletable()
发布者发布事件 可以手动创建也可以调用内置方法
Observable
1  | Observable  | 
Flowable
1  | Flowable  | 
Single
1  | Single  | 
Completable
1  | Completable  | 
订阅者(Subscriber)
Observer/FlowableOnSubscribe/SingleOnSubscribe/CompletableOnSubscribe/Consumer/Subscriber
| 发布者 | 订阅者 | 
|---|---|
| Observable | Observer/Consumer | 
| Flowable | FlowableOnSubscribe/Subscriber/Consumer | 
| Single | SingleObserver/Consumer/BiConsumer | 
| Completable | CompletableObserver/Action | 
创建
1  | Observer<String> observer = new Observer<String>() {  | 
订阅
1  | observable.subscribe(observer);  | 
注意上面方法的顺序 看上去是发布者订阅了订阅者,之所以这样是因为链式代码的优雅
线程(Scheduler)
常用的方式是分线程中处理数据,主线程中使用数据生成页面
1  | Observable  | 
操作符
| 名称 | 解析 | 
|---|---|
| amb() ambArray ambWith  | 
给定多个Observable,只让第一个发射数据的Observable发射全部数据 | 
| defaultIfEmpty() | 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据 | 
| switchIfEmpty() | 如果原始Observable没有发射数据,它发射一个备用Observable的发射物 | 
| skipUntil() | 跳过原始Observable发射的数据,直到第二个Observable发射了一个数据, 然后发射原始Observable的剩余数据  | 
| skipWhile() | 判断成功的都跳过 一旦为假 发送剩余的所有数据 | 
| takeUntil() | 发送为真包括以前的数据 不再处理后续数据 | 
| takeWhile() | 发送为真的数据 一旦为假就不再处理后续数据 | 
create
参见面发布者部分
just/range/fromArray
- just
 
1  | Observable observable = Observable.just("好好学习", "天天向上");  | 
- range
 
1  | Observable.range(1,10);  | 
- fromArray
 
1  | String[] quotations = {"好好学习", "天天向上"};  | 
interval/timer
1  | //延迟10s每10s发送一次  | 
throttleFirst/throttleLast
throttleFirst操作符:仅发送指定时间段内的第一个信号
throttleLast操作符:仅发送指定时间段内的第一个信号
1  | RxView.clicks(mBtn)  | 
debounce
指定时间段内没有新的信号时 则发出最后一个信号
比如监听文本变化进行搜索
1  | RxTextView.textChanges(etKey)  | 
map
类型变换
1  | String[] strs = {"11","22","33"};  | 
concatMap
concatMap(): 这是一个很有用但非常难理解的变换。
首先假设这么一种需求:上面的{"11","22","33"}我们像最终获取到1,1,2,2,3,3
1  | String[] strs = {"11","22","33"};  | 
用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化,就需要用 flatMap() 了
Kotlin
1  | Observable  | 
注意
concatMap中一定要发送onComplete事件
flatMap
flatMap和concatMap最大的区别是concatMap发射的数据集是有序的,flatMap发射的数据集是无序的
filter
过滤  
假如我们要大于5的数
1  | Integer[] nums = {3, 4, 5, 6, 7};  | 
defaultIfEmpty
当未发送onNext直接发送onComplete时 onNext收到的默认值
1  | Observable  | 
switchEmpty
如果发射源没有发射数据就完成了,就发射switchIfEmpty里面新的Observable发射源
1  | Observable  | 
zip
1  | Observable  | 
上面的代码会收到 101、202
也就是说多个Observable都发送时 才处理数据
amb/ambArray/ambWith
给定多个Observable,只让第一个发射数据的Observable发射全部数据。
take/takeWhile/takeUntil
take
1  | //取前两个信号  | 
takeWhile
1  | //发送为真的数据 一旦为假就不再处理后续数据  | 
takeUntil
1  | //发送为真包括以前的数据 不再处理后续数据  | 
skip/skipWhile/skipUntil
skip
1  | //取前两个信号  | 
skipWhile:判断成功的都跳过 一旦为假 发送剩余的所有数据
1  | Observable  | 
会收到2、3、2 判断成功的都跳过 一旦为假 发送剩余的所有数据
skipUntil:跳过原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
1  | Observable  | 
收不到数据 因为第二个Observable延迟1s结束后 原始Observable已经没有剩余数据了
中转站(Subject)
Rxjava和Rxjava2对比
io.reactivex.subjects.AsyncSubject,io.reactivex.subjects.BehaviorSubject,io.reactivex.subjects.PublishSubject,io.reactivex.subjects.ReplaySubject,io.reactivex.subjects.UnicastSubject
在RxJava2中依然存在,但现在他们不支持backpressure。
新出现的
io.reactivex.processors.AsyncProcessor,io.reactivex.processors.BehaviorProcessor,io.reactivex.processors.PublishProcessor,io.reactivex.processors.ReplayProcessorio.reactivex.processors.UnicastProcessor
支持backpressure
Subject 在平时开发时 用的不是很多
它分为四种
- PublishSubject(之后)
 - BehaviorSubject(前一个事件+之后)
 - ReplaySubject(所有事件)
 - AsyncSubject(最后事件)
 
用法如下
1  | observable.subscribe(subject);  | 
区别
假如发布者  也就是报社 只发布周一到周五的报纸 一天一份
如果我们在周三早上来报厅订报  
如果报厅是
PublishSubject我们可以收到周三 周四 周五的报纸如果报厅是
BehaviorSubject我们可以收到周二 至 周五的报纸如果报厅是
ReplaySubject我们可以收到周一 至 周五的报纸如果报厅是
AsyncSubject我们可以收到周五的报纸 但是发布的事件中如果有错误那我们只会接受到错误而不是错误的前一个事件
Android中应用
添加依赖
1  | implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'  | 
详细示例可参考 github博客生成APP(Rxjava1)