Interceptor 是个接口,包括一个方法intercept和一个接口Chain;接口Chain里有返回 Request的方法request(),返回Response 的方法proceed,返回Connection的方法 connection();大致可以判断,执行请求的方法是proceed,而且根据注释,Connection()只会在network interceptors(网络拦截器)有用,application interceptors(应用拦截器)永远是null;
写个例子:
Call call = mOkHttpClient.newBuilder(). retryOnConnectionFailure(true). writeTimeout(timeout, TimeUnit.SECONDS). retryOnConnectionFailure(true). build(). newCall(request);call的两种使用: 1. call.enqueue(callback); 2. call.execute();
关键在client.dispatcher().enqueue(new AsyncCall(responseCallback));
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }runningAsyncCalls和readyAsyncCalls都是Deque的双端队列。 如果运行中的异步调用(runningAsyncCalls)数比最大请求数小并且对于某个指定的host的运行中的异步调用(runningCallsForHost)数小于每个host的最大请求数,就把异步调用call放到集合runningAsyncCalls中去,并且执行。否则就异步调用call放到就绪的异步调用集合readyAsyncCalls中去。 Dispatcher的executorService()方法:
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }这里用了线程池,但是new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory(“OkHttp Dispatcher”, false));这一行,熟悉线程池的同学应该瞬间反应过来,这个就是CachedThreadPool的实现。开始以为是okhttp用runningAsyncCalls和readyAsyncCalls作为线程池中的异步任务队列,看到这里就知道不是了,还是用SynchronousQueue 无界阻塞队列完成的。CachedThreadPool线程池适合任务耗时少、任务数多的场景,当提交的任务数比线程处理的速度快时,会导致不停的创建线程,浪费CPU资源。当然,作为网络请求,应该是够了的,如果需要自定义,可以在Dispathcer中设置。因为SynchronousQueue接收的任务是Runnable,所以先预测AsyncCall肯定是实现了Runnable,找到AsyncCall继承了NamedRunnable:
public abstract class NamedRunnable implements Runnable { protected final String name; public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); } @Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } } protected abstract void execute(); }NamedRunnable做了线程名处理;然后跑抽象方法execute()。直接找AsyncCall的实现吧:
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }这个execute()首先是在子线程跑的(runnable 中的run),其次做了两件事: 1. 发请求; 2. 处理response回调。 第2点不用多说了,实现1的关键一行: Response response = getResponseWithInterceptorChain();
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }有点懵逼,容我捋一捋。。。 好像就是做了Interceptor的一个list的填充,然后new 了个RealInterceptorChain,开启了所有操作。。。多说无益,硬头皮看RealInterceptorChain。proceed()方法吧。。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); //。。。省略。。。 // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } //。。。省略。。。 return response; }为节省篇幅,省略了一些代码,这些都是一些校验。剩下的部分。。挖槽,在RealInterceptorChain里又new 了一个RealInterceptorChain,不同的是,最外层的index是0,这里是index+1。从0开始,然后interceptor.intercept(chain)去处理拦截,这个chain都是RealInterceptorChain包装过的,对应index+1,index+1的RealInterceptorChain还会再次调用index+2的。。。一直到index达到interceptors.size()。也就是说,真正的链式调用在这里!getResponseWithInterceptorChain方法中开启了chain.proceed方法,会递归的从List的第一个Interceptor开始发送request,执行完后从最后一个Interceptor开始,向上返回response。再看看List里拦截器的顺序:
interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket));也就是说,request先经过自定义的Interceptor,一直到CallServerInterceptor,然后从CallServerInterceptor 返回response一直到自定义的Interceptor。上个图:
这个execute和上面那个不一样,上面是Runnable里的,这个是调用线程(可能会是主线程,所以要保证即使是同步线程,调用线程也应该是非主线程)里的,虽然client.dispatcher().executed(this);但是其实只是添加到了Deque队列里,真正执行请求的是 Response result = getResponseWithInterceptorChain();