Java OkHttp框架源码超详细解析

Java
274
0
0
2023-06-10
目录
  • 一、自己的理解的OkHttp
  • 二、OkHttp的使用方法
  • 三、基本对象介绍
  • 1.OkHttpClient
  • 2.request
  • 3.Call
  • 4.RealCall
  • 5.AsyncCall
  • 6.Dispatcher
  • 四、流程分析
  • 1.同步请求
  • 2.异步请求
  • 3.获取Response
  • 五、Interceptor
  • 六、RealInterceptorChain
  • 七、拦截器
  • 1.client.interceptors
  • 2.RetryAndFollowUpInterceptor
  • 3.BridgeInterceptor
  • 4.CacheInterceptor
  • 5.ConnectInterceptor
  • 6.client.networkInterceptors
  • 7.CallServerInterceptor

一、自己的理解的OkHttp

我理解的http的本质就是基于socket连接,把要传输的数据按照http协议的格式去封装后,传输在网络中,以此来实现的网络通信。

而OkHttp协议就是帮助我们,把我们把要传输的数据请求,按照http协议的格式,传输在Socket上,当然还有很多优化管理这些请求和连接的方法,例如:对于这些请求的管理:最多同时进行64个请求,同域名的最多同时进行5个请求。还有Socket连接池的管理。

二、OkHttp的使用方法

1.创建一个client,构建一个request

OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder()
      .url("https://www.baidu.com/")
      .build();

2.同步请求

Response response = client.newCall(request).execute();

3.异步请求

client.newCall(request).enqueue(new Callback() {
  @Override
  public void onFailure(@NotNull Call call, @NotNull IOException e) {
  //todo handle request failed
  }
  @Override
  public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
      //todo handle Response
  }
});

三、基本对象介绍

1.OkHttpClient

一个请求的配置类,采用了建造者模式,方便用户配置一些请求参数,如配置callTimeout,cookie,interceptor等等。

