0%

Rx中的Subject使用

RxJava 在我们的工程中可以说是到处都在用了,但是我们大部分使用的是 Observable.justSchedulersObservable.zip 这样的api,像 Subject 这种我们使用的很少,下面介绍一下 Subject 的使用

Cold Observable 和 Hot Observable

Observable 分为 Cold Observable 和 Hot Observable:

Hot Observable 跟 EventBus 或者广播比较类似,是一对多的关系,订阅它的对象收到的都是同一个数据源发出的事件及数据,不管有没有订阅者,事件都会发生

而 Cold Observable,是一对一的关系,每个事件都是独立发生,比如请求一个网络接口,订阅了网络请求才会发出

我们平时使用的基本都是 Cold Observable,如常见的 just 、create、网络库返回的 Observable

Subject

Subject 可以很方便的控制 onNextonComplete 逻辑

如果我们需要基于之前的 Callback ,创建一个 Observable ,使用 create 方法只能在 call 里面去调用 onNext

Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
//这里发射数据源
...
}
})

如果使用 Subject ,可以很方便的处理数据发射时机

PublishSubject<String> subject = PublishSubject.create();
subject.onNext("1");
subject.onNext("2");
subject.onComplete();

可能会用到的的 Subject 及其特性:

Subject 发射行为
AsyncSubject 不论订阅发生在什么时候,只会发射最后一个数据
BehaviorSubject 发送订阅之前一个数据和订阅之后的全部数据
ReplaySubject 不论订阅发生在什么时候,都发射全部数据
PublishSubject 发送订阅之后全部数据

光看这个表格,会感觉 Subject 没什么卵用,下面我们重点讲讲 ReplaySubject

ReplaySubject

val subject=ReplaySubject.create<String>()
subject.onNext("1")
subject.onNext("2")
subject.subscribe {
log(it)
}
subject.onNext("3")
subject.onNext("4")

log输出结果:

1
2
3
4

然后,这有啥用?

ReplaySubject 默认会创建一个容量为 16 的动态链表,用于记录发射数据,如果数据不断增加,链表会每次增加50% 的容量。但是我们也可以通过 ReplaySubject.createWithSize 来指定链表的大小

如果我们把上面的缓存大小指定为一个,并且只调用一次 onNext

val subject=ReplaySubject.createWithSize<String>(1)
subject.onNext("1")
subject.subscribe {
log(it)
}
val subject=ReplaySubject.createWithSize<String>(1)
subject.subscribe {
log(it)
}
subject.onNext("1")

log 都会输出

1

仔细想想,这种就跟我们经常使用到的预加载很像,我们可能需要在接口回来之前或者接口回来之后处理逻,
那么我们一般会这样写代码:

预加载数据类:

//获取启动广告数据API
Observable<AdInfoBean> getAdInfo()

Callback callback

AdInfoBean adInfoBean

getAdInfo().subscribe{
if(callback!=null){
calback.onResult()
}else{
adInfoBean=it
}
}

广告展示的地方:

if(adInfoBean!=null){
//使用 adInfoBean
}else{
setCallback(new Callback(){
void onResult(){
//使用 adInfoBean
}
})
}

但是我们使用上面说的 ReplaySubject ,利用它可以 “存一个” 的特性,就可以很优雅的解决这个问题:

首先保存一个 replaySubject 的引用

//获取启动广告数据API
Observable<AdInfoBean> getAdInfo()

replaySubject = getAdInfo().replay(1)

然后在广告展示的地方:

replaySubject.subscribe{
//使用 adInfoBean
}

这样无论 getAdInfo() 的结果在什么时机回来,我们都可以用同样的流程去处理接口返回的数据

上面使用了 replay(1) 方法来将 Cold Observable 转换成 Hot Observable,还有很多别的转换方法,如 publish 。 Hot Observable 也可以转换成 Cold Observable,这个可以使用的时候自行搜索

基本上只要涉及这种可能在 subscribe 之前发生,也可能在 subscribe 之后发生的异步问题,都可以用 ReplaySubject 解决。

ReplaySubject 有个需要注意的点:如果你需要指定 size 为1,那么需要使用 createWithSize(1) 方法。如果使用 create(1) ,那么只是指定了 ReplaySubject 的初始容量为 1,后续有事件发生时,链表会自动扩容

BehaviorSubject

BehaviorSubject 的特性是有个初始状态,当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。

根据这个特性我目前能想到的就是登录状态的监听,BehaviorSubject 首先发射当前的登录态,后面有登录状态变化时,再发射变化后的状态。

需要处理登录状态的页面,直接订阅即可:

behaviorSubject.subscribe{
if(logined){
...
}else{
...
}
}

PublishSubject

PublishSubject 跟 ReplaySubject 的区别是订阅之后不会收到订阅之前的时间。类似粘性广播和非粘性广播的区别

RxBus 就是基于 PublishSubject 实现的。 RxBus 跟 EvenBus 一样,都是使用的一个事件总线,使用多了会感觉逻辑很乱。但是我们直接使用 PublishSubject 的话,我们可以把需要对外暴露的事件包装成一个 PublishSubject ,需要监听这个事件的地方直接订阅这个 Observable ,这样就是一个一对多的关系,后续查找哪里使用了这个 Observable 也很方便,跨 Module 暴露的时候,也可以少一点 XXXCallback 之类的类

AsyncSubject

asyncSubject 只有当数据源执行了 complete() ,订阅者才能收到事件,并且是最后一个事件。

这个subject 的作用暂时还没想到。

使用注意事项

Subject 会一直持有 订阅者的引用,直到Subject 的生命周期结束。订阅静态 Subject 或者对生命周期有特殊要求时,需要手动调用 unsubscribe()来解除引用