"一篇就够"系列:RxJava 核心解密

Java
280
0
0
2023-06-18

前言

很高兴遇见你~

众所周知, RxJava 是一个非常流行的第三方开源库,它能将复杂的逻辑简单化,提高我们的开发效率,一个这么好用的库,来让我们来学习一下吧:beer:

下面我抛出一些问题,如果你都知道,那么恭喜你,你对 RxJava 掌握的很透彻,如果你对下面这些问题有一些疑惑,那么你就可以接着往下看,我会由浅入深的给你讲解 RxJava,看完之后,这些问题你会非常明了

1、什么是 观察者模式 ?什么是装饰者模式?

2、观察者模式,装饰者模式在 RxJava 中的应用?

3、RxJava map 和 flatMap 操作符有啥区别?

4、如果有多个 subscribeOn ,会是一种什么情况?为啥?

5、如果有多个 observeOn ,会是一种什么情况?为啥?

6、RxJava 框架流思想设计?

7、RxJava 的 Subject 是什么?

8、如何通过 RxJava 实现一个自己的事件总线?

一、设计模式介绍

我们先了解一下下面两种设计模式:

1、观察者模式

2、装饰者模式

1.1、观察者模式

1.1.1、观察者模式定义

简单的理解:对象间存在一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都会得到通知并被自动更新

1.1.2、观察者模式示例

 //、定义一个观察者的接口
 Int erface Observer {
    /**
     * 接收事件的方法
     */
    fun onChange(o: Any)
}

//、定义一个被观察者的接口
interface Observable {
    /**
     * 添加观察者
     */
    fun addObserver(observer: Observer)
    /**
     * 移除观察者
     */
    fun removeObserver(observer: Observer)
    /**
     * 发送事件通知
     */
    fun changeEvent(o: Any)
}

//、定义一个观察者的实现类
class ObserverImpl: Observer {
    override fun onChange(o: Any) {
      	//对事件进行打印
        println(o)
    }
}

//、定义一个被观察者的实现类
class ObservableImpl: Observable {

    //存放观察者的集合
     private  val observerList: MutableList<Observer> = LinkedList()

    override fun addObserver(observer: Observer) {
        observerList.add(observer)
    }

    override fun removeObserver(observer: Observer) {
        observerList.remove(observer)
    }

    override fun changeEvent(o: Any) {
        for (observer in observerList) {
            observer.onChange(o)
        }
    }
}

//、测试
fun main(){
    //、创建被观察者
    val observable = ObservableImpl()
    //、创建观察者
    val observer = ObserverImpl()
    val observer = ObserverImpl()
    val observer = ObserverImpl()
    //、添加观察者
    observable.addObserver(observer)
    observable.addObserver(observer)
    observable.addObserver(observer)
    //、发送事件
    observable.changeEvent("erdai")
}
//打印结果
erdai
erdai
erdai

1.2、装饰者模式

1.2.1、装饰者模式定义

简单的理解:动态的给一个类进行功能增强

1.2.2、装饰者模式示例

举个例子:我想吃个蛋炒饭,但是单独一个蛋炒饭我觉得不好吃,我想在上面加火腿,加牛肉。我们使用装饰者模式来实现它

 //、定义一个炒饭的接口
interface Rice {
    fun cook()
}

//、定义一个炒饭接口的实现类:蛋炒饭
class EggFriedRice: Rice {

    override fun cook() {
        println("蛋炒饭")
    }
}

//、定义一个炒饭的抽象装饰类
 abstract  class RiceDecorate(var rice: Rice): Rice

//、往蛋炒饭中加火腿
class HamFriedRiceDecorate(rice: Rice): RiceDecorate(rice) {
    override fun cook() {
        rice.cook()
        println("加火腿")
    }
}

//、往蛋炒饭中加牛肉
class BeefFriedRiceDecorate(rice: Rice): RiceDecorate(rice) {

    override fun cook() {
        rice.cook()
        println("加牛肉")
    }
}

//、测试
fun main(){
    //蛋炒饭
    val rice = EggFriedRice()
    //加火腿
    val hamFriedRiceDecorate = HamFriedRiceDecorate(rice)
    //加牛肉
    val beefFriedRiceDecorate = BeefFriedRiceDecorate(hamFriedRiceDecorate)
    beefFriedRiceDecorate.cook()
}
//打印结果
蛋炒饭
加火腿
加牛肉

装饰者模式的核心:定义一个抽象的装饰类继承顶级接口,然后持有这个顶级接口的引用,接下来就可以进行无限套娃了:smile:

二、手撸 RxJava 核心源码实现

ok,了解了两种设计模式,接下来我们正式进入 RxJava 的学习

