likes
comments
collection
share

源码分析笔记——OkHttp

作者站长头像
站长
· 阅读数 7

1.写在前面

工作以来,大部分时间都在忙于各种业务。业务的实现固然重要,不过我仍然坚信注重基础知识的积累才是正道。想想用了这么久的OkHttp竟然没有仔细品读他的源码,深感惭愧。是时候研究一下了。

2.关于OkHttp

2.1 简介

引用项目官方的一句话:An HTTP & HTTP/2 client for Android and Java applications。OkHttp是一个处理网络请求的高性能框架,由Square公司贡献(该公司还贡献了Picasso) 项目首页地址

2.2 优势

  • 共享Socket,减少对服务器的请求次数
  • 通过连接池,减少了请求延迟;
  • 支持GZIP来减少数据流量;
  • 缓存响应数据来减少完全重复的网络请求;
  • 使用 OkHttp 无需重写您程序中的网络代码。OkHttp实现了几乎和java.net.HttpURLConnection一样的API;
  • OkHttp会从很多常用的连接问题中自动恢复。如果您的服务器配置了多个IP地址,当第一个IP连接失败的时候,会自动尝试下一个IP。

3.开始啃源码

注:本文代码梳理基于 v3.10.0 版本,建议从github上clone一份代码然后跟着本文一起梳理一遍。

3.1 先描述下大体的分析步骤

本文会以请求流程为主线梳理OKHttp的源码,之所以不深入代码细节是因为过分深钻代码实现本身没有太多的指导意义,并且容易让初学者陷入只见树木不见森林的尴尬场面。由于OkHttp发起请求的时候从是否同步的角度来讲可以分为同步或异步。所有接下来的源码梳理也从这两个方面来分析。先上一张流程图:

源码分析笔记——OkHttp 上图来自Piasy的博客。这张图对OkHttp的流程的详细程度让我实在找不出重画的理由,所以照搬过来啦(其实是我懒)。

3.2 同步请求

3.2.1 如何发起同步请求?

完整代码如下:

OkHttpClient client = new OkHttpClient
                        .Builder().build();

String run(String url) throws IOException {
  Request request = new Request.Builder()
      .url(url)
      .build();

  Response response = client.newCall(request).execute();
  return response.body().string();
}

3.2.2 同步请求涉及到的类

按照3.2.1小节的demo,我们先来分析用到的这几个类分别扮演了什么样的角色,然后按照请求的流程将所有知识串联起来。

3.2.2.1 Request类

引用一下官方的类描述: An HTTP request. Instances of this class are immutable if their is null or itself immutable.很明显,这个类代表一个HTTP请求。来看下他的源码:

public final class Request {
  final HttpUrl url;
  final String method;
  final Headers headers;
  final @Nullable RequestBody body;
  final Object tag;

