目标
- 高效管理连接
动态复用有限的连接资源,避免重复创建和销毁连接的开销,提升性能和资源利用率。 - 线程安全
确保在多线程环境下的连接获取和释放操作是线程安全的。 - 动态伸缩
能够根据需求动态调整池中连接的数量,比如最小连接数、最大连接数,以及连接的超时释放机制。 - 连接健康检查
确保每个连接在取出前可用,提供自动重连功能。(待实现) - 多种消息队列支持(待实现)
功能点
1. 初始化
- 配置选项
- 最小连接数
- 最大连接数
- 空闲连接的最大存活时间
- 获取连接的超时时间
- 消息队列的连接参数
- 延迟创建
- 在需要连接时才初始化,而不是在连接池创建时全部生成连接。
2. 连接的获取和释放
- 连接获取
- 提供一个
get_connection()
方法,阻塞或非阻塞地从池中取出连接。 - 如果池中没有空闲连接:
- 若未达到最大连接数限制,创建新连接。
- 若达到限制,按照超时时间阻塞等待可用连接。
- 提供一个
- 连接释放
- 提供一个
release_connection()
方法,将连接归还池中。 - 若连接已经失效或不可用,可以销毁并尝试补充新的连接。
- 提供一个
3. 连接管理
- 动态调整
- 根据使用量自动伸缩连接数,保持池的灵活性。
- 健康检查
- 定期检测池中的连接是否有效,清理失效连接。
- 连接重试
- 如果连接失效,提供机制自动重新建立连接.
4. 多线程支持
- 确保在多线程环境下,连接池的操作是线程安全的,避免竞争条件或死锁。
5. 错误处理
- 处理连接超时、不可用等异常情况,提供明确的错误信息。
- 支持日志记录,便于问题追踪和调试。
6. 资源释放
- 在程序结束时,能够优雅地关闭所有连接,释放资源。
读写锁
连接支持同时读和竞争写,使用ReentrantLock保证读写操作的安全性
实现示例:
Map<String, Connection> activePool;
Map<String, Connection> idlePool;
Map<String, ReentrantReadWriteLock> lockMap;
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
int idleTimeout = 60000; // 空闲超时时间
public xxConnectionPool {
this.activePool = new ConcurrentHashMap<>(minPoolSize);
this.idlePool = Collections.synchronizedMap(new LinkedHashMap<>(minPoolSize, 0.75f, true));
this.lockMap = new ConcurrentHashMap<>();
}
@PostConstruct
void init {
// 定期清理超时连接
scheduler.scheduleWithFixedDelay(this::cleanupExpiredConnections, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS);
}
Connection getConnection(String connKey) {
// 获取连接时,使用读取锁,允许多个线程并发读取
ReentrantReadWriteLock lock = lockMap.computeIfAbsent(connKey, key -> new ReentrantReadWriteLock());
lock.readLock().lock(); // 获取读锁
ConnectionWrapper conn;
if (idlePool.containsKey(connKey)) {
conn = idlePool.get(connKey);
activePool.put(connKey, idlePool.remove(connKey));
} else {
conn = activePool.computeIfAbsent(connKey, key -> createConnection(connKey));
}
return conn.getConnection;
}
//建立连接
Connection createConnection(String key) {
...
}
//释放连接
void releaseConnection(String key) {
...
}
void close(String key) {
ReentrantReadWriteLock lock = lockMap.get(key);
if (lock == null) return;
lock.readLock().unlock(); // 释放读锁
Lock writeLock = lock.writeLock(); // 获取写锁
if (writeLock.tryLock()) { // 尝试加写锁,失败直接返回
try {
if (!activePool.containsKey(key)) return;
activePool.get(key).updateLastAccessedTime(); // 更新连接访问时间,自定义实现连接包装类
idlePool.put(key, activePool.remove(key)); // 将连接放入空池
} finally {
writeLock.unlock(); // 释放写锁
lockMap.remove(key);
}
}
}
/**
* 定期清理空闲超时连接
*/
void cleanupExpiredConnections() {
long currentTime = System.currentTimeMillis();
if (!idlePool.isEmpty()) {
Iterator<Map.Entry<String, ConnectionWrapper>> iterator = idlePool.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, ConnectionWrapper> entry = iterator.next();
ConnectionWrapper wrapper = entry.getValue();
if (currentTime - wrapper.getLastAccessedTime() > idleTimeout) {
releaseConnection(wrapper.getConnection());
iterator.remove();
} else {
break;
}
}
}
}
// 内部封装连接对象和最后访问时间
@Getter
private static class ConnectionWrapper {
private final Connection connection;
private long lastAccessedTime;
public ConnectionWrapper(Connection connection) {
this.connection = connection;
updateLastAccessedTime();
}
public void updateLastAccessedTime() {
this.lastAccessedTime = System.currentTimeMillis();
}
}