2.1、RxJava 介绍

RxJava 是一个异步操作框架,其核心可以归纳为两点:1、异步事件流 2、响应式编程。接下来我们可以好好的去感受这两点

2.2、RxJava 操作符

RxJava 之所以强大源于它各种强大的操作符,掌握好这些操作符能让你对 RxJava 的使用得心应手,RxJava 操作符主要分为 6 大类:

每一个操作符背后都对应了一个具体的实现类,接下来我们就挑几个最常用,最核心的操作符:create,map,flatMap,observeOn,subscribeOn 进行手撸实现,相信看完这些操作符的实现后,你能融会贯通,举一反三

注意:下面这些操作符的实现和 RxJava 实现细节不尽相同,但核心思想是一致的,大家只要理解核心思想就好

2.3、create 操作符实现

create 是来创建一个被观察者对象,看了 RxJava create 操作符源码你会发现:

1、create 是使用观察者模式实现的,但 RxJava 里面使用的观察者模式和我们上面介绍的还有点不一样,它是一种变种的观察者模式

2、上面例子中我们是通过被观察者去发送事件,而 RxJava 里面定义了专门发送事件的接口,这样做的好处就是让被观察者和发射事件进行 解耦

 //、定义一个观察者的顶级接口
interface Observer<T> {
    //建立了订阅关系
    fun onSubscribe()
    //接收到正常事件
    fun onNext(t: T)
    //接收到 error 事件
    fun onError(e: Throwable)
    //接收到 onComplete 事件
    fun onComplete()
}

//、定义一个被观察者的顶级接口
interface ObservableSource<T> {
    //订阅观察者
    fun subscribe(observer: Observer<T>)
}

//、定义一个被观察者抽象类实现顶层被观察者接口
abstract class Observable<T>: ObservableSource<T> {

    override fun subscribe(observer: Observer<T>) {
        subscribeActual(observer)
    }
		
    //实际订阅观察者的抽象方法,让子类去实现
    protected abstract fun subscribeActual(observer: Observer<T>)

    //伴生类里面的方法,直接通过类名调用    
    companion object{
      	//这里是我们实现 create 操作符对外提供和 RxJava 类似的方法调用
        fun <T> create(source: ObservableOnSubscribe<T>): ObservableCreate<T>{
            return ObservableCreate(source)
        }
    }
}

//、定义一个与被观察者发射事件解耦的接口
interface ObservableOnSubscribe<T> {
    //通过 Emitter 发射事件
    fun subscribe(emitter: Emitter<T>)
}

//、定义事件发射器接口
interface Emitter<T> {
    //发送 onNext 事件
    fun onNext(t: T)
    //发送  onError  事件
    fun onError(e: Throwable)
    //发送 onComplete 事件
    fun onComplete()
}

//、定义 create 操作符的实现类
class ObservableCreate<T>(var source: ObservableOnSubscribe<T>): Observable<T>() {
		
    //实现父类订阅观察者的方法
    override fun subscribeActual(downStream: Observer<T>) {
      	//可以看到只要一订阅,首先就会接收 onSubscribe 事件
        downStream.onSubscribe()
      	//通过 ObservableOnSubscribe 里面的 Emitter 进行事件的发送,完成被观察者发送事件的解耦
        source.subscribe(CreateEmitter(downStream))
    }

    //事件发射器实现类,可以看到传入了下游的观察者来接收我们发射的事件
    class CreateEmitter<T>(var downStream: Observer<T>): Emitter<T>{

        override fun onNext(t: T) {
            downStream.onNext(t)
        }

        override fun onError(e: Throwable) {
            downStream.onError(e)
        }

        override fun onComplete() {
            downStream.onComplete()
        }
    }
}

//、测试
fun main(){
    Observable.create(object : ObservableOnSubscribe<String>{
        override fun subscribe(emitter: Emitter<String>) {
            //发射 onNext 事件
            emitter.onNext("erdai")
            //发射 onComplete 事件
            emitter.onComplete()
        }
    }).subscribe(object : Observer<String>{
        override fun onSubscribe() {
            println("onSubscribe")
        }

        override fun onNext(t: String) {
            println("onNext:$t")
        }

        override fun onError(e: Throwable) {
        }

        override fun onComplete() {
            println("onComplete")
        }
    })
}
//打印结果
onSubscribe
onNext:erdai
onComplete

ok,上述代码就是 create 操作符的实现,大家如果没看明白可以多看几遍,也可以直接把我上面的代码直接拷贝到一个 kt 文件中去运行

2.4、map 操作符实现

map 是一个转换操作符,它能把一种类型转为为另外一种类型,如:Int -> String。

