likes
comments
collection
share

通过一个简单的连接池和实现事务自动控制DEMO

作者站长头像
站长
· 阅读数 3

为什么要有连接池事务自动控制

  • 连接池:因为提前初始化并可以复用,可以降低获取连接的时间开销(数据库连接打开相对耗时)。
  • 连接池:有效控制当前系统的数据库连接量,防止数据库连接被当前应用耗尽。
  • 事务的自动控制:通过一些封装控制可以减少一些重复操作,避免手工处理的一些遗漏,让开发过程更专注于业务处理。

JDBC的操作回顾

  • 框架的底层最终操作都是通过jdbc进行的,所以jdbc的手动操作我们需要了解。
  • 下面是个获取连接demo,设置事务手动提交。
private Connection getConnectionTest(){
    Connection conn=null;
    try {
        String url="jdbc:oracle:thin:@127.0.0.1:1521:orcl";
        String user="test";
        String password="test";
        Class.forName("oracle.jdbc.driver.OracleDriver");//加载数据驱动
        conn = DriverManager.getConnection(url, user, password);// 连接数据库
        conn.setAutoCommit(false);
    } catch (ClassNotFoundException e) {
        log.error(e.getMessage(),e);
    }catch(Exception e){
        log.error(e.getMessage(),e);
    }
    return conn;
}

这上面有个注意的问题是要加载下驱动:Class.forName("oracle.jdbc.driver.OracleDriver");//加载数据驱动

在Java官方文档中对Class.forName的解释是:此方法用于在运行时动态地加载一个类,返回值为生成的Class对象。而在这个过程中,除了将指定类的.class文件加载到JVM中之外,还会对类进行解释,执行类中的static代码块。

更具体地说,当我们调用Class.forName("oracle.jdbc.driver.OracleDriver")时,实际上是在调用DriverManagerregisterDriver方法来注册一个Oracle的JDBC驱动。这个驱动是从Driver接口继承的,该接口提供了若干数据库连接的方法。但这个注册过程并不是自动的,必须由可用的JDBC驱动的Driver类自己在DriverManager上进行注册。

一个简单的连接池demo

  • 管理池子的数据库连接
  • 分配一个连接
  • 连接的释放
  • 对于一些长时间不用的连接防止数据库断开连接。需要保活心跳(oracle 发送个查询 select 1 frmo dual;)
  • 空闲连接过多则要主动移除
初始化的参数
  • 空闲连接是备用的
  • 活跃连接是正在使用的
  • 连接数的控制
  • 获取连接超时主要利用队列的阻塞
  • 当前线程的事务控制主要用了:ThreadLocal
public class MyDbPool implements DbPool{
    private final static Logger log = LoggerFactory.getLogger(MyDbPool.class);
    /**
     * 空闲连接队列
     */
    private LinkedBlockingQueue<Connection> freeConnectPool;
    /**
     * 活跃连接队列
     */
    private LinkedBlockingQueue<Connection> wordConnectPool;

    /**
     * 初始空闲数
     */
    private int initialSize;
    /**
     * 最大空闲数
     */
    private int maxIdle;
    /**
     * 最大活跃数
     */
    private int maxActive;
    /**
     * 最大等待时间 单位毫秒
     */
    private int maxWait;

    /**
     * 事务控制
     */
    private static ThreadLocal<Connection> localConnection = new ThreadLocal<Connection>();
    private MyDbPool(){}
    private static  final  MyDbPool instance=new MyDbPool();
    public static MyDbPool getInstance(){
        return instance;
    }
}   
连接获取

通过一个简单的连接池和实现事务自动控制DEMO

  • 这里其实还需要判断下连接是否有效
public synchronized  Connection getConnection() throws Exception {
    Connection conn=null;
    if(localConnection.get()!=null){
        conn=localConnection.get();
        log.info("---得到公用线程---");
    }
    //1.先取空闲连接
    else if(freeConnectPool.size()>0){
        conn=freeConnectPool.poll();
        if(conn!=null){
            wordConnectPool.add(conn);
        }
        log.info("---得到空闲线程---");
    //2.如果没有空闲连接,再取活跃连接
    }else if(wordConnectPool.size()<maxActive){
        conn=getConnectionTest();
        wordConnectPool.add(conn);
        log.info("---得到新线程---");
    }else{
     //3.如果没有空闲和活跃连接,则等待
        try {
            conn=freeConnectPool.poll(maxWait,java.util.concurrent.TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error(e.getMessage(),e);
        }
        log.info("---得到释放线程---");
    }
    if(conn!=null){
        localConnection.set(conn);
    };
    return conn;
}
连接释放
  • 当前线程存的变量移除
  • 空闲没达到最大则移动下存储位置复用
  • 空闲达到最大则直接关闭
/**
 * 释放先判断是否为当前的线程,防止释放了其他线程的
 * @param connection
 */
@Override
public void releaseConnection(Connection connection) {
    try {
        //判断是是否是当前的线程
        if(localConnection.get()!=null) {
            //当前线程清除
            localConnection.remove();
            if (connection != null) {
                if (connection.isClosed()) {
                    log.error("connection is closed");
                } else {
                    wordConnectPool.remove(connection);
                    if (freeConnectPool.size() < maxIdle) {
                        freeConnectPool.add(connection);
                    } else {
                        connection.close();
                    }
                }
            }
        }else{
            log.info("---当前线程已释放---");
        }
    } catch (SQLException e) {
        log.error(e.getMessage(), e);
    }
    log.info("---线程释放结束---");
}
事务提交和连接释放方法
保活心跳

为防止数据库连接长时间没用数据库直接断开,则需要起个定时任务或者起个线程定时向数据库发送个请求保持连接。

空闲连接过多则要主动移除

服务的任务不多时降低服务器开销则需要对空闲的连接减少连接数量,需要起个定时任务或者起个线程定时维护下空闲连接的保有量。

测试下效果
public class MyDbPoolTest {
    public static void main(String[] args) throws Exception {
        MyDbPool.getInstance().init();
        for(int i = 0; i < 5; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                       Connection connection = MyDbPool.getInstance().getConnection();
                       //MyDbPool.getInstance().releaseConnection(connection);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        Thread.sleep(2000);
        MyDbPool.getInstance().getConnection();
        MyDbPool.getInstance().getConnection();
        MyDbPool.getInstance().destroy();
    }
}

通过一个简单的连接池和实现事务自动控制DEMO

事务控制的简单思路

  • 用注解判断哪些需要事务自动控制。
  • AOP代理切面去控制事务的开启、提交、回滚。
  • 事务的传递。
1.如果是入口方法A开启事务    那么当前的线程调用栈是共享一个connection事务的
2.                         那么里面有单独线程那么单独线程和当前的线程事务时隔离的
3.如果是入口方法A没有开启事务方法内的执行是单独事务直接提交的
4.                         调用了开启事务的方法取决去开启事务的方法是否会触发切面代理
5                          单独起的线程同上
定义一个自己的事务注解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE })
public @interface MyTransactional {
    String value() default "";
    Class<Exception> rollbackFor() default Exception.class;
}
切面控制直接用springboot提供的
  • 切面控制连接的获取(上面数据库连接池得到连接会放到ThreadLocal里面)
  • 然后控制当前事务传递、提交和回滚
/**
 *  这里定义一个PointCut,用于描述满足条件的方法
 */
@Pointcut("execution(public * com.pingpang.services..*(*))")
private void servicesPointCut() {
    System.out.println("pointCut");
}

@Around("servicesPointCut() &&(@within(com.pingpang.db.annotation.MyTransactional) || @annotation(com.pingpang.db.annotation.MyTransactional))")
public Object roundServices(ProceedingJoinPoint pjp) throws NoSuchMethodException {

    Signature sig = pjp.getSignature();
    MethodSignature msig = (MethodSignature) sig;
    Object target = pjp.getTarget();
    Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
    log.info("-------当前处理方法:"+currentMethod.getName());

    log.info("获取连接");
    Object result = null;
    Connection con=null;
    try {
        //连接池拿到连接
        con= MyDbPool.getInstance().getConnection();
        result = pjp.proceed();
        if(con!=null){
            //事务提交
            MyDbPool.getInstance().commit(con);
        }
        //连接释放
        MyDbPool.getInstance().releaseConnection(con);
    } catch (Throwable e) {
        log.error(e.getMessage(),e);
        if(con!=null){
            try {
                //异常事务回滚
                MyDbPool.getInstance().rollback(con);
            } catch (Exception e1) {
                log.error(e1.getMessage(),e1);
            }
        }
        throw new RuntimeException(e);
    }
   return result;
}
DAO的处理
  • 需要拿到connection连接
  • 如何区分当前的事务是走的事务控制,还是主动获取的(这里我做了个取巧的操作,纯粹是个demo演示)。
  • 因为AOP会在执行到dao操作数据库之前拿到事务,在拿事务前做了个判断(是AOP的就AOP去控制,不是的直接控制提交)
/**
 * 测试先用个取巧的方法吧
 * @return
 * @throws Exception
 */
public MyDbBean getConnection() throws Exception {
    MyDbBean dbBean=new MyDbBean();
    //事务AOP进来会首先得到连接的
    dbBean.setAop(MyDbPool.getInstance().isCurrentThread());
    dbBean.setConnection(MyDbPool.getInstance().getConnection());
    return dbBean;
}

public int insertTest(String sql, List<Object> values) throws Exception {
    int result=-1;
    MyDbBean myDbBean=null;
    Connection con=null;
    PreparedStatement pre=null;
    try {
        myDbBean = this.getConnection();
        con=myDbBean.getConnection();
        pre=con.prepareStatement(sql);
        for (int index = 0; index < values.size(); index++) {
            Object obj = values.get(index);
            if (obj.getClass().isInstance(Integer.class)) {
                pre.setInt(index + 1, (Integer) obj);
            } else if (obj.getClass().isInstance(Double.class)) {
                pre.setDouble(index + 1, (Double) obj);
            } else if (obj.getClass().isInstance(BigDecimal.class)) {
                pre.setBigDecimal(index + 1, (BigDecimal) obj);
            } else if (obj.getClass().isInstance(java.sql.Date.class)) {
                pre.setDate(index + 1, (java.sql.Date) obj);
            } else if (obj.getClass().isInstance(java.util.Date.class)) {
                pre.setDate(index + 1, new java.sql.Date(((java.util.Date) obj).getTime()));
            } else {
                pre.setString(index + 1, obj.toString());
            }
        }
        //result = pre.executeUpdate();
        if(!myDbBean.isAop()) {
            MyDbPool.getInstance().commit(con);
        }
    }catch(Exception e){
        if(null!=myDbBean) {
            if (!myDbBean.isAop()) {
                MyDbPool.getInstance().rollback(con);
            } else {
                throw e;//事务自动控制
            }
        }
    }finally {
        if(null!=myDbBean) {
            if (myDbBean.isAop()) {//事务自动控制
                MyDbPool.getInstance().close(null, pre, null);
            } else {
                MyDbPool.getInstance().releaseConnection(con);
                MyDbPool.getInstance().close(null, pre, null);
            }
        }
    }
    return result;
}
事务自动控制测试

通过一个简单的连接池和实现事务自动控制DEMO