查看原文
其他

OkHttp 用了那么久,你竟然不知道它的原理?

点击关注 👉 Java面试那些事儿 2022-04-29

大家好,我是D哥

点击关注下方公众号,Java面试资料 都在这里

作者:码农的书柜  
来源:https://www.jianshu.com/p/76d79a1407aa

最近,很多朋友反映大厂的面试喜欢挖底层知识,像OkHttp这些都是必问的问题。这里就给大家分享一篇非常有帮助的技术文吧。


OkHttp是一个高效的HTTP库:


  • 支持HTTP / 2,允许对同一主机的所有请求共享一个套接字

  • 通过连接池可减少请求延迟(如果HTTP / 2不可用)

  • 支持GZIP压缩减少数据流量

  • 响应缓存可以完全避免网络重复请求

  • 静默恢复处理常见的连接问题


本文就以请求使用为入口,来深入学习下OkHttp。



#请求流程分析


同步请求


Okhttp同步GET请求使用:

// 新建一个Okhttp客户端(也可以通过OkHttpClient.Builder来构造)OkHttpClient client = new OkHttpClient();// 构造一个请求对象Request request = new Request.Builder().url(url).build();// 执行同步请求,返回响应Response response = client.newCall(request).execute();// 从响应体中获取数据String str = response.body().string();

先来瞧瞧构建OkhttpClient的源码:

open class OkHttpClient internal constructor( builder: Builder) : Cloneable, Call.Factory, WebSocket.Factory { //若直接实例化OkHttpClient,则调用主构造函数以默认Builder作为参数 constructor() : this(Builder())
// 通过builder中的值赋值 @get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool ...
class Builder constructor() { //分发器 internal var dispatcher: Dispatcher = Dispatcher() //连接池 internal var connectionPool: ConnectionPool = ConnectionPool() //应用拦截器集合 internal val interceptors: MutableList<Interceptor> = mutableListOf() //网络拦截器集合 internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() //事件监听工厂 internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() //连接失败是否重试 internal var retryOnConnectionFailure = true internal var authenticator: Authenticator = Authenticator.NONE //是否跟随重定向 internal var followRedirects = true internal var followSslRedirects = true //cookie internal var cookieJar: CookieJar = CookieJar.NO_COOKIES //磁盘缓存 internal var cache: Cache? = null //dns internal var dns: Dns = Dns.SYSTEM //代理设置 internal var proxy: Proxy? = null
... }
}

OkhttpClient可以通过构造者配置参数来构建,也可以直接实例化,直接实例化其实也是内部调用构造者,只是传入的是默认builder。


再来看看OkhttpClient的新调用方法
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

发现返回的是RealCall,接下来去RealCall中看后续的execute执行方法

