Tomato 的个人博客

记录精彩的程序人生

Open Source, Open Mind,
Open Sight, Open Future!
  menu
8 文章
0 浏览
1 当前访客
ღゝ◡╹)ノ❤️

消息队列连接池实现

目标

  1. 高效管理连接
    动态复用有限的连接资源,避免重复创建和销毁连接的开销,提升性能和资源利用率。
  2. 线程安全
    确保在多线程环境下的连接获取和释放操作是线程安全的。
  3. 动态伸缩
    能够根据需求动态调整池中连接的数量,比如最小连接数、最大连接数,以及连接的超时释放机制。
  4. 连接健康检查
    确保每个连接在取出前可用,提供自动重连功能。(待实现)
  5. 多种消息队列支持(待实现)

功能点

1. 初始化

  • 配置选项
    • 最小连接数
    • 最大连接数
    • 空闲连接的最大存活时间
    • 获取连接的超时时间
    • 消息队列的连接参数
  • 延迟创建
    • 在需要连接时才初始化,而不是在连接池创建时全部生成连接。

2. 连接的获取和释放

  • 连接获取
    • 提供一个get_connection() 方法,阻塞或非阻塞地从池中取出连接。
    • 如果池中没有空闲连接:
      • 若未达到最大连接数限制,创建新连接。
      • 若达到限制,按照超时时间阻塞等待可用连接。
  • 连接释放
    • 提供一个release_connection() 方法,将连接归还池中。
    • 若连接已经失效或不可用,可以销毁并尝试补充新的连接。

3. 连接管理

  • 动态调整
    • 根据使用量自动伸缩连接数,保持池的灵活性。
  • 健康检查
    • 定期检测池中的连接是否有效,清理失效连接。
  • 连接重试
    • 如果连接失效,提供机制自动重新建立连接.

4. 多线程支持

  • 确保在多线程环境下,连接池的操作是线程安全的,避免竞争条件或死锁。

5. 错误处理

  • 处理连接超时、不可用等异常情况,提供明确的错误信息。
  • 支持日志记录,便于问题追踪和调试。

6. 资源释放

  • 在程序结束时,能够优雅地关闭所有连接,释放资源。

读写锁

连接支持同时读和竞争写,使用ReentrantLock保证读写操作的安全性

实现示例:


Map<String, Connection> activePool;
Map<String, Connection> idlePool;
Map<String, ReentrantReadWriteLock> lockMap;
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
int idleTimeout = 60000; // 空闲超时时间

public xxConnectionPool {
    this.activePool = new ConcurrentHashMap<>(minPoolSize);
    this.idlePool = Collections.synchronizedMap(new LinkedHashMap<>(minPoolSize, 0.75f, true));
    this.lockMap = new ConcurrentHashMap<>();
}

@PostConstruct
void init {
    // 定期清理超时连接
    scheduler.scheduleWithFixedDelay(this::cleanupExpiredConnections, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS);
}

Connection getConnection(String connKey) {
        // 获取连接时,使用读取锁,允许多个线程并发读取
        ReentrantReadWriteLock lock = lockMap.computeIfAbsent(connKey, key -> new ReentrantReadWriteLock());
        lock.readLock().lock(); // 获取读锁
        ConnectionWrapper conn;
        if (idlePool.containsKey(connKey)) {
            conn = idlePool.get(connKey);
            activePool.put(connKey, idlePool.remove(connKey));
        } else {
            conn = activePool.computeIfAbsent(connKey, key -> createConnection(connKey));
        }
        return conn.getConnection;
}

//建立连接
Connection createConnection(String key) {
 ... 
}

//释放连接
void releaseConnection(String key) {
... 
}
void close(String key) {
        ReentrantReadWriteLock lock = lockMap.get(key);
        if (lock == null) return;
        lock.readLock().unlock(); // 释放读锁
        Lock writeLock = lock.writeLock(); // 获取写锁
        if (writeLock.tryLock()) { // 尝试加写锁,失败直接返回
            try {
                if (!activePool.containsKey(key)) return;
                activePool.get(key).updateLastAccessedTime(); // 更新连接访问时间,自定义实现连接包装类
                idlePool.put(key, activePool.remove(key)); // 将连接放入空池
            } finally {
                writeLock.unlock(); // 释放写锁
                lockMap.remove(key);
            }
        }
}

    /**
     * 定期清理空闲超时连接
     */
   void cleanupExpiredConnections() {
        long currentTime = System.currentTimeMillis();
        if (!idlePool.isEmpty()) {
            Iterator<Map.Entry<String, ConnectionWrapper>> iterator = idlePool.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, ConnectionWrapper> entry = iterator.next();
                ConnectionWrapper wrapper = entry.getValue();
                if (currentTime - wrapper.getLastAccessedTime() > idleTimeout) {
                    releaseConnection(wrapper.getConnection());
                    iterator.remove();
                } else {
                    break;
                }
            }
        }
    }

    // 内部封装连接对象和最后访问时间
    @Getter
    private static class ConnectionWrapper {
        private final Connection connection;
        private long lastAccessedTime;

        public ConnectionWrapper(Connection connection) {
            this.connection = connection;
            updateLastAccessedTime();
        }

        public void updateLastAccessedTime() {
            this.lastAccessedTime = System.currentTimeMillis();
        }
    }


标题:消息队列连接池实现
作者:MrTomato
地址:https://isee-you.icu/articles/2024/12/16/1734337019981.html