专业的编程技术博客社区

网站首页 > 博客文章 正文

Okhttp入门到精通(三)-调度器Dispatcher

baijin 2024-08-21 11:29:40 博客文章 11 ℃ 0 评论

本文为个人学习笔记分享,没有任何商业化行为,对其他文章的引用都会标记。如有侵权行为,请及时提醒更正!如需转载请表明出处。

一,OKHttp介绍

okhttp是一个第三方类库,用于android中请求网络。

这是一个开源项目,是安卓端最火热的轻量级框架,由移动支付Square公司贡献(该公司还贡献了Picasso和LeakCanary) 。用于替代HttpUrlConnection和Apache HttpClient(android API23 里已移除HttpClient)。

Okhttp简单使用

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
 .url("http://www.baidu.com")
 .build();
client.newCall(request).enqueue(new Callback() {
 @Override
 public void onFailure(Call call, IOException e) {
 }
 @Override
 public void onResponse(Call call, Response response) throws IOException {
 if(response.isSuccessful()){//回调的方法执行在子线程。
 Log.d("kwwl","获取数据成功了");
 Log.d("kwwl","response.code()=="+response.code());
 Log.d("kwwl","response.body().string()=="+response.body().string());
 }
 }
});
 /**
 * Prepares the {@code request} to be executed at some point in the future.
 */
 @Override
 public Call newCall(Request request) {
 return RealCall.newRealCall(this, request, false /* for web socket */);
 }
 static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
 // Safely publish the Call instance to the EventListener.
 RealCall call = new RealCall(client, originalRequest, forWebSocket);
 call.eventListener = client.eventListenerFactory().create(call);
 return call;
 }
OkhttpClient 调用newCall方法,创建Call对象。
 Call对象调用enqueue方法将该次请求加入队列
 @Override public void enqueue(Callback responseCallback) {
 //不能重复执行
 synchronized (this) {
 if (executed) throw new IllegalStateException("Already Executed");
 executed = true;
 }
 captureCallStackTrace();
 eventListener.callStart(this);
 //交给 dispatcher调度器 进行调度
 client.dispatcher().enqueue(new AsyncCall(responseCallback));
 }
AsyncCall是什么?
final RealCall implements Call{
 ````
 //AsyncCall 是RealCall的一个内部类继承NamedRunnable
 final AsyncCall extends NamedRunnable{
 
 }
 ````
}
/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
 protected final String name;
 public NamedRunnable(String format, Object... args) {
 this.name = Util.format(format, args);
 }
 @Override public final void run() {
 String oldName = Thread.currentThread().getName();
 Thread.currentThread().setName(name);
 try {
 execute();
 } finally {
 Thread.currentThread().setName(oldName);
 }
 }
 protected abstract void execute();
}

AsyncCall 其实就是一个线程,RealCall 在调用enqueue方法时,调用内部属性Dispatcher 将AsyncCall线程加入队列中。

当我们构建Builder时,Okhttp会为我们创建一个默认的Dispatcher

 public Builder() {
 dispatcher = new Dispatcher();
 protocols = DEFAULT_PROTOCOLS;
 connectionSpecs = DEFAULT_CONNECTION_SPECS;
 eventListenerFactory = EventListener.factory(EventListener.NONE);
 proxySelector = ProxySelector.getDefault();
 cookieJar = CookieJar.NO_COOKIES;
 socketFactory = SocketFactory.getDefault();
 hostnameVerifier = OkHostnameVerifier.INSTANCE;
 certificatePinner = CertificatePinner.DEFAULT;
 proxyAuthenticator = Authenticator.NONE;
 authenticator = Authenticator.NONE;
 connectionPool = new ConnectionPool();
 dns = Dns.SYSTEM;
 followSslRedirects = true;
 followRedirects = true;
 retryOnConnectionFailure = true;
 connectTimeout = 10_000;
 readTimeout = 10_000;
 writeTimeout = 10_000;
 pingInterval = 0;
 }
1.Dispatcher是什么?

2.Dispatcher在Okhttp中主要执行哪些工作?

3.默认的Dispatcher都做了些什么?

二、调度器Dispatcher调度队列

public final class Dispatcher {
 private int maxRequests = 64;
 private int maxRequestsPerHost = 5;
 /**
 * Ready async calls in the order they'll be run.
 * 双端队列,支持首尾两端 双向开口可进可出
 */
 private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
 /**
 * Running asynchronous calls. Includes canceled calls that haven't finished yet.
 */
 private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
 /**
 * Running synchronous calls. Includes canceled calls that haven't finished yet.
 */
 private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
 ..................................................................................
 /**
 * 该方法主要的目的是将线程加入调度器队列
 * @param call AsyncCall 线程
 */
 synchronized void enqueue(AsyncCall call) {
 //TODO 同时请求不能超过并发数(64,可配置调度器调整)
 //TODO okhttp会使用共享主机即 地址相同的会共享socket
 //TODO 同一个host最多允许5条线程通知执行请求
 if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) <
 maxRequestsPerHost) {
 //TODO 加入运行队列 并交给线程池执行
 //TODO AsyncCall 是一个runnable,查看其execute实现
 runningAsyncCalls.add(call);
 executorService().execute(call);
 } else {
 //TODO 加入等候队列
 readyAsyncCalls.add(call);
 }
 }
}

