likes
comments
collection
share

连接池——Jedis连接池

作者站长头像
站长
· 阅读数 10

Jedis连接池

  Redis连接池和数据库连接池一样,也是预先创建和管理一组连接,这样当需要与Redis服务器交互时,就可以直接复用连接。Redis的客户端JedisLettuce都实现了连接池的功能。我们本篇文章先以 Jedis为例,从连接的获取、归还、关闭、创建几个方面详细介绍具体的实现功能。

多线程的使用

  在数据库中如果多个线程复用一个连接会存在数据库事务问题,那么在Redis中我们先来看下在多线程环境下使用一个连接会产生什么问题?

  首先启动两个线程,共同操作同一个 Jedis 实例,每一个线程循环 500 次,分别读取 Key 为 a 和 b 的值

Jedis jedis = new Jedis("127.0.0.1", 6379);
     new Thread(() -> {
         for (int i = 0; i < 500; i++) {
             String result = jedis.get("a");
             System.out.println(result);
         }
     }).start();
     new Thread(() -> {
         for (int i = 0; i < 500; i++) {
             String result = jedis.get("b");
             System.out.println(result);
         }
     }).start();

  执行程序多次,可以看到日志中出现了各种奇怪的异常信息,有的未知答复错误,还有的是连接关闭异常等

错误1:redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: 1

错误2:java.io.IOException: Socket Closed

  那我们先来看下 Jedis常用的(3.x)版本的源码

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands,
    ModuleCommands{}
    
 public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {

      protected final Client client;
  
  }
  public class Client extends BinaryClient implements Commands {}
  public class BinaryClient extends Connection {}
  
  public class Connection implements Closeable {
      private Socket socket;
      private RedisOutputStream outputStream;
      private RedisInputStream inputStream;
  }    

  首先Jedis 继承了 BinaryJedisBinaryJedis 中保存了单个 Client 的实例,Client最终继承了 ConnectionConnection 中保存了单个 Socket 的实例以及对应的两个读写流一个 RedisOutputStream 一个是 RedisInputStream

连接池——Jedis连接池

  BinaryClient 封装了各种 Redis 命令,其最终会调用的是 sendCommand 方法,发现其发送命令时是直接操作 RedisOutputStream 写入字节。

private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
    try {
      os.write(ASTERISK_BYTE);
      os.writeIntCrLf(args.length + 1);
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(command.length);
      os.write(command);
      os.writeCrLf();

      for (final byte[] arg : args) {
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(arg.length);
        os.write(arg);
        os.writeCrLf();
      }
    } catch (IOException e) {
      throw new JedisConnectionException(e);
    }
  }

  所以在多线程环境下使用 Jedis ,其实就是在复用RedisOutputStream。如果多个线程在执行操作,那么无法保证整条命令是原子写入 Socket。比如,写操作互相干扰,多条命令相互穿插的话,必然不是合法的 Redis 命令也就导致等等各种问题。

  这也说明了Jedis是非线程安全。但是可以通过JedisPool连接池去管理实例,在多线程情况下让每个线程有自己独立的Jedis实例,可变为线程安全。

//使用redis连接池,不会有线程安全问题
private static JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);
     new Thread(() -> {
         try (Jedis jedis = jedisPool.getResource()) {
             for (int i = 0; i < 1000; i++) {
                 String result = jedis.get("a");
                 System.out.println(result);
             }
         }
     }).start();
     new Thread(() -> {
         try (Jedis jedis = jedisPool.getResource()) {
             for (int i = 0; i < 1000; i++) {
                 String result = jedis.get("b");
                 System.out.println(result);
             }
         }
     }).start();

连接池的管理

  JedisPool的连接池是基于 Apache Commons PoolGenericObjectPool实现的。我们先来了解下 Apache Commons Pool的实现

Apache Commons Pool

  Apache Commons Pool是一个开源的通用对象池实现,它提供了对象池的基本功能,如对象的创建、销毁、借用和归还等。Apache Commons Pool有如下3个核心的组件,主要负责对象的通用配置、对象的创建、对象池的管理。

GenericObjectPoolConfig

  GenericObjectPoolConfig类是负责通用的对象池配置信息,比如最大对象数,最小空闲数量等。

连接池——Jedis连接池

  JedisPoolConfig通过继承 GenericObjectPoolConfig,设置了很多个性化的关于空闲连接检测的配置。

public class GenericObjectPoolConfig<T> extends BaseObjectPoolConfig<T> {

    /**
     * 对象池中最大对象数
     * @see GenericObjectPool#getMaxTotal()
     */
    public static final int DEFAULT_MAX_TOTAL = 8;

    /**
     * 对象池中最大空闲对象数
     * @see GenericObjectPool#getMaxIdle()
     */
    public static final int DEFAULT_MAX_IDLE = 8;

    /**
     * 对象池中最小空闲对象数
     * @see GenericObjectPool#getMinIdle()
     */
    public static final int DEFAULT_MIN_IDLE = 0;
    
    }
    
    
public class JedisPoolConfig extends GenericObjectPoolConfig<Jedis> {
  public JedisPoolConfig() {
    //空闲时是否进行对象有效性检查
    setTestWhileIdle(true);
    //连接空闲的最小时间
    setMinEvictableIdleTimeMillis(60000);
    //“空闲链接”检测线程,检测的周期,毫秒数
    setTimeBetweenEvictionRunsMillis(30000);
    //对所有连接做空闲监测
    setNumTestsPerEvictionRun(-1);
  }
} 

PooledObjectFactory

   PooledObjectFactory这个对象工厂主要负责对象的创建与销毁,它是一个接口,JedisFactory实现对应的接口功能。

连接池——Jedis连接池

public interface PooledObjectFactory<T> {
	//"激活"对象
    void activateObject(PooledObject<T> var1) throws Exception;
	//销毁对象
    void destroyObject(PooledObject<T> var1) throws Exception;

    default void destroyObject(PooledObject<T> p, DestroyMode destroyMode) throws Exception {
        this.destroyObject(p);
    }
	//创建一个新对象	
    PooledObject<T> makeObject() throws Exception;
    // "钝化"对象,
    void passivateObject(PooledObject<T> var1) throws Exception;
	//检测对象是否"有效"
    boolean validateObject(PooledObject<T> var1);
}

GenericObjectPool

   GenericObjectPool主要是负责操作对象池里面的对象,从对象池获取对象、归还对象等操作。而GenericObjectPool通过持有上面的PooledObjectFactory对象工厂,然后去操作对应的对象。

连接池——Jedis连接池

public interface ObjectPool<T> extends Closeable {
    
    //从池中获取对象
    T borrowObject() throws Exception, NoSuchElementException,
            IllegalStateException;
            
    //清除池,池可用
    void clear() throws Exception, UnsupportedOperationException;

    //关闭池,池不可用
    @Override
    void close();
    
    //将对象放回池中
    void returnObject(T obj) throws Exception;

}

小结

  以上就是Apache Commons Pool的具体的核心组件与功能,接下来我们看下JedisPool连接池如何基于它去实现具体的功能的。

获取连接

  我们使用JedisPool时候,是使用getResource()方法去获取Jedis,如下代码

Jedis jedis = jedisPool.getResource()

  我们先看下源码,getResource()最终实际调用的还是 GenericObjectPool对象池里面的borrowObject方法。

//Pool#getResource
 public T getResource() {
    try {
      return internalPool.borrowObject();
    } catch (NoSuchElementException nse) {
      if (null == nse.getCause()) { 
        //异常是连接池耗尽导致的
        throw new JedisExhaustedPoolException(
            "Could not get a resource since the pool is exhausted", nse);
      }
      //异常是 activateObject() or ValidateObject()导致的
      throw new JedisException("Could not get a resource from the pool", nse);
    } catch (Exception e) {
      throw new JedisConnectionException("Could not get a resource from the pool", e);
    }
  }

  我们先看下整体的流程,实际做的事情就是从空闲队列获取对象,没有的话就去创建对象信息,然后激活对象实例,再校验对象的合法性,最后返回对应的一个对象实例。

连接池——Jedis连接池

  接下来我们再看下具体的源码,GenericObjectPool对象池里面的borrowObject方法实现

//GenericObjectPool#borrowObject
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
 
public T borrowObject(final Duration borrowMaxWaitDuration) throws Exception {
   //检查对象池状态,看看是否已经被关闭了
    assertOpen();
    //清除废弃的对象
    final AbandonedConfig ac = this.abandonedConfig;
    if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) &&
            (getNumActive() > getMaxTotal() - 3)) {
        removeAbandoned(ac);
    }

    PooledObject<T> p = null;
    final boolean blockWhenExhausted = getBlockWhenExhausted();

    boolean create;
    final long waitTimeMillis = System.currentTimeMillis();

    while (p == null) {
        create = false;
        //从LinkedBlockingDeque队列中拿出第一个元素
        p = idleObjects.pollFirst();
        if (p == null) {
            //创建对象
            p = create();
            if (p != null) {
                    //创建成功,创建标识置为true
                create = true;
            }
        }
        if (blockWhenExhausted) {
          //上面没有创建成功
            if (p == null) {
                    //如果maxWaitDuration设置的为负数
                if (borrowMaxWaitDuration.isNegative()) {
                    // 从空闲队列获取,但是该方法会阻塞,一直等到有可用空闲对象。
                    p = idleObjects.takeFirst();
                } else {
                    // 如果设置了一个有效的等待时间,最多等待borrowMaxWaitMillis毫秒。还取不到就返回空
                    p = idleObjects.pollFirst(borrowMaxWaitDuration);
                }
            }
            if (p == null) {
                throw new NoSuchElementException(appendStats(
                        "Timeout waiting for idle object, borrowMaxWaitDuration=" + borrowMaxWaitDuration));
            }
        } else if (p == null) {
            throw new NoSuchElementException(appendStats("Pool exhausted"));
        }
        // 如果分配失败(可认为被别人抢走了),p置为空(可以进行下一次循环遍历)
        if (!p.allocate()) {
            p = null;
        }
        if (p != null) {
            try {
            //通过对象池工厂,激活这个对象
            //jedis连接池的实现是JedisFactory,做了一个redis的select连库请求
                factory.activateObject(p);
            } catch (final Exception e) {
                try {
                 // 如果激活对象时,发生了异常,销毁对象
                    destroy(p, DestroyMode.NORMAL);
                } catch (final Exception e1) {}
                p = null;
                if (create) {
                    final NoSuchElementException nsee = new NoSuchElementException(
                            appendStats("Unable to activate object"));
                    nsee.initCause(e);
                    throw nsee;
                }
            }

            if (p != null && getTestOnBorrow()) {
                boolean validate = false;
                Throwable validationThrowable = null;
                try {
                //激活成功,开始校验对象。jedis的实现是,发一条redis的ping命令来校验连接的有效性
                    validate = factory.validateObject(p);
                } catch (final Throwable t) {
                    PoolUtils.checkRethrow(t);
                    validationThrowable = t;
                }
                if (!validate) {
                    try {
                    //校验对象失败,开始销毁对象
                        destroy(p, DestroyMode.NORMAL);
                        destroyedByBorrowValidationCount.incrementAndGet();
                    } catch (final Exception e) {
                        // Ignore - validation failure is more important
                    }
                    p = null;
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                appendStats("Unable to validate object"));
                        nsee.initCause(validationThrowable);
                        throw nsee;
                    }
                }
            }
        }
    }
            //更新对象池统计信息
    updateStatsBorrow(p, Duration.ofMillis(System.currentTimeMillis() - waitTimeMillis));
            //返回对象实例
    return p.getObject();
}

归还连接

  接着我们看下怎么去归还连接,主要流程其实就是把连接加入到空闲队列。 连接池——Jedis连接池   连接归还是由Jedis里面的close()方法去触发的,实际调用的还是GenericObjectPool类里面的returnObject(),我们主要看下这个方法

//Jedis#close
public void close() {
    if (dataSource != null) {
      JedisPoolAbstract pool = this.dataSource;
      this.dataSource = null;
      if (isBroken()) {
        pool.returnBrokenResource(this);
      } else {
        pool.returnResource(this);
      }
    } else {
      super.close();
    }
  }
//GenericObjectPool#returnObject
public void returnObject(final T obj) {
    //从ConcurrentHashMap中获取原始对象的PooledObject对象
    final PooledObject<T> p = getPooledObject(obj);
    //如果p为空,说明这个要还的对象,已经不在池子中了
    if (p == null) {
        if (!isAbandonedConfig()) {
            throw new IllegalStateException(
                    "Returned object not currently part of this pool");
        }
        return;
    }
    //使用同步锁,标记返回对象的状态
    markReturningState(p);
    //获取对象使用时间
    final Duration activeTime = p.getActiveDuration();
    //如果testOnReturn配置为true,需要校验有效性
    if (getTestOnReturn() && !factory.validateObject(p)) {
        try {
            //如果校验不通过,则销毁该对象
            destroy(p, DestroyMode.NORMAL);
        } catch (final Exception e) {
            swallowException(e);
        }
        try {
            ensureIdle(1, false);
        } catch (final Exception e) {
            swallowException(e);
        }
        updateStatsReturn(activeTime);
        return;
    }
  //钝化对象,也就是反初始化,也就是释放核心资源,JedisFactory里面是什么都没有实现的
    try {
        factory.passivateObject(p);
    } catch (final Exception e1) {
        swallowException(e1);
        try {
            destroy(p, DestroyMode.NORMAL);
        } catch (final Exception e) {
            swallowException(e);
        }
        try {
            ensureIdle(1, false);
        } catch (final Exception e) {
            swallowException(e);
        }
        updateStatsReturn(activeTime);
        return;
    }
    //变更状态为 IDLE
    if (!p.deallocate()) {
        throw new IllegalStateException(
                "Object has already been returned to this pool or is invalid");
    }
    //获取对象池配置的最大空闲对象数量
    final int maxIdleSave = getMaxIdle();
    //目前空闲对象数量已经达到规定的最大值,直接销毁对象
    if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
        try {
            destroy(p, DestroyMode.NORMAL);
        } catch (final Exception e) {
            swallowException(e);
        }
        try {
            ensureIdle(1, false);
        } catch (final Exception e) {
            swallowException(e);
        }
    } else {
     // 如果对象可以被正常归还,那么把对象添加到空闲队列
        if (getLifo()) {
         // 如果是后进先出,那么把空闲对象添加到队列开头
            idleObjects.addFirst(p);
        } else {
        // 如果是先进先出,那么把空闲对象添加到队列末尾
            idleObjects.addLast(p);
        }
       // 判断一下对象池状态,如果是关闭状态,那么调用clear方法,清空对象池
        if (isClosed()) {
            clear();
        }
    }
    //更新统计信息
    updateStatsReturn(activeTime);
}

关闭连接

  关闭连接,实际上就是销毁对象,调用的是GenericObjectPool类中的destroy()方法,最后调用JedisFactory实现的关闭物理连接的方法,去 关闭服务器socket连接。

//GenericObjectPool#destroy 
private void destroy(final PooledObject<T> toDestroy, final DestroyMode destroyMode) throws Exception {
     //设置状态为无效状态
     toDestroy.invalidate();
     //空闲列表移除当前对象
     idleObjects.remove(toDestroy);
     //所有对象列表移除当前对象
     allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
     try {
         //调用 JedisFactory的destroyObject方法
         factory.destroyObject(toDestroy, destroyMode);
     } finally {
         //增加销毁数量
         destroyedCount.incrementAndGet();
         //减少创建的数量
         createCount.decrementAndGet();
     }
}
//JedisFactory#destroyObject
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
    final BinaryJedis jedis = pooledJedis.getObject();
    if (jedis.isConnected()) {
        try {
            jedis.close();
        } catch (RuntimeException e) {
            logger.debug("Error while close", e);
        }
    }
}
//Connection#disconnect
//关闭服务器socket连接
public void disconnect() {
    if (isConnected()) {
        try {
            outputStream.flush();
            socket.close();
        } catch (IOException ex) {
            broken = true;
            throw new JedisConnectionException(ex);
        } finally {
            IOUtils.closeQuietly(socket);
        }
    }
}

创建连接

  创建连接,其实在上面获取连接的时候,如果发现空闲列表没有空闲连接了,就会调用create()方法创建对象。

 private PooledObject<T> create() throws Exception {
     // 获取对象池的最大数量配置
     int localMaxTotal = getMaxTotal();
     if (localMaxTotal < 0) {
         //配置为负数,则为无限
         localMaxTotal = Integer.MAX_VALUE;
     }
     final long localStartTimeMillis = System.currentTimeMillis();
     final long localMaxWaitTimeMillis = Math.max(getMaxWaitDuration().toMillis(), 0);
     Boolean create = null;
     while (create == null) {
         synchronized (makeObjectCountLock) {
             // createCount先自增
             final long newCreateCount = createCount.incrementAndGet();
             //当前数量大于配置的最大数量,池子满了
             if (newCreateCount > localMaxTotal) {
                 
                 createCount.decrementAndGet();
                 //当前没有创建的对象数量,则无需创建对象
                 if (makeObjectCount == 0) {
                     create = Boolean.FALSE;
                 } else {
                     //否则等待对象的返回
                     makeObjectCountLock.wait(localMaxWaitTimeMillis);
                 }
             } else {
                 // 对象池未达到容量。创建新对象
                 makeObjectCount++;
                 create = Boolean.TRUE;
             }
         }

         //超过了最大等待时间
         if (create == null &&
             (localMaxWaitTimeMillis > 0 &&
              System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {
             create = Boolean.FALSE;
         }
     }

     if (!create.booleanValue()) {
         return null;
     }

     final PooledObject<T> p;
     try {
         //创建对象调用的是JedisFactory中实现的
         p = factory.makeObject();
          //如果testOnReturn配置为true,需要校验有效性
         if (getTestOnCreate() && !factory.validateObject(p)) {
             //不合法减少创建数量,返回空
             createCount.decrementAndGet();
             return null;
         }
     } catch (final Throwable e) {
          //创建失败,减少创建数量,抛出异常
         createCount.decrementAndGet();
         throw e;
     } finally {
         //释放锁,通知其他的等待线程
         synchronized (makeObjectCountLock) {
             makeObjectCount--;
             makeObjectCountLock.notifyAll();
         }
     }
	//清除废弃的对象配置
     final AbandonedConfig ac = this.abandonedConfig;
     if (ac != null && ac.getLogAbandoned()) {
         p.setLogAbandoned(true);
         p.setRequireFullStackTrace(ac.getRequireFullStackTrace());
     }
	 //增加createdCount数量
     createdCount.incrementAndGet();
     //新的对象创建好了,需要把他添加到池子里,allObjects用的一个ConcurrentHashMap
     allObjects.put(new IdentityWrapper<>(p.getObject()), p);
     return p;
 }

  JedisFactory中实现的创建对象方法,实际上就是创建一个 Jedis实例

//JedisFactory#makeObject
public PooledObject<Jedis> makeObject() throws Exception {
    Jedis jedis = null;
    try {
        //创建redis连接
        jedis = new Jedis(jedisSocketFactory, clientConfig);
        jedis.connect();
        return new DefaultPooledObject<>(jedis);
    } catch (JedisException je) {
        if (jedis != null) {
            try {
                jedis.close();
            } catch (RuntimeException e) {
                logger.debug("Error while close", e);
            }
        }
        throw je;
    }
}

连接池的配置

  接下来我们看下常用的配置参数与建议。

参数说明默认值建议
maxTotal资源池中的最大连接数8
maxIdle资源池允许的最大空闲连接数8
minIdle资源池确保的最少空闲连接数0
blockWhenExhausted当资源池用尽后,调用者是否要等待。只有当值为true时,下面的maxWaitMillis才会生效。true建议默认值。
maxWaitMillis当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)。-1(永不超时)不建议默认值。
testOnBorrow向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。false业务量很大建议设置为false,减少一次ping的开销。
testOnReturn向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。false业务量很大建议设置为false,减少一次ping的开销。
jmxEnabled是否开启JMX监控true建议开启,请注意应用本身也需要开启
testWhileIdle是否开启空闲资源检测。falsetrue
timeBetweenEvictionRunsMillis空闲资源的检测周期(单位为毫秒)-1(不检测)建议设置,周期自行选择
minEvictableIdleTimeMillis资源池中资源的最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除。30分钟可根据自身业务决定,一般默认值即可
numTestsPerEvictionRun做空闲资源检测时,每次检测资源的个数。3可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。

  其中最主要的就是最大连接数(maxTotal)。可以先根据下面的公式估算,实际情况还是根据业务总QPS和调用Redis的客户端规模整体评估每个节点所使用的连接池大小。

最大连接数=平均命令执行耗时(S)∗业务的QPS最大连接数 = 平均命令执行耗时(S) * 业务的QPS最大连接数=平均命令执行耗时(S)业务的QPS

  假如redis命令平均耗时约为1ms,业务期望的QPS是10000,那么理论上需要的连接数大小是 0.001/10000=100.001 / 10000 = 100.001/10000=10

总结

  JedisPool的连接池是基于 Apache Commons Pool 的 GenericObjectPool实现的,相对数据库连接池HikariCP实现起来更加简单,大家也可以使用Apache Commons Pool去实现其他的连接池技术,比如FTP连接池等等

参考