Kotlin使用协程实现高效并发程序流程详解

手机APP/开发
391
0
0
2023-07-23
标签   Kotlin
目录
  • 1.协程的基本用法
  • 2.更多的作用域构建器
  • 3.使用协程简化回调的写法

协程属于Kotlin中非常有特色的一项技术,因为大部分编程语言中是没有协程这个概念的。那么什么是协程呢?它其实和线程有点相似,可以简单地将它理解成一种轻量级的线程。我们之前学习的线程是重量级的,它需要依靠操作系统的调度才能实现不同线程之间的切换,而使用协程却可以仅在编程语言的层面就能实现不同协程之间的切换,从而大大提示了并发编程的运行效率。

比如我们有如下foo()和bar()方法:

fun foo(){
  print()
  print()
  print()
}
fun bar(){
  print()
  print()
  print()
}

在没有开启线程的情况下,先后调用foo()和bar()这两个方法,那么理论上的输出结果一定是123456。而如果使用了协程,在协程A中去调用foo()方法,协程B中去调用bar()方法,虽然它们仍然会运行在同一个线程当中,但是在执行foo()方法时随时都有可能被挂起转而去执行bar()方法,执行bar()方法时也随时都要可能被挂起而继续执行foo()方法,最终输出结果也变得不确定了。

可以看出,协程允许我们在单线程模式下模拟多线程编程的效果,代码执行的挂起与恢复完全由编程语言来控制的,和操作系统无关。这种特性使得高并发程序的运行效率得到了极大的提升。

1.协程的基本用法

Kotlin并没有将协程纳入标准库的API中,而是以依赖库的形式提供的。所以如果我们想使用协程功能,需要先在app/build.gradle文件中添加如下依赖库:

  //协程
  implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2"
  implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2"

接下来创建一个coroutines.kt文件,并定义一个main()函数。

开启一个协程,最简单的方式就是使用Global.launch函数,如下所示:

fun main() {
    GlobalScope.launch {
        println("codes run in coroutine scope")
    }
}

GlobalScope.launch 函数可以创建一个协程的作用域,这样传递给launch函数的代码块(lambdal表达式)就是在协程中运行的了,这里我们在代码块中打印了一行日志。现在运行main()函数,结果并没有输出。

这是因为,Global.launch函数每次创建的都是一个顶层协程,这种协程当应用程序运行结束时也会跟着一起结束。刚才的日志之所以无法打印出来,就是因为代码块中的代码还没来得及运行,应用程序就结束了。

要解决这个问题,只需要让程序延迟一段时间再结束就行了。这里使用Thread.sleep()方法让主线程阻塞1秒钟,现在重新运行一下程序,你会发现日志可以正常打印了。

fun main() {
GlobalScope.launch {
    println("codes run in coroutine scope")
}
    Thread.sleep()
}

可是这种写法还是存在问题,如果代码块中的代码再1秒钟之内不能运行结束,那么就会被强制中断。

例如:

fun main() {
GlobalScope.launch {
    println("codes run in coroutine scope")
    delay()
    println("codes run in coroutine scope finished")
}
    Thread.sleep()
}

我们在代码块中加入了一个delay函数,并在之后又打印了一行日志。delay()函数可以让当前协程延迟指定时间后再运行,但它和Thread.sleep()方法不同。delay()函数是一个非阻塞式的挂起函数,它只会挂起当前协程,并不会影响其他协程的运行。而Thread.sleep()方法会阻塞当前的线程,这样运行在该线程下的所有协程都会被阻塞。注意,delay()函数只能在协程的作用域或其他挂起函数中调用。

这里我们让协程挂起1.5秒,但是主线程却只阻塞1秒,这样重新运行程序,你会发现代码块中新增的一条日志并没有打印出来,因为它还没来得及运行,应用程序就结束了。

借助runBlocking函数可以让应用程序在协程中所有代码都运行完了之后再结束。

fun main() {
    runBlocking{
        println("codes run in coroutine scope")
        delay()
        println("codes run in coroutine scope finished")
    }
}       

runBlocking函数同样会创建一个协程的作用域,但是它可以保证在协程作用域内的所有代码和子协程没有全部执行完之前一直阻塞当前线程。需要注意的是,runBlocking函数通常只应该再测试环境下使用,在正式环境中使用容易产生性能上的问题。

可以看到,两条日志都能够正常打印出来了。

一旦设计高并发的应用场景,协程相比于线程的优势就能体现出来了。

那么如何才能创建多个协程呢?使用launch函数就可以了,如下所示:

fun main() {
    runBlocking {
        launch {
            println("launch")
            delay()
            println("launch finished")
        }
        launch {
            println("launch")
            delay()
            println("launch finished")
        }
    }
}