override fun execute(): Response { //确认call没有执行过并置executed为true,否则抛出异常 check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter() callStart() try { //标记执行中 client.dispatcher.executed(this) //通过拦截器链获取网络响应 return getResponseWithInterceptorChain() } finally { //标记执行结束 client.dispatcher.finished(this) }}

看起来比较精简,通过拦截器链获取网络响应,然后返回响应(拦截器链路在后续拦截器分析)。


异步请求


Okhttp异步GET请求使用:

OkHttpClient client = new OkHttpClient();Request request = new Request.Builder().url(url).build();// 执行异步请求,通过回调返回响应client.newCall(request).enqueue(new Callback() { @Override public void onFailure(@NotNull Call call, @NotNull IOException e) {}
@Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { // 从回调中通过响应体获取数据 String str = response.body().string(); }});

异步请求流程大致和同步请求相似,但是最后的执行方法是enqueue,并传入回调对象。


我们来看看源码:

override fun enqueue(responseCallback: Callback) { //确认call没有执行过并置executed为true,否则抛出异常 check(executed.compareAndSet(false, true)) { "Already Executed" } //监听回调 callStart() //调用Dispatcher的enqueue方法,并传入一个AsyncCall对象 client.dispatcher.enqueue(AsyncCall(responseCallback))}

内部调用了客户端的分发器的enqueue方法,并把AsyncCall(responseCallback)作为参数传入,AsyncCall是继承自Runnable,且是RealCall的内部类,我们先看Dispatcher.enqueue()方法

class Dispatcher constructor() { /** '异步准备执行'队列 */ private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** '异步正在执行'队列,包括取消但至今还没结束的 */ private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** ‘同步正在执行’队列*/ private val runningSyncCalls = ArrayDeque<RealCall>() ... internal fun enqueue(call: AsyncCall) { synchronized(this) { //加入准备执行队列 readyAsyncCalls.add(call) ... } // 执行 promoteAndExecute() }
private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() //检查请求是否超过最大请求数 if (runningAsyncCalls.size >= this.maxRequests) break //检查请求是否超过一个Host对应的最大请求数 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 }
for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] //调用AsyncCall的executeOn()方法 asyncCall.executeOn(executorService) }
return isRunning }}

可以从上面代码看出,就是将符合条件的调用从readyAsyncCalls队列提升到runningAsyncCalls,并调用 AsyncCall的executeOn() 方法,把线程池传入。


我们来看看AsyncCall:
inner class AsyncCall( private val responseCallback: Callback ) : Runnable { ... fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock()
var success = false try { //使用线程池执行自己的run()方法 executorService.execute(this) success = true } catch (e: RejectedExecutionException) { ... //失败回调 responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { //标记结束 client.dispatcher.finished(this) // This call is no longer running! } } }
override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false timeout.enter() try { //和同步请求一样,通过拦截器链获取网络响应 val response = getResponseWithInterceptorChain() signalledCallback = true //回调成功 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { ... } else { //回调失败 responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { cancel() if (!signalledCallback) { ... //回调失败 responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { //标记结束 client.dispatcher.finished(this) } } } }

使用线程池来执行自己,接下来就看run()方法,发现和同步请求一样,通过拦截器链获取网络响应,再调用回调对象的回调方法返回响应。


#拦截器分析


请求大致流程知道了,我们来看看重头戏,拦截器链里面做了什么操作。

@Throws(IOException::class)internal fun getResponseWithInterceptorChain(): Response { // 建立一个拦截器列表 val interceptors = mutableListOf<Interceptor>() // 用户设置的所有应用拦截器 interceptors += client.interceptors // 处理错误恢复和重定向的拦截器 interceptors += RetryAndFollowUpInterceptor(client) // 桥接拦截器,桥接应用层和网络层代码 interceptors += BridgeInterceptor(client.cookieJar) // 缓存拦截器 interceptors += CacheInterceptor(client.cache) // 服务器连接拦截器 interceptors += ConnectInterceptor if (!forWebSocket) { // 用户设置的所有网络拦截器 interceptors += client.networkInterceptors } // 服务器请求拦截器 interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis )
var calledNoMoreExchanges = false try { //使用责任链模式开启链式调用 val response = chain.proceed(originalRequest) if (isCanceled()) { response.closeQuietly() throw IOException("Canceled") } //返回响应 return response } catch (e: IOException) { calledNoMoreExchanges = true throw noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { noMoreExchanges(null) } }}

我们在看下RealInterceptorChain的proceed方法:

@Throws(IOException::class) override fun proceed(request: Request): Response { ... // 复制一个RealInterceptorChain,用于调用链中的下一个拦截器 val next = copy(index = index + 1, request = request) val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS") // 调用下一个拦截器的intercept方法,获取response返回给上一个拦截器 val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null") ... return response }

接下来我们来具体看看各个拦截器的作用


1. RetryAndFollowUpInterceptor


RetryAndFollowUpInterceptor 处理错误恢复和重定向,它会判断错误是否满足条件进行重试,还有根据返回的响应判断是否需要重定向请求。

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain var request = chain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { // 初始化ExchangeFinder(后续ConnectInterceptor会用到ExchangeFinder来查找连接) call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response var closeActiveExchange = true try { if (call.isCanceled()) { throw IOException("Canceled") }
try { //执行下一个拦截器,获取响应 response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { ... // 满足条件则重试 continue } catch (e: IOException) { ... // 满足条件则重试 continue }
// 赋上重定向之前的响应(响应体置空) if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() }
val exchange = call.interceptorScopedExchange //判断是否需重定向,若需则返回重定向请求 val followUp = followUpRequest(response, exchange)
//不需要重定向则直接返回response if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response }
val followUpBody = followUp.body // 若该请求只可传输一次,则返回响应 if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response }
response.body?.closeQuietly()
//超过重定向最大次数则抛出异常 if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") }
//将请求重新赋值为重定向的请求,继续循环,再次发送 request = followUp priorResponse = response } finally { call.exitNetworkInterceptorExchange(closeActiveExchange) } } } ...}

2. BridgeInterceptor


BridgeInterceptor 桥接应用层和网络层的代码,对用户的请求进行加工(如对请求头进行设置添加),也对网络响应做相应的处理(如解压服务端返回的 gzip 压缩数据)。

class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { // 获取用户请求 val userRequest = chain.request() // 真正发送的网络请求的构建者 val requestBuilder = userRequest.newBuilder()
// 用户请求的请求体 val body = userRequest.body // 对请求头的设置 ... var transparentGzip = false if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true requestBuilder.header("Accept-Encoding", "gzip") }
val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) }
if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) }
// 执行下一个拦截器,获取网络响应 val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder() .request(userRequest)
// 若因配置问题,服务端返回gzip压缩的数据,则做相应的解压缩 if (transparentGzip && "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) && networkResponse.promisesBody()) { val responseBody = networkResponse.body if (responseBody != null) { // GzipSource对象,用于解压 val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build() responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type") responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer())) } }
return responseBuilder.build() }}

3. CacheInterceptor


CacheInterceptor 承担着缓存的查找与保存的职责。根据策略判断是使用缓存还是走网络请求,对于返回的响应,满足条件则进行缓存。

class CacheInterceptor(internal val cache: Cache?) : Interceptor {
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val call = chain.call() val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 检查缓存策略 val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() // 若还需发送网络请求,则networkRequest不为空 val networkRequest = strategy.networkRequest // 若存在可用缓存,则cacheResponse不为空 val cacheResponse = strategy.cacheResponse
...
// 如果我们被禁止使用网络,并且无可用缓存,则返回失败 if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } }
// 如果不需要网络请求,缓存可用,则返回缓存 if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } }
...
var networkResponse: Response? = null try { // 若无缓存可用,则执行下一个拦截器,获取响应 networkResponse = chain.proceed(networkRequest) } finally { if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } }
// 如果我们还有缓存响应,且网络响应code为304,则更新缓存响应,并返回 if (cacheResponse != null) { if (networkResponse?.code == HTTP_NOT_MODIFIED) { // 合并响应头、更新为网络请求时间和网络响应时间等 val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()
networkResponse.body!!.close()
cache!!.trackConditionalCacheHit() // 更新缓存 cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { cacheResponse.body?.closeQuietly() } }
// 包装网络响应 val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()
// 若用户配置了缓存 if (cache != null) { // 判断是否满足缓存条件 if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // 将网络响应写入缓存,并返回 val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { listener.cacheMiss(call) } } }
// 根据请求方法判断是否为无效请求,是则从缓存移除相对应响应 if (HttpMethod.invalidatesCache(networkRequest.method)) { try { cache.remove(networkRequest) } catch (_: IOException) { // cache无法被写 } } }
return response }}

4. ConnectInterceptor


ConnectInterceptor 主要是给网络请求提供一个连接,并交给下一个拦截器处理,这里还没有发送请求到服务器获取响应。在获取连接对象的时候,使用了连接池ConnectionPool来复用连接。

object ConnectInterceptor : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain // 查找新连接或池里的连接以承载即将到来的请求和响应 val exchange = realChain.call.initExchange(chain) val connectedChain = realChain.copy(exchange = exchange) return connectedChain.proceed(realChain.request) }}

ConnectInterceptor 看似代码很少,其实代码都在深处,看下initExchange方法

internal fun initExchange(chain: RealInterceptorChain): Exchange { ... // codec(ExchangeCodec) 是一个连接所用的编码解码器,用于编码HTTP请求和解码HTTP响应 val codec = exchangeFinder.find(client, chain) // result(Exchange)是封装这个编码解码器的一个工具类,用于管理ExchangeCodec,处理实际的 I/O val result = Exchange(this, eventListener, exchangeFinder, codec) ... return result}

ExchangeCodec持有连接,可通过其编码请求到服务端和获取服务端的响应并解码,我们依方法进入到最深处,看看是连接是如何获取的(代码已做简化处理)。

private fun findConnection(): RealConnection {
// 1、复用当前连接 val callConnection = call.connection if (callConnection != null) { //检查这个连接是否可用和可复用 if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } return callConnection }
//2、从连接池中获取可用连接 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
//3、从连接池中获取可用连接,通过一组路由routes(涉及知识点Http2多路复用) if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! return result } route = localRouteSelection.next()
// 4、创建新连接,进行tcp连接 val newConnection = RealConnection(connectionPool, route) newConnection.connect
// 5、再获取一次连接,在新建连接过程中可能有其他竞争连接被创建了,如可用防止浪费 if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { val result = call.connection!! // 关闭刚刚创建的新连接 newConnection.socket().closeQuietly() return result }
//6、还是要使用创建的新连接,放入连接池,并返回 connectionPool.put(newConnection) return newConnection}

5. CallServerInterceptor


CallServerInterceptor 是真正向服务器发起请求并获取响应的,它是拦责任链的最后一个拦截器,拿到响应后返回给上一个拦截器。代码已做简化(省略了很多条件判断和处理)。

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain // ConnectInterceptor获取到的,持有编码解码器 val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body val sentRequestMillis = System.currentTimeMillis()
var responseBuilder: Response.Builder? = null try { // 写入请求头 exchange.writeRequestHeaders(request)
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { if (...) { // 写入请求体 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } else { ... } } else { // 无请求体 exchange.noRequestBody() } } catch (e: IOException) {...}
try { if (responseBuilder == null) { // 读取响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() invokeStartEvent = false } } // 构建响应 var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code // 读取响应体 response = if (forWebSocket && code == 101) { response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() } ... return response } catch (e: IOException) { ... } }}

#总结


以上就是对OkHttp的源码解析,可以看出它是一个结构清晰的优质源码库,各个模块通过设计模式解耦。


总结下流程:首先通过OkHttpClient对象调用newCall方法得到RealCall实例,再通过调用RealCall的execute方法或enqueue方法,这两个方法最终都会调用到getResponseWithInterceptorChain方法,运用责任链模式,开始一层层传入各个拦截器,每个拦截器都有着自己都职责,最终在CallServerInterceptor发出请求并获取响应,然后层层返回响应。



技术交流群


最后,D哥也建了一个技术群,主要探讨一些新的技术和开源项目值不值得去研究及IDEA使用的“骚操作”,有兴趣入群的同学,可长按扫描下方二维码,一定要备注:城市+昵称+技术方向,根据格式备注,可更快被通过且邀请进群。



▲长按扫描


热门推荐:

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存