它的主要实现:观察者模式 + 装饰者模式 + 泛型

 //、定义一个抽象装饰类,注意里面 泛型 的使用
abstract class AbstractObservableWithUpstream<T,U>(var source: ObservableSource<T>): Observable<U>()

//、定义一个类型转换的接口
interface  Function <T,U> {
    //传入 T 类型,返回 U 类型
    fun apply(t: T): U
}

//、定义 map 操作符的实现类
class ObservableMap<T,U>(source: ObservableSource<T>,var function: Function<T,U>): AbstractObservableWithUpstream<T,U>(source) {

    //实现父类订阅观察者的方法
    override fun subscribeActual(observer: Observer<U>) {
      	//接收 onSubscribe 事件
        observer.onSubscribe()
      	//完成事件的转换
        source.subscribe(MapObserver(function,observer))
    }

    //MapObserver 接收 function 对类型进行转换,downStream 对事件进行接收
    class MapObserver<T,U>(var function: Function<T,U>,var downStream: Observer<U>): Observer<T>{
        override fun onSubscribe() {
        }

        override fun onNext(t: T) {
            //核心:当接收到 T 类型,调用 function.apply 转换为 U 类型
            val u: U = function.apply(t)
            downStream.onNext(u)
        }

        override fun onError(e: Throwable) {
            downStream.onError(e)
        }

        override fun onComplete() {
            downStream.onComplete()
        }

    }
}

//、Observable 中增加相应的调用方法
fun <U> map(function: Function<T, U>): ObservableMap<T,U>{
    return ObservableMap(this, function)
}

//、测试
fun main(){
    Observable.create(object : ObservableOnSubscribe<String>{
        override fun subscribe(emitter: Emitter<String>) {
            emitter.onNext("erdai")
            emitter.onComplete()
        }
    })
        .map(object : Function<String, String >{
            override fun apply(t: String): String {
                return "map 转换:$t"
            }
        })
        .subscribe(object : Observer<String>{
        override fun onSubscribe() {
            println("onSubscribe")
        }

        override fun onNext(t: String) {
            println("onNext:$t")
        }

        override fun onError(e: Throwable) {
        }

        override fun onComplete() {
            println("onComplete")
        }
    })
}

//打印结果
onSubscribe
onNext:map 转换:erdai
onComplete

2.5、flatMap 操作符实现

flatMap 操作符的实现其实和 map 类似,只不过是把 :Function<T, U> -> Function<T, ObservableSource> ,将一种类型转换为了一个被观察者的类型,被观察者的类型又可以进行一系列的转换,因此能拆分更细的粒度:

 //、定义 flatMap 操作符的实现类
class ObservableFlatMap<T,U>(source: ObservableSource<T>,var function: Function<T,ObservableSource<U>>): AbstractObservableWithUpstream<T,U>(source) {

    override fun subscribeActual(observer: Observer<U>) {
        observer.onSubscribe()
        source.subscribe(FlatMapObserver(function,observer))
    }

    //FlatMapObserver 接收 function 对类型进行转换,downStream 对事件进行接收
    class FlatMapObserver<T,U>(var function: Function<T,ObservableSource<U>>, var downStream: Observer<U>): Observer<T>{
        override fun onSubscribe() {
        }

        override fun onNext(t: T) {
            //核心:当接收到 T 类型,调用 function.apply 转换为 ObservableSource<U> 类型
            val u: ObservableSource<U> = function.apply(t)
            //对 u 进行更细粒度的拆分,在让下游观察者进行接收
            u.subscribe(object : Observer<U>{
                override fun onSubscribe() {

                }

                override fun onNext(t: U) {
                    downStream.onNext(t)
                }

                override fun onError(e: Throwable) {

                }

                override fun onComplete() {

                }
            })
        }

        override fun onError(e: Throwable) {
            downStream.onError(e)
        }

        override fun onComplete() {
            downStream.onComplete()
        }
    }
}

//、Observable 中增加相应的调用方法
fun <U> flatMap(function: Function<T,ObservableSource<U>>): ObservableFlatMap<T,U>{
    return ObservableFlatMap(this,function)
}

//、测试
fun main(){
    Observable.create(object : ObservableOnSubscribe<String>{
        override fun subscribe(emitter: Emitter<String>) {
            emitter.onNext("erdai")
            emitter.onComplete()
        }
    }).flatMap(object : Function<String,ObservableSource<String>>{
        override fun apply(t: String): ObservableSource<String> {
            return Observable.create(object : ObservableOnSubscribe<String>{
                override fun subscribe(emitter: Emitter<String>) {
                    emitter.onNext("flatMap:$t")
                }
            })
        }

    })
        .subscribe(object : Observer<String>{
        override fun onSubscribe() {
            println("onSubscribe")
        }

        override fun onNext(t: String) {
            println("onNext:$t")
        }

        override fun onError(e: Throwable) {
        }

        override fun onComplete() {
            println("onComplete")
        }
    })
}

