okhttp源码分析之同步、异步请求

xiaoxiao2021-02-28  22

核心类:

public interface Interceptor { Response intercept(Chain chain) throws IOException; interface Chain { Request request(); Response proceed(Request request) throws IOException; /** * Returns the connection the request will be executed on. This is only available in the chains * of network interceptors; for application interceptors this is always null. */ @Nullable Connection Connection(); } }

Interceptor 是个接口,包括一个方法intercept和一个接口Chain;接口Chain里有返回 Request的方法request(),返回Response 的方法proceed,返回Connection的方法 connection();大致可以判断,执行请求的方法是proceed,而且根据注释,Connection()只会在network interceptors(网络拦截器)有用,application interceptors(应用拦截器)永远是null;

写个例子:

Call call = mOkHttpClient.newBuilder(). retryOnConnectionFailure(true). writeTimeout(timeout, TimeUnit.SECONDS). retryOnConnectionFailure(true). build(). newCall(request);

call的两种使用: 1. call.enqueue(callback); 2. call.execute();

call.enqueue(callback)

@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }

关键在client.dispatcher().enqueue(new AsyncCall(responseCallback));

synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }

runningAsyncCalls和readyAsyncCalls都是Deque的双端队列。 如果运行中的异步调用(runningAsyncCalls)数比最大请求数小并且对于某个指定的host的运行中的异步调用(runningCallsForHost)数小于每个host的最大请求数,就把异步调用call放到集合runningAsyncCalls中去,并且执行。否则就异步调用call放到就绪的异步调用集合readyAsyncCalls中去。 Dispatcher的executorService()方法:

public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }

这里用了线程池,但是new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory(“OkHttp Dispatcher”, false));这一行,熟悉线程池的同学应该瞬间反应过来,这个就是CachedThreadPool的实现。开始以为是okhttp用runningAsyncCalls和readyAsyncCalls作为线程池中的异步任务队列,看到这里就知道不是了,还是用SynchronousQueue 无界阻塞队列完成的。CachedThreadPool线程池适合任务耗时少、任务数多的场景,当提交的任务数比线程处理的速度快时,会导致不停的创建线程,浪费CPU资源。当然,作为网络请求,应该是够了的,如果需要自定义,可以在Dispathcer中设置。因为SynchronousQueue接收的任务是Runnable,所以先预测AsyncCall肯定是实现了Runnable,找到AsyncCall继承了NamedRunnable:

public abstract class NamedRunnable implements Runnable { protected final String name; public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); } @Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } } protected abstract void execute(); }

NamedRunnable做了线程名处理;然后跑抽象方法execute()。直接找AsyncCall的实现吧:

@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }

这个execute()首先是在子线程跑的(runnable 中的run),其次做了两件事: 1. 发请求; 2. 处理response回调。 第2点不用多说了,实现1的关键一行: Response response = getResponseWithInterceptorChain();

Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }

有点懵逼,容我捋一捋。。。 好像就是做了Interceptor的一个list的填充,然后new 了个RealInterceptorChain,开启了所有操作。。。多说无益,硬头皮看RealInterceptorChain。proceed()方法吧。。

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); //。。。省略。。。 // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } //。。。省略。。。 return response; }

为节省篇幅,省略了一些代码,这些都是一些校验。剩下的部分。。挖槽,在RealInterceptorChain里又new 了一个RealInterceptorChain,不同的是,最外层的index是0,这里是index+1。从0开始,然后interceptor.intercept(chain)去处理拦截,这个chain都是RealInterceptorChain包装过的,对应index+1,index+1的RealInterceptorChain还会再次调用index+2的。。。一直到index达到interceptors.size()。也就是说,真正的链式调用在这里!getResponseWithInterceptorChain方法中开启了chain.proceed方法,会递归的从List的第一个Interceptor开始发送request,执行完后从最后一个Interceptor开始,向上返回response。再看看List里拦截器的顺序:

interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket));

也就是说,request先经过自定义的Interceptor,一直到CallServerInterceptor,然后从CallServerInterceptor 返回response一直到自定义的Interceptor。上个图:

call.execute:

@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } }

这个execute和上面那个不一样,上面是Runnable里的,这个是调用线程(可能会是主线程,所以要保证即使是同步线程,调用线程也应该是非主线程)里的,虽然client.dispatcher().executed(this);但是其实只是添加到了Deque队列里,真正执行请求的是 Response result = getResponseWithInterceptorChain();

转载请注明原文地址: https://www.6miu.com/read-2450336.html

最新回复(0)