open class OkHttpClient internal constructor(builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {constructor() : this(Builder())
  class Builder constructor() {  //调度器
internal var dispatcher: Dispatcher = Dispatcher()
//连接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//整体流程拦截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络流程拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//流程监听器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
//连接失败时是否重连
internal var retryOnConnectionFailure = true
//服务器认证设置
internal var authenticator: Authenticator = Authenticator.NONE
//是否重定向
internal var followRedirects = true
//是否从HTTP重定向到HTTPS
internal var followSslRedirects = true
//cookie设置
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
//缓存设置
internal var cache: Cache? = null
//DNS设置
internal var dns: Dns = Dns.SYSTEM
//代理设置
internal var proxy: Proxy? = null
//代理选择器设置
internal var proxySelector: ProxySelector? = null
//代理服务器认证设置
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
//socket配置
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
//https socket配置
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var xTrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
//协议
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
//域名校验
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
//请求超时
internal var callTimeout =
//连接超时
internal var connectTimeout =_000
//读取超时
internal var readTimeout =_000
//写入超时
internal var writeTimeout =_000
internal var pingInterval =
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
···省略代码···

2.request

同样是请求参数的配置类,也同样采用了建造者模式,但相比于OkHttpClient,Request就十分简单了,只有四个参数,分别是请求URL、请求方法、请求头、请求体。

class Request internal constructor(@get:JvmName("url") val url: HttpUrl,
  @get:JvmName("method") val method: String,@get:JvmName("headers") val headers: Headers,
  @get:JvmName("body") val body: RequestBody?,internal val tags: Map<Class<*>, Any>
) {open class Builder {
//请求的URL
internal var url: HttpUrl? = null
//请求方法,如:GET、POST..
internal var method: String
//请求头
internal var headers: Headers.Builder
//请求体
internal var body: RequestBody? = null
  ···省略代码···

3.Call

请求调用接口,表示这个请求已经准备好可以执行,也可以取消,只能执行一次。

interface Call : Cloneable {/** 返回发起此调用的原始请求 */
  fun request(): Request/**
   * 同步请求,立即执行。 * 
   * 抛出两种异常: *. 请求失败抛出IOException;
   *. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/@Throws(IOException::class)
  fun execute(): Response/**
   * 异步请求,将请求安排在将来的某个时间点执行。 * 如果在执行过一回的前提下再次执行抛出IllegalStateException */
  fun enqueue(responseCallback: Callback)/** 取消请求。已经完成的请求不能被取消 */
  fun cancel()/** 是否已被执行  */
  fun isExecuted(): Boolean/** 是否被取消   */
  fun isCanceled(): Boolean/** 一个完整Call请求流程的超时时间配置,默认选自[OkHttpClient.Builder.callTimeout] */
  fun timeout(): Timeout/** 克隆这个call,创建一个新的相同的Call */
  public override fun clone(): Call/** 利用工厂模式来让 OkHttpClient 来创建 Call对象 */
  fun interface Factory {  fun newCall(request: Request): Call
  }
}

4.RealCall

OkHttpClient.kt
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

RealCall是Call接口的具体实现类,是应用端与网络层的连接桥,展示应用端原始的请求与连接数据,以及网络层返回的response及其它数据流。 通过使用方法也可知,创建RealCall对象后,就要调用同步或异步请求方法,所以它里面还包含同步请求 execute()与异步请求 enqueue()方法。(后面具体展开分析)

5.AsyncCall

异步请求调用,是RealCall的一个内部类,就是一个Runnable,被dispatcher调度器中的线程池所执行。

inner class AsyncCall(  //用户传入的响应回调方法
private val responseCallback: Callback
  ) : Runnable {  //同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger()
  private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
  this.callsPerHost = other.callsPerHost
}
···省略代码···  fun executeOn(executorService: ExecutorService) {
  client.dispatcher.assertThreadDoesntHoldLock()
  var success = false
  try {
    //调用线程池执行
    executorService.execute(this)
    success = true
  } catch (e: RejectedExecutionException) {
    val ioException = InterruptedIOException("executor rejected")
    ioException.initCause(e)
    noMoreExchanges(ioException)
    //请求失败,调用 Callback.onFailure() 方法
    responseCallback.onFailure(this@RealCall, ioException)
  } finally {
    if (!success) {
      //请求失败,调用调度器finish方法
      client.dispatcher.finished(this) // This call is no longer running!
    }
  }
}
override fun run() {
  threadName("OkHttp ${redactedUrl()}") {
    var signalledCallback = false
    timeout.enter()
    try {
      //请求成功,获取到服务器返回的response
      val response = getResponseWithInterceptorChain()
      signalledCallback = true
      //调用 Callback.onResponse() 方法,将 response 传递出去
      responseCallback.onResponse(this@RealCall, response)
    } catch (e: IOException) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
      } else {
        //请求失败,调用 Callback.onFailure() 方法
        responseCallback.onFailure(this@RealCall, e)
      }
    } catch (t: Throwable) {
      //请求出现异常,调用cancel方法来取消请求
      cancel()
      if (!signalledCallback) {
        val canceledException = IOException("canceled due to $t")
        canceledException.addSuppressed(t)
        //请求失败,调用 Callback.onFailure() 方法
        responseCallback.onFailure(this@RealCall, canceledException)
      }
      throw t
    } finally {
      //请求结束,调用调度器finish方法
      client.dispatcher.finished(this)
    }
  }
}
  }

6.Dispatcher

调度器,用来调度Call对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall对象。

class Dispatcher constructor() {@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService  get() {
  if (executorServiceOrNull == null) {
    //创建一个缓存线程池,来处理请求调用,这个线程池的核心线程数是,等待队列的长度也是0,意味着
    //线程池会直接创建新的线程去处理请求
    executorServiceOrNull = ThreadPoolExecutor(, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
        SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
  }
  return executorServiceOrNull!!
}
  /** 已准备好的异步请求队列 */@get:Synchronized
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()/** 正在运行的异步请求队列, 包含取消但是还未finish的AsyncCall */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()/** 正在运行的同步请求队列, 包含取消但是还未finish的RealCall */
  private val runningSyncCalls = ArrayDeque<RealCall>()
···省略代码···
}

四、流程分析

1.同步请求

client.newCall(request).execute();

newCall方法就是创建一个RealCall对象,然后执行其execute()方法。

  RealCall.ktoverride fun execute(): Response {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//请求超时开始计时
timeout.enter()
//开启请求监听
callStart()
try {
  //调用调度器中的 executed() 方法,调度器只是将 call 加入到了runningSyncCalls队列中
  client.dispatcher.executed(this)
  //调用getResponseWithInterceptorChain 方法拿到 response
  return getResponseWithInterceptorChain()
} finally {
  //执行完毕,调度器将该 call 从 runningSyncCalls队列中移除
  client.dispatcher.finished(this)
}
  }Dispatcher.kt
  @Synchronized internal fun executed(call: RealCall) {  runningSyncCalls.add(call)
  }

调用调度器executed方法,就是将当前的RealCall对象加入到runningSyncCalls队列中,然后调用getResponseWithInterceptorChain方法拿到response。

2.异步请求

  RealCall.ktoverride fun enqueue(responseCallback: Callback) {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//开启请求监听
callStart()
//新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中
client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

然后调用调度器的enqueue方法

  Dispatcher.ktinternal fun enqueue(call: AsyncCall) {
//加锁,保证线程安全
synchronized(this) {
  //将该请求调用加入到 readyAsyncCalls 队列中
  readyAsyncCalls.add(call)
  // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
  // the same host.
  if (!call.call.forWebSocket) {
    //通过域名来查找有没有相同域名的请求,有则复用。
    val existingCall = findExistingCallWithHost(call.host)
    if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
  }
}
//执行请求
promoteAndExecute()
  }private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
//判断是否有请求正在执行
val isRunning: Boolean
//加锁,保证线程安全
synchronized(this) {
  //遍历 readyAsyncCalls 队列
  val i = readyAsyncCalls.iterator()
  while (i.hasNext()) {
    val asyncCall = i.next()
    //runningAsyncCalls 的数量不能大于最大并发请求数
    if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
    //同域名最大请求数,同一个域名最多允许5条线程同时执行请求
    if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
    //从 readyAsyncCalls 队列中移除,并加入到 executableCalls 及 runningAsyncCalls 队列中
    i.remove()
    asyncCall.callsPerHost.incrementAndGet()
    executableCalls.add(asyncCall)
    runningAsyncCalls.add(asyncCall)
  }
  //通过运行队列中的请求数量来判断是否有请求正在执行
  isRunning = runningCallsCount() >
}
//遍历可执行队列,调用线程池来执行AsyncCall
for (i in until executableCalls.size) {
  val asyncCall = executableCalls[i]
  asyncCall.executeOn(executorService)
}
return isRunning
  }

调度器的enqueue方法就是将AsyncCall加入到readyAsyncCalls队列中,然后调用promoteAndExecute方法来执行请求,promoteAndExecute方法做的其实就是遍历readyAsyncCalls队列,然后将符合条件的请求用线程池执行,也就是会执行AsyncCall.run()方法。

AsyncCall 方法的具体代码看上面的这边就不在此展示了,简单来说就是调用getResponseWithInterceptorChain方法拿到response,然后通过Callback.onResponse方法传递出去。反之,如果请求失败,捕获了异常,就通过Callback.onFailure将异常信息传递出去。 最终,请求结束,调用调度器finish方法。

  Dispatcher.kt/** 异步请求调用结束方法 */
  internal fun finished(call: AsyncCall) {  call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
  }/** 同步请求调用结束方法 */
  internal fun finished(call: RealCall) {  finished(runningSyncCalls, call)
  }private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
  //将当前请求调用从 正在运行队列 中移除
  if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
  idleCallback = this.idleCallback
}
//继续执行剩余请求,将call从readyAsyncCalls中取出加入到runningAsyncCalls,然后执行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
  //如果执行完了所有请求,处于闲置状态,调用闲置回调方法
  idleCallback.run()
}
  }