//打印结果
onSubscribe
onNext:flatMap:erdai
onComplete

2.6、subscribeOn 操作符实现

subscribeOn 主要是用来决定我们订阅观察者是在哪个线程执行

 //、定义一个抽象的调度器
abstract class Scheduler {
		
    abstract fun createWorker(): Worker
    
    //定义一个抽象的 Worker
    abstract class Worker{
      	//真正决定线程执行
        abstract fun schedule(runnable: Runnable)
    }
}

//、定义调度器的实现类,我们主要实现两种:
//.1、AndroidMainScheduler:Android 主线程
//可以看到我们就是使用 Handler 将线程切换到主线程
class AndroidMainScheduler(var handler: Handler): Scheduler() {
    override fun createWorker(): Worker {
        return AndroidMainWorker(handler)
    }
    
    class AndroidMainWorker(var handler: Handler): Worker(){
        override fun schedule(runnable: Runnable) {
            handler.post(runnable)
        }
    }
}

//.2、NewThreadScheduler:开启一个新的子线程
//可以看到我们就是使用线程池来执行 runnable
class NewThreadScheduler(var executorService: ExecutorService): Scheduler() {

    override fun createWorker(): Worker {
        return NewThreadWork(executorService)
    }
  
    class NewThreadWork(var executorService: ExecutorService): Worker(){
        override fun schedule(runnable: Runnable) {
            executorService.execute(runnable)
        }
    }
}

//、定义一个线程调度器的工具类,类似 RxJava 的调用
class Schedulers {
  
    companion object{
      	//切换到子线程
        fun newThread(): NewThreadScheduler{
            return NewThreadScheduler(Executors.newScheduledThreadPool())
        }
				
      	//切换到主线程
        fun mainThread(): AndroidMainScheduler{
            return AndroidMainScheduler(Handler(Looper.getMainLooper()))
        }
    }
}

//、定义 subscribeOn 操作符实现类
class ObservableSubscribeOn<T>(source: ObservableSource<T>,var scheduler: Scheduler): AbstractObservableWithUpstream<T,T>(source) {

    override fun subscribeActual(observer: Observer<T>) {
      	//接收订阅事件
        observer.onSubscribe()
      	//创建 Worker 决定我们代码所执行的线程
        val worker = scheduler.createWorker()
        worker.schedule(SubscribeTask(SubscribeOnObserver(observer)))
    }


    //可以看到,Runnable 里面就只做了一个订阅操作,因此 subscribeOn 会决定我们订阅观察者的线程
    inner class SubscribeTask(var observer: SubscribeOnObserver<T>): Runnable{
        override fun run() {
            source.subscribe(observer)
        }
    }
  
    //如果我们没有使用 observeOn 切换线程,那么观察者接收事件的线程也会由 subscribeOn 线程决定
    class SubscribeOnObserver<T>(var observer: Observer<T>): Observer<T>{
        override fun onSubscribe() {
        }

        override fun onNext(t: T) {
            observer.onNext(t)
        }

        override fun onError(e: Throwable) {
            observer.onError(e)
        }

        override fun onComplete() {
            observer.onComplete()
        }

    }
}

//、Observable 中增加相应的调用方法
fun subscribeOn(scheduler: Scheduler): ObservableSubscribeOn<T>{
    return ObservableSubscribeOn(this,scheduler)
}

//、测试
fun main(){
    Observable.create(object :ObservableOnSubscribe<String>{
        override fun subscribe(emitter: Emitter<String>) {
            emitter.onNext("erdai")
            emitter.onComplete()
            println("subscribe:${Thread.currentThread().name}")
        }

    }).subscribeOn(Schedulers.newThread())
        .subscribe(object : Observer<String>{
            override fun onSubscribe() {
                println("onSubscribe:${Thread.currentThread().name}")
            }

            override fun onNext(t: String) {
                println("onNext:$t")
                println("onNext:${Thread.currentThread().name}")
            }

            override fun onError(e: Throwable) {
                println("onError:${Thread.currentThread().name}")
            }

            override fun onComplete() {
                println("onComplete")
                println("onComplete:${Thread.currentThread().name}")
            }
        })
}

//打印结果
onSubscribe:main
onNext:erdai
onNext:pool--thread-1
onComplete
onComplete:pool--thread-1
subscribe:pool--thread-1

分析一下上面的打印结果:

1、onSubscribe 是在一开始订阅就触发的,此时 Worker 都还没创建,因此是在主线程执行的

2、因为我们没有使用 observeOn 对观察者接收事件的线程进行切换,所以 onNext,onComplete 接收事件的线程由 subscribeOn 切换的线程决定,

3、subscribe 在我们实际订阅观察者的方法里会执行它,因此是由 subscribeOn 切换的线程决定

2.7、observeOn 操作符实现

observeOn 是用来决定我们观察者接收事件是在哪个线程执行,实现相对复杂一点,它内部使用了一个队列来存储发送过来的 onNext 事件,然后通过 While 循环对队列中的事件进行处理,具体大家可以看我下面的实现,写了详细的注释

 //、定义 observeOn 操作符实现类
class ObservableObserveOn<T>(source: ObservableSource<T>, var scheduler: Scheduler): AbstractObservableWithUpstream<T, T>(source) {

    override fun subscribeActual(observer: Observer<T>) {
      	//接收订阅事件
        observer.onSubscribe()
        val worker = scheduler.createWorker()
        source.subscribe(ObserveOnObserver(observer,worker))
    }

    class ObserveOnObserver<T>(var observer: Observer<T>, var worker: Scheduler.Worker, var queue: Deque<T>? = null): Observer<T>,Runnable {

        //标记是否事件都已经接收,一般在 onError 或 onComplete 时标记
        @Volatile
        var done = false

        //记录 onError 的异常
        @Volatile
        var throwable: Throwable? = null

        //是否能结束 While 循环:例如观察者接收了 onError 或 onComplete 事件,就可以结束循环了
        @Volatile
        var over = false

        init {
            //如果队列为空,则新建
            if(queue == null){
                queue = ArrayDeque()
            }
        }

        override fun onSubscribe() {
        }

        override fun onNext(t: T) {
            if(done)return
            //将接收的 onNext 事件加入队列中
            queue?.offer(t)
            //执行调度
            schedule()
        }

        override fun onError(e: Throwable) {
            if(done)return
            //记录异常
            throwable = e
            //标记接收事件完成
            done = true
            //执行调度
            schedule()

        }

        override fun onComplete() {
            if(done)return
            //标记接收事件完成
            done = true
            //执行调度
            schedule()
        }

        //可以看到这里进行了任务的执行,由 observeOn 决定执行的线程
        private fun schedule() {
            worker.schedule(this)
        }

        override fun run() {
            drainNormal()
        }

        //实际最终的逻辑就是在这个方法里面进行处理
        private fun drainNormal() {
            //取当前的队列
            val q = queue
            //取观察者
            val obs = observer

            //while 循环取出队列里面的 onNext 事件
            while (true){
                //取 done 标记
                val d = done
                //从队列中取出元素并出队
                val t = q?.poll()
                //如果 t 为 null 表示队列里面没有事件了
                val empty = t == null
                //检查是否能终止 While 循环
                if(checkTerminated(d,empty,obs)){
                    return
                }

                //如果队列为空,跳出 While 循环
                if(empty)break

                //观察者接收 onNext 事件
                t?.apply {
                    obs.onNext(this)
                }
            }

        }

        //检查是否能终止 While 循环
        private fun checkTerminated(d: Boolean, empty: Boolean, obs: Observer<T>): Boolean {
            if(over){
                //如果能结束了,清空队列
                queue?.clear()
                return true
            }

            //如果已经完成事件的发送
            if(d){
                val e = throwable
                if(e != null){
                    //如果有 onError 事件,标记结束,并接收 onError 事件
                    over = true
                    obs.onError(e)
                }else if(empty){
                    //如果队列为空,标记结束,并接收 onComplete 事件
                    over = true
                    obs.onComplete()
                    return true
                }
            }
            return false
        }
        
    }
} 

//、Observable 中增加相应的调用方法
fun observeOn(scheduler: Scheduler): ObservableObserveOn<T>{
    return ObservableObserveOn(this,scheduler)
}

//、测试,因为涉及到 Handler 切换到主线程,我们这里放到 Activity 里面去测试
class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)


        Observable.create(object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: Emitter<String>) {
                emitter.onNext("erdai")
                emitter.onComplete()
                println("subscribe:${Thread.currentThread().name}")
            }

        })
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.mainThread())
            .subscribe(object : Observer<String> {
                override fun onSubscribe() {
                    println("onSubscribe:${Thread.currentThread().name}")
                }

                override fun onNext(t: String) {
                    println("onNext:$t")
                    println("onNext:${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    println("onError:${Thread.currentThread().name}")
                }

                override fun onComplete() {
                    println("onComplete")
                    println("onComplete:${Thread.currentThread().name}")
                }
            })
    }
}