这里的launch函数和我们刚才所使用的GlobalScope.launch函数不同。首先它必须在协程的作用域中才能调用,其次它会在当前协程的作用域下创建子协程。子协程的特点是如果外层作用域的协程结束了,该作用域下的所有子协程也会一同结束。相比而言,GlobalScope.launch函数创建的永远是顶层协程,这一点和线程比较像,因为线程也没有层级这一说,永远都是顶层的。

这里我们调用了两次launch函数,也就是创建了两个子协程。运行 程序结果如下:

可以看到,两个子协程中的日志是交替打印的,说明它们确实是像多线程那样并发运行的。然而这两个子协程实际却运行在同一个线程当中,只是由编程语言来决定如何在多个协程之间进行调度,让谁运行,让谁挂起。调度的过程完全不需要操作系统参与,这也就使得协程的并发效果很高。

例如:

fun main() {
    val start = System.currentTimeMillis()
    runBlocking {
        repeat(){
            launch {
                println("测试")
            }
        }
    }
    val  end=System.currentTimeMillis()
    println(end-start)
}

这里使用repeat函数循环创建了10万个协程,仅仅耗时了564毫秒,如果开启的是线程可能已经oom异常了。

不过,随着lanuch函数中的逻辑越来越复杂,可能你需要将部分代码提取到一个单独的函数中。这个时候就会出现一个问题,我们再launch函数中编写的代码是拥有协程作用域的,但是提取到一个单独函数中就没有协程作用域了。

那么我们应该如何调用像delay()这样的挂起函数呢?Kotlin提供了一个suspend关键字,使用它可以将任意函数声明成挂起函数,而挂起函数之间都是可以互相调用的,如下所示:

suspend fun printDot(){
    println(".")
    delay()
}

这样就可以在printDot()函数中调用delay()函数了

但是,suspend关键字只能将一个函数声明成挂起函数,是无法给它提供协程作用域的。比如你现在尝试在printDot()函数中调用launch函数,一定是无法调用成功的,因为launch函数要求必须在协程作用域当中才能调用。

这个问题可以借助coroutineScope函数来解决。coroutineScope函数也是一个挂起函数,因此可以在任何其他挂起函数中调用。它的特点是会继承外部的协程作用域并创建一个子作用域,借助这个特性,我们就可以给任意挂起函数提供协程作用域了。示例写法如下:

suspend fun printDot()= coroutineScope {
    launch {
        println(".")
        delay()
    }
}

可以看到,现在我们就可以在printDot()这个挂起函数中调用launch函数了。

另外,coroutineScope函数和runBlocking函数还有点类似,它可以保证其作用域内的所有代码和子协程在全部执行完之前,会一直阻塞当前协程。

fun main() {
    runBlocking {
        coroutineScope {
            launch {
                for(i in..10){
                    println(i)
                    delay()
                }
            }
        }
       println("coroutineScope finished")
    }
    println("runBlocking  finished")
}

这里先使用runBlocking函数创建了一个协程作用域,然后又使用coroutineScope 函数创建了一个子协程作用域。在coroutineScope 的作用域中,我们调用launch函数创建了一个子协程,并通过for循环依次打印数字1到10,每次打印间隔一秒钟。最后在runBlocking和corountineScope函数的结尾,分别又打印了一行日志。

由此可见,corountineScope函数确实是将协程阻塞住了,只有当他作用域内的所有代码和子协程都执行完毕之后,corountineScope函数之后的代码才能得到运行。

corountineScope函数只会阻塞当前协程,既不影响当前协程,也不影响任何线程,因此是不会造成性能上的问题的。而runBlocking函数由于会阻塞当前线程,如果你恰好又在主线程当中调用它的话中调用它的话,那么就有可能导致界面卡死的情况,所以不太推荐在实际项目中使用。

2.更多的作用域构建器

前面学习了GlobalScope.launch,runBlocking,launch,corountineScope这几种作用域构建器,它们都可以用于创建一个新的协程作用域。不管GlobalScope.launch和runBlocking函数都可以在任意地方调用的,corountineScope函数可以在协程作用域或挂起函数在调用,而launch函数只能在协程作用域中调用。而launch函数只能在协程作用域中调用。

runBlocking由于会阻塞线程,因此只建议在测试环境下使用。而GlobalScope.launch由于每次创建的都是顶层协程,一般也不太建议使用。因为管理成本比较高,比如我们再某个Activity中使用协程发起了一条网络请求,由于网络请求是耗时的,用户在服务器还没来得及响应的情况下就关闭了当前Activity,此时应该取消这条网络请求,或者至少不应该进行回调,因为Activity已经不存在了,回调也没意义。

那么协程要怎样取消呢?不管是GlobalScope.launch函数还是launch函数,它们都会返回一个job对象,只需要调用job对象的cancel()方法就可以取消协程了,如下所示:

val job= GlobalScope.launch { 
//处理具体的逻辑
}
job.cancel()

但是如果我们每次创建的都是顶层协程,那么当Activity关闭时,就需要逐个调用所有已创建的协程的cancel()方法。这样管理成本太大。

因此,GlobalScope.launch这种协程作用域构建器,在实际项目中不太常用。实际项目中比较常用的写法:

val job = Job()
val scope = CoroutineScope(job)
scope.launch { 
    //处理具体的逻辑
}
job.cancel()

我们先创建了一个Job对象,然后把它传入CoroutineScope()函数当中,CoroutineScope()函数会返回一个CoroutineScope对象,有了CoroutineScope对象之后,我们就可以随时调用它的lanuch函数来创建一个协程了。

现在所有调用CoroutineScope的launch 函数所创建的协程,都会被关联在Job对象的作用域下面。这样只需要调用一次cancel()方法,就可以将同一作用域内的所有协程全部取消,从而大大降低协程管理的成本。

我们已经指定了调用launch函数可以创建一个新的协程,但是launch函数只能用于执行一段逻辑,却不能获取执行的结果,因为它的返回值永远是一个Job对象。而我们使用async函数就可以实现创建一个协程并获取它的执行结果。

async函数必须在协程作用域当中才能调用,它会创建一个新的子协程并返回一个Deferred对象,如果我们想要获取async函数代码块的执行结果,只需要调用Deferred对象的await()方法即可,代码如下所示:

val job = Job()
val scope = CoroutineScope(job)
scope.launch { 
    //处理具体的逻辑
}
job.cancel()

事实上,在调用了async函数之后,代码块中的代码就会立刻开始执行。当调用await()方法时,如果代码块中的代码还没执行完,那么await()方法会将当前协程阻塞住,直到可以获得async函数的执行结果。例如:

   runBlocking {
       val start=System.currentTimeMillis()
        val result=async {
            delay()
        }.await()
        val result=async {
            delay()
        }.await()
        println("result is ${result+result2}")
        val end=System.currentTimeMillis()
        println("cost ${end-start} ms.")
    }

这里连续使用了两个async函数来执行任务,并在代码块中调用delay()方法进行1秒的延迟。而且await()方法在async函数代码块中的代码执行完之前会一直将当前协程阻塞住。

可以看到整段代码耗时是2032毫秒,说明这里的两个async函数确实是一种串行的关系,前一个执行完了后一个才能执行。

两个async函数完全可以同时执行从而提高效率,对上述代码进行修改

   runBlocking {
       val start=System.currentTimeMillis()
        val result=async {
            delay()
        }.await()
        val result=async {
            delay()
        }.await()
        println("result is ${result+result2}")
        val end=System.currentTimeMillis()
        println("cost ${end-start} ms.")
    }

现在我们不在每次调用async函数之后就立刻使用await()方法获取结果了,而是仅在需要用到async函数的执行结果时才调用await()方法进行获取,这样两个async函数就变成一种并行关系了。

可以看到代码运行耗时变成了1027毫秒,运行效率提升。

还有一个比较特殊的作用域构建器:withContext()函数。withContext()函数是一个挂起函数,大体可以将它理解成async函数的一种简化版写法,示例写法如下:

runBlocking {
    val result=withContext(Dispatchers.Default){+5
    }
    println(result)
}

调用withContext()函数之后,会立即执行代码块中的代码,同时将当前协程阻塞住。当代码块中的代码全部执行完之后,会将最后一行的执行结果作为withContext()函数的返回值返回,因此基本上相当于val result=async{5+5}.await()的写法。唯一不同的是,withContext()函数强制要求我们指定一个线程参数。

线程参数主要有以下3种值可选:Dispatchers.Default,Dispatchers.IO,Dispatchers.Main。Dispatchers.Default表示会使用一种默认低并发的线程策略,当你要执行的代码属于计算密集型任务时,开启过高的并发反而可能会影响任务的运行效率,此时就可以使用Dispatchers.Default。Dispatchers.IO表示会使用一种较高并发的线程策略,当你要执行的代码大多数时间是在阻塞和等待中,比如说执行网络请求时,为了能够支持更高的并发数量,此时就可以使用Dispatchers.IO。Dispatchers.Main则表示不会开启子线程,而是在Android主线程中执行代码,但是这个值只能在Android项目中使用。

3.使用协程简化回调的写法

通过回调机制实现获取异步网络请求数据响应的功能。回调机制基本上是依靠匿名类来实现的,但是匿名类的写法通常比较繁琐,比如如下代码:

HttpUtil.sendHttpRequest(address,object:HttpCallbackListener){
  override fun onFinish(response:String){
  //得到服务器返回的具体内容
  }
  override fun onError(e:Exception){
  //这里对异常情况进行处理
  }
})