请求结束,异步请求,把当前同域名的计数减一,然后后面和同步一样,都是把请求从正在执行的队列中移除,然后继续执行剩余请求。

3.获取Response

接着就是看看getResponseWithInterceptorChain方法是如何拿到response的。

  internal fun getResponseWithInterceptorChain(): Response {  //拦截器列表
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
  interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
//构建拦截器责任链
val chain = RealInterceptorChain(
    call = this,
    interceptors = interceptors,
    index =,
    exchange = null,
    request = originalRequest,
    connectTimeoutMillis = client.connectTimeoutMillis,
    readTimeoutMillis = client.readTimeoutMillis,
    writeTimeoutMillis = client.writeTimeoutMillis
)
//如果call请求完成,那就意味着交互完成了,没有更多的东西来交换了
var calledNoMoreExchanges = false
try {
  //执行拦截器责任链来获取 response
  val response = chain.proceed(originalRequest)
  //如果被取消,关闭响应,抛出异常
  if (isCanceled()) {
    response.closeQuietly()
    throw IOException("Canceled")
  }
  return response
} catch (e: IOException) {
  calledNoMoreExchanges = true
  throw noMoreExchanges(e) as Throwable
} finally {
  if (!calledNoMoreExchanges) {
    noMoreExchanges(null)
  }
}
  }