//打印结果
onSubscribe:main
subscribe:pool--thread-1
onNext:erdai
onNext:main
onComplete
onComplete:main

分析一下上面的打印结果:

1、onSubscribe 是在一开始订阅就触发的,此时 Worker 都还没创建,因此是在主线程执行的

2、subscribe 在我们实际订阅观察者的方法里会执行它,因此是由 subscribeOn 切换的线程决定

3、observeOn 决定了观察者接收事件所在的线程,因此 onNext,onComplete 是在主线程执行的

三、RxJava 框架流思想设计

我们通过一段代码来分析 RxJava 的框架流设计:

 class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
				
      	//create 操作符
        Observable.create(object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: Emitter<String>) {
                emitter.onNext("erdai")
                emitter.onComplete()
                println("subscribe:${Thread.currentThread().name}")
            }

        })
      	    //map 操作符
            .map(object : Function<String,String>{
                override fun apply(t: String): String {
                    println("map:${Thread.currentThread().name}")
                    return "map:$t"
                }
            })
      	    //flatMap 操作符
            .flatMap(object : Function<String,ObservableSource<String>>{
                override fun apply(t: String): ObservableSource<String> {
                    println("flatMap:${Thread.currentThread().name}")
                    return Observable.create(object : ObservableOnSubscribe<String>{
                        override fun subscribe(emitter: Emitter<String>) {
                            emitter.onNext("flatMap:$t")
                        }
                    })
                }

            })
      	    //subscribeOn 操作符
            .subscribeOn(Schedulers.newThread())
      	    //observeOn 操作符
            .observeOn(Schedulers.mainThread())
      	    //订阅
            .subscribe(object : Observer<String> {
                override fun onSubscribe() {
                    println("onSubscribe:${Thread.currentThread().name}")
                }

                override fun onNext(t: String) {
                    println("onNext:$t")
                    println("onNext:${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    println("onError:${Thread.currentThread().name}")
                }

                override fun onComplete() {
                    println("onComplete")
                    println("onComplete:${Thread.currentThread().name}")
                }
            })
    }
}

3.1、链式构建流

特点:从上往下

使用一段伪代码来分析 RxJava Observable 的构建

 val source = ObservableOnSubscribe()
//create 操作符
Observable.create(souce) ---> observable = ObservableCreate(source)
//map 操作符
observable.map() ---> observable1 = ObservableMap(observable0)
//flatMap 操作符
observable.flatMap() ---> observable2 = ObservableFlatMap(observable1)
//subscribeOn 操作符
observable.subscribeOn() ---> observable3 = ObservableSubscribeOn(observable2)
//observeOn 操作符
observable.observeOn() ---> observable4 = ObservableObserveOn(observable3)

有没有发现规律: 我们在上游创建的 Observable(被观察者) 会被传入到下游。这就是典型的装饰者模式的应用,它的特点就是从上往下,无限套娃,动态的达到功能的增强

3.2、订阅流

特点:从下往上

使用一段伪代码来分析 RxJava 订阅的过程

 val observe = Observer(){}...
observable.subscribe(observe5) ---> observable4.subscribeActual(observe5)

//observeOn 操作符
val observe = ObserveOnObserver(observe5)
observable.subscribe(observe4) ---> observable3.subscribeActual(observe4)

//subscribeOn 操作符
val observe = SubscribeOnObserver(observe4)
observable.subscribe(observe3) ---> observable2.subscribeActual(observe3)

//flatMap 操作符
val observe = FlatMapObserver(observe3)
observable.subscribe(observe2) ---> observable1.subscribeActual(observe2)

//map 操作符
val observe = MapObserver(observe2)
observable.subscribe(observe1) ---> observable0.subscribeActual(observe1)

//create 操作符
val emitter = CreateEmitter(observe)
source.subscribe(emitter)

有点递归的意思哈

可以发现规律: 我们在下游创建的 Observable 订阅时,会递归先执行上游的订阅,因此订阅流的特点就是从下往上

3.3、回调流

特点:从上往下

我们分析订阅流可以发现,观察者对象是从下往上传的,因此当 emitter 发送事件时,接收的顺序:

 //create 操作符
emitter -> observe

//map 操作符
observe -> observe2

//flatMap 操作符
observe -> observe3

//subscribeOn 操作符
observe -> observe4

//observeOn 操作符
observe -> observe5

可以看到: 当 emitter 发送事件后,观察者收到事件的顺序是从上往下的

上面这三个流就是 RxJava 框架流的一个思想设计,对于你理解 RxJava 非常重要,如果没看明白,多看几遍

3.4、问题回顾

