目录
- 一、select是什么
- 二、select和Channel
一、select是什么
select——>用于选择更快的结果。
基于场景理解
比如客户端要查询一个商品的详情。两个服务:缓存服务,速度快但信息可能是旧的;网络服务,速度慢但信息一定是最新的。
如何实现上述逻辑:
runBlocking { | |
suspend fun getCacheInfo(productId: String): Product { | |
delay(100L) | |
return Product(productId, 8.9) | |
} | |
suspend fun getNetworkInfo(productId: String): Product? { | |
delay(200L) | |
return Product(productId, 8.8) | |
} | |
fun updateUI(product: Product) { | |
println("${product.productId} : ${product.price}") | |
} | |
val startTime = System.currentTimeMillis() | |
val productId = "001" | |
val cacheInfo = getCacheInfo(productId) | |
if (cacheInfo != null) { | |
updateUI(cacheInfo) | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} | |
val latestInfo = getNetworkInfo(productId) | |
if (latestInfo != null) { | |
updateUI(latestInfo) | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} | |
} |
001 : 8.9
Time cost: 113
001 : 8.8
Time cost: 324
上述程序分为四步:第一步:查询缓存信息;第二步:缓存服务返回信息,更新 UI;第三步:查询网络服务;第四步:网络服务返回信息,更新 UI。
用户可以第一时间看到商品的信息,虽然它暂时会展示旧的信息,但由于我们同时查询了网络服务,旧缓存信息也马上会被替代成新的信息。但是可能存在一些问题:如果程序卡在了缓存服务,获取网络服务就会无法执行。是因为 getCacheInfo() 它是一个挂起函数,只有这个程序执行成功以后,才可以继续执行后面的任务。能否做到:两个挂起函数同时执行,谁返回的速度更快,就选择哪个结果。答案是使用select。
runBlocking { | |
suspend fun getCacheInfo(productId: String): Product { | |
delay(100L) | |
return Product(productId, 8.9) | |
} | |
suspend fun getNetworkInfo(productId: String): Product { | |
delay(200L) | |
return Product(productId, 8.8) | |
} | |
fun updateUI(product: Product) { | |
println("${product.productId} : ${product.price}") | |
} | |
val startTime = System.currentTimeMillis() | |
val productId = "001" | |
val product = select<Product?> { | |
async { | |
getCacheInfo(productId) | |
}.onAwait { | |
it | |
} | |
async { | |
getNetworkInfo(productId) | |
}.onAwait { | |
it | |
} | |
} | |
if (product != null) { | |
updateUI(product) | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} | |
} |
001 : 8.9
Time cost: 134
Process finished with exit code 0
由于缓存的服务更快,所以,select 确实帮我们选择了更快的那个结果。我们的 select 可以在缓存服务出现问题的时候,灵活选择网络服务的结果。从而避免用户等待太长的时间,得到糟糕的体验。
在上述代码中,用户大概率是会展示旧的缓存信息。但实际场景下,我们是需要进一步更新最新信息的。
runBlocking { | |
suspend fun getCacheInfo(productId: String): Product { | |
delay(100L) | |
return Product(productId, 8.9) | |
} | |
suspend fun getNetworkInfo(productId: String): Product { | |
delay(200L) | |
return Product(productId, 8.8) | |
} | |
fun updateUI(product: Product) { | |
println("${product.productId} : ${product.price}") | |
} | |
val startTime = System.currentTimeMillis() | |
val productId = "001" | |
val cacheDeferred = async { | |
getCacheInfo(productId) | |
} | |
val latestDeferred = async { | |
getNetworkInfo(productId) | |
} | |
val product = select<Product?> { | |
cacheDeferred.onAwait { | |
it.copy(isCache = true) | |
} | |
latestDeferred.onAwait { | |
it.copy(isCache = false) | |
} | |
} | |
if (product != null) { | |
updateUI(product) | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} | |
if (product != null && product.isCache) { | |
val latest = latestDeferred.await() ?: return@runBlocking | |
updateUI(latest) | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} | |
} |
001 : 8.9
Time cost: 124
001 : 8.8
Time cost: 228
Process finished with exit code 0
如果是多个服务的缓存场景呢?
runBlocking { | |
val startTime = System.currentTimeMillis() | |
val productId = "001" | |
suspend fun getCacheInfo(productId: String): Product { | |
delay(100L) | |
return Product(productId, 8.9) | |
} | |
suspend fun getCacheInfo2(productId: String): Product { | |
delay(50L) | |
return Product(productId, 8.85) | |
} | |
suspend fun getNetworkInfo(productId: String): Product { | |
delay(200L) | |
return Product(productId, 8.8) | |
} | |
fun updateUI(product: Product) { | |
println("${product.productId} : ${product.price}") | |
} | |
val cacheDeferred = async { | |
getCacheInfo(productId) | |
} | |
val cacheDeferred2 = async { | |
getCacheInfo2(productId) | |
} | |
val latestDeferred = async { | |
getNetworkInfo(productId) | |
} | |
val product = select<Product?> { | |
cacheDeferred.onAwait { | |
it.copy(isCache = true) | |
} | |
cacheDeferred2.onAwait { | |
it.copy(isCache = true) | |
} | |
latestDeferred.onAwait { | |
it.copy(isCache = true) | |
} | |
} | |
if (product != null) { | |
updateUI(product) | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} | |
if (product != null && product.isCache) { | |
val latest = latestDeferred.await() | |
updateUI(latest) | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} | |
} |
Log
001 : 8.85
Time cost: 79
001 : 8.8
Time cost: 229
Process finished with exit code 0
select 代码模式,可以提升程序的整体响应速度。
二、select和Channel
runBlocking { | |
val startTime = System.currentTimeMillis() | |
val channel1 = produce { | |
send(1) | |
delay(200L) | |
send(2) | |
delay(200L) | |
send(3) | |
} | |
val channel2 = produce { | |
delay(100L) | |
send("a") | |
delay(200L) | |
send("b") | |
delay(200L) | |
send("c") | |
} | |
channel1.consumeEach { | |
println(it) | |
} | |
channel2.consumeEach { | |
println(it) | |
} | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} |
Log
1
2
3
a
b
c
Time cost: 853
Process finished with exit code 0
上述代码串行执行,可以使用select进行优化。
runBlocking { | |
val startTime = System.currentTimeMillis() | |
val channel1 = produce { | |
send(1) | |
delay(200L) | |
send(2) | |
delay(200L) | |
send(3) | |
} | |
val channel2 = produce { | |
delay(100L) | |
send("a") | |
delay(200L) | |
send("b") | |
delay(200L) | |
send("c") | |
} | |
suspend fun selectChannel( | |
channel1: ReceiveChannel<Int>, | |
channel2: ReceiveChannel<String> | |
): Any { | |
return select<Any> { | |
if (!channel1.isClosedForReceive) { | |
channel1.onReceive { | |
it.also { | |
println(it) | |
} | |
} | |
} | |
if (!channel2.isClosedForReceive) { | |
channel2.onReceive { | |
it.also { | |
println(it) | |
} | |
} | |
} | |
} | |
} | |
repeat(6) { | |
selectChannel(channel1, channel2) | |
} | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} |
Log
1
a
2
b
3
c
Time cost: 574
Process finished with exit code 0
从代码执行结果可以发现程序的执行耗时有效减少。onReceive{} 是 Channel 在 select 当中的语法,当 Channel 当中有数据以后,它就会被回调,通过这个 Lambda,将结果传出去。执行了 6 次 select,目的是要把两个管道中的所有数据都消耗掉。
如果Channel1不生产数据了,程序会如何执行?
runBlocking { | |
val startTime = System.currentTimeMillis() | |
val channel1 = produce<String> { | |
delay(5000L) | |
} | |
val channel2 = produce<String> { | |
delay(100L) | |
send("a") | |
delay(200L) | |
send("b") | |
delay(200L) | |
send("c") | |
} | |
suspend fun selectChannel( | |
channel1: ReceiveChannel<String>, | |
channel2: ReceiveChannel<String> | |
): String = select<String> { | |
channel1.onReceive { | |
it.also { | |
println(it) | |
} | |
} | |
channel2.onReceive { | |
it.also { | |
println(it) | |
} | |
} | |
} | |
repeat(3) { | |
selectChannel(channel1, channel2) | |
} | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} |
Log
a
b
c
Time cost: 570
Process finished with exit code 0
如果不知道Channel的个数,如何避免ClosedReceiveChannelException?
使用:onReceiveCatching{}
runBlocking { | |
val startTime = System.currentTimeMillis() | |
val channel1 = produce<String> { | |
delay(5000L) | |
} | |
val channel2 = produce<String> { | |
delay(100L) | |
send("a") | |
delay(200L) | |
send("b") | |
delay(200L) | |
send("c") | |
} | |
suspend fun selectChannel( | |
channel1: ReceiveChannel<String>, | |
channel2: ReceiveChannel<String> | |
): String = select<String> { | |
channel1.onReceiveCatching { | |
it.getOrNull() ?: "channel1 is closed!" | |
} | |
channel2.onReceiveCatching { | |
it.getOrNull() ?: "channel2 is closed!" | |
} | |
} | |
repeat(6) { | |
val result = selectChannel(channel1, channel2) | |
println(result) | |
} | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} |
Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 584
Process finished with exit code 0
得到所有结果以后,程序不会立即退出,因为 channel1 一直在 delay()。
所以我们需要在6次repeat之后将channel关闭。
runBlocking { | |
val startTime = System.currentTimeMillis() | |
val channel1 = produce<String> { | |
delay(15000L) | |
} | |
val channel2 = produce<String> { | |
delay(100L) | |
send("a") | |
delay(200L) | |
send("b") | |
delay(200L) | |
send("c") | |
} | |
suspend fun selectChannel( | |
channel1: ReceiveChannel<String>, | |
channel2: ReceiveChannel<String> | |
): String = select<String> { | |
channel1.onReceiveCatching { | |
it.getOrNull() ?: "channel1 is closed!" | |
} | |
channel2.onReceiveCatching { | |
it.getOrNull() ?: "channel2 is closed!" | |
} | |
} | |
repeat(6) { | |
val result = selectChannel(channel1, channel2) | |
println(result) | |
} | |
channel1.cancel() | |
channel2.cancel() | |
println("Time cost: ${System.currentTimeMillis() - startTime}") | |
} |
Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 612
Process finished with exit code 0
Deferred、Channel 的 API:
public interface Deferred : CoroutineContext.Element { | |
public suspend fun join() | |
public suspend fun await(): T | |
public val onJoin: SelectClause0 | |
public val onAwait: SelectClause1<T> | |
} | |
public interface SendChannel<in E> | |
public suspend fun send(element: E) | |
public val onSend: SelectClause2<E, SendChannel<E>> | |
} | |
public interface ReceiveChannel<out E> { | |
public suspend fun receive(): E | |
public suspend fun receiveCatching(): ChannelResult<E> | |
public val onReceive: SelectClause1<E> | |
public val onReceiveCatching: SelectClause1<ChannelResult<E>> | |
} |
当 select 与 Deferred 结合使用的时候,当并行的 Deferred 比较多的时候,你往往需要在得到一个最快的结果以后,去取消其他的 Deferred。
通过 async 并发执行协程,也可以借助 select 得到最快的结果。
runBlocking { | |
suspend fun <T> fastest(vararg deferreds: Deferred<T>): T = select { | |
fun cancelAll() = deferreds.forEach { | |
it.cancel() | |
} | |
for (deferred in deferreds) { | |
deferred.onAwait { | |
cancelAll() | |
it | |
} | |
} | |
} | |
val deferred1 = async { | |
delay(100L) | |
println("done1") | |
"result1" | |
} | |
val deferred2 = async { | |
delay(200L) | |
println("done2") | |
"result2" | |
} | |
val deferred3 = async { | |
delay(300L) | |
println("done3") | |
"result3" | |
} | |
val deferred4 = async { | |
delay(400L) | |
println("done4") | |
"result4" | |
} | |
val deferred5 = async { | |
delay(5000L) | |
println("done5") | |
"result5" | |
} | |
val fastest = fastest(deferred1, deferred2, deferred3, deferred4, deferred5) | |
println(fastest) | |
} |
Log
done1
result1
Process finished with exit code 0