RxJava 在我们的工程中可以说是到处都在用了,但是我们大部分使用的是 Observable.just
、Schedulers
、Observable.zip
这样的api,像 Subject
这种我们使用的很少,下面介绍一下 Subject
的使用
Cold Observable 和 Hot Observable
Observable 分为 Cold Observable 和 Hot Observable:
Hot Observable 跟 EventBus 或者广播比较类似,是一对多的关系,订阅它的对象收到的都是同一个数据源发出的事件及数据,不管有没有订阅者,事件都会发生
而 Cold Observable,是一对一的关系,每个事件都是独立发生,比如请求一个网络接口,订阅了网络请求才会发出
我们平时使用的基本都是 Cold Observable,如常见的 just 、create、网络库返回的 Observable
Subject
Subject 可以很方便的控制 onNext
、onComplete
逻辑
如果我们需要基于之前的 Callback ,创建一个 Observable ,使用 create 方法只能在 call 里面去调用 onNext
Observable.create(new Observable.OnSubscribe<Object>() { |
如果使用 Subject ,可以很方便的处理数据发射时机
PublishSubject<String> subject = PublishSubject.create(); |
可能会用到的的 Subject 及其特性:
Subject | 发射行为 |
---|---|
AsyncSubject | 不论订阅发生在什么时候,只会发射最后一个数据 |
BehaviorSubject | 发送订阅之前一个数据和订阅之后的全部数据 |
ReplaySubject | 不论订阅发生在什么时候,都发射全部数据 |
PublishSubject | 发送订阅之后全部数据 |
光看这个表格,会感觉 Subject 没什么卵用,下面我们重点讲讲 ReplaySubject
ReplaySubject
val subject=ReplaySubject.create<String>() |
log输出结果:
1 |
然后,这有啥用?
ReplaySubject 默认会创建一个容量为 16 的动态链表,用于记录发射数据,如果数据不断增加,链表会每次增加50% 的容量。但是我们也可以通过 ReplaySubject.createWithSize
来指定链表的大小
如果我们把上面的缓存大小指定为一个,并且只调用一次 onNext
val subject=ReplaySubject.createWithSize<String>(1) |
val subject=ReplaySubject.createWithSize<String>(1) |
log 都会输出
1 |
仔细想想,这种就跟我们经常使用到的预加载很像,我们可能需要在接口回来之前或者接口回来之后处理逻,
那么我们一般会这样写代码:
预加载数据类:
//获取启动广告数据API |
广告展示的地方:
if(adInfoBean!=null){ |
但是我们使用上面说的 ReplaySubject
,利用它可以 “存一个” 的特性,就可以很优雅的解决这个问题:
首先保存一个 replaySubject 的引用
//获取启动广告数据API |
然后在广告展示的地方:
replaySubject.subscribe{ |
这样无论 getAdInfo()
的结果在什么时机回来,我们都可以用同样的流程去处理接口返回的数据
上面使用了 replay(1)
方法来将 Cold Observable 转换成 Hot Observable,还有很多别的转换方法,如 publish
。 Hot Observable 也可以转换成 Cold Observable,这个可以使用的时候自行搜索
基本上只要涉及这种可能在 subscribe 之前发生,也可能在 subscribe 之后发生的异步问题,都可以用 ReplaySubject 解决。
ReplaySubject 有个需要注意的点:如果你需要指定 size 为1,那么需要使用
createWithSize(1)
方法。如果使用create(1)
,那么只是指定了 ReplaySubject 的初始容量为 1,后续有事件发生时,链表会自动扩容
BehaviorSubject
BehaviorSubject 的特性是有个初始状态,当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
根据这个特性我目前能想到的就是登录状态的监听,BehaviorSubject 首先发射当前的登录态,后面有登录状态变化时,再发射变化后的状态。
需要处理登录状态的页面,直接订阅即可:
behaviorSubject.subscribe{ |
PublishSubject
PublishSubject 跟 ReplaySubject 的区别是订阅之后不会收到订阅之前的时间。类似粘性广播和非粘性广播的区别
RxBus 就是基于 PublishSubject 实现的。 RxBus 跟 EvenBus 一样,都是使用的一个事件总线,使用多了会感觉逻辑很乱。但是我们直接使用 PublishSubject 的话,我们可以把需要对外暴露的事件包装成一个 PublishSubject ,需要监听这个事件的地方直接订阅这个 Observable ,这样就是一个一对多的关系,后续查找哪里使用了这个 Observable 也很方便,跨 Module 暴露的时候,也可以少一点 XXXCallback 之类的类
AsyncSubject
asyncSubject 只有当数据源执行了 complete()
,订阅者才能收到事件,并且是最后一个事件。
这个subject 的作用暂时还没想到。
使用注意事项
Subject 会一直持有 订阅者的引用,直到Subject 的生命周期结束。订阅静态 Subject 或者对生命周期有特殊要求时,需要手动调用 unsubscribe()来解除引用