掌握了 RxJava 框架流,我们回顾一下前面提到的两个问题:

1、如果有多个 subscribeOn ,会是一种什么情况?为啥?

答:只有最上面那个 subscribeOn 切换的线程才会生效。因为 subscribeOn 的作用就是决定你订阅所执行的线程,而订阅流是从下往上的,因此你如果使用多个 subscribeOn 对线程进行切换,最终生效的只会是最上面那个

2、如果有多个 observeOn ,会是一种什么情况?为啥?

答:同理,只有最下游那个 observeOn 切换的线程才会生效。因为回调流是从上往下的,所以如果你创建了多个观察者接收事件,最终生效的只会是最下面那个

好好体会下下面这个例子:

 class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        Observable.create(object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: Emitter<String>) {
                emitter.onNext("erdai")
                emitter.onComplete()
                println("subscribe:${Thread.currentThread().name}")
            }

        })
            .subscribeOn(Schedulers.newThread())
            .map(object : Function<String, String> {
                override fun apply(t: String): String {
                    println("map:${Thread.currentThread().name}")
                    return "map:$t"
                }

            })
            .subscribeOn(Schedulers.mainThread())
            .subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.mainThread())
            .observeOn(Schedulers.newThread())
            .observeOn(Schedulers.mainThread())
            .observeOn(Schedulers.newThread())
      	    .observeOn(Schedulers.mainThread())
            .subscribe(object : Observer<String> {
                override fun onSubscribe() {
                    println("onSubscribe:${Thread.currentThread().name}")
                }

                override fun onNext(t: String) {
                    println("onNext:$t")
                    println("onNext:${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    println("onError:${Thread.currentThread().name}")
                }

                override fun onComplete() {
                    println("onComplete")
                    println("onComplete:${Thread.currentThread().name}")
                }

            })
    }
}

//打印结果
onSubscribe:main
map:pool--thread-1
subscribe:pool--thread-1
onNext:map:erdai
onNext:main
onComplete
onComplete:main

四、RxLifeCycle

实现 RxLifeCycle 之前,我们需要了解一下 compose 操作符

4.1、compose 操作符介绍

compose 操作符作用:传入一个上游的被观察者返回一个下游的被观察者,能起到一个代码复用的逻辑

注意:下面使用的是 RxJava 包下的类

 class MyTransformer<T: Any>: ObservableTransformer<T,T> {

    override fun apply(upstream: Observable<T>): ObservableSource<T> {
      	//完成订阅和回调线程的切换
        return upstream.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
    }
}


//实际应用
class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        Observable.create(object : ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("erdai")
                emitter.onComplete()
                println("subscribe:${Thread.currentThread().name}")
            }
        })	
      	    //compose 操作符需要传入一个 ObservableTransformer 类型的对象
            .compose(MyTransformer())
            .subscribe(object : Observer<String>{
                override fun onSubscribe(d: Disposable) {
                    println("onSubscribe:${Thread.currentThread().name}")
                }

                override fun onNext(t: String) {
                    println("onNext $t")
                    println("onNext:${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    println("onError:${Thread.currentThread().name}")
                }

                override fun onComplete() {
                    println("onComplete")
                    println("onComplete:${Thread.currentThread().name}")
                }
            })
    }
}

//打印结果
onSubscribe:main
subscribe:RxCachedThreadScheduler-
onNext erdai
onNext:main
onComplete
onComplete:main

4.2、RxLifeCycle 实现

我们上面写的代码是存在内存泄漏的,如果我们使用 RxJava 在 Activity 做一个网络请求,此时用户退出了当前 Activity ,但是网络请求还在继续,那么此时就会产生内存泄漏,因此我们需要在做网络请求的时候感知 Activity 的生命周期去做相应的逻辑处理,那么此时 RxLifeCycle 就派上用场了,直接上代码:

 //、RxLifeCycle 实现类
//.1、它实现了 LifecycleEventObserver,因此可以感知 LifecycleOwner 的声明周期
//.2、它实现了 ObservableTransformer,因此我们可以使用 compose 操作符
class RxLifeCycle<T: Any>: LifecycleEventObserver,ObservableTransformer<T,T> {

    var compositeDisposable = CompositeDisposable()

    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if(event == Lifecycle.Event.ON_DESTROY){
            //监听到 LifecycleOwner 生命周期为 Destroy 时,移除 Disposable 容器中的所有 Disposable 对象
            compositeDisposable.clear()
        }
    }

    override fun apply(upstream: Observable<T>): ObservableSource<T> {
        return upstream.doOnSubscribe{
            //将 Disposable 加入 Disposable 容器
            compositeDisposable.add(it)
        }
    }

    companion object{
      	//传入 LifecycleOwner,和 RxJava 进行生命周期绑定
        fun <T: Any> bindToDestroy(owner: LifecycleOwner): RxLifeCycle<T>{
            val rxLifeCycle = RxLifeCycle<T>()
            owner.lifecycle.addObserver(rxLifeCycle)
            return rxLifeCycle
        }
    }
}

