Http 持久连接与 HttpClient 连接池

一、背景

HTTP协议是无状态的协议,即每一次请求都是互相独立的。因此它的最初实现是,每一个http请求都会打开一个tcp socket连接,当交互完毕后会关闭这个连接。

HTTP协议是全双工的协议,所以建立连接与断开连接是要经过三次握手与四次挥手的。显然在这种设计中,每次发送Http请求都会消耗很多的额外资源,即连接的建立与销毁。

于是,HTTP协议的也进行了发展,通过持久连接的方法来进行socket连接复用。

从图中可以看到:

  1. 在串行连接中,每次交互都要打开关闭连接
  2. 在持久连接中,第一次交互会打开连接,交互结束后连接并不关闭,下次交互就省去了建立连接的过程。

持久连接的实现有两种:HTTP/1.0+的keep-alive与HTTP/1.1的持久连接。

二、HTTP/1.0+的Keep-Alive

从1996年开始,很多HTTP/1.0浏览器与服务器都对协议进行了扩展,那就是“keep-alive”扩展协议。

注意,这个扩展协议是作为1.0的补充的“实验型持久连接”出现的。keep-alive已经不再使用了,最新的HTTP/1.1规范中也没有对它进行说明,只是很多应用延续了下来。

使用HTTP/1.0的客户端在首部中加上”Connection:Keep-Alive”,请求服务端将一条连接保持在打开状态。服务端如果愿意将这条连接保持在打开状态,就会在响应中包含同样的首部。如果响应中没有包含”Connection:Keep-Alive”首部,则客户端会认为服务端不支持keep-alive,会在发送完响应报文之后关闭掉当前连接。

通过keep-alive补充协议,客户端与服务器之间完成了持久连接,然而仍然存在着一些问题:

  • 在HTTP/1.0中keep-alive不是标准协议,客户端必须发送Connection:Keep-Alive来激活keep-alive连接。
  • 代理服务器可能无法支持keep-alive,因为一些代理是”盲中继”,无法理解首部的含义,只是将首部逐跳转发。所以可能造成客户端与服务端都保持了连接,但是代理不接受该连接上的数据。

三、HTTP/1.1的持久连接

HTTP/1.1采取持久连接的方式替代了Keep-Alive。

HTTP/1.1的连接默认情况下都是持久连接。如果要显式关闭,需要在报文中加上Connection:Close首部。即在HTTP/1.1中,所有的连接都进行了复用。

然而如同Keep-Alive一样,空闲的持久连接也可以随时被客户端与服务端关闭。不发送Connection:Close不意味着服务器承诺连接永远保持打开。

四、HttpClient如何生成持久连接

HttpClien中使用了连接池来管理持有连接,同一条TCP链路上,连接是可以复用的。HttpClient通过连接池的方式进行连接持久化。

其实“池”技术是一种通用的设计,其设计思想并不复杂:

  1. 当有连接第一次使用的时候建立连接
  2. 结束时对应连接不关闭,归还到池中
  3. 下次同个目的的连接可从池中获取一个可用连接
  4. 定期清理过期连接

所有的连接池都是这个思路,不过我们看HttpClient源码主要关注两点:

  • 连接池的具体设计方案,以供以后自定义连接池参考
  • 如何与HTTP协议对应上,即理论抽象转为代码的实现

4.1 HttpClient连接池的实现

HttpClient关于持久连接的处理在下面的代码中可以集中体现,下面从MainClientExec摘取了和连接池相关的部分,去掉了其他部分:

public class MainClientExec implements ClientExecChain {

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
     //从连接管理器HttpClientConnectionManager中获取一个连接请求ConnectionRequest
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);final HttpClientConnection managedConn;
        final int timeout = config.getConnectionRequestTimeout();
        //从连接请求ConnectionRequest中获取一个被管理的连接HttpClientConnection
        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
     //将连接管理器HttpClientConnectionManager与被管理的连接HttpClientConnection交给一个ConnectionHolder持有
        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            HttpResponse response;
            if (!managedConn.isOpen()) {
          //如果当前被管理的连接不是出于打开状态,需要重新建立连接
                establishRoute(proxyAuthState, managedConn, route, request, context);
            }
       //通过连接HttpClientConnection发送请求
            response = requestExecutor.execute(request, managedConn, context);
       //通过连接重用策略判断是否连接可重用         
            if (reuseStrategy.keepAlive(response, context)) {
                //获得连接有效期
                final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                //设置连接有效期
                connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
          //将当前连接标记为可重用状态
                connHolder.markReusable();
            } else {
                connHolder.markNonReusable();
            }
        }
        final HttpEntity entity = response.getEntity();
        if (entity == null || !entity.isStreaming()) {
            //将当前连接释放到池中,供下次调用
            connHolder.releaseConnection();
            return new HttpResponseProxy(response, null);
        } else {
            return new HttpResponseProxy(response, connHolder);
        }
}

这里看到了在Http请求过程中对连接的处理是和协议规范是一致的,这里要展开讲一下具体实现。

PoolingHttpClientConnectionManager是HttpClient默认的连接管理器,首先通过requestConnection()获得一个连接的请求,注意这里不是连接。

public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {
            @Override
            public boolean cancel() {
                return future.cancel(true);
            }
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                final HttpClientConnection conn = leaseConnection(future, timeout, tunit);
                if (conn.isOpen()) {
                    final HttpHost host;
                    if (route.getProxyHost() != null) {
                        host = route.getProxyHost();
                    } else {
                        host = route.getTargetHost();
                    }
                    final SocketConfig socketConfig = resolveSocketConfig(host);
                    conn.setSocketTimeout(socketConfig.getSoTimeout());
                }
                return conn;
            }
        };
    }

可以看到返回的ConnectionRequest对象实际上是一个持有了Future<CPoolEntry>,CPoolEntry是被连接池管理的真正连接实例。

从上面的代码我们应该关注的是:

  • Future<CPoolEntry> future = this.pool.lease(route, state, null)
    •   如何从连接池CPool中获得一个异步的连接,Future<CPoolEntry>
  • HttpClientConnection conn = leaseConnection(future, timeout, tunit)
    •   如何通过异步连接Future<CPoolEntry>获得一个真正的连接HttpClientConnection

4.2 Future<CPoolEntry>

看一下CPool是如何释放一个Future<CPoolEntry>的,AbstractConnPool核心代码如下:

private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final Future<E> future) throws IOException, InterruptedException, TimeoutException {
     //首先对当前连接池加锁,当前锁是可重入锁ReentrantLockthis.lock.lock();
        try {
        //获得一个当前HttpRoute对应的连接池,对于HttpClient的连接池而言,总池有个大小,每个route对应的连接也是个池,所以是“池中池”
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
          //死循环获得连接
                for (;;) {
            //从route对应的池中拿连接,可能是null,也可能是有效连接
                    entry = pool.getFree(state);
            //如果拿到null,就退出循环
                    if (entry == null) {
                        break;
                    }
            //如果拿到过期连接或者已关闭连接,就释放资源,继续循环获取
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
              //如果拿到有效连接就退出循环
                        break;
                    }
                }
          //拿到有效连接就退出
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
          //到这里证明没有拿到有效连接,需要自己生成一个                
                final int maxPerRoute = getMax(route);
                //每个route对应的连接最大数量是可配置的,如果超过了,就需要通过LRU清理掉一些连接
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
          //当前route池中的连接数,没有达到上线
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
            //判断连接池是否超过上线,如果超过了,需要通过LRU清理掉一些连接