OKHttp源码解析(一)

xiaoxiao2021-02-28  62

一、OKHttp的综述

OkHttp是一个高效的Http客户端,有如下的特点:

支持HTTP2/SPDY黑科技socket自动选择最好路线,并支持自动重连拥有自动维护的socket连接池,减少握手次数拥有队列线程池,轻松写并发拥有Interceptors轻松处理请求与响应(比如透明GZIP压缩,LOGGING)

基于Headers的缓存策略

Okhttp的整体流程图

二、网络请求

用OkHttpClient.newCall(request)进行execute/enenqueue时,实际是将请求Call放到了Dispatcher中,okhttp使用Dispatcher进行线程分发,它有两种方法,一个是普通的同步单线程;另一种是使用了队列进行并发任务的分发(Dispatch)与回调。接下来分析第二种并发任务的分发。 1. Dispatcher的结构 maxRequests = 64: 最大并发请求数为64 maxRequestsPerHost = 5: 每个主机最大请求数为5 Dispatcher: 分发者,也就是生产者(默认在主线程) AsyncCall: 队列中需要处理的Runnable(包装了异步回调接口) ExecutorService:消费者池(也就是线程池) Deque:缓存(用数组实现,可自动扩容,无大小限制) Deque:正在运行的任务,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存

根据生产者消费者模型的模型理论,当入队(enqueue)请求时,如果满足(runningRequests<64 && runningRequestsPerHost<5),那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行。如果消费者缓存满了,就放入readyAsyncCalls进行缓存等待。 当任务执行完成后,调用finished的promoteCalls()函数,手动移动缓存区(可以看出这里是主动清理的,因此不会发生死锁) 当我们希望使用OkHttp的异步请求时,一般进行如下构造

OkHttpClient client = new OkHttpClient.Builder().build(); Request request = new Request.Builder() .url("http://qq.com").get().build(); client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } });

根据代码,我们可以发现实际上是Dispatcher进行了入队操作

synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { //添加正在运行的请求 runningAsyncCalls.add(call); //线程池执行请求 executorService().execute(call); } else { //添加到缓存队列排队等待 readyAsyncCalls.add(call); } }

如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。 请求元素RealCall#execute(它实现了Runnable接口),它内部实现的execute方法如下

@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }

调用 getResponseWithInterceptorChain() 函数获取 HTTP 返回结果,从函数名可以看出,这一步还会进行一系列“拦截”操作。

Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }

Interceptor 是 OkHttp 最核心的一个东西,不要误以为它只负责拦截请求进行一些额外的处理(例如 cookie),实际上它把实际的网络请求、缓存、透明压缩等功能都统一了起来,每一个功能都只是一个 Interceptor,它们再连接成一个 Interceptor.Chain,环环相扣,最终圆满完成一次网络请求。

三、Interceptor责任链

以下是主要的拦截器的种类: RetryAndFollowUpInterceptor 负责失败重试以及重定向的 BridgeInterceptor 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应的 CacheInterceptor 负责读取缓存直接返回、更新缓存的 ConnectInterceptor 负责和服务器建立连接的 networkInterceptors 配置 OkHttpClient 时设置的 CallServerInterceptor 负责向服务器发送请求数据、从服务器读取响应数据的

1、ConnectInterceptor(建立连接) 核心代码拦截如下:

@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); }

创建了一个 HttpCodec 对象,它是对 HTTP 协议操作的抽象,有两个实现:Http1Codec 和 Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。 2、CallServerInterceptor(发送和接收数据) 核心代码拦截如下:

@Override public Response intercept(Chain chain) throws IOException { HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream(); StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation(); Request request = chain.request(); long sentRequestMillis = System.currentTimeMillis(); httpCodec.writeRequestHeaders(request); if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength()); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } httpCodec.finishRequest(); Response response = httpCodec.readResponseHeaders() .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); if (!forWebSocket || response.code() != 101) { response = response.newBuilder() .body(httpCodec.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } // 省略部分检查代码 return response; }

向服务器发送 request header; 如果有 request body,就向服务器发送; 读取 response header,先构造一个 Response 对象; 如果有 response body,就在 3 的基础上加上 body 构造一个新的 Response 对象;

四、连接池

HTTP中的keepalive连接在网络性能优化中,对于延迟降低与速度提升的有非常重要的作用。 Okhttp支持5个并发KeepAlive连接,默认链路生命为5分钟(链路空闲后,保持存活的时间)

1、连接池的的关键对象:

Connection: 对jdk的socket物理连接的包装,它内部有List

//Socket清理的Runnable,每当put操作时,就会被主动调用 //注意put操作是在网络线程 //而Socket清理是在`OkHttp ConnectionPool`线程池中调用 while (true) { //执行清理并返回下场需要清理的时间 long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { synchronized (ConnectionPool.this) { try { //在timeout内释放锁与时间片 ConnectionPool.this.wait(TimeUnit.NANOSECONDS.toMillis(waitNanos)); } catch (InterruptedException ignored) { } } } }

这段死循环实际上是一个阻塞的清理任务,首先进行清理(clean),并返回下次需要清理的间隔时间,然后调用wait(timeout)进行等待以释放锁与时间片,当等待时间到了后,再次进行清理,并返回下次要清理的间隔时间

long cleanup(long now) { int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE; //遍历`Deque`中所有的`RealConnection`,标记泄漏的连接 synchronized (this) { for (RealConnection connection : connections) { // 查询此连接内部StreamAllocation的引用数量 if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++; continue; } idleConnectionCount++; //选择排序法,标记出空闲连接 long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { //如果(`空闲socket连接超过5个` //且`keepalive时间大于5分钟`) //就将此泄漏连接从`Deque`中移除 connections.remove(longestIdleConnection); } else if (idleConnectionCount > 0) { //返回此连接即将到期的时间,供下次清理 //这里依据是在上文`connectionBecameIdle`中设定的计时 return keepAliveDurationNs - longestIdleDurationNs; } else if (inUseConnectionCount > 0) { //全部都是活跃的连接,5分钟后再次清理 return keepAliveDurationNs; } else { //没有任何连接,跳出循环 cleanupRunning = false; return -1; } } //关闭连接,返回`0`,也就是立刻再次清理 closeQuietly(longestIdleConnection.socket()); return 0; }

遍历Deque中所有的RealConnection,标记泄漏的连接 如果被标记的连接满足(空闲socket连接超过5个&&keepalive时间大于5分钟),就将此连接从Deque中移除,并关闭连接,返回0,也就是将要执行wait(0),提醒立刻再次扫描 如果(目前还可以塞得下5个连接,但是有可能泄漏的连接(即空闲时间即将达到5分钟)),就返回此连接即将到期的剩余时间,供下次清理 如果(全部都是活跃的连接),就返回默认的keep-alive时间,也就是5分钟后再执行清理 如果(没有任何连接),就返回-1,跳出清理的死循环 这里的“并发”==(“空闲”+“活跃”)==5 标记并找到最不活跃的连接:pruneAndGetAllocationCount()方法中依据弱引用是否为null而判断这个连接是否泄漏

参考链接:https://blog.piasy.com/2016/07/11/Understand-OkHttp/

转载请注明原文地址: https://www.6miu.com/read-2631432.html

最新回复(0)