前言
很高兴遇见你~
众所周知, RxJava 是一个非常流行的第三方开源库,它能将复杂的逻辑简单化,提高我们的开发效率,一个这么好用的库,来让我们来学习一下吧:beer:
下面我抛出一些问题,如果你都知道,那么恭喜你,你对 RxJava 掌握的很透彻,如果你对下面这些问题有一些疑惑,那么你就可以接着往下看,我会由浅入深的给你讲解 RxJava,看完之后,这些问题你会非常明了
1、什么是 观察者模式 ?什么是装饰者模式?
2、观察者模式,装饰者模式在 RxJava 中的应用?
3、RxJava map 和 flatMap 操作符有啥区别?
4、如果有多个 subscribeOn ,会是一种什么情况?为啥?
5、如果有多个 observeOn ,会是一种什么情况?为啥?
6、RxJava 框架流思想设计?
7、RxJava 的 Subject 是什么?
8、如何通过 RxJava 实现一个自己的事件总线?
一、设计模式介绍
我们先了解一下下面两种设计模式:
1、观察者模式
2、装饰者模式
1.1、观察者模式
1.1.1、观察者模式定义
简单的理解:对象间存在一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都会得到通知并被自动更新
1.1.2、观察者模式示例
//、定义一个观察者的接口 | |
Int erface Observer { | |
/** | |
* 接收事件的方法 | |
*/ | |
fun onChange(o: Any) | |
} | |
//、定义一个被观察者的接口 | |
interface Observable { | |
/** | |
* 添加观察者 | |
*/ | |
fun addObserver(observer: Observer) | |
/** | |
* 移除观察者 | |
*/ | |
fun removeObserver(observer: Observer) | |
/** | |
* 发送事件通知 | |
*/ | |
fun changeEvent(o: Any) | |
} | |
//、定义一个观察者的实现类 | |
class ObserverImpl: Observer { | |
override fun onChange(o: Any) { | |
//对事件进行打印 | |
println(o) | |
} | |
} | |
//、定义一个被观察者的实现类 | |
class ObservableImpl: Observable { | |
//存放观察者的集合 | |
private val observerList: MutableList<Observer> = LinkedList() | |
override fun addObserver(observer: Observer) { | |
observerList.add(observer) | |
} | |
override fun removeObserver(observer: Observer) { | |
observerList.remove(observer) | |
} | |
override fun changeEvent(o: Any) { | |
for (observer in observerList) { | |
observer.onChange(o) | |
} | |
} | |
} | |
//、测试 | |
fun main(){ | |
//、创建被观察者 | |
val observable = ObservableImpl() | |
//、创建观察者 | |
val observer = ObserverImpl() | |
val observer = ObserverImpl() | |
val observer = ObserverImpl() | |
//、添加观察者 | |
observable.addObserver(observer) | |
observable.addObserver(observer) | |
observable.addObserver(observer) | |
//、发送事件 | |
observable.changeEvent("erdai") | |
} | |
//打印结果 | |
erdai | |
erdai | |
erdai |
1.2、装饰者模式
1.2.1、装饰者模式定义
简单的理解:动态的给一个类进行功能增强
1.2.2、装饰者模式示例
举个例子:我想吃个蛋炒饭,但是单独一个蛋炒饭我觉得不好吃,我想在上面加火腿,加牛肉。我们使用装饰者模式来实现它
//、定义一个炒饭的接口 | |
interface Rice { | |
fun cook() | |
} | |
//、定义一个炒饭接口的实现类:蛋炒饭 | |
class EggFriedRice: Rice { | |
override fun cook() { | |
println("蛋炒饭") | |
} | |
} | |
//、定义一个炒饭的抽象装饰类 | |
abstract class RiceDecorate(var rice: Rice): Rice | |
//、往蛋炒饭中加火腿 | |
class HamFriedRiceDecorate(rice: Rice): RiceDecorate(rice) { | |
override fun cook() { | |
rice.cook() | |
println("加火腿") | |
} | |
} | |
//、往蛋炒饭中加牛肉 | |
class BeefFriedRiceDecorate(rice: Rice): RiceDecorate(rice) { | |
override fun cook() { | |
rice.cook() | |
println("加牛肉") | |
} | |
} | |
//、测试 | |
fun main(){ | |
//蛋炒饭 | |
val rice = EggFriedRice() | |
//加火腿 | |
val hamFriedRiceDecorate = HamFriedRiceDecorate(rice) | |
//加牛肉 | |
val beefFriedRiceDecorate = BeefFriedRiceDecorate(hamFriedRiceDecorate) | |
beefFriedRiceDecorate.cook() | |
} | |
//打印结果 | |
蛋炒饭 | |
加火腿 | |
加牛肉 |
装饰者模式的核心:定义一个抽象的装饰类继承顶级接口,然后持有这个顶级接口的引用,接下来就可以进行无限套娃了:smile:
二、手撸 RxJava 核心源码实现
ok,了解了两种设计模式,接下来我们正式进入 RxJava 的学习
2.1、RxJava 介绍
RxJava 是一个异步操作框架,其核心可以归纳为两点:1、异步事件流 2、响应式编程。接下来我们可以好好的去感受这两点
2.2、RxJava 操作符
RxJava 之所以强大源于它各种强大的操作符,掌握好这些操作符能让你对 RxJava 的使用得心应手,RxJava 操作符主要分为 6 大类:
每一个操作符背后都对应了一个具体的实现类,接下来我们就挑几个最常用,最核心的操作符:create,map,flatMap,observeOn,subscribeOn 进行手撸实现,相信看完这些操作符的实现后,你能融会贯通,举一反三
注意:下面这些操作符的实现和 RxJava 实现细节不尽相同,但核心思想是一致的,大家只要理解核心思想就好
2.3、create 操作符实现
create 是来创建一个被观察者对象,看了 RxJava create 操作符源码你会发现:
1、create 是使用观察者模式实现的,但 RxJava 里面使用的观察者模式和我们上面介绍的还有点不一样,它是一种变种的观察者模式
2、上面例子中我们是通过被观察者去发送事件,而 RxJava 里面定义了专门发送事件的接口,这样做的好处就是让被观察者和发射事件进行 解耦
//、定义一个观察者的顶级接口 | |
interface Observer<T> { | |
//建立了订阅关系 | |
fun onSubscribe() | |
//接收到正常事件 | |
fun onNext(t: T) | |
//接收到 error 事件 | |
fun onError(e: Throwable) | |
//接收到 onComplete 事件 | |
fun onComplete() | |
} | |
//、定义一个被观察者的顶级接口 | |
interface ObservableSource<T> { | |
//订阅观察者 | |
fun subscribe(observer: Observer<T>) | |
} | |
//、定义一个被观察者抽象类实现顶层被观察者接口 | |
abstract class Observable<T>: ObservableSource<T> { | |
override fun subscribe(observer: Observer<T>) { | |
subscribeActual(observer) | |
} | |
//实际订阅观察者的抽象方法,让子类去实现 | |
protected abstract fun subscribeActual(observer: Observer<T>) | |
//伴生类里面的方法,直接通过类名调用 | |
companion object{ | |
//这里是我们实现 create 操作符对外提供和 RxJava 类似的方法调用 | |
fun <T> create(source: ObservableOnSubscribe<T>): ObservableCreate<T>{ | |
return ObservableCreate(source) | |
} | |
} | |
} | |
//、定义一个与被观察者发射事件解耦的接口 | |
interface ObservableOnSubscribe<T> { | |
//通过 Emitter 发射事件 | |
fun subscribe(emitter: Emitter<T>) | |
} | |
//、定义事件发射器接口 | |
interface Emitter<T> { | |
//发送 onNext 事件 | |
fun onNext(t: T) | |
//发送 onError 事件 | |
fun onError(e: Throwable) | |
//发送 onComplete 事件 | |
fun onComplete() | |
} | |
//、定义 create 操作符的实现类 | |
class ObservableCreate<T>(var source: ObservableOnSubscribe<T>): Observable<T>() { | |
//实现父类订阅观察者的方法 | |
override fun subscribeActual(downStream: Observer<T>) { | |
//可以看到只要一订阅,首先就会接收 onSubscribe 事件 | |
downStream.onSubscribe() | |
//通过 ObservableOnSubscribe 里面的 Emitter 进行事件的发送,完成被观察者发送事件的解耦 | |
source.subscribe(CreateEmitter(downStream)) | |
} | |
//事件发射器实现类,可以看到传入了下游的观察者来接收我们发射的事件 | |
class CreateEmitter<T>(var downStream: Observer<T>): Emitter<T>{ | |
override fun onNext(t: T) { | |
downStream.onNext(t) | |
} | |
override fun onError(e: Throwable) { | |
downStream.onError(e) | |
} | |
override fun onComplete() { | |
downStream.onComplete() | |
} | |
} | |
} | |
//、测试 | |
fun main(){ | |
Observable.create(object : ObservableOnSubscribe<String>{ | |
override fun subscribe(emitter: Emitter<String>) { | |
//发射 onNext 事件 | |
emitter.onNext("erdai") | |
//发射 onComplete 事件 | |
emitter.onComplete() | |
} | |
}).subscribe(object : Observer<String>{ | |
override fun onSubscribe() { | |
println("onSubscribe") | |
} | |
override fun onNext(t: String) { | |
println("onNext:$t") | |
} | |
override fun onError(e: Throwable) { | |
} | |
override fun onComplete() { | |
println("onComplete") | |
} | |
}) | |
} | |
//打印结果 | |
onSubscribe | |
onNext:erdai | |
onComplete |
ok,上述代码就是 create 操作符的实现,大家如果没看明白可以多看几遍,也可以直接把我上面的代码直接拷贝到一个 kt 文件中去运行
2.4、map 操作符实现
map 是一个转换操作符,它能把一种类型转为为另外一种类型,如:Int -> String。
它的主要实现:观察者模式 + 装饰者模式 + 泛型
//、定义一个抽象装饰类,注意里面 泛型 的使用 | |
abstract class AbstractObservableWithUpstream<T,U>(var source: ObservableSource<T>): Observable<U>() | |
//、定义一个类型转换的接口 | |
interface Function <T,U> { | |
//传入 T 类型,返回 U 类型 | |
fun apply(t: T): U | |
} | |
//、定义 map 操作符的实现类 | |
class ObservableMap<T,U>(source: ObservableSource<T>,var function: Function<T,U>): AbstractObservableWithUpstream<T,U>(source) { | |
//实现父类订阅观察者的方法 | |
override fun subscribeActual(observer: Observer<U>) { | |
//接收 onSubscribe 事件 | |
observer.onSubscribe() | |
//完成事件的转换 | |
source.subscribe(MapObserver(function,observer)) | |
} | |
//MapObserver 接收 function 对类型进行转换,downStream 对事件进行接收 | |
class MapObserver<T,U>(var function: Function<T,U>,var downStream: Observer<U>): Observer<T>{ | |
override fun onSubscribe() { | |
} | |
override fun onNext(t: T) { | |
//核心:当接收到 T 类型,调用 function.apply 转换为 U 类型 | |
val u: U = function.apply(t) | |
downStream.onNext(u) | |
} | |
override fun onError(e: Throwable) { | |
downStream.onError(e) | |
} | |
override fun onComplete() { | |
downStream.onComplete() | |
} | |
} | |
} | |
//、Observable 中增加相应的调用方法 | |
fun <U> map(function: Function<T, U>): ObservableMap<T,U>{ | |
return ObservableMap(this, function) | |
} | |
//、测试 | |
fun main(){ | |
Observable.create(object : ObservableOnSubscribe<String>{ | |
override fun subscribe(emitter: Emitter<String>) { | |
emitter.onNext("erdai") | |
emitter.onComplete() | |
} | |
}) | |
.map(object : Function<String, String >{ | |
override fun apply(t: String): String { | |
return "map 转换:$t" | |
} | |
}) | |
.subscribe(object : Observer<String>{ | |
override fun onSubscribe() { | |
println("onSubscribe") | |
} | |
override fun onNext(t: String) { | |
println("onNext:$t") | |
} | |
override fun onError(e: Throwable) { | |
} | |
override fun onComplete() { | |
println("onComplete") | |
} | |
}) | |
} | |
//打印结果 | |
onSubscribe | |
onNext:map 转换:erdai | |
onComplete |
2.5、flatMap 操作符实现
flatMap 操作符的实现其实和 map 类似,只不过是把 :Function<T, U> -> Function<T, ObservableSource> ,将一种类型转换为了一个被观察者的类型,被观察者的类型又可以进行一系列的转换,因此能拆分更细的粒度:
//、定义 flatMap 操作符的实现类 | |
class ObservableFlatMap<T,U>(source: ObservableSource<T>,var function: Function<T,ObservableSource<U>>): AbstractObservableWithUpstream<T,U>(source) { | |
override fun subscribeActual(observer: Observer<U>) { | |
observer.onSubscribe() | |
source.subscribe(FlatMapObserver(function,observer)) | |
} | |
//FlatMapObserver 接收 function 对类型进行转换,downStream 对事件进行接收 | |
class FlatMapObserver<T,U>(var function: Function<T,ObservableSource<U>>, var downStream: Observer<U>): Observer<T>{ | |
override fun onSubscribe() { | |
} | |
override fun onNext(t: T) { | |
//核心:当接收到 T 类型,调用 function.apply 转换为 ObservableSource<U> 类型 | |
val u: ObservableSource<U> = function.apply(t) | |
//对 u 进行更细粒度的拆分,在让下游观察者进行接收 | |
u.subscribe(object : Observer<U>{ | |
override fun onSubscribe() { | |
} | |
override fun onNext(t: U) { | |
downStream.onNext(t) | |
} | |
override fun onError(e: Throwable) { | |
} | |
override fun onComplete() { | |
} | |
}) | |
} | |
override fun onError(e: Throwable) { | |
downStream.onError(e) | |
} | |
override fun onComplete() { | |
downStream.onComplete() | |
} | |
} | |
} | |
//、Observable 中增加相应的调用方法 | |
fun <U> flatMap(function: Function<T,ObservableSource<U>>): ObservableFlatMap<T,U>{ | |
return ObservableFlatMap(this,function) | |
} | |
//、测试 | |
fun main(){ | |
Observable.create(object : ObservableOnSubscribe<String>{ | |
override fun subscribe(emitter: Emitter<String>) { | |
emitter.onNext("erdai") | |
emitter.onComplete() | |
} | |
}).flatMap(object : Function<String,ObservableSource<String>>{ | |
override fun apply(t: String): ObservableSource<String> { | |
return Observable.create(object : ObservableOnSubscribe<String>{ | |
override fun subscribe(emitter: Emitter<String>) { | |
emitter.onNext("flatMap:$t") | |
} | |
}) | |
} | |
}) | |
.subscribe(object : Observer<String>{ | |
override fun onSubscribe() { | |
println("onSubscribe") | |
} | |
override fun onNext(t: String) { | |
println("onNext:$t") | |
} | |
override fun onError(e: Throwable) { | |
} | |
override fun onComplete() { | |
println("onComplete") | |
} | |
}) | |
} | |
//打印结果 | |
onSubscribe | |
onNext:flatMap:erdai | |
onComplete |
2.6、subscribeOn 操作符实现
subscribeOn 主要是用来决定我们订阅观察者是在哪个线程执行
//、定义一个抽象的调度器 | |
abstract class Scheduler { | |
abstract fun createWorker(): Worker | |
//定义一个抽象的 Worker | |
abstract class Worker{ | |
//真正决定线程执行 | |
abstract fun schedule(runnable: Runnable) | |
} | |
} | |
//、定义调度器的实现类,我们主要实现两种: | |
//.1、AndroidMainScheduler:Android 主线程 | |
//可以看到我们就是使用 Handler 将线程切换到主线程 | |
class AndroidMainScheduler(var handler: Handler): Scheduler() { | |
override fun createWorker(): Worker { | |
return AndroidMainWorker(handler) | |
} | |
class AndroidMainWorker(var handler: Handler): Worker(){ | |
override fun schedule(runnable: Runnable) { | |
handler.post(runnable) | |
} | |
} | |
} | |
//.2、NewThreadScheduler:开启一个新的子线程 | |
//可以看到我们就是使用线程池来执行 runnable | |
class NewThreadScheduler(var executorService: ExecutorService): Scheduler() { | |
override fun createWorker(): Worker { | |
return NewThreadWork(executorService) | |
} | |
class NewThreadWork(var executorService: ExecutorService): Worker(){ | |
override fun schedule(runnable: Runnable) { | |
executorService.execute(runnable) | |
} | |
} | |
} | |
//、定义一个线程调度器的工具类,类似 RxJava 的调用 | |
class Schedulers { | |
companion object{ | |
//切换到子线程 | |
fun newThread(): NewThreadScheduler{ | |
return NewThreadScheduler(Executors.newScheduledThreadPool()) | |
} | |
//切换到主线程 | |
fun mainThread(): AndroidMainScheduler{ | |
return AndroidMainScheduler(Handler(Looper.getMainLooper())) | |
} | |
} | |
} | |
//、定义 subscribeOn 操作符实现类 | |
class ObservableSubscribeOn<T>(source: ObservableSource<T>,var scheduler: Scheduler): AbstractObservableWithUpstream<T,T>(source) { | |
override fun subscribeActual(observer: Observer<T>) { | |
//接收订阅事件 | |
observer.onSubscribe() | |
//创建 Worker 决定我们代码所执行的线程 | |
val worker = scheduler.createWorker() | |
worker.schedule(SubscribeTask(SubscribeOnObserver(observer))) | |
} | |
//可以看到,Runnable 里面就只做了一个订阅操作,因此 subscribeOn 会决定我们订阅观察者的线程 | |
inner class SubscribeTask(var observer: SubscribeOnObserver<T>): Runnable{ | |
override fun run() { | |
source.subscribe(observer) | |
} | |
} | |
//如果我们没有使用 observeOn 切换线程,那么观察者接收事件的线程也会由 subscribeOn 线程决定 | |
class SubscribeOnObserver<T>(var observer: Observer<T>): Observer<T>{ | |
override fun onSubscribe() { | |
} | |
override fun onNext(t: T) { | |
observer.onNext(t) | |
} | |
override fun onError(e: Throwable) { | |
observer.onError(e) | |
} | |
override fun onComplete() { | |
observer.onComplete() | |
} | |
} | |
} | |
//、Observable 中增加相应的调用方法 | |
fun subscribeOn(scheduler: Scheduler): ObservableSubscribeOn<T>{ | |
return ObservableSubscribeOn(this,scheduler) | |
} | |
//、测试 | |
fun main(){ | |
Observable.create(object :ObservableOnSubscribe<String>{ | |
override fun subscribe(emitter: Emitter<String>) { | |
emitter.onNext("erdai") | |
emitter.onComplete() | |
println("subscribe:${Thread.currentThread().name}") | |
} | |
}).subscribeOn(Schedulers.newThread()) | |
.subscribe(object : Observer<String>{ | |
override fun onSubscribe() { | |
println("onSubscribe:${Thread.currentThread().name}") | |
} | |
override fun onNext(t: String) { | |
println("onNext:$t") | |
println("onNext:${Thread.currentThread().name}") | |
} | |
override fun onError(e: Throwable) { | |
println("onError:${Thread.currentThread().name}") | |
} | |
override fun onComplete() { | |
println("onComplete") | |
println("onComplete:${Thread.currentThread().name}") | |
} | |
}) | |
} | |
//打印结果 | |
onSubscribe:main | |
onNext:erdai | |
onNext:pool--thread-1 | |
onComplete | |
onComplete:pool--thread-1 | |
subscribe:pool--thread-1 |
分析一下上面的打印结果:
1、onSubscribe 是在一开始订阅就触发的,此时 Worker 都还没创建,因此是在主线程执行的
2、因为我们没有使用 observeOn 对观察者接收事件的线程进行切换,所以 onNext,onComplete 接收事件的线程由 subscribeOn 切换的线程决定,
3、subscribe 在我们实际订阅观察者的方法里会执行它,因此是由 subscribeOn 切换的线程决定
2.7、observeOn 操作符实现
observeOn 是用来决定我们观察者接收事件是在哪个线程执行,实现相对复杂一点,它内部使用了一个队列来存储发送过来的 onNext 事件,然后通过 While 循环对队列中的事件进行处理,具体大家可以看我下面的实现,写了详细的注释
//、定义 observeOn 操作符实现类 | |
class ObservableObserveOn<T>(source: ObservableSource<T>, var scheduler: Scheduler): AbstractObservableWithUpstream<T, T>(source) { | |
override fun subscribeActual(observer: Observer<T>) { | |
//接收订阅事件 | |
observer.onSubscribe() | |
val worker = scheduler.createWorker() | |
source.subscribe(ObserveOnObserver(observer,worker)) | |
} | |
class ObserveOnObserver<T>(var observer: Observer<T>, var worker: Scheduler.Worker, var queue: Deque<T>? = null): Observer<T>,Runnable { | |
//标记是否事件都已经接收,一般在 onError 或 onComplete 时标记 | |
var done = false | |
//记录 onError 的异常 | |
var throwable: Throwable? = null | |
//是否能结束 While 循环:例如观察者接收了 onError 或 onComplete 事件,就可以结束循环了 | |
var over = false | |
init { | |
//如果队列为空,则新建 | |
if(queue == null){ | |
queue = ArrayDeque() | |
} | |
} | |
override fun onSubscribe() { | |
} | |
override fun onNext(t: T) { | |
if(done)return | |
//将接收的 onNext 事件加入队列中 | |
queue?.offer(t) | |
//执行调度 | |
schedule() | |
} | |
override fun onError(e: Throwable) { | |
if(done)return | |
//记录异常 | |
throwable = e | |
//标记接收事件完成 | |
done = true | |
//执行调度 | |
schedule() | |
} | |
override fun onComplete() { | |
if(done)return | |
//标记接收事件完成 | |
done = true | |
//执行调度 | |
schedule() | |
} | |
//可以看到这里进行了任务的执行,由 observeOn 决定执行的线程 | |
private fun schedule() { | |
worker.schedule(this) | |
} | |
override fun run() { | |
drainNormal() | |
} | |
//实际最终的逻辑就是在这个方法里面进行处理 | |
private fun drainNormal() { | |
//取当前的队列 | |
val q = queue | |
//取观察者 | |
val obs = observer | |
//while 循环取出队列里面的 onNext 事件 | |
while (true){ | |
//取 done 标记 | |
val d = done | |
//从队列中取出元素并出队 | |
val t = q?.poll() | |
//如果 t 为 null 表示队列里面没有事件了 | |
val empty = t == null | |
//检查是否能终止 While 循环 | |
if(checkTerminated(d,empty,obs)){ | |
return | |
} | |
//如果队列为空,跳出 While 循环 | |
if(empty)break | |
//观察者接收 onNext 事件 | |
t?.apply { | |
obs.onNext(this) | |
} | |
} | |
} | |
//检查是否能终止 While 循环 | |
private fun checkTerminated(d: Boolean, empty: Boolean, obs: Observer<T>): Boolean { | |
if(over){ | |
//如果能结束了,清空队列 | |
queue?.clear() | |
return true | |
} | |
//如果已经完成事件的发送 | |
if(d){ | |
val e = throwable | |
if(e != null){ | |
//如果有 onError 事件,标记结束,并接收 onError 事件 | |
over = true | |
obs.onError(e) | |
}else if(empty){ | |
//如果队列为空,标记结束,并接收 onComplete 事件 | |
over = true | |
obs.onComplete() | |
return true | |
} | |
} | |
return false | |
} | |
} | |
} | |
//、Observable 中增加相应的调用方法 | |
fun observeOn(scheduler: Scheduler): ObservableObserveOn<T>{ | |
return ObservableObserveOn(this,scheduler) | |
} | |
//、测试,因为涉及到 Handler 切换到主线程,我们这里放到 Activity 里面去测试 | |
class MainActivity : AppCompatActivity() { | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_main) | |
Observable.create(object : ObservableOnSubscribe<String> { | |
override fun subscribe(emitter: Emitter<String>) { | |
emitter.onNext("erdai") | |
emitter.onComplete() | |
println("subscribe:${Thread.currentThread().name}") | |
} | |
}) | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(Schedulers.mainThread()) | |
.subscribe(object : Observer<String> { | |
override fun onSubscribe() { | |
println("onSubscribe:${Thread.currentThread().name}") | |
} | |
override fun onNext(t: String) { | |
println("onNext:$t") | |
println("onNext:${Thread.currentThread().name}") | |
} | |
override fun onError(e: Throwable) { | |
println("onError:${Thread.currentThread().name}") | |
} | |
override fun onComplete() { | |
println("onComplete") | |
println("onComplete:${Thread.currentThread().name}") | |
} | |
}) | |
} | |
} | |
//打印结果 | |
onSubscribe:main | |
subscribe:pool--thread-1 | |
onNext:erdai | |
onNext:main | |
onComplete | |
onComplete:main |
分析一下上面的打印结果:
1、onSubscribe 是在一开始订阅就触发的,此时 Worker 都还没创建,因此是在主线程执行的
2、subscribe 在我们实际订阅观察者的方法里会执行它,因此是由 subscribeOn 切换的线程决定
3、observeOn 决定了观察者接收事件所在的线程,因此 onNext,onComplete 是在主线程执行的
三、RxJava 框架流思想设计
我们通过一段代码来分析 RxJava 的框架流设计:
class MainActivity : AppCompatActivity() { | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_main) | |
//create 操作符 | |
Observable.create(object : ObservableOnSubscribe<String> { | |
override fun subscribe(emitter: Emitter<String>) { | |
emitter.onNext("erdai") | |
emitter.onComplete() | |
println("subscribe:${Thread.currentThread().name}") | |
} | |
}) | |
//map 操作符 | |
.map(object : Function<String,String>{ | |
override fun apply(t: String): String { | |
println("map:${Thread.currentThread().name}") | |
return "map:$t" | |
} | |
}) | |
//flatMap 操作符 | |
.flatMap(object : Function<String,ObservableSource<String>>{ | |
override fun apply(t: String): ObservableSource<String> { | |
println("flatMap:${Thread.currentThread().name}") | |
return Observable.create(object : ObservableOnSubscribe<String>{ | |
override fun subscribe(emitter: Emitter<String>) { | |
emitter.onNext("flatMap:$t") | |
} | |
}) | |
} | |
}) | |
//subscribeOn 操作符 | |
.subscribeOn(Schedulers.newThread()) | |
//observeOn 操作符 | |
.observeOn(Schedulers.mainThread()) | |
//订阅 | |
.subscribe(object : Observer<String> { | |
override fun onSubscribe() { | |
println("onSubscribe:${Thread.currentThread().name}") | |
} | |
override fun onNext(t: String) { | |
println("onNext:$t") | |
println("onNext:${Thread.currentThread().name}") | |
} | |
override fun onError(e: Throwable) { | |
println("onError:${Thread.currentThread().name}") | |
} | |
override fun onComplete() { | |
println("onComplete") | |
println("onComplete:${Thread.currentThread().name}") | |
} | |
}) | |
} | |
} |
3.1、链式构建流
特点:从上往下
使用一段伪代码来分析 RxJava Observable 的构建
val source = ObservableOnSubscribe() | |
//create 操作符 | |
Observable.create(souce) ---> observable = ObservableCreate(source) | |
//map 操作符 | |
observable.map() ---> observable1 = ObservableMap(observable0) | |
//flatMap 操作符 | |
observable.flatMap() ---> observable2 = ObservableFlatMap(observable1) | |
//subscribeOn 操作符 | |
observable.subscribeOn() ---> observable3 = ObservableSubscribeOn(observable2) | |
//observeOn 操作符 | |
observable.observeOn() ---> observable4 = ObservableObserveOn(observable3) |
有没有发现规律: 我们在上游创建的 Observable(被观察者) 会被传入到下游。这就是典型的装饰者模式的应用,它的特点就是从上往下,无限套娃,动态的达到功能的增强
3.2、订阅流
特点:从下往上
使用一段伪代码来分析 RxJava 订阅的过程
val observe = Observer(){}... | |
observable.subscribe(observe5) ---> observable4.subscribeActual(observe5) | |
//observeOn 操作符 | |
val observe = ObserveOnObserver(observe5) | |
observable.subscribe(observe4) ---> observable3.subscribeActual(observe4) | |
//subscribeOn 操作符 | |
val observe = SubscribeOnObserver(observe4) | |
observable.subscribe(observe3) ---> observable2.subscribeActual(observe3) | |
//flatMap 操作符 | |
val observe = FlatMapObserver(observe3) | |
observable.subscribe(observe2) ---> observable1.subscribeActual(observe2) | |
//map 操作符 | |
val observe = MapObserver(observe2) | |
observable.subscribe(observe1) ---> observable0.subscribeActual(observe1) | |
//create 操作符 | |
val emitter = CreateEmitter(observe) | |
source.subscribe(emitter) |
有点递归的意思哈
可以发现规律: 我们在下游创建的 Observable 订阅时,会递归先执行上游的订阅,因此订阅流的特点就是从下往上
3.3、回调流
特点:从上往下
我们分析订阅流可以发现,观察者对象是从下往上传的,因此当 emitter 发送事件时,接收的顺序:
//create 操作符 | |
emitter -> observe | |
//map 操作符 | |
observe -> observe2 | |
//flatMap 操作符 | |
observe -> observe3 | |
//subscribeOn 操作符 | |
observe -> observe4 | |
//observeOn 操作符 | |
observe -> observe5 |
可以看到: 当 emitter 发送事件后,观察者收到事件的顺序是从上往下的
上面这三个流就是 RxJava 框架流的一个思想设计,对于你理解 RxJava 非常重要,如果没看明白,多看几遍
3.4、问题回顾
掌握了 RxJava 框架流,我们回顾一下前面提到的两个问题:
1、如果有多个 subscribeOn ,会是一种什么情况?为啥?
答:只有最上面那个 subscribeOn 切换的线程才会生效。因为 subscribeOn 的作用就是决定你订阅所执行的线程,而订阅流是从下往上的,因此你如果使用多个 subscribeOn 对线程进行切换,最终生效的只会是最上面那个
2、如果有多个 observeOn ,会是一种什么情况?为啥?
答:同理,只有最下游那个 observeOn 切换的线程才会生效。因为回调流是从上往下的,所以如果你创建了多个观察者接收事件,最终生效的只会是最下面那个
好好体会下下面这个例子:
class MainActivity : AppCompatActivity() { | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_main) | |
Observable.create(object : ObservableOnSubscribe<String> { | |
override fun subscribe(emitter: Emitter<String>) { | |
emitter.onNext("erdai") | |
emitter.onComplete() | |
println("subscribe:${Thread.currentThread().name}") | |
} | |
}) | |
.subscribeOn(Schedulers.newThread()) | |
.map(object : Function<String, String> { | |
override fun apply(t: String): String { | |
println("map:${Thread.currentThread().name}") | |
return "map:$t" | |
} | |
}) | |
.subscribeOn(Schedulers.mainThread()) | |
.subscribeOn(Schedulers.newThread()) | |
.subscribeOn(Schedulers.mainThread()) | |
.observeOn(Schedulers.newThread()) | |
.observeOn(Schedulers.mainThread()) | |
.observeOn(Schedulers.newThread()) | |
.observeOn(Schedulers.mainThread()) | |
.subscribe(object : Observer<String> { | |
override fun onSubscribe() { | |
println("onSubscribe:${Thread.currentThread().name}") | |
} | |
override fun onNext(t: String) { | |
println("onNext:$t") | |
println("onNext:${Thread.currentThread().name}") | |
} | |
override fun onError(e: Throwable) { | |
println("onError:${Thread.currentThread().name}") | |
} | |
override fun onComplete() { | |
println("onComplete") | |
println("onComplete:${Thread.currentThread().name}") | |
} | |
}) | |
} | |
} | |
//打印结果 | |
onSubscribe:main | |
map:pool--thread-1 | |
subscribe:pool--thread-1 | |
onNext:map:erdai | |
onNext:main | |
onComplete | |
onComplete:main |
四、RxLifeCycle
实现 RxLifeCycle 之前,我们需要了解一下 compose 操作符
4.1、compose 操作符介绍
compose 操作符作用:传入一个上游的被观察者返回一个下游的被观察者,能起到一个代码复用的逻辑
注意:下面使用的是 RxJava 包下的类
class MyTransformer<T: Any>: ObservableTransformer<T,T> { | |
override fun apply(upstream: Observable<T>): ObservableSource<T> { | |
//完成订阅和回调线程的切换 | |
return upstream.subscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
} | |
} | |
//实际应用 | |
class MainActivity : AppCompatActivity() { | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_main) | |
Observable.create(object : ObservableOnSubscribe<String>{ | |
override fun subscribe(emitter: ObservableEmitter<String>) { | |
emitter.onNext("erdai") | |
emitter.onComplete() | |
println("subscribe:${Thread.currentThread().name}") | |
} | |
}) | |
//compose 操作符需要传入一个 ObservableTransformer 类型的对象 | |
.compose(MyTransformer()) | |
.subscribe(object : Observer<String>{ | |
override fun onSubscribe(d: Disposable) { | |
println("onSubscribe:${Thread.currentThread().name}") | |
} | |
override fun onNext(t: String) { | |
println("onNext $t") | |
println("onNext:${Thread.currentThread().name}") | |
} | |
override fun onError(e: Throwable) { | |
println("onError:${Thread.currentThread().name}") | |
} | |
override fun onComplete() { | |
println("onComplete") | |
println("onComplete:${Thread.currentThread().name}") | |
} | |
}) | |
} | |
} | |
//打印结果 | |
onSubscribe:main | |
subscribe:RxCachedThreadScheduler- | |
onNext erdai | |
onNext:main | |
onComplete | |
onComplete:main |
4.2、RxLifeCycle 实现
我们上面写的代码是存在内存泄漏的,如果我们使用 RxJava 在 Activity 做一个网络请求,此时用户退出了当前 Activity ,但是网络请求还在继续,那么此时就会产生内存泄漏,因此我们需要在做网络请求的时候感知 Activity 的生命周期去做相应的逻辑处理,那么此时 RxLifeCycle 就派上用场了,直接上代码:
//、RxLifeCycle 实现类 | |
//.1、它实现了 LifecycleEventObserver,因此可以感知 LifecycleOwner 的声明周期 | |
//.2、它实现了 ObservableTransformer,因此我们可以使用 compose 操作符 | |
class RxLifeCycle<T: Any>: LifecycleEventObserver,ObservableTransformer<T,T> { | |
var compositeDisposable = CompositeDisposable() | |
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) { | |
if(event == Lifecycle.Event.ON_DESTROY){ | |
//监听到 LifecycleOwner 生命周期为 Destroy 时,移除 Disposable 容器中的所有 Disposable 对象 | |
compositeDisposable.clear() | |
} | |
} | |
override fun apply(upstream: Observable<T>): ObservableSource<T> { | |
return upstream.doOnSubscribe{ | |
//将 Disposable 加入 Disposable 容器 | |
compositeDisposable.add(it) | |
} | |
} | |
companion object{ | |
//传入 LifecycleOwner,和 RxJava 进行生命周期绑定 | |
fun <T: Any> bindToDestroy(owner: LifecycleOwner): RxLifeCycle<T>{ | |
val rxLifeCycle = RxLifeCycle<T>() | |
owner.lifecycle.addObserver(rxLifeCycle) | |
return rxLifeCycle | |
} | |
} | |
} | |
//、使用 | |
class MainActivity : AppCompatActivity() { | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_main) | |
Observable.create(object : ObservableOnSubscribe<String>{ | |
override fun subscribe(emitter: ObservableEmitter<String>) { | |
emitter.onNext("erdai") | |
emitter.onComplete() | |
println("subscribe:${Thread.currentThread().name}") | |
} | |
}) | |
.compose(MyTransformer()) | |
//绑定生命周期,防止内存泄漏 | |
.compose(RxLifeCycle.bindToDestroy(this)) | |
.subscribe(object : Observer<String>{ | |
override fun onSubscribe(d: Disposable) { | |
println("onSubscribe:${Thread.currentThread().name}") | |
} | |
override fun onNext(t: String) { | |
println("onNext $t") | |
println("onNext:${Thread.currentThread().name}") | |
} | |
override fun onError(e: Throwable) { | |
println("onError:${Thread.currentThread().name}") | |
} | |
override fun onComplete() { | |
println("onComplete") | |
println("onComplete:${Thread.currentThread().name}") | |
} | |
}) | |
} | |
} |
五、RxBus
实现 RxBus 前,我们需要先了解一下 Subject
5.1、Subject 介绍
1、Subject 既可以表示一个被观察者也可以表示一个观察者,实际它就是继承了 Observable 抽象类并实现了 Observer 接口
2、Subject 主要分为四种:
、AsyncSubject | |
、BehaviorSubject | |
、PublishSubject | |
、ReplaySubject |
5.1.1、AsyncSubject
特点:事件发射无论是在订阅前还是后,都只会接收最后一个事件
fun main(){ | |
val subject = AsyncSubject.create<String>() | |
subject.onNext("A") | |
subject.onNext("B") | |
subject.subscribe { | |
println(it) | |
} | |
subject.onNext("C") | |
subject.onNext("D") | |
subject.onComplete() | |
} | |
//打印结果 | |
D |
5.1.2、BehaviorSubject
特点:接收订阅前最后一个事件以及订阅后的所有事件
fun main(){ | |
val subject = BehaviorSubject.create<String>() | |
subject.onNext("A") | |
subject.onNext("B") | |
subject.subscribe { | |
println(it) | |
} | |
subject.onNext("C") | |
subject.onNext("D") | |
subject.onComplete() | |
} | |
//打印结果 | |
B | |
C | |
D |
5.1.3、PublishSubject
特点:只接收订阅后的所有事件
fun main(){ | |
val subject = PublishSubject.create<String>() | |
subject.onNext("A") | |
subject.onNext("B") | |
subject.subscribe { | |
println(it) | |
} | |
subject.onNext("C") | |
subject.onNext("D") | |
subject.onComplete() | |
} | |
//打印结果 | |
C | |
D |
5.1.4、ReplaySubject
特点:事件发射无论是在订阅前还是后,都会被全部接收
fun main(){ | |
val subject = ReplaySubject.create<String>() | |
subject.onNext("A") | |
subject.onNext("B") | |
subject.subscribe { | |
println(it) | |
} | |
subject.onNext("C") | |
subject.onNext("D") | |
subject.onComplete() | |
} | |
//打印结果 | |
A | |
B | |
C | |
D |
5.2、RxBus 实现
我们在日常开发中,事件总线用的最多的可能是 EventBus,殊不知 RxJava 也能通过 Subject 实现事件总线的功能,而且使用起来比 EventBus 还简单一些:
//、RxBus 实现类 | |
object RxBus { | |
//定义一个 PublishSubject 类型的 Subject,只接收订阅后的事件 | |
private val subject: Subject<Any> = PublishSubject.create<Any>().toSerialized() | |
//接收事件 | |
fun <T: Any> receive(clazz: Class<T>): Observable<T>{ | |
return subject.ofType(clazz) | |
} | |
//发送事件 | |
fun post(o: Any){ | |
subject.onNext(o) | |
} | |
} | |
//、使用 | |
class MainActivity : AppCompatActivity() { | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_main) | |
RxBus.receive(String::class.java) | |
//绑定生命周期,防止内存泄漏 | |
.compose(RxLifeCycle.bindToDestroy(this)) | |
.subscribe { | |
println(it) | |
} | |
RxBus.post("erdai") | |
} | |
} | |
//打印结果 | |
erdai |
六、总结
本篇文章我们由浅入深对 RxJava 进行了全面的介绍:
1、介绍了 RxJava 中使用的两种设计模式:
、变种的观察者模式 | |
、装饰者模式 |
2、手撸了 RxJava 核心操作符的实现,希望你能举一反三,其它操作符的实现也是类似的套路
3、介绍了 RxJava 框架流思想设计:
、链式构建流:从上往下 | |
、订阅流:从下往上 | |
、回调流:从上往下 |
4、介绍了 compose 操作符并扩展实现了 RxLifeCycle
5、介绍了 Subject 并扩展实现了 RxBus
好了,本篇文章到这里就结束了,希望能给你带来帮助