Android 中 RxJava2 的实际使用

Android 中 RxJava 的使用

Rx相关依赖

1
2
3
4
5
6
7
8
9
10
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'io.reactivex.rxjava2:rxjava:2.1.10'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'
//网络请求库
implementation 'com.lzy.net:okgo:3.0.4'
implementation 'com.lzy.net:okrx2:2.0.2'
//JSON转换
implementation 'com.alibaba:fastjson:1.2.46'

Rx相关的库

防止View点击多次

throttleFirst操作符:仅发送指定时间段内的第一个信号

throttleLast操作符:仅发送指定时间段内的第一个信号

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//对于button的防抖处理 1秒内只截取第一次的点击事件,还有一个截取最后一次的方法
//throttleFirst(2, TimeUnit.SECONDS) 截取第一个事件
//throttleWithTimeout(2,TimeUnit.SECONDS) 事件延迟两秒执行
RxView.clicks(register_button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public void onNext(@NonNull Object o) {
Log.e("收到点击事件","==="+System.currentTimeMillis());
}

@Override
public void onError(@NonNull Throwable e) {

}

@Override
public void onComplete() {

}

@Override
public void onSubscribe(@NonNull Disposable d) {

}
});

Kotlin(顺序和上面的Java一样)

1
2
3
4
5
6
7
8
9
10
11
12
RxView.clicks(register_button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe({

}, {

}, {

}, {

})

倒计时(用RxBinding)

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//按钮实现倒计时,屏蔽点击事件+定时
RxView.clicks(button)
.subscribeOn(AndroidSchedulers.mainThread())
.throttleFirst(60, TimeUnit.SECONDS)//60S后可以再次发送
.subscribe(new Observer() {
@Override
public void onSubscribe(@NonNull Disposable d) {

}

@Override
public void onNext(@NonNull Object o) {
Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
.take(60)
.subscribe(new Observer<Long>() {

@Override
public void onSubscribe(@NonNull Disposable d) {

}

@Override
public void onNext(@NonNull Long aLong) {
button.setText(60 - aLong + "S后可发送");
}

@Override
public void onError(@NonNull Throwable e) {

}

@Override
public void onComplete() {
button.setText("发送验证码");
}
});
}

@Override
public void onError(@NonNull Throwable e) {

}

@Override
public void onComplete() {

}
});

Kotlin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
RxView.clicks(register_button)
.subscribeOn(AndroidSchedulers.mainThread())
.throttleFirst(60, TimeUnit.SECONDS)//60S后可以再次发送
.subscribe {
Observable
.interval(1,TimeUnit.SECONDS,AndroidSchedulers.mainThread())
.take(60)
.subscribe({
register_button.text = "${60 - it.toInt()}S后可发送"
},{

},{
register_button.text = "发送验证码"
})
}

倒计时(未用RxBinding)

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Function;

public class RxCountDown {
public static Observable<Integer> countdown(int time) {
if (time < 0) time = 0;
final int countTime = time;
return Observable.interval(0, 1, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return countTime - aLong.intValue();
}
})
.take(countTime + 1);
}
}

调用方式(Kotlin)

1
2
3
4
5
6
7
8
9
10
11
12
13
//倒计时
RxCountDown.countdown(60)
.doOnSubscribe {
valicode_button.isEnabled = false
}
.subscribe({
valicode_button.setText("(${it})秒后重发")
}, {

}, {
valicode_button.isEnabled = true
valicode_button.text = "获取"
})

防止重复调用

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
RxTextView.textChanges(username_edittext)
.debounce(400, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.filter(new Predicate<CharSequence>() {
@Override
public boolean test(CharSequence charSequence) throws Exception {
return charSequence.toString().trim().length()>0;
}
})
.flatMap(new Function<CharSequence, ObservableSource<char[]>>() {
private Subject<char[]> s = PublishSubject.create();
@Override
public ObservableSource<char[]> apply(CharSequence charSequence) throws Exception {
char[] cahrs= charSequence.toString().toLowerCase().toCharArray();
s.onNext(cahrs);
return s;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<char[]>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(char[] chars) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

延迟执行

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.timer(3, TimeUnit.SECONDS)
.compose(this.<Long>bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Long aLong) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

Kotlin

1
2
3
4
5
6
Observable.timer(100,TimeUnit.MILLISECONDS)
.compose(this.bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {

}

循环执行

Java

1
2
3
4
5
6
7
8
9
10
//延时3s,每间隔3s,时间单位s
Observable.interval(3,3,TimeUnit.SECONDS)
.compose(this.<Long>bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {

}
});

Kotlin

1
2
3
4
5
6
Observable.interval(1,1,TimeUnit.SECONDS)
.compose(this.bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {

}

分线程操作

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("发送的数据");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}

@Override
public void onNext(String s) {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
}
});