Dispatcher中共存有三个队列:

readyAsyncCalls 等待执行异步队列

runningAsyncCalls 正在执行异步队列

runningSyncCalls 正在执行同步队列

当调用Dispatcher 的enqueue方法时,首先:

1.判断同时请求的异步线程不能超过并发数(64,可配置调度器调整)

2.okhttp会使用共享主机即 地址相同的会共享socket 同一个host最多允许5条线程通知执行请求

如果满足条件,就加入runingAsyncCalls队列中,并执行线程的execute方法。

否则,调用readyAsyncCalls的add方法,加入等候队列。

在多线程开发中不要直接使用new Thread()的方式创建线程。

推荐使用线程池来管理线程。

下面是Okhttp中Dispatcher的线程池

 public synchronized ExecutorService executorService() {
 if (executorService == null) {
 //TODO 线程池
 //TODO 核心线程 最大线程 非核心线程闲置60秒回收 任务队列
 executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher",
 false));
 }
 return executorService;
 }
三、Okhttp任务线程

上面说到如果满足条件,就加入runingAsyncCalls队列中,并执行线程的execute方法。上文中Okhttp的任务线程由AsyncCall 声明:

 final class AsyncCall extends NamedRunnable {
 @Override protected void execute() {
 boolean signalledCallback = false;
 try {
 //TODO 责任链模式
 //TODO 拦截器链 执行请求
 Response response = getResponseWithInterceptorChain();
 //回调结果
 if (retryAndFollowUpInterceptor.isCanceled()) {
 signalledCallback = true;
 responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
 } else {
 signalledCallback = true;
 responseCallback.onResponse(RealCall.this, response);
 }
 } catch (IOException e) {
 if (signalledCallback) {
 // Do not signal the callback twice!
 Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
 } else {
 eventListener.callFailed(RealCall.this, e);
 responseCallback.onFailure(RealCall.this, e);
 }
 } finally {
 //TODO 移除队列
 client.dispatcher().finished(this);
 }
 }
 }

在AsyncCall的execute方法中:

1.执行请求获取Response

2.判断是否被取消,如果取消,回调onFailure()函数,并抛出IO异常

否则回调成功

3.最终将该任务线程移除Dispatcher线程队列

注:其中signalledCallback标记是为了防止当回调成功时,在onResponse()中捕获到异常时调用。判断是自己的异常还是Okhttp的异常。如果是用户自己的异常,Okhttp不进行处理,如果是Okhttp的异常回调出去。

 /**
 * Used by {@code AsyncCall#run} to signal completion.
 */
 void finished(AsyncCall call) {
 finished(runningAsyncCalls, call, true);
 }
 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
 int runningCallsCount;
 Runnable idleCallback;
 synchronized (this) {
 //TODO 移除队列
 if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
 //TODO 检查执行 readyAsyncCalls 中的请求
 if (promoteCalls) promoteCalls();
 runningCallsCount = runningCallsCount();
 idleCallback = this.idleCallback;
 }
 //闲置调用
 if (runningCallsCount == 0 && idleCallback != null) {
 idleCallback.run();
 }
 }
在execute()方法的最后,调用Dispatcher的finished方法。

1.将线程移除runningAsyncCalls队列

2.检查readyAsyncCalls队列中的请求,如果有满足条件的任务线程,就会将该任务加入runningAsyncCalls队列并执行run方法。

问题:如何执行队列切换?

 private void promoteCalls() {
 //TODO 检查 运行队列 与 等待队列
 if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
 if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
 for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
 AsyncCall call = i.next();
 //TODO 相同host的请求没有达到最大
 if (runningCallsForHost(call) < maxRequestsPerHost) {
 i.remove();
 runningAsyncCalls.add(call);
 executorService().execute(call);
 }
 if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
 }
 }

promoteCalls的主逻辑就是将符合条件等待队列的数据放入到执行队列中去。

1.检查runningAsyncCalls队列的数量是否大于等于maxRequests(64)

2.检查readyAsyncCalls队列是否为空

3.遍历readyAsyncCalls队列将符合条件的放入runningAsyncCalls队列:

条件1:相同host的请求没有达到最大

条件2:runningAsyncCalls队列的数量是否大于等于maxRequests(64)

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表