likes
comments
collection
share

手写动态数据源连接池管理

作者站长头像
站长
· 阅读数 52

手写动态数据源连接池管理

对于公司的中台系统而言,采集和告警的作用是很关键的。

在数据治理领域,告警可以帮助程序员对不同表的字段缺失,数量统计做有效的根因定位。对于一些重要指标的监控,我们需要对此限定阈值。

而告警以及多任务执行的源头也就是数据源的管理,一些中大型公司的后台系统,往往藏着多个数据库,有大数据领域的数据库如Doris,关系型数据库如MySQL。

对于采集任务而言,即使是同一类型的数据库,不同的的库也需要进行定位。

我们简化一下场景,现在有10条采集规则,每个采集规则30s执行一次,这10条采集规则分别都来自于不同的库,表,每次采集的时间是不定的,可能是10s,也有可能是30s,短一些的有5s。

如果说,我们对每一条采集SQL,都是先定位数据源,执行完后断开。再定位另一个数据源,执行完后再断开。每一次采集都需要去新建一个线程去创建连接,无法像固定的数据源一样去静态的把连接线程放进线程池里面,这该如何是好?

问题思考

原本我们后端系统一般是一个后端一个库,因此我们方便静态的为这个库创建数据库的连接池。为此我们需要研发一套高性能的,能将新的连接复用到连接池的东西。

手写动态数据源连接池管理

结合上图来看,若两个连接,它的(url+username+password)组成的key相同,我们就认为他是可以在同一个连接池中进行复用的。

我们可以对于这个连接池集合做一个限定,防止它来一个数据源创建一个线程池。现在有一个问题,如果连接池集合都占满了,连接池1,2都在有线程在使用,而3,4里面没有线程使用。

而此时我们会想到,把3或者4这些没有使用到的线程池移除不就好了。

没错,问题是,移除3,还是移除4?

其实这也很好想到,也就是通过观测它过去的使用频率来判定哪一个需要被移除。

想完这些我们就可以开始写一个小的Demo了。

代码实战

package org.example;

import org.apache.commons.dbcp2.BasicDataSource;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class DynamicConnPoolCache {


    private static DynamicConnPoolCache cache;

    private ConcurrentHashMap<String,MysqlConnPool> connectPoolMap;

    private ConcurrentHashMap<String,Integer> useMap;


    private ConcurrentHashMap<String,Integer> rateMap;


    private ConcurrentHashMap<String,Long> createTimeMap;

    private int maxConnectNum;

    private int connectNum;


    public DynamicConnPoolCache(){

    }

    public static DynamicConnPoolCache connPoolCache(int maxConnectNum){
        if(null == cache){
            synchronized (DynamicConnPoolCache.class){
                cache = new DynamicConnPoolCache();
                cache.connectPoolMap = new ConcurrentHashMap<>(maxConnectNum);
                cache.maxConnectNum = maxConnectNum;
                cache.connectNum = 0;
                cache.rateMap = new ConcurrentHashMap<>(maxConnectNum);
                cache.useMap = new ConcurrentHashMap<>(maxConnectNum);
                cache.createTimeMap = new ConcurrentHashMap<>(maxConnectNum);
            }
        }
        return cache;
    }

    public boolean put(String key,MysqlConnPool value){
        //删除某个连接如果已达到最大设置连接数,先删除未在使用,并且使用评率最低的连接
        if(connectNum > maxConnectNum){
            String needRemoveKey = chooseRemoveKey();
            if(null == needRemoveKey){
                //连接池数已达上限,并且都在使用中
                return false;
            }
            deleteConnPool(needRemoveKey);
        }
        //保存连接
        connectPoolMap.put(key, value);
        useMap.put(key, 0);
        rateMap.put(key, 0);
        createTimeMap.put(key, System.currentTimeMillis());
        connectNum++;
        return true;
    }

    public void close(){
        for(String key : connectPoolMap.keySet()){
            deleteConnPool(key);
        }
    }

    public void deleteConnPool(String remove_key) {
        MysqlConnPool remove_value = connectPoolMap.get(remove_key);
        //关闭连接
        remove_value.close();
        //从maps清除key
        connectPoolMap.remove(remove_key);
        useMap.remove(remove_key);
        rateMap.remove(remove_key);
        createTimeMap.remove(remove_key);
        connectNum--;
    }

    public MysqlConnPool get(String key){
        if(connectPoolMap.containsKey(key)){
            //正在使用的用户数+1
            useMap.put(key, useMap.get(key)+1);
            //历史使用数+1
            rateMap.put(key, rateMap.get(key)+1);
        }
        return connectPoolMap.get(key);
    }

    public boolean useOver(String key,Connection conn,PreparedStatement st,ResultSet rs){
        MysqlConnPool.release(conn, st, rs);
        //正在使用的用户数-1
        try {
            useMap.put(key, useMap.get(key)-1);
        } catch (Exception e) {
            System.out.println(e.getStackTrace());
            return false;
        }
        return true;
    }

    public String chooseRemoveKey(){
        Map<String,Double> k_r = new HashMap<>();//存储未被使用的连接的使用评率(总使用数/(now-createTime))
        useMap.forEach((k,v)->{
            if(v<1){
                k_r.put(k,rateMap.get(k)/(double)System.currentTimeMillis()-createTimeMap.get(k));
            }
        });
        //取使用率最小的key
        if(k_r.size()>0){
            List<Map.Entry<String, Double>> entryList = new ArrayList(k_r.entrySet());
            Collections.sort(entryList,(e1, e2)->(e1.getValue()>=e2.getValue() ? 1 : -1));
            return entryList.get(0).getKey();
        }
        return null;
    }


}

对于这段代码,还是会有一定的并发问题的。对于put和get方法会有读写并发问题,put方法会有写写并发问题。

    private ReentrantLock lock = new ReentrantLock();
    public boolean put(String key,MysqlConnPool value){
        //删除某个连接如果已达到最大设置连接数,先删除未在使用,并且使用评率最低的连接
        final ReentrantLock currentLock = this.lock;
        currentLock.lock();
        try {
            if(connectNum >= maxConnectNum){
                String needRemoveKey = chooseRemoveKey();
                if(null == needRemoveKey){
                    //连接池数已达上限,并且都在使用中
                    return false;
                }
                deleteConnPool(needRemoveKey);
            }
            //保存连接
            connectPoolMap.put(key, value);
            useMap.put(key, 0);
            rateMap.put(key, 0);
            createTimeMap.put(key, System.currentTimeMillis());
            connectNum++;
        }finally {
            currentLock.unlock();
        }
        return true;
    }
    public MysqlConnPool get(String key){
        final ReentrantLock currentLock = this.lock;
        currentLock.lock();
        try {
            if(connectPoolMap.containsKey(key)){
                //正在使用的用户数+1
                useMap.put(key, useMap.get(key)+1);
                //历史使用数+1
                rateMap.put(key, rateMap.get(key)+1);
            }
            return connectPoolMap.get(key);
        }finally {
            lock.unlock();
        }
    }

转载自:https://juejin.cn/post/7270864903015170082
评论
请登录