  private volatile CacheControl cacheControl; // Lazily initialized.

  Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tag = builder.tag != null ? builder.tag : this;
  }
  ...
  public static class Builder {
    HttpUrl url;
    String method;
    Headers.Builder headers;
    RequestBody body;
    Object tag;

    public Builder() {
      this.method = "GET";
      this.headers = new Headers.Builder();
    }

    Builder(Request request) {
      this.url = request.url;
      this.method = request.method;
      this.body = request.body;
      this.tag = request.tag;
      this.headers = request.headers.newBuilder();
    }

    public Builder url(HttpUrl url) {
      if (url == null) throw new NullPointerException("url == null");
      this.url = url;
      return this;
    }
    ...

上述代码经过筛选,咱们只看关键的部分。由代码可见,Request的主要作用是用来描述一次HTTP请求,这个类中包含请求的地址、请求方法、请求体等类型数据,没什么难度。强调一点:构建Request这个类的使用需要使用内部类Builder,很明显,典型的建造器模式。

3.2.2.2 OkHttpClient类

顾名思义,这个类是OKHttp的一个“客户端角色”,按照官方介绍,他其实是一个“Call”的工厂。什么是“Call”呢?其实是一次请求的过程,这里先略过,后面再介绍。一起来看下OkHttpClient的源码。

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {

   public OkHttpClient() {
    this(new Builder());
  }

  OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;
    ...
  }
  
 ...

   public static final class Builder {
    Dispatcher dispatcher;
    @Nullable Proxy proxy;
    List<Protocol> protocols;
    List<ConnectionSpec> connectionSpecs;
    final List<Interceptor> interceptors = new ArrayList<>();
    final List<Interceptor> networkInterceptors = new ArrayList<>();
    EventListener.Factory eventListenerFactory;
    ProxySelector proxySelector;
    CookieJar cookieJar;
    @Nullable Cache cache;
    @Nullable InternalCache internalCache;
    SocketFactory socketFactory;
    @Nullable SSLSocketFactory sslSocketFactory;
    @Nullable CertificateChainCleaner certificateChainCleaner;
    HostnameVerifier hostnameVerifier;
    CertificatePinner certificatePinner;
    Authenticator proxyAuthenticator;
    Authenticator authenticator;
    ConnectionPool connectionPool;
    Dns dns;
    boolean followSslRedirects;
    boolean followRedirects;
    boolean retryOnConnectionFailure;
    int connectTimeout;
    int readTimeout;
    int writeTimeout;
    int pingInterval;

    public Builder() {
      dispatcher = new Dispatcher();
     ...
    }
}

上述代码经过精简,同样的,咱们只挑重点的看,OkHttpClient同样提供了建造器模式来创建一个实例,主要对拦截器、读写超时等属性进行配置。这里要注意,OkHttpClient的Builder已经为我们提供了默认的配置,我们只需要按需重写自定义的属性就好。

3.2.2.3 Call类

直白来讲Call代表一次请求的过程,按照官方描述,这个类代表一个已经准备好将要被执行的请求。并且同一个Call实例不可以被执行两次。这里注意一点,Call可以通过原型模式复制一个与自己相同内容的对象。

/**
 * A call is a request that has been prepared for execution. A call can be canceled. As this object
 * represents a single request/response pair (stream), it cannot be executed twice.
 */
public interface Call extends Cloneable {
  Request request();

  Response execute() throws IOException;

  void cancel();

  boolean isExecuted();

  boolean isCanceled();

  Call clone();

  interface Factory {
    Call newCall(Request request);
  }
}

3.2.3 同步请求流程

按照3.2.1小节的demo,一起来梳理一下同步请求的流程,顺便把3.2.2小节中提到的知识点串联一下。 发起请求的时候,典型应用代码为:Response response = client.newCall(request).execute(); 所以一起来看下OkHttpClientnewCall方法做了什么事情:

OkHttpClient的newCall方法内容:

 /**
   * Prepares the {@code request} to be executed at some point in the future.
   */
  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

上述代码可以看出newCall方法直接调用了RealCall.newRealCall方法。这个RealCall类实现了Call这个接口。来看下RealCall.newRealCall的内容。

RealCall.newRealCall方法体:

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

上述代码很清晰,newRealCall方法直接new了一个RealCall对象并且返回了。

分析到这里,咱们接着最开始的典型应用代码分析Response response = client.newCall(request).execute();综合上述的代码分析,这段代码可以替换成伪代码Response response = RealCall.execute();。所以下面一起来看下RealCall中的execute方法吧。

RealCall.execute方法:

 @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }

重点分析上述代码两个地方:

  • client.dispatcher().executed(this);,这句代码中用client中的Dispatcher对当前Call进行了一次标记。这个方法内部用一个名叫runningSyncCalls的Deque对当前的Call进行了存储。由于Dispatcher在同步调用的过程中并不是主角,所以这里先占个坑后面分析异步调用的时候在分析一下。
  • Response result = getResponseWithInterceptorChain();我个人认为,这句调用才是同步调用乃至整个OkHttp的灵魂所在。下面重点分析一下。

来看下getResponseWithInterceptorChain()这个方法的方法体:

 Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