在多少个地方发起网络请求,就需要编写多少次这样的匿名类实现。有没有更简单的写法呢?Kotlin的协程使我们的这种设想成为了可能,只需要借助SuspendCoroutine函数就能将传统回调机制的写法大幅简化。

SuspendCoroutine函数必须在协程作用域或挂起函数才能调用,它接收一个Lambda表达式参数,主要作用是将当前协程立即挂起,然后在一个普通的线程中执行Lambda表达式中的代码。Lambda表达式的参数列表上会传入一个Continuation参数,调用它的resume()方法或resumeWithException()可以让协程恢复执行。

接下来我们借助这个函数对传统的回调写法进行优化。首先定义一个request()函数,代码如下:

suspend fun request(address:String):String{
  return suspendCoroutine{continuation ->
  HttpUtil.sendHttpRequest(address,object:HttpCallbackListener{
    override fun onFinish(response:String){
    continuation.resume(response)
  }
  override fun onError(e:Exception){
    continuation.resumeWithException(e)
  }
  })
  }
}

可以看到,request()函数是一个挂起函数,并且接收一个address参数。在request()函数的内部,我们调用了suspendCoroutine函数,这样当前协程就会被立即挂起,而Lambda表达式中的代码则会在普通线程中执行。接下来我们再Lambda表达式中调用HttpUtil.sendHttpRequest()方法发起网络请求,并通过传统回调的方式监听请求结果。如果成功就调用continuation的resume()方法恢复挂起的协程,并传入服务器响应的数据,该值会成为suspendCoroutine函数的返回值。如果请求失败,就调用Continuation的resumeWithException()恢复被挂起的协程,并传入具体的异常原因。

这样不管之后我们要发起多少次网络请求,都不需要再重复进行回调实现了。比如说获取百度页面的响应数据,就可以这样写:

suspend fun getBaiduResponse(){
  try{
  val response=request("https://www.baidu.com/")
  //对服务器响应的数据进行处理
  }catch(e:Exception){
  //对异常情况进行处理
  }
}

由于getBaiduResponse()是一个挂起函数,因此当它调用了request()函数时,当前的协程就已经被立刻挂起,然后一直等待网络请求成功或失败后,当前协程才能恢复运行。这样即使不使用回调的写法,我们也能够获得异步网络请求的响应数据,而如果请求失败,则会进入catch语句当中。

事实上,suspendCoroutine函数几乎可以用于简化任何回调的写法,比如之前使用Retrofit来发起网络请求需要这样写:

val appService=ServiceCreator.create<AppService>()
appService.getAppData().enqueue(object:Callback<List<App>>{
  override fun onResponse(call:Call<List<App>>,response:Response<List<App>>){
  //得到服务器返回的数据
  }
  override fun onFailure(call:Call<List<App>>,t:Throwable){
  //在这里对异常情况进行处理
  }
})

使用suspendCoroutine函数,就可以对上述写法进行大幅度的简化。

由于不同的接口返回的数据类型不同,所以我们要使用泛型的方式,定义一个await()函数,代码如下所示:

suspend fun <T> Call<T>.await():T{
  return suspendCoroutine{continuation->
  enqueue(object:Callback<T>{
    override fun onResponse(call:Call<T>,response:Response<T>){
      val body=response.body()
      if(body!=null){
        continuation.resume(body)
        else continuation.resumeWithException(RuntimeException("response body is null"))
      }
      override fun onFailure(call:Call<T>,t:Throwable){
        continuation.resumeWithException(t)
      }
    })
  }
}

首先await()函数仍然是一个挂起函数,然后我们给它声明了一个泛型T,并将await()函数定义成Call< T >的扩展函数,这样所有返回值是Call类型的Retrofit网络请求接口就都可以直接调用await()函数了。

await()函数使用了suspendCoroutine函数来挂起当前协程,并且由于扩展函数的原因,我们现在拥有了Call对象的上下文,那么这里就可以直接调用enqueue()方法让Retrofit发起网络请求。使用同样的方式对Retrofit响应的数据或网络请求失败的情况进行处理。

有了await()函数之后,我们调用可以变得更简便,如下:

suspend fun getAppData(){
try{
val appList=ServiceCreator.create<AppService>().getAppData().await()
//对服务器响应的数据进行处理
}catch(e:Exception){
//对异常情况进行处理
}
}

这样只需要简单调用await()函数就可以让Retrofit发起网络请求,并直接获得服务器响应的数据。当然每次网络请求都要进行一次try catch处理也比较麻烦,在不处理的情况下,如果发生异常会一层层往上抛出,一直到被某一层函数处理位置,因此我们可以统一一个入口函数只进行一次try catch。