//、使用
class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        Observable.create(object : ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("erdai")
                emitter.onComplete()
                println("subscribe:${Thread.currentThread().name}")
            }
        })	
            .compose(MyTransformer())
      	    //绑定生命周期,防止内存泄漏
      	    .compose(RxLifeCycle.bindToDestroy(this))
            .subscribe(object : Observer<String>{
                override fun onSubscribe(d: Disposable) {
                    println("onSubscribe:${Thread.currentThread().name}")
                }

                override fun onNext(t: String) {
                    println("onNext $t")
                    println("onNext:${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    println("onError:${Thread.currentThread().name}")
                }

                override fun onComplete() {
                    println("onComplete")
                    println("onComplete:${Thread.currentThread().name}")
                }
            })
    }
}

五、RxBus

实现 RxBus 前,我们需要先了解一下 Subject

5.1、Subject 介绍

1、Subject 既可以表示一个被观察者也可以表示一个观察者,实际它就是继承了 Observable 抽象类并实现了 Observer 接口

2、Subject 主要分为四种:

、AsyncSubject
、BehaviorSubject
、PublishSubject
、ReplaySubject

5.1.1、AsyncSubject

特点:事件发射无论是在订阅前还是后,都只会接收最后一个事件

 fun main(){
    val subject = AsyncSubject.create<String>()
    subject.onNext("A")
    subject.onNext("B")
    subject.subscribe {
        println(it)
    }
    subject.onNext("C")
    subject.onNext("D")
    subject.onComplete()
}

//打印结果
D

5.1.2、BehaviorSubject

特点:接收订阅前最后一个事件以及订阅后的所有事件

 fun main(){
    val subject = BehaviorSubject.create<String>()
    subject.onNext("A")
    subject.onNext("B")
    subject.subscribe {
        println(it)
    }
    subject.onNext("C")
    subject.onNext("D")
    subject.onComplete()
}

//打印结果
B
C
D

5.1.3、PublishSubject

特点:只接收订阅后的所有事件

 fun main(){
    val subject = PublishSubject.create<String>()
    subject.onNext("A")
    subject.onNext("B")
    subject.subscribe {
        println(it)
    }
    subject.onNext("C")
    subject.onNext("D")
    subject.onComplete()
}

//打印结果
C
D

5.1.4、ReplaySubject

特点:事件发射无论是在订阅前还是后,都会被全部接收

 fun main(){
    val subject = ReplaySubject.create<String>()
    subject.onNext("A")
    subject.onNext("B")
    subject.subscribe {
        println(it)
    }
    subject.onNext("C")
    subject.onNext("D")
    subject.onComplete()
}

//打印结果
A
B
C
D

5.2、RxBus 实现

我们在日常开发中,事件总线用的最多的可能是 EventBus,殊不知 RxJava 也能通过 Subject 实现事件总线的功能,而且使用起来比 EventBus 还简单一些:

 //、RxBus 实现类
object RxBus {
    //定义一个 PublishSubject 类型的 Subject,只接收订阅后的事件
    private val subject: Subject<Any> = PublishSubject.create<Any>().toSerialized()
    
    //接收事件
    fun <T: Any> receive(clazz: Class<T>): Observable<T>{
        return subject.ofType(clazz)
    }

    //发送事件
    fun post(o: Any){
        subject.onNext(o)
    }
}

//、使用
class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        RxBus.receive(String::class.java)
      	    //绑定生命周期,防止内存泄漏
            .compose(RxLifeCycle.bindToDestroy(this))
            .subscribe {
                println(it)
            }

        RxBus.post("erdai")
    }
}

//打印结果
erdai

六、总结

本篇文章我们由浅入深对 RxJava 进行了全面的介绍:

1、介绍了 RxJava 中使用的两种设计模式:

、变种的观察者模式
、装饰者模式

2、手撸了 RxJava 核心操作符的实现,希望你能举一反三,其它操作符的实现也是类似的套路

3、介绍了 RxJava 框架流思想设计:

、链式构建流:从上往下
、订阅流:从下往上
、回调流:从上往下

4、介绍了 compose 操作符并扩展实现了 RxLifeCycle

5、介绍了 Subject 并扩展实现了 RxBus

好了,本篇文章到这里就结束了,希望能给你带来帮助

感谢你阅读这篇文章