手写动态数据源连接池管理
手写动态数据源连接池管理
对于公司的中台系统而言,采集和告警的作用是很关键的。
在数据治理领域,告警可以帮助程序员对不同表的字段缺失,数量统计做有效的根因定位。对于一些重要指标的监控,我们需要对此限定阈值。
而告警以及多任务执行的源头也就是数据源的管理,一些中大型公司的后台系统,往往藏着多个数据库,有大数据领域的数据库如Doris,关系型数据库如MySQL。
对于采集任务而言,即使是同一类型的数据库,不同的的库也需要进行定位。
我们简化一下场景,现在有10条采集规则,每个采集规则30s执行一次,这10条采集规则分别都来自于不同的库,表,每次采集的时间是不定的,可能是10s,也有可能是30s,短一些的有5s。
如果说,我们对每一条采集SQL,都是先定位数据源,执行完后断开。再定位另一个数据源,执行完后再断开。每一次采集都需要去新建一个线程去创建连接,无法像固定的数据源一样去静态的把连接线程放进线程池里面,这该如何是好?
问题思考
原本我们后端系统一般是一个后端一个库,因此我们方便静态的为这个库创建数据库的连接池。为此我们需要研发一套高性能的,能将新的连接复用到连接池的东西。
结合上图来看,若两个连接,它的(url+username+password)组成的key相同,我们就认为他是可以在同一个连接池中进行复用的。
我们可以对于这个连接池集合做一个限定,防止它来一个数据源创建一个线程池。现在有一个问题,如果连接池集合都占满了,连接池1,2都在有线程在使用,而3,4里面没有线程使用。
而此时我们会想到,把3或者4这些没有使用到的线程池移除不就好了。
没错,问题是,移除3,还是移除4?
其实这也很好想到,也就是通过观测它过去的使用频率来判定哪一个需要被移除。
想完这些我们就可以开始写一个小的Demo了。
代码实战
package org.example;
import org.apache.commons.dbcp2.BasicDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class DynamicConnPoolCache {
private static DynamicConnPoolCache cache;
private ConcurrentHashMap<String,MysqlConnPool> connectPoolMap;
private ConcurrentHashMap<String,Integer> useMap;
private ConcurrentHashMap<String,Integer> rateMap;
private ConcurrentHashMap<String,Long> createTimeMap;
private int maxConnectNum;
private int connectNum;
public DynamicConnPoolCache(){
}
public static DynamicConnPoolCache connPoolCache(int maxConnectNum){
if(null == cache){
synchronized (DynamicConnPoolCache.class){
cache = new DynamicConnPoolCache();
cache.connectPoolMap = new ConcurrentHashMap<>(maxConnectNum);
cache.maxConnectNum = maxConnectNum;
cache.connectNum = 0;
cache.rateMap = new ConcurrentHashMap<>(maxConnectNum);
cache.useMap = new ConcurrentHashMap<>(maxConnectNum);
cache.createTimeMap = new ConcurrentHashMap<>(maxConnectNum);
}
}
return cache;
}
public boolean put(String key,MysqlConnPool value){
//删除某个连接如果已达到最大设置连接数,先删除未在使用,并且使用评率最低的连接
if(connectNum > maxConnectNum){
String needRemoveKey = chooseRemoveKey();
if(null == needRemoveKey){
//连接池数已达上限,并且都在使用中
return false;
}
deleteConnPool(needRemoveKey);
}
//保存连接
connectPoolMap.put(key, value);
useMap.put(key, 0);
rateMap.put(key, 0);
createTimeMap.put(key, System.currentTimeMillis());
connectNum++;
return true;
}
public void close(){
for(String key : connectPoolMap.keySet()){
deleteConnPool(key);
}
}
public void deleteConnPool(String remove_key) {
MysqlConnPool remove_value = connectPoolMap.get(remove_key);
//关闭连接
remove_value.close();
//从maps清除key
connectPoolMap.remove(remove_key);
useMap.remove(remove_key);
rateMap.remove(remove_key);
createTimeMap.remove(remove_key);
connectNum--;
}
public MysqlConnPool get(String key){
if(connectPoolMap.containsKey(key)){
//正在使用的用户数+1
useMap.put(key, useMap.get(key)+1);
//历史使用数+1
rateMap.put(key, rateMap.get(key)+1);
}
return connectPoolMap.get(key);
}
public boolean useOver(String key,Connection conn,PreparedStatement st,ResultSet rs){
MysqlConnPool.release(conn, st, rs);
//正在使用的用户数-1
try {
useMap.put(key, useMap.get(key)-1);
} catch (Exception e) {
System.out.println(e.getStackTrace());
return false;
}
return true;
}
public String chooseRemoveKey(){
Map<String,Double> k_r = new HashMap<>();//存储未被使用的连接的使用评率(总使用数/(now-createTime))
useMap.forEach((k,v)->{
if(v<1){
k_r.put(k,rateMap.get(k)/(double)System.currentTimeMillis()-createTimeMap.get(k));
}
});
//取使用率最小的key
if(k_r.size()>0){
List<Map.Entry<String, Double>> entryList = new ArrayList(k_r.entrySet());
Collections.sort(entryList,(e1, e2)->(e1.getValue()>=e2.getValue() ? 1 : -1));
return entryList.get(0).getKey();
}
return null;
}
}
对于这段代码,还是会有一定的并发问题的。对于put和get方法会有读写并发问题,put方法会有写写并发问题。
private ReentrantLock lock = new ReentrantLock();
public boolean put(String key,MysqlConnPool value){
//删除某个连接如果已达到最大设置连接数,先删除未在使用,并且使用评率最低的连接
final ReentrantLock currentLock = this.lock;
currentLock.lock();
try {
if(connectNum >= maxConnectNum){
String needRemoveKey = chooseRemoveKey();
if(null == needRemoveKey){
//连接池数已达上限,并且都在使用中
return false;
}
deleteConnPool(needRemoveKey);
}
//保存连接
connectPoolMap.put(key, value);
useMap.put(key, 0);
rateMap.put(key, 0);
createTimeMap.put(key, System.currentTimeMillis());
connectNum++;
}finally {
currentLock.unlock();
}
return true;
}
public MysqlConnPool get(String key){
final ReentrantLock currentLock = this.lock;
currentLock.lock();
try {
if(connectPoolMap.containsKey(key)){
//正在使用的用户数+1
useMap.put(key, useMap.get(key)+1);
//历史使用数+1
rateMap.put(key, rateMap.get(key)+1);
}
return connectPoolMap.get(key);
}finally {
lock.unlock();
}
}
转载自:https://juejin.cn/post/7270864903015170082