pymysqlpool-Python连接Mysql的线程安全数据库池

代码演示

做web开发选择一个线程安全的数据库连接代码还是非常有必要的,可以自己写一个,也可以用别人的轮子,这里为大家分享的就是github上的一个项目。

项目地址:github:https://github.com/luvvien/pymysqlpool

MySQL 数据库连接池组件

pymysqlpool 是数据库工具包中新成员,目的是能提供一个实用的数据库连接池中间件,从而避免在应用中频繁地创建和释放数据库连接资源。

功能

  1. 连接池本身是线程安全的,可在多线程环境下使用,不必担心连接资源被多个线程共享的问题;
  2. 提供尽可能紧凑的接口用于数据库操作;
  3. 连接池的管理位于包内完成,客户端可以通过接口获取池中的连接资源(返回 pymysql.Connection);
  4. 将最大程度地与 dataobj 等兼容,便于使用;
  5. 连接池本身具备动态增加连接数的功能,即 max_pool_size 和 step_size 会用于控制每次增加的连接数和最大连接数;
  6. 连接池最大连接数亦动态增加,需要开启 enable_auto_resize 开关,此后当任何一次连接获取超时发生,均记为一次惩罚,并且将 max_pool_size 扩大一定倍数。

基本工作流程

注意,当多线程同时请求时,若池中没有可用的连接对象,则需要排队等待

  1. 初始化后优先创建一个连接对象,放在连接池中;
  2. 客户端请求连接对象,连接池会从中挑选最近没使用的连接对象返回(同时会检查连接是否正常);
  3. 客户端使用连接对象,执行相应操作后,调用接口返回连接对象;
  4. 连接池回收连接对象,并将其加入池中的队列,供其它请求使用。
|——–|                                |————–| |        | <==borrow connection object==  | Pool manager | | Client |                                |              | |        | ==return connection object==>  |  FIFO queue  | |——–|                                |————–|

参数配置

  • pool_name: 连接池的名称,多种连接参数对应多个不同的连接池对象,多单例模式;
  • host: 数据库地址
  • user: 数据库服务器用户名
  • password: 用户密码
  • database: 默认选择的数据库
  • port: 数据库服务器的端口
  • charset: 字符集,默认为 ‘utf8’
  • use_dict_cursor: 使用字典格式或者元组返回数据;
  • max_pool_size: 连接池优先最大连接数;
  • enable_auto_resize: 是否动态扩展连接池,即当超过 max_pool_size 时,自动扩展 max_pool_size;
  • pool_resize_boundary: 该配置为连接池最终可以增加的上上限大小,即时扩展也不可超过该值;
  • auto_resize_scale: 自动扩展 max_pool_size 的增益,默认为 1.5 倍扩展;
  • defer_connect_pool: 是否延迟连接到连接池,当该值为 True 时,需要显示调用 pool.connect 进行连接;
  • kwargs: 其他配置参数将会在创建连接对象时传递给 pymysql.Connection。

# -*-coding: utf-8-*-
# Author : Christopher Lee
# License: MIT License
# File   : pool.py
# Date   : 2017-06-15 14-09
# Version: 0.1
# Description: connection pool manager.

import logging
import threading
import contextlib

from pymysql.connections import Connection
from pymysql.cursors import DictCursor, Cursor

from pymysqlpool.pool import PoolContainer, PoolIsFullException, PoolIsEmptyException

__version__ = '0.1'
__author__ = 'Chris'

logger = logging.getLogger('pymysqlpool')

__all__ = ['MySQLConnectionPool']


class NoFreeConnectionFoundError(Exception):
    pass


class PoolBoundaryExceedsError(Exception):
    pass


class MySQLConnectionPool(object):
    """
    A connection pool manager.
    """

    def __init__(self, pool_name, host=None, user=None, password="", database=None, port=3306,
                 charset='utf8', use_dict_cursor=True, max_pool_size=16,
                 enable_auto_resize=True, auto_resize_scale=1.5,
                 pool_resize_boundary=48,
                 defer_connect_pool=False, **kwargs):

        """
        Initialize the connection pool.

        Update: 2017.06.19
            1. remove `step_size` argument
            2. remove `wait_timeout` argument

        :param pool_name: a unique pool_name for this connection pool.
        :param host: host to your database server
        :param user: username to your database server
        :param password: password to access the database server
        :param database: select a default database(optional)
        :param port: port of your database server
        :param charset: default charset is 'utf8'
        :param use_dict_cursor: whether to use a dict cursor instead of a default one
        :param max_pool_size: maximum connection pool size (max pool size can be changed dynamically)
        :param enable_auto_resize: if set to True, the max_pool_size will be changed dynamically
        :param pool_resize_boundary: !!this is related to the max connections of your mysql server!!
        :param auto_resize_scale: `max_pool_size * auto_resize_scale` is the new max_pool_size.
                                The max_pool_size will be changed dynamically only if `enable_auto_resize` is True.
        :param defer_connect_pool: don't connect to pool on construction, wait for explicit call. Default is False.
        :param kwargs: other keyword arguments to be passed to `pymysql.Connection`
        """
        # config for a database connection
        self._host = host
        self._user = user
        self._password = password
        self._database = database
        self._port = port
        self._charset = charset
        self._cursor_class = DictCursor if use_dict_cursor else Cursor
        self._other_kwargs = kwargs

        # config for the connection pool
        self._pool_name = pool_name
        self._max_pool_size = max_pool_size if max_pool_size < pool_resize_boundary else pool_resize_boundary
        # self._step_size = step_size
        self._enable_auto_resize = enable_auto_resize
        self._pool_resize_boundary = pool_resize_boundary
        if auto_resize_scale < 1:
            raise ValueError(
                "Invalid scale {}, must be bigger than 1".format(auto_resize_scale))

        self._auto_resize_scale = int(round(auto_resize_scale, 0))
        # self.wait_timeout = wait_timeout
        self._pool_container = PoolContainer(self._max_pool_size)

        self.__safe_lock = threading.RLock()
        self.__is_killed = False
        self.__is_connected = False

        if not defer_connect_pool:
            self.connect()

    def __repr__(self):
        return '<MySQLConnectionPool ' \
               'name={!r}, size={!r}>'.format(self.pool_name, self.size)

    def __del__(self):
        self.close()

    def __iter__(self):
        """Iterate each connection item"""
        return iter(self._pool_container)

    @property
    def pool_name(self):
        return self._pool_name

    @property
    def pool_size(self):
        return self._pool_container.pool_size

    @property
    def free_size(self):
        return self._pool_container.free_size

    @property
    def size(self):
        return '<boundary={}, max={}, current={}, free={}>'.format(self._pool_resize_boundary,
                                                                   self._max_pool_size,
                                                                   self.pool_size,
                                                                   self.free_size)

    @contextlib.contextmanager
    def cursor(self, cursor=None):
        """Shortcut to get a cursor object from a free connection.
        It's not that efficient to get cursor object in this way for
        too many times.
        """
        with self.connection(autocommit=True) as conn:
            assert isinstance(conn, Connection)
            cursor = conn.cursor(cursor)

            try:
                yield cursor
            except Exception as err:
                conn.rollback()
                raise err
            finally:
                cursor.close()

    @contextlib.contextmanager
    def connection(self, autocommit=False):
        conn = self.borrow_connection()
        assert isinstance(conn, Connection)
        old_value = conn.get_autocommit()
        conn.autocommit(autocommit)
        try:
            yield conn
        except Exception as err:
            # logger.error(err, exc_info=True)
            raise err
        finally:
            conn.autocommit(old_value)
            self.return_connection(conn)

    def connect(self):
        """Connect to this connection pool
        """
        if self.__is_connected:
            return

        logger.info('[{}] Connect to connection pool'.format(self))

        test_conn = self._create_connection()
        try:
            test_conn.ping()
        except Exception as err:
            raise err
        else:
            with self.__safe_lock:
                self.__is_connected = True

            self._adjust_connection_pool()
        finally:
            test_conn.close()

    def close(self):
        """Close this connection pool"""
        try:
            logger.info('[{}] Close connection pool'.format(self))
        except Exception:
            pass

        with self.__safe_lock:
            if self.__is_killed is True:
                return True

        self._free()

        with self.__safe_lock:
            self.__is_killed = True

    def borrow_connection(self):
        """
        Get a free connection item from current pool. It's a little confused here, but it works as expected now.
        """
        block = False

        while True:
            conn = self._borrow(block)
            if conn is None:
                block = not self._adjust_connection_pool()
            else:
                return conn

    def _borrow(self, block):
        try:
            connection = self._pool_container.get(block, None)
        except PoolIsEmptyException:
            return None
        else:
            # check if the connection is alive or not
            connection.ping(reconnect=True)
            return connection

    def return_connection(self, connection):
        """Return a connection to the pool"""
        return self._pool_container.return_(connection)

    def _adjust_connection_pool(self):
        """
        Adjust the connection pool.
        """
        # Create several new connections
        logger.debug('[{}] Adjust connection pool, '
                     'current size is "{}"'.format(self, self.size))

        if self.pool_size >= self._max_pool_size:
            if self._enable_auto_resize:
                self._adjust_max_pool_size()

        try:
            connection = self._create_connection()
        except Exception as err:
            logger.error(err)
            return False
        else:
            try:
                self._pool_container.add(connection)
            except PoolIsFullException:
                # logger.debug('[{}] Connection pool is full now'.format(self.pool_name))
                return False
            else:
                return True

    def _adjust_max_pool_size(self):
        with self.__safe_lock:
            self._max_pool_size *= self._auto_resize_scale
            if self._max_pool_size > self._pool_resize_boundary:
                self._max_pool_size = self._pool_resize_boundary
            logger.debug('[{}] Max pool size adjusted to {}'.format(self, self._max_pool_size))
            self._pool_container.max_pool_size = self._max_pool_size

    def _free(self):
        """
        Release all the connections in the pool
        """
        for connection in self:
            try:
                connection.close()
            except Exception as err:
                _ = err

    def _create_connection(self):
        """Create a pymysql connection object
        """
        return Connection(host=self._host,
                          user=self._user,
                          password=self._password,
                          database=self._database,
                          port=self._port,
                          charset=self._charset,
                          cursorclass=self._cursor_class,
                          **self._other_kwargs)

代码下载

链接:https://pan.baidu.com/s/1-Aha_cOAEEk5eIbekqUmAQ 

提取码:ltkj 

关于我

网名:大T,【大T笔记】站长,12年互联网从业经验,2018年开启全栈自由职业之路,专注互联网创业、网络营销推广、个人IP打造、私域流量、自媒体、短视频、知识付费等各类互联网掘金项目。请点击了解更多我的经历

大T笔记所有文章均为本人原创,转载请您注明来源,并留下原文链接地址,是对我的尊重,也是对知识的尊重,谢谢!
大T笔记 » pymysqlpool-Python连接Mysql的线程安全数据库池

大T笔记-我的个人互联网创业和自由职业之路

关于我 我的百宝箱
享更多特权,建议使用 QQ 登录