// We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection();
/** * <ul> * <li><strong>Connections:</strong> physical socket connections to remote servers. These are * potentially slow to establish so it is necessary to be able to cancel a connection * currently being connected. * <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on * connections. Each connection has its own allocation limit, which defines how many * concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream * at a time, HTTP/2 typically carry multiple. * <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and * its follow up requests. We prefer to keep all streams of a single call on the same * connection for better behavior and locality. * </ul> */
/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */ private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true) { RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } }
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { // 如果这个连接不健康, // 禁用这条连接, 重复寻找可用的连接 noNewStreams(); continue; }
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ /** * 返回一个连接. 优先使用已存在的连接, 其次是从连接池中获取, 最后新建一个连接 */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled)throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; Connection releasedConnection; Socket toClose; synchronized (connectionPool) { ... // 省略代码
// Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. releasedConnection = this.connection; toClose = releaseIfNoNewStreams(); if (this.connection != null) { //如果当前connection不为空可以直接使用 // We had an already-allocated connection and it's good. result = this.connection; releasedConnection = null; } if (!reportedAcquired) { // If the connection was never reported acquired, don't report it as released! releasedConnection = null; }
//当前这个connection不能使用,尝试从连接池里面获取一个请求 if (result == null) { // Attempt to get a connection from the pool. // Internal是一个抽象类,instance是在OkHttpClient中实现的,get方法实现的时候从pool的get方法 Internal.instance.get(connectionPool, address, this, null); if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } closeQuietly(toClose);
... // 省略代码
if (result != null) { // 找到一条可复用的连接 // If we found an already-allocated or pooled connection, we're done. return result; }
// 到达这里表示没有找到 // 切换路由再在连接池里面找下,如果有则返回
// If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; // 检查是否有其他路由 if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); }
synchronized (connectionPool) { if (canceled) thrownew IOException("Canceled");
if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. // 有其他路由, 遍历RooteSelector List<Route> routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); Internal.instance.get(connectionPool, address, this, route); if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } }
if (!foundPooledConnection) { //没找到则创建一条 if (selectedRoute == null) { selectedRoute = routeSelection.next(); }
// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; result = new RealConnection(connectionPool, selectedRoute); //往连接中增加流 acquire(result, false); } }
// If we found a pooled connection on the 2nd time around, we're done. //如果第二次找到了可以复用的,则返回 if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; }
// Do TCP + TLS handshakes. This is a blocking operation. // 建立连接,开始握手 result.connect( connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener); // 将这条路由从错误缓存中清除 routeDatabase().connected(result.route());
// Pool the connection. //将这个请求加入连接池 Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. // 如果是多路复用,则合并 if (result.isMultiplexed()) { // 返回的是一个重复的socket socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } // 关闭重复的socket closeQuietly(socket);
/** * Background threads are used to cleanup expired connections. There will be at most a single * thread running per connection pool. The thread pool executor permits the pool itself to be * garbage collected. */ /** 被用来清理超时连接的后台线程. 在每个连接池中最多只会有一个这样的线程(虽然它是个线程池). */ privatestaticfinal Executor executor = new ThreadPoolExecutor(0/* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L/* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */ // 允许的每个host地址可以维持的最大空闲连接数量, 默认为5 privatefinalint maxIdleConnections; // 允许的线程空闲的最大时间, 默认为5分钟 privatefinallong keepAliveDurationNs; // 清理的task privatefinal Runnable cleanupRunnable = new Runnable() { @Overridepublicvoidrun(){ while (true) { long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } }; // 连接池中的连接集合 privatefinal Deque<RealConnection> connections = new ArrayDeque<>(); // 用来记录连接失败的路线名单 ?? 暂时没懂啥意思... final RouteDatabase routeDatabase = new RouteDatabase(); // 标记清理线程是否在运行 boolean cleanupRunning;
/** * Performs maintenance on this pool, evicting the connection that has been idle the longest if * either it has exceeded the keep alive limit or the idle connections limit. * * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns * -1 if no further cleanups are required. */
longcleanup(long now){ int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due. // 找到需要清理的connection, 如果没有找到, 那么确定下次清理的时间. 也就是说, 只会清理一个合适的connection synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next();
// If the connection is in use, keep searching. // pruneAndGetAllocationCount方法判断当前connection是否正在使用中 if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++; continue; }
idleConnectionCount++;
// If the connection is ready to be evicted, we're done. long idleDurationNs = now - connection.idleAtNanos; // 记录空闲最长的那个connection, 并且记录空闲的最长时间 if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } }
if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { // We've found a connection to evict. Remove it from the list, then close it below (outside // of the synchronized block). // 如果 最长空闲的connection空闲时间超过限制, 或者 如果空闲的connection数量超过限制 // 从connections集合中remove掉该connection connections.remove(longestIdleConnection); } elseif (idleConnectionCount > 0) { // A connection will be ready to evict soon. // 最长空闲的connection没有超出时间限制, 返回下次开启清理的时间 return keepAliveDurationNs - longestIdleDurationNs; } elseif (inUseConnectionCount > 0) { // All connections are in use. It'll be at least the keep alive duration 'til we run again. // 没有空闲的connection, 等待keepAliveDurationNs时间之后再次清理 return keepAliveDurationNs; } else { // No connections, idle or in use. // 根本没有connection, 返回-1, 直接终止清理任务 cleanupRunning = false; return -1; } }