简单概括一下:这里采用了责任链设计模式,通过拦截器构建了以RealInterceptorChain责任链,然后执行proceed方法来得到response。

那么,这又涉及拦截器是什么?拦截器责任链又是什么?

五、Interceptor

只声明了一个拦截器方法,在子类中具体实现,还包含一个Chain接口,核心方法是proceed(request)处理请求来获取response。

fun interface Interceptor {/** 拦截方法 */
  @Throws(IOException::class)fun intercept(chain: Chain): Response
  interface Chain {  /** 原始请求数据 */
fun request(): Request
/** 核心方法,处理请求,获取response */
@Throws(IOException::class)
fun proceed(request: Request): Response
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
  }
}

六、RealInterceptorChain

拦截器链就是实现Interceptor.Chain接口,重点就是复写的proceed方法。

class RealInterceptorChain(internal val call: RealCall,
  private val interceptors: List<Interceptor>,private val index: Int,
  internal val exchange: Exchange?,internal val request: Request,
  internal val connectTimeoutMillis: Int,internal val readTimeoutMillis: Int,
  internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
···省略代码···private var calls: Int =
  override fun call(): Call = calloverride fun request(): Request = request
  @Throws(IOException::class)override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
  check(exchange.finder.sameHostAndPort(request.url)) {
    "network interceptor ${interceptors[index -]} must retain the same host and port"
  }
  check(calls ==) {
    "network interceptor ${interceptors[index -]} must call proceed() exactly once"
  }
}
//index+, 复制创建新的责任链,也就意味着调用责任链中的下一个处理者,也就是下一个拦截器
val next = copy(index = index +, request = request)
//取出当前拦截器
val interceptor = interceptors[index]
//执行当前拦截器的拦截方法
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
    "interceptor $interceptor returned null")
if (exchange != null) {
  check(index + >= interceptors.size || next.calls == 1) {
    "network interceptor $interceptor must call proceed() exactly once"
  }
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
  }
}

链式调用,最终会向下执行拦截器列表中的每个拦截器,然后向上返回Response。

七、拦截器

各类拦截器的总结,按顺序:

  • client.interceptors:这是由开发者设置的,会在所有的拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义header、自定义log等等。
  • RetryAndFollowUpInterceptor:这里会对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。
  • BridgeInterceptor:是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应。
  • CacheInterceptor:这里主要是缓存的相关处理,会根据用户在OkHttpClient里定义的缓存配置,然后结合请求新建一个缓存策略,由它来判断是使用网络还是缓存来构建response。
  • ConnectInterceptor:这里主要就是负责建立连接,会建立TCP连接或者TLS连接。
  • Client.networkInterceptors:这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。
  • CallServerInterceptor:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,将请求头与请求体发送给服务器,以及解析服务器返回的response。

接下来我们按顺序,从上往下,对这些拦截器进行一一解读。

1.client.interceptors

这是用户自己定义的拦截器,称为应用拦截器,会保存在OkHttpClient的interceptors: List<Interceptor>列表中。 他是拦截器责任链中的第一个拦截器,也就是说会第一个执行拦截方法,我们可以通过它来添加自定义Header信息,如:

class HeaderInterceptor implements Interceptor {  @Override
public Response intercept(Chain chain) throws IOException {
    Request request = chain.request().newBuilder()
            .addHeader("device-android", "xxxxxxxxxxx")
            .addHeader("country-code", "ZH")
            .build();
    return chain.proceed(request);
}
}
//然后在 OkHttpClient 中加入
OkHttpClient client = new OkHttpClient.Builder()  .connectTimeout(, TimeUnit.SECONDS)
.readTimeout(, TimeUnit.SECONDS)
.writeTimeout(, TimeUnit.SECONDS)
.cookieJar(new MyCookieJar())
.addInterceptor(new HeaderInterceptor())//添加自定义Header拦截器
.build();

2.RetryAndFollowUpInterceptor

第二个拦截器,从它的名字也可知道,它负责请求失败的重试工作与重定向的后续请求工作,同时它会对连接做一些初始化工作。

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {  val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount =
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
  //这里会新建一个ExchangeFinder,ConnectInterceptor会使用到
  call.enterNetworkInterceptorExchange(request, newExchangeFinder)
  var response: Response
  var closeActiveExchange = true
  try {
    if (call.isCanceled()) {
      throw IOException("Canceled")
    }
    try {
      response = realChain.proceed(request)
      newExchangeFinder = true
    } catch (e: RouteException) {
      //尝试通过路由连接失败。该请求将不会被发送。
      if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
        throw e.firstConnectException.withSuppressed(recoveredFailures)
      } else {
        recoveredFailures += e.firstConnectException
      }
      newExchangeFinder = false
      continue
    } catch (e: IOException) {
      //尝试与服务器通信失败。该请求可能已发送。
      if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
        throw e.withSuppressed(recoveredFailures)
      } else {
        recoveredFailures += e
      }
      newExchangeFinder = false
      continue
    }
    // Attach the prior response if it exists. Such responses never have a body.
    //尝试关联上一个response,注意:body是为null
    if (priorResponse != null) {
      response = response.newBuilder()
          .priorResponse(priorResponse.newBuilder()
              .body(null)
              .build())
          .build()
    }
    val exchange = call.interceptorScopedExchange
    //会根据 responseCode 来判断,构建一个新的request并返回来重试或者重定向
    val followUp = followUpRequest(response, exchange)
    if (followUp == null) {
      if (exchange != null && exchange.isDuplex) {
        call.timeoutEarlyExit()
      }
      closeActiveExchange = false
      return response
    }
    //如果请求体是一次性的,不需要再次重试
    val followUpBody = followUp.body
    if (followUpBody != null && followUpBody.isOneShot()) {
      closeActiveExchange = false
      return response
    }
    response.body?.closeQuietly()
    //最大重试次数,不同的浏览器是不同的,比如:Chrome为,Safari则是16
    if (++followUpCount > MAX_FOLLOW_UPS) {
      throw ProtocolException("Too many follow-up requests: $followUpCount")
    }
    request = followUp
    priorResponse = response
  } finally {
    call.exitNetworkInterceptorExchange(closeActiveExchange)
  }
}
  }/** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/
  private fun recover(  e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
  ): Boolean {  //客户端禁止重试
if (!client.retryOnConnectionFailure) return false
//不能再次发送该请求体
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
//发生的异常是致命的,无法恢复,如:ProtocolException
if (!isRecoverable(e, requestSendStarted)) return false
//没有更多的路由来尝试重连
if (!call.retryAfterFailure()) return false
// 对于失败恢复,使用带有新连接的相同路由选择器
return true
  }
···省略代码··· 

3.BridgeInterceptor

从它的名字可以看出,他的定位是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,比如:添加Content-Type,添加Cookie,添加User-Agent等等。再将服务器返回的response做一些处理转换为客户端需要的response。比如:移除响应头中的Content-Encoding、Content-Length等等。

class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {  //获取原始请求数据
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
//重新构建请求头,请求体信息
val body = userRequest.body
val contentType = body.contentType()
requestBuilder.header("Content-Type", contentType.toString())
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.header("Host", userRequest.url.toHostHeader())
requestBuilder.header("Connection", "Keep-Alive")
   ···省略代码···  //添加cookie
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
  requestBuilder.header("Cookie", cookieHeader(cookies))
}
//添加user-agent
if (userRequest.header("User-Agent") == null) {
  requestBuilder.header("User-Agent", userAgent)
}
//重新构建一个Request,然后执行下一个拦截器来处理该请求
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
//创建一个新的responseBuilder,目的是将原始请求数据构建到response中
val responseBuilder = networkResponse.newBuilder()
    .request(userRequest)
if (transparentGzip &&
    "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
    networkResponse.promisesBody()) {
  val responseBody = networkResponse.body
  if (responseBody != null) {
    val gzipSource = GzipSource(responseBody.source())
    val strippedHeaders = networkResponse.headers.newBuilder()
        .removeAll("Content-Encoding")
        .removeAll("Content-Length")
        .build()
    //修改response header信息,移除Content-Encoding,Content-Length信息
    responseBuilder.headers(strippedHeaders)
    val contentType = networkResponse.header("Content-Type")
    //修改response body信息
    responseBuilder.body(RealResponseBody(contentType, -L, gzipSource.buffer()))
  }
}
return responseBuilder.build()
···省略代码···

4.CacheInterceptor

用户可以通过OkHttpClient.cache来配置缓存,缓存拦截器通过CacheStrategy来判断是使用网络还是缓存来构建response。

class CacheInterceptor(internal val cache: Cache?) : Interceptor {@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {  val call = chain.call()
//通过request从OkHttpClient.cache中获取缓存
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//创建一个缓存策略,用来确定怎么使用缓存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
//为空表示不使用网络,反之,则表示使用网络
val networkRequest = strategy.networkRequest
//为空表示不使用缓存,反之,则表示使用缓存
val cacheResponse = strategy.cacheResponse
//追踪网络与缓存的使用情况
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
//有缓存但不适用,关闭它
if (cacheCandidate != null && cacheResponse == null) {
  cacheCandidate.body?.closeQuietly()
}
//如果网络被禁止,但是缓存又是空的,构建一个code为的response,并返回
if (networkRequest == null && cacheResponse == null) {
  return Response.Builder()
      .request(chain.request())
      .protocol(Protocol.HTTP__1)
      .code(HTTP_GATEWAY_TIMEOUT)
      .message("Unsatisfiable Request (only-if-cached)")
      .body(EMPTY_RESPONSE)
      .sentRequestAtMillis(-L)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build().also {
        listener.satisfactionFailure(call, it)
      }
}
//如果我们禁用了网络不使用网络,且有缓存,直接根据缓存内容构建并返回response
if (networkRequest == null) {
  return cacheResponse!!.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .build().also {
        listener.cacheHit(call, it)
      }
}
//为缓存添加监听
if (cacheResponse != null) {
  listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
  listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
  //责任链往下处理,从服务器返回response 赋值给 networkResponse
  networkResponse = chain.proceed(networkRequest)
} finally {
  //捕获I/O或其他异常,请求失败,networkResponse为空,且有缓存的时候,不暴露缓存内容。
  if (networkResponse == null && cacheCandidate != null) {
    cacheCandidate.body?.closeQuietly()
  }
}
//如果有缓存
if (cacheResponse != null) {
  //且网络返回response code为的时候,使用缓存内容新构建一个Response返回。
  if (networkResponse?.code == HTTP_NOT_MODIFIED) {
    val response = cacheResponse.newBuilder()
        .headers(combine(cacheResponse.headers, networkResponse.headers))
        .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
        .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()
    networkResponse.body!!.close()
    // Update the cache after combining headers but before stripping the
    // Content-Encoding header (as performed by initContentStream()).
    cache!!.trackConditionalCacheHit()
    cache.update(cacheResponse, response)
    return response.also {
      listener.cacheHit(call, it)
    }
  } else {
    //否则关闭缓存响应体
    cacheResponse.body?.closeQuietly()
  }
}
//构建网络请求的response
val response = networkResponse!!.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build()
//如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络请求response存到cache中
if (cache != null) {
  //根据response的code,header以及CacheControl.noStore来判断是否可以缓存
  if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
    // 将该response存入缓存
    val cacheRequest = cache.put(response)
    return cacheWritingResponse(cacheRequest, response).also {
      if (cacheResponse != null) {
        listener.cacheMiss(call)
      }
    }
  }
  //根据请求方法来判断缓存是否有效,只对Get请求进行缓存,其它方法的请求则移除
  if (HttpMethod.invalidatesCache(networkRequest.method)) {
    try {
      //缓存无效,将该请求缓存从client缓存配置中移除
      cache.remove(networkRequest)
    } catch (_: IOException) {
      // The cache cannot be written.
    }
  }
}
return response
  }
···省略代码···  

5.ConnectInterceptor

负责实现与服务器真正建立起连接,

object ConnectInterceptor : Interceptor {@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {  val realChain = chain as RealInterceptorChain
//初始化一个exchange对象
val exchange = realChain.call.initExchange(chain)
//根据这个exchange对象来复制创建一个新的连接责任链
val connectedChain = realChain.copy(exchange = exchange)
//执行该连接责任链
return connectedChain.proceed(realChain.request)
  }
}

一扫下来,代码十分简单,拦截方法里就只有三步。

  • 初始化一个exchange对象。
  • 然后根据这个exchange对象来复制创建一个新的连接责任链。
  • 执行该连接责任链。

那这个exchange对象又是什么呢?

RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {  ...省略代码...
//这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创建的
val exchangeFinder = this.exchangeFinder!!
//返回一个ExchangeCodec(是个编码器,为request编码以及为response解码)
val codec = exchangeFinder.find(client, chain)
//根据exchangeFinder与codec新构建一个Exchange对象,并返回
val result = Exchange(this, eventListener, exchangeFinder, codec)
  ...省略代码...  return result
  }

具体看看ExchangeFinder.find()这一步,

ExchangeFinder.kt
fun find(  client: OkHttpClient,
chain: RealInterceptorChain
  ): ExchangeCodec {  try {
  //查找合格可用的连接,返回一个 RealConnection 对象
  val resultConnection = findHealthyConnection(
      connectTimeout = chain.connectTimeoutMillis,
      readTimeout = chain.readTimeoutMillis,
      writeTimeout = chain.writeTimeoutMillis,
      pingIntervalMillis = client.pingIntervalMillis,
      connectionRetryEnabled = client.retryOnConnectionFailure,
      doExtensiveHealthChecks = chain.request.method != "GET"
  )
  //根据连接,创建并返回一个请求响应编码器:HttpExchangeCodec 或者 Http2ExchangeCodec,分别对应Http1协议与Http2协议
  return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
  trackFailure(e.lastConnectException)
  throw e
} catch (e: IOException) {
  trackFailure(e)
  throw RouteException(e)
}
  }

继续往下看findHealthyConnection方法

ExchangeFinder.ktprivate fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
  ): RealConnection {  while (true) {
  //重点:查找连接
  val candidate = findConnection(
      connectTimeout = connectTimeout,
      readTimeout = readTimeout,
      writeTimeout = writeTimeout,
      pingIntervalMillis = pingIntervalMillis,
      connectionRetryEnabled = connectionRetryEnabled
  )
  //检查该连接是否合格可用,合格则直接返回该连接
  if (candidate.isHealthy(doExtensiveHealthChecks)) {
    return candidate
  }
  //如果该连接不合格,标记为不可用,从连接池中移除
  candidate.noNewExchanges()
...省略代码...
}
  }

所以核心方法就是findConnection,我们继续深入看看该方法:

private fun findConnection(  connectTimeout: Int, 
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
  ): RealConnection {  if (call.isCanceled()) throw IOException("Canceled")
//第一次,尝试重连 call 中的 connection,不需要去重新获取连接
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
  var toClose: Socket? = null
  synchronized(callConnection) {
    if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
      toClose = call.releaseConnectionNoEvents()
    }
  }
  //如果 call 中的 connection 还没有释放,就重用它。
  if (call.connection != null) {
    check(toClose == null)
    return callConnection
  }
  //如果 call 中的 connection 已经被释放,关闭Socket.
  toClose?.closeQuietly()
  eventListener.connectionReleased(call, callConnection)
}
//需要一个新的连接,所以重置一些状态
refusedStreamCount =
connectionShutdownCount =
otherFailureCount =
//第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
  val result = call.connection!!
  eventListener.connectionAcquired(call, result)
  return result
}
//连接池中是空的,准备下次尝试连接的路由
val routes: List<Route>?
val route: Route
...省略代码...
  //第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用
  if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
    val result = call.connection!!
    eventListener.connectionAcquired(call, result)
    return result
  }
  route = localRouteSelection.next()
}
//第四次,手动创建一个新连接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
  newConnection.connect(
      connectTimeout,
      readTimeout,
      writeTimeout,
      pingIntervalMillis,
      connectionRetryEnabled,
      call,
      eventListener
  )
} finally {
  call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
//第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。
//这一步主要是为了校验一下,比如已经有了一条连接了,就可以直接复用,而不用使用手动创建的新连接。
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
  val result = call.connection!!
  nextRouteToTry = route
  newConnection.socket().closeQuietly()
  eventListener.connectionAcquired(call, result)
  return result
}
synchronized(newConnection) {
  //将手动创建的新连接放入连接池
  connectionPool.put(newConnection)
  call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
  }

在代码中可以看出,一共做了5次尝试去得到连接:

  • 第一次,尝试重连 call 中的 connection,不需要去重新获取连接。
  • 第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用。
  • 第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用。
  • 第四次,手动创建一个新连接。
  • 第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。

这一步就是为了建立连接。

6.client.networkInterceptors

该拦截器称为网络拦截器,与client.interceptors一样也是由用户自己定义的,同样是以列表的形式存在OkHttpClient中。

那这两个拦截器有什么不同呢?

其实他两的不同都是由于他们所处的位置不同所导致的,应用拦截器处于第一个位置,所以无论如何它都会被执行,而且只会执行一次。而网络拦截器处于倒数第二的位置,它不一定会被执行,而且可能会被执行多次,比如:在RetryAndFollowUpInterceptor失败或者CacheInterceptor直接返回缓存的情况下,我们的网络拦截器是不会被执行的。

7.CallServerInterceptor

到了这里,客户端与服务器已经建立好了连接,接着就是将请求头与请求体发送给服务器,以及解析服务器返回的response了。

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {  val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
try {
  //写入请求头
  exchange.writeRequestHeaders(request)
  //如果不是GET请求,并且请求体不为空
  if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
    //当请求头为"Expect:-continue"时,在发送请求体之前需要等待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。
    //POST请求,先发送请求头,在获取到继续状态后继续发送请求体
    if ("-continue".equals(request.header("Expect"), ignoreCase = true)) {
      //刷新请求,即发送请求头
      exchange.flushRequest()
      //解析响应头
      responseBuilder = exchange.readResponseHeaders(expectContinue = true)
      exchange.responseHeadersStart()
      invokeStartEvent = false
    }
    //写入请求体
    if (responseBuilder == null) {
      if (requestBody.isDuplex()) {
        //如果请求体是双公体,就先发送请求头,稍后在发送请求体
        exchange.flushRequest()
        val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
        //写入请求体
        requestBody.writeTo(bufferedRequestBody)
      } else {
        //如果获取到了"Expect:-continue"响应,写入请求体
        val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
        requestBody.writeTo(bufferedRequestBody)
        bufferedRequestBody.close()
      }
   ···省略代码···
    //请求结束,发送请求体
    exchange.finishRequest()
···省略代码···
try {
  if (responseBuilder == null) {
    //读取响应头
    responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
    ···省略代码···
  //构建一个response
  var response = responseBuilder
      .request(request)
      .handshake(exchange.connection.handshake())
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build()
  var code = response.code
  ···省略代码···
  return response
···省略代码···

简单概括一下:写入发送请求头,然后根据条件是否写入发送请求体,请求结束。解析服务器返回的请求头,然后构建一个新的response,并返回。 这里CallServerInterceptor是拦截器责任链中最后一个拦截器了,所以他不会再调用chain.proceed()方法往下执行,而是将这个构建的response往上传递给责任链中的每个拦截器。

总结一下流程: