背景:
最近在处理一个大资料业务迁移华为云mrs平台过程中发现一个问题,程式每次进行资料处理时都需要重新建立一个hive 连线。考虑到后面业务逻辑扩充套件,一次需要载入上几百个档案,有时也需要多执行绪执行计算业务,这种情况每次建立数据库连线降低业务的计算能力是必然的,因此自己维护一个JDBC 连线池(至于连线池,其实可以采用阿里巴巴druid或c3p0 ,但是没找到整合方式,so 自己来写)。
准备工作:引入mysql驱动jar包

编码展示:
DBBean:
package com.ditto.connectionpool;
/**
* @author SDT14325
* created at 15:10 2019/6/13
*/
public class DBBean {
/**
* 数据库连线资讯
*/
private String driverName = "com.mysql.jdbc.Driver";
private String url = "jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8";
private String userName = "root";
private String password = "123456";
private String poolName = "testPool";
//空闲时最小连线数
private Integer minConnections = 3;
//空闲时最大连线数
private Integer maxConnections = 10;
//初始化连线数量
private Integer initConnection = 5;
//重复获得连线的频率
private Long connTimeOut = 1000L;
//最大允许的连线数
private Integer maxActiveConnections = 100;
//连线超时时间
private Long ConnectionTimeOut = 1000 * 60 * 20L;
//是否获得当前连线
private Boolean isCurrentConnection = true;
//是否定时检查连线池
private Boolean isCheckPool = true;
//延迟多少时间后开始检查
private Long lazyCheck = 1000 * 60 * 60L;
//检查频率
private Long periodCheck = 1000 * 60 * 60L;
private static DBBean db;
private DBBean() { }//使用单例物件
public static DBBean getDB() {
if (db == null) {
db = new DBBean();
}
return db;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDriverName() {
return driverName;
}
public void setDriverName(String driverName) {
this.driverName = driverName;
}
public Integer getMinConnections() {
return minConnections;
}
public void setMinConnections(Integer minConnections) {
this.minConnections = minConnections;
}
public Integer getMaxConnections() {
return maxConnections;
}
public void setMaxConnections(Integer maxConnections) {
this.maxConnections = maxConnections;
}
public Integer getInitConnection() {
return initConnection;
}
public void setInitConnection(Integer initConnection) {
this.initConnection = initConnection;
}
public Long getConnTimeOut() {
return connTimeOut;
}
public void setConnTimeOut(Long connTimeOut) {
this.connTimeOut = connTimeOut;
}
public Integer getMaxActiveConnections() {
return maxActiveConnections;
}
public void setMaxActiveConnections(Integer maxActiveConnections) {
this.maxActiveConnections = maxActiveConnections;
}
public Long getConnectionTimeOut() {
return ConnectionTimeOut;
}
public void setConnectionTimeOut(Long connectionTimeOut) {
ConnectionTimeOut = connectionTimeOut;
}
public Boolean getCurrentConnection() {
return isCurrentConnection;
}
public void setCurrentConnection(Boolean currentConnection) {
isCurrentConnection = currentConnection;
}
public Boolean getCheckPool() {
return isCheckPool;
}
public void setCheckPool(Boolean checkPool) {
isCheckPool = checkPool;
}
public Long getLazyCheck() {
return lazyCheck;
}
public void setLazyCheck(Long lazyCheck) {
this.lazyCheck = lazyCheck;
}
public Long getPeriodCheck() {
return periodCheck;
}
public void setPeriodCheck(Long periodCheck) {
this.periodCheck = periodCheck;
}
}
服务界面:ConnectionPoolInterface
package com.ditto.connectionpool;
import java.sql.Connection;
import java.sql.SQLException;
/**
* @author SDT14325
* created at 15:19 2019/6/13
*/
public interface ConnectionPoolInterface {
public Connection createConnection(); //获得当前连线
public void close(Connection conn) throws SQLException; //回收连线
public void destroy(); //销毁连线池
}
构建数据库连线池: ConnectionPool
package com.ditto.connectionpool;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
/**
* @author SDT14325
* created at 15:20 2019/6/13
*/
public class ConnectionPool implements ConnectionPoolInterface {
private DBBean dbBean;
private Boolean isActive = false;
//建立连线总数
private Integer totalActiveConnections = 0;
//空闲连结
private List freeConnections = new Vector();
//活动连线
private List activeConnections = new Vector();
public ConnectionPool(DBBean dbBean) {
this.dbBean = dbBean;
init();
}
/**
* 初始化连线池
*/
private void init(){
try{
Class.forName(dbBean.getDriverName());
for(int i = 0;i Connection conn = newConnection();
if (conn != null) {
freeConnections.add(conn);
totalActiveConnections++;
}
}
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 建立新的连线
* @return
* @throws ClassNotFoundException
* @throws SQLException
*/
private Connection newConnection() throws ClassNotFoundException, SQLException{
Connection conn = null;
if(dbBean != null){
Class.forName(dbBean.getDriverName());
conn = DriverManager.getConnection(dbBean.getUrl(),dbBean.getUserName(),dbBean.getPassword());
}
return conn;
}
/**
* 此连线是否可用
* @param conn
* @return
*/
private Boolean isValid(Connection conn){
try{
if(conn == null || conn.isClosed()) {
return false;
}
}catch(SQLException e){
e.printStackTrace();
}
return true;
}
/**
* 从执行绪池中获取连线,如果则自行建立
* @return
*/
private Connection getConnection() {
Connection conn = null;
try {
if(totalActiveConnections if(freeConnections.size() > 0){
conn = freeConnections.get(0);
freeConnections.remove(0);
}else{
conn = newConnection();
}
}else{
synchronized (this) {
this.wait(this.dbBean.getConnTimeOut());
conn = getConnection();
}
}
if(isValid(conn)){
activeConnections.add(conn);
totalActiveConnections++;
}
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
@Override
public Connection createConnection() {
return getConnection();
}
public void getConn() {
}
@Override
public synchronized void close(Connection conn) {
if(isValid(conn) && freeConnections.size() freeConnections.add(conn);
activeConnections.remove(conn);
totalActiveConnections--;
this.notifyAll();
}
}
@Override
public void destroy() {
for (Connection conn : freeConnections) {
try {
if(isValid(conn)){conn.close();}
} catch (Exception e) {
e.printStackTrace();
}
}
for(Connection conn : activeConnections){
try {
if(isValid(conn)){ conn.close();}
} catch (Exception e) {
e.printStackTrace();
}
}
isActive = false;
totalActiveConnections = 0;
}
private Boolean isActive() {
return isActive;
}
public void checkPool() {
System.out.println("空闲连线数"+freeConnections.size());
System.out.println("活动连线数"+activeConnections.size());
System.out.println("总连线数"+ totalActiveConnections);
}
}
测试用例:
package com.ditto.connectionpool;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* @author ASUS
* Created at 10:33.2019/6/16
*/
public class TestPool {
public static void main(String[] args) throws SQLException, InterruptedException {
ConnectionPool pool = new ConnectionPool(DBBean.getDB());
Connection conn = pool.createConnection();
pool.checkPool();
System.out.println("1111111111:" + conn);
//测试资料查询
PreparedStatement ps = conn.prepareStatement("select * from test_tb_grade limit 1");
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
String s1 = resultSet.getString(1);
System.out.println("22222222222:" + s1);
}
pool.close(conn);
Connection conn2 = pool.createConnection();
System.out.println("333333333333:" + conn2);
pool.checkPool();//检查连线池状态
System.out.println("444444444444444");
pool.close(conn2);
pool.checkPool();
Thread.sleep(20 * 1000L);
System.out.println("5555555555555555");
pool.checkPool();
}
}
总结:
1.连线池中存放的空闲连线和活跃连线需确保执行绪安全,避免多执行绪公用;
2.当执行绪池中的总连线数超过了允许的最大连线数时,需要进行执行绪等待,直到有连线被关闭,此执行绪被唤醒;
3.每一个连线下可以开启多个statement,进行资料互动查询,但是对于资料写入和更新操作需要考虑到每一个连线下的事务是否相互影响;
4.练手demo,如有不正确的地方,劳烦指出,共同学习进步。





























