OkHttp 用了那么久,你竟然不知道它的原理?
大家好,我是D哥
点击关注下方公众号,Java面试资料 都在这里
来源:https://www.jianshu.com/p/76d79a1407aa
最近,很多朋友反映大厂的面试喜欢挖底层知识,像OkHttp这些都是必问的问题。这里就给大家分享一篇非常有帮助的技术文吧。
OkHttp是一个高效的HTTP库:
支持HTTP / 2,允许对同一主机的所有请求共享一个套接字
通过连接池可减少请求延迟(如果HTTP / 2不可用)
支持GZIP压缩减少数据流量
响应缓存可以完全避免网络重复请求
静默恢复处理常见的连接问题
…
本文就以请求使用为入口,来深入学习下OkHttp。
#请求流程分析
同步请求
Okhttp同步GET请求使用:
// 新建一个Okhttp客户端(也可以通过OkHttpClient.Builder来构造)
OkHttpClient client = new OkHttpClient();
// 构造一个请求对象
Request request = new Request.Builder().url(url).build();
// 执行同步请求,返回响应
Response response = client.newCall(request).execute();
// 从响应体中获取数据
String str = response.body().string();
先来瞧瞧构建OkhttpClient的源码:
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
//若直接实例化OkHttpClient,则调用主构造函数以默认Builder作为参数
constructor() : this(Builder())
// 通过builder中的值赋值
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool
...
class Builder constructor() {
//分发器
internal var dispatcher: Dispatcher = Dispatcher()
//连接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//应用拦截器集合
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络拦截器集合
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//事件监听工厂
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
//连接失败是否重试
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
//是否跟随重定向
internal var followRedirects = true
internal var followSslRedirects = true
//cookie
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
//磁盘缓存
internal var cache: Cache? = null
//dns
internal var dns: Dns = Dns.SYSTEM
//代理设置
internal var proxy: Proxy? = null
...
}
}
OkhttpClient可以通过构造者配置参数来构建,也可以直接实例化,直接实例化其实也是内部调用构造者,只是传入的是默认builder。
再来看看OkhttpClient的新调用方法
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
发现返回的是RealCall,接下来去RealCall中看后续的execute执行方法
override fun execute(): Response {
//确认call没有执行过并置executed为true,否则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
//标记执行中
client.dispatcher.executed(this)
//通过拦截器链获取网络响应
return getResponseWithInterceptorChain()
} finally {
//标记执行结束
client.dispatcher.finished(this)
}
}
看起来比较精简,通过拦截器链获取网络响应,然后返回响应(拦截器链路在后续拦截器分析)。
异步请求
Okhttp异步GET请求使用:
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
// 执行异步请求,通过回调返回响应
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
// 从回调中通过响应体获取数据
String str = response.body().string();
}
});
异步请求流程大致和同步请求相似,但是最后的执行方法是enqueue,并传入回调对象。
我们来看看源码:
override fun enqueue(responseCallback: Callback) {
//确认call没有执行过并置executed为true,否则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//监听回调
callStart()
//调用Dispatcher的enqueue方法,并传入一个AsyncCall对象
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
内部调用了客户端的分发器的enqueue方法,并把AsyncCall(responseCallback)作为参数传入,AsyncCall是继承自Runnable,且是RealCall的内部类,我们先看Dispatcher.enqueue()方法
class Dispatcher constructor() {
/** '异步准备执行'队列 */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** '异步正在执行'队列,包括取消但至今还没结束的 */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** ‘同步正在执行’队列*/
private val runningSyncCalls = ArrayDeque<RealCall>()
...
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//加入准备执行队列
readyAsyncCalls.add(call)
...
}
// 执行
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//检查请求是否超过最大请求数
if (runningAsyncCalls.size >= this.maxRequests) break
//检查请求是否超过一个Host对应的最大请求数
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//调用AsyncCall的executeOn()方法
asyncCall.executeOn(executorService)
}
return isRunning
}
}
可以从上面代码看出,就是将符合条件的调用从readyAsyncCalls队列提升到runningAsyncCalls,并调用 AsyncCall的executeOn() 方法,把线程池传入。
我们来看看AsyncCall:
inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//使用线程池执行自己的run()方法
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
...
//失败回调
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//标记结束
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//和同步请求一样,通过拦截器链获取网络响应
val response = getResponseWithInterceptorChain()
signalledCallback = true
//回调成功
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
...
} else {
//回调失败
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
...
//回调失败
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
//标记结束
client.dispatcher.finished(this)
}
}
}
}
使用线程池来执行自己,接下来就看run()方法,发现和同步请求一样,通过拦截器链获取网络响应,再调用回调对象的回调方法返回响应。
#拦截器分析
请求大致流程知道了,我们来看看重头戏,拦截器链里面做了什么操作。
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// 建立一个拦截器列表
val interceptors = mutableListOf<Interceptor>()
// 用户设置的所有应用拦截器
interceptors += client.interceptors
// 处理错误恢复和重定向的拦截器
interceptors += RetryAndFollowUpInterceptor(client)
// 桥接拦截器,桥接应用层和网络层代码
interceptors += BridgeInterceptor(client.cookieJar)
// 缓存拦截器
interceptors += CacheInterceptor(client.cache)
// 服务器连接拦截器
interceptors += ConnectInterceptor
if (!forWebSocket) {
// 用户设置的所有网络拦截器
interceptors += client.networkInterceptors
}
// 服务器请求拦截器
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
//使用责任链模式开启链式调用
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
//返回响应
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
我们在看下RealInterceptorChain的proceed方法:
@Throws(IOException::class)
override fun proceed(request: Request): Response {
...
// 复制一个RealInterceptorChain,用于调用链中的下一个拦截器
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
// 调用下一个拦截器的intercept方法,获取response返回给上一个拦截器
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
...
return response
}
接下来我们来具体看看各个拦截器的作用
1. RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor 处理错误恢复和重定向,它会判断错误是否满足条件进行重试,还有根据返回的响应判断是否需要重定向请求。
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
// 初始化ExchangeFinder(后续ConnectInterceptor会用到ExchangeFinder来查找连接)
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
//执行下一个拦截器,获取响应
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
...
// 满足条件则重试
continue
} catch (e: IOException) {
...
// 满足条件则重试
continue
}
// 赋上重定向之前的响应(响应体置空)
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
//判断是否需重定向,若需则返回重定向请求
val followUp = followUpRequest(response, exchange)
//不需要重定向则直接返回response
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
val followUpBody = followUp.body
// 若该请求只可传输一次,则返回响应
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
//超过重定向最大次数则抛出异常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
//将请求重新赋值为重定向的请求,继续循环,再次发送
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
...
}
2. BridgeInterceptor
BridgeInterceptor 桥接应用层和网络层的代码,对用户的请求进行加工(如对请求头进行设置添加),也对网络响应做相应的处理(如解压服务端返回的 gzip 压缩数据)。
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// 获取用户请求
val userRequest = chain.request()
// 真正发送的网络请求的构建者
val requestBuilder = userRequest.newBuilder()
// 用户请求的请求体
val body = userRequest.body
// 对请求头的设置
...
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
// 执行下一个拦截器,获取网络响应
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 若因配置问题,服务端返回gzip压缩的数据,则做相应的解压缩
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
// GzipSource对象,用于解压
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
}
3. CacheInterceptor
CacheInterceptor 承担着缓存的查找与保存的职责。根据策略判断是使用缓存还是走网络请求,对于返回的响应,满足条件则进行缓存。
class CacheInterceptor(internal val cache: Cache?) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 检查缓存策略
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
// 若还需发送网络请求,则networkRequest不为空
val networkRequest = strategy.networkRequest
// 若存在可用缓存,则cacheResponse不为空
val cacheResponse = strategy.cacheResponse
...
// 如果我们被禁止使用网络,并且无可用缓存,则返回失败
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// 如果不需要网络请求,缓存可用,则返回缓存
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
...
var networkResponse: Response? = null
try {
// 若无缓存可用,则执行下一个拦截器,获取响应
networkResponse = chain.proceed(networkRequest)
} finally {
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// 如果我们还有缓存响应,且网络响应code为304,则更新缓存响应,并返回
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
// 合并响应头、更新为网络请求时间和网络响应时间等
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
cache!!.trackConditionalCacheHit()
// 更新缓存
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
// 包装网络响应
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
// 若用户配置了缓存
if (cache != null) {
// 判断是否满足缓存条件
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 将网络响应写入缓存,并返回
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
// 根据请求方法判断是否为无效请求,是则从缓存移除相对应响应
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// cache无法被写
}
}
}
return response
}
}
4. ConnectInterceptor
ConnectInterceptor 主要是给网络请求提供一个连接,并交给下一个拦截器处理,这里还没有发送请求到服务器获取响应。在获取连接对象的时候,使用了连接池ConnectionPool来复用连接。
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// 查找新连接或池里的连接以承载即将到来的请求和响应
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
ConnectInterceptor 看似代码很少,其实代码都在深处,看下initExchange方法
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...
// codec(ExchangeCodec) 是一个连接所用的编码解码器,用于编码HTTP请求和解码HTTP响应
val codec = exchangeFinder.find(client, chain)
// result(Exchange)是封装这个编码解码器的一个工具类,用于管理ExchangeCodec,处理实际的 I/O
val result = Exchange(this, eventListener, exchangeFinder, codec)
...
return result
}
ExchangeCodec持有连接,可通过其编码请求到服务端和获取服务端的响应并解码,我们依方法进入到最深处,看看是连接是如何获取的(代码已做简化处理)。
private fun findConnection(): RealConnection {
// 1、复用当前连接
val callConnection = call.connection
if (callConnection != null) {
//检查这个连接是否可用和可复用
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
return callConnection
}
//2、从连接池中获取可用连接
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
//3、从连接池中获取可用连接,通过一组路由routes(涉及知识点Http2多路复用)
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
return result
}
route = localRouteSelection.next()
// 4、创建新连接,进行tcp连接
val newConnection = RealConnection(connectionPool, route)
newConnection.connect
// 5、再获取一次连接,在新建连接过程中可能有其他竞争连接被创建了,如可用防止浪费
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
// 关闭刚刚创建的新连接
newConnection.socket().closeQuietly()
return result
}
//6、还是要使用创建的新连接,放入连接池,并返回
connectionPool.put(newConnection)
return newConnection
}
5. CallServerInterceptor
CallServerInterceptor 是真正向服务器发起请求并获取响应的,它是拦责任链的最后一个拦截器,拿到响应后返回给上一个拦截器。代码已做简化(省略了很多条件判断和处理)。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// ConnectInterceptor获取到的,持有编码解码器
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
var responseBuilder: Response.Builder? = null
try {
// 写入请求头
exchange.writeRequestHeaders(request)
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
if (...) {
// 写入请求体
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
} else {
...
}
} else {
// 无请求体
exchange.noRequestBody()
}
} catch (e: IOException) {...}
try {
if (responseBuilder == null) {
// 读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
// 构建响应
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
// 读取响应体
response = if (forWebSocket && code == 101) {
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
...
return response
} catch (e: IOException) {
...
}
}
}
#总结
以上就是对OkHttp的源码解析,可以看出它是一个结构清晰的优质源码库,各个模块通过设计模式解耦。
总结下流程:首先通过OkHttpClient对象调用newCall方法得到RealCall实例,再通过调用RealCall的execute方法或enqueue方法,这两个方法最终都会调用到getResponseWithInterceptorChain方法,运用责任链模式,开始一层层传入各个拦截器,每个拦截器都有着自己都职责,最终在CallServerInterceptor发出请求并获取响应,然后层层返回响应。
最后,D哥也建了一个技术群,主要探讨一些新的技术和开源项目值不值得去研究及IDEA使用的“骚操作”,有兴趣入群的同学,可长按扫描下方二维码,一定要备注:城市+昵称+技术方向,根据格式备注,可更快被通过且邀请进群。
▲长按扫描