想必大家即使没有阅读过OkHttp的源码,一定也有所耳闻,OkHttp的精髓在于拦截器的使用。没错getResponseWithInterceptorChain这个方法就是负责将责任链上的每一个拦截器进行串联的方法。该方法的逻辑也比较通俗易懂,将开发者定义的拦截器与OkHttp内置的拦截器封装成一个List然后塞给了RealInterceptorChain,最后执行RealInterceptorChain的proceed方法。那这个RealInterceptorChain是什么呢?通俗一点来讲,这个类是一个具体的拦截器链,没错这是典型的责任链模式。

RealInterceptorChain类代码:

/**
 * A concrete interceptor chain that carries the entire interceptor chain: all application
 * interceptors, the OkHttp core, all network interceptors, and finally the network caller.
 */
public final class RealInterceptorChain implements Interceptor.Chain {
  private final List<Interceptor> interceptors;
  private final StreamAllocation streamAllocation;
  private final HttpCodec httpCodec;
  private final RealConnection connection;
  private final int index;
  private final Request request;
  private final Call call;
  private final EventListener eventListener;
  private final int connectTimeout;
  private final int readTimeout;
  private final int writeTimeout;
  private int calls;

  public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
      HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
      EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.connection = connection;
    ...
  }


  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }
    
    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }
    return response;
  }
}

一开始分析到这里的时候,我有点蒙圈。弄了好久没弄懂这个责任链是怎么执行的。最后还是搬出了断点打法,终于看到了光明。再复杂的责任链(由于本人比较水,所以觉得OkHttp的责任链是比较复杂的,欢迎拍砖)把握好两点就够了:

  • 怎样获取的责任链上下一环的实例
  • 责任链是怎样串行执行的

带着上述的问题进行断点大法,终于找到了以下核心代码:

// Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

这段代码使用递归调用,如果责任链上还有下一环的拦截器实例,则取出该实例,并且调用该实例的intercept方法。前面分析getResponseWithInterceptorChain这个方法的时候对拦截器进行过介绍。OkHttp中所有拦截器都需要实现Interceptor这个接口。 这个接口唯一的方法便是Response intercept(Chain chain) throws IOException;

由于是递归调用,那什么时候结束呢?带着这个问题,我们再次看下getResponseWithInterceptorChain方法中添加的拦截器。为了方便大家查看,再次贴一下这个方法的源码吧

   Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

从方法体可以看到,OkHttp在添加了开发者自定义的Interceptor后,先后添加了RetryAndFollowUpInterceptor...CallServerInterceptor等拦截器。这里先给出一个结论,拦截器中的责任链在CallServerInterceptor这个类中结束。这个结论是怎么得到的呢?来看下OkHttp第一个原生拦截器RetryAndFollowUpInterceptor的代码:前面提到过OKHttp所有的拦截器都实现了Interceptor接口。所以直接来看下Interceptor中核心的方法intercept吧。

RetryAndFollowUpInterceptor拦截器中的intercept方法:

 @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } 
      ...
    }
  }

看到没response = realChain.proceed(request, streamAllocation, null, null);通过这句代码,开始调用RealInterceptorChainproceed方法,而RealInterceptorChain方法的proceed具体逻辑是取出责任链上下一个拦截器并执行该拦截器的intercept方法,这样形成了一个具体的循环逻辑,责任链开始完美的执行。继续说下责任链的结束,由于CallServerInterceptor这个拦截器是责任链的最后一环,我们看下他的代码。

CallServerInterceptor的intercept方法

@Override public Response intercept(Chain chain) throws IOException {
  HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
  StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
  Request request = chain.request();

  long sentRequestMillis = System.currentTimeMillis();
  httpCodec.writeRequestHeaders(request);

  if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
    BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
    request.body().writeTo(bufferedRequestBody);
    bufferedRequestBody.close();
  }

  httpCodec.finishRequest();

  Response response = httpCodec.readResponseHeaders()
      .request(request)
      .handshake(streamAllocation.connection().handshake())
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build();

  if (!forWebSocket || response.code() != 101) {
    response = response.newBuilder()
        .body(httpCodec.openResponseBody(response))
        .build();
  }

  if ("close".equalsIgnoreCase(response.request().header("Connection"))
      || "close".equalsIgnoreCase(response.header("Connection"))) {
    streamAllocation.noNewStreams();
  }

  // 省略部分检查代码

  return response;

上述代码可以看到,在CallServerInterceptor拦截器中,会调用HttpCodec这个类来完成最终的请求。这里简单说下HttpCodec这个接口,在OkHttp中有两个实现类Http1Codec与Http2Codec,这两个实现类对应着Http1.x与Http2.x的请求。他们内部的具体请求都依赖于Okio,而Okio内部也是基于Socket的封装。回到原来的话题,CallServerInterceptor的intercept方法执行后并没有再次调用RealInterceptorChain的process方法,而是直接将Response这个响应体返回了。所以拦截器的责任链到这里终止寻找下一环,并且将Response逆向返回。这个时候代码执行完了拦截器的责任链,最终再次返回到RealCall的execute方法中

用张图总结一下拦截器链的调用过程:

源码分析笔记——OkHttp

至此OkHttp的同步流程就介绍完了。

3.3 异步请求

3.3.1 如何发起异步请求

异步请求的典型代码

Request request = new Request.Builder()
                .url(url)
                .build();
        client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                //TODO 失败的回调函数
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                //TODO 成功的回调函数
            }
        });

3.3.2 异步请求涉及到的类

3.3.2.1 Dispatcher类

同步请求的时候咱们短暂的遇到过这个类,但是通过请求中这个类不是主角,他仅仅将同步请求的Call存储到了一个名叫runningSyncCallsArrayDeque中。但是在异步请求中,Dispatcher类还是承担了很重要的角色。

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;
  private @Nullable ExecutorService executorService;
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }
  public Dispatcher() {
  }
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
  ...
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

  public synchronized void cancelAll() {
    for (AsyncCall call : readyAsyncCalls) {
      call.get().cancel();
    }

    for (AsyncCall call : runningAsyncCalls) {
      call.get().cancel();
    }

    for (RealCall call : runningSyncCalls) {
      call.cancel();
    }
  }
...
}

从上述Dispatcher的源码来看,该类用final关键字修饰,表示该类不可以被继承。他的内部有个public synchronized ExecutorService executorService()方法。很明显这个方法设置了一下ThreadPoolExecutor线程池。异步的任务都将放到这个线程池中执行。这与同步调用直接通过拦截器链的方式还是有明显差别的。

3.3.2.2 AsyncCall类

这个类继承自NamedRunnable类,而NamedRunnable类其实是对Runnable的简单封装,主要是为当前执行的线程设置一下名称。这里就不展开对NamedRunnable的介绍了。下面看下AsyncCall的源码。

  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

当线程池调用AsyncCall的时候,execute方法会被执行,看下execute的内容,天啦噜,还是通过getResponseWithInterceptorChain方法发起的请求,这跟同步调用是一样一样滴。

3.3.3 异步请求流程梳理

还是从3.3.1小节的demo开始,一起来梳理一下异步流程。 开始调用的时候跟同步流程一样,都是经过client.newCall(request)的调用,很明显这一步已经走到了RealCall内部。再来看下RealCallenqueue方法。

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

enqueue方法会检查当前这个Call是否被请求过,如果已经被请求过,则会抛出异常。最后,该方法会将AsyncCall对象直接交给Dispatcher来执行,上面3.3.2对Dispatcher的介绍已经分析过了,Dispatcherenqueue方法会直接用executorService方法初始化的线程池执AsyncCall这个经过封装的 Runnable。在AsyncCall内部会完成开发者自定义的Callback回调。至此异步流程梳理完毕。

写在最后

本篇是我第一次写源码分析类的博客,写的可能比较乱。欢迎拍砖。

戒骄、戒躁、戒安逸。共勉~


About Me