Django Connection Pool

通常简单的 Django 程序并不需要考虑数据库连接池,但由于我们项目使用了大量的多线程任务,每个线程都会开启自己的数据库连接,因此频繁的创建和关闭数据库连接成为了系统的一个瓶颈。

使用数据库连接池能够有效的降低数据库连接的数量和频率。

在 Django 的官方文档中提到,mysqlclient 和 MySQL Connector/Python 两个 MySql 的 driver 提供了连接池的能力。

为了测试连接池确实生效,我找到一个可行的方法是使用 Wireshark 抓包,监控 Django 程序与 MySql 建立了多少个连接。

在 Wireshark 中可以使用下面的条件将端口 3309 的所有建立连接的包(TCP建立连接的三次握手第一个包是 SYN 包)筛选出来:

tcp.dstport == 3309 and tcp.flags.syn == 1

在没有连接池的时候,我的项目刷新首页时,抓包的结果如下:

8797	5.433141	127.0.0.1	127.0.0.1	TCP	68	56707 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=993610705 TSecr=0 SACK_PERM
8831	5.450387	127.0.0.1	127.0.0.1	TCP	68	56708 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=698213627 TSecr=0 SACK_PERM
8975	5.496565	127.0.0.1	127.0.0.1	TCP	68	56711 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=818685765 TSecr=0 SACK_PERM
9107	5.683560	127.0.0.1	127.0.0.1	TCP	68	56714 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3144230711 TSecr=0 SACK_PERM
9161	5.726263	127.0.0.1	127.0.0.1	TCP	68	56716 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3320673497 TSecr=0 SACK_PERM
9301	5.785169	127.0.0.1	127.0.0.1	TCP	68	56719 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=899920607 TSecr=0 SACK_PERM
9373	6.498582	127.0.0.1	127.0.0.1	TCP	68	56722 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3196184111 TSecr=0 SACK_PERM
9473	7.435943	127.0.0.1	127.0.0.1	TCP	68	56724 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=1471022748 TSecr=0 SACK_PERM
10887	7.940869	127.0.0.1	127.0.0.1	TCP	68	56806 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=916530966 TSecr=0 SACK_PERM
11035	8.010046	127.0.0.1	127.0.0.1	TCP	68	56815 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=257879139 TSecr=0 SACK_PERM
11069	8.019687	127.0.0.1	127.0.0.1	TCP	68	56816 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=357996506 TSecr=0 SACK_PERM
11095	8.024085	127.0.0.1	127.0.0.1	TCP	68	56817 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3462932328 TSecr=0 SACK_PERM
11180	8.106833	127.0.0.1	127.0.0.1	TCP	68	56818 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=388620014 TSecr=0 SACK_PERM
11241	8.200372	127.0.0.1	127.0.0.1	TCP	68	56821 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3503724435 TSecr=0 SACK_PERM
11277	8.209429	127.0.0.1	127.0.0.1	TCP	68	56822 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=1364832109 TSecr=0 SACK_PERM
11461	8.251246	127.0.0.1	127.0.0.1	TCP	68	56825 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=2043783406 TSecr=0 SACK_PERM
12070	8.779165	127.0.0.1	127.0.0.1	TCP	68	56844 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=4227265927 TSecr=0 SACK_PERM

共有17次数据库连接建立,与实际的 API 接口请求次数相同,也就是每一次 API 请求创建一个数据库连接。

为了调试在启用连接池后 SQL 是否有什么问题,可以将 MySQL 的所有查询输出到一个文件方便检查:

SET GLOBAL general_log = 'ON';
SET GLOBAL log_output = 'FILE';
SET GLOBAL general_log_file = '/some_dir/mysql_general.log';
SHOW VARIABLES LIKE 'general_log%';

CONN_MAX_AGE#

根据Django的文档,CONN_MAX_AGE 变量设置为大于 0 的值后,Django 会启用长连接(Persistent connections),然而我实测没有效果,无论是否开启该设置,数据库连接数量没有变化。

具体的原因和机制有待后续调研。

mysqlclient#

这是 Django 官方推荐的 MySql 连接库,它本身是将 MySql 的 C 库进行了一个简单的封装,底层还是调用 C 库,因此在安装时需要系统先安装了 MySql 的 C 的连接库。

Django 官方文档上说其具有连接池功能,但我在其官方文档上没有找到如何开启和设置。

经过实际测试发现使用该库数据库连接数量不变,依然是每个请求会产生一个数据库连接。

MySQL Connector/Python#

这是 MySQL 官方提供的使用纯 Python 实现的连接库,安装时无需其它依赖。

它的文档中对于如何使用连接池有明确的说明。

它本身也提供了 Django Backend,但自带的 Django Backend 无法配置和启用连接池功能。

将它的 Backend 代码拷贝出来,手工魔改启用连接池后,可以看到数据库连接数明显降低。

实测刷新首页时,17 个 API 请求只有 4 个数据库连接,连接数明显降低。

14918	43.102250	127.0.0.1	127.0.0.1	TCP	68	62940 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=601162343 TSecr=0 SACK_PERM
15108	43.201124	127.0.0.1	127.0.0.1	TCP	68	62943 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=1236611962 TSecr=0 SACK_PERM
15388	45.796693	127.0.0.1	127.0.0.1	TCP	68	62959 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=366009269 TSecr=0 SACK_PERM
18323	47.241159	127.0.0.1	127.0.0.1	TCP	68	63068 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3329381954 TSecr=0 SACK_PERM

启用连接池#

由于其自带的 Django Backend 不支持连接池,因此需要魔改。

先将其自带的 Django Backend 代码拷贝到项目目录下:

cp -r ../env/lib/python3.10/site-packages/mysql/connector/django your_project_root/mysql_connector_pool

之后修改 Django settings.py 将 DB 的 engine 指向新的地址:

DATABASES = {
    'default': {
        'ENGINE': 'mysql_connector_pool',
        ...

魔改其代码,在创建连接时使用连接池,编辑 base.py 文件,因为是测试,这里只简单的使用固定连接池配置:

def get_new_connection(
    self, conn_params: Dict[str, Any]
) -> Union[PooledMySQLConnection, "MySQLConnectionAbstract"]:
    if "converter_class" not in conn_params:
        conn_params["converter_class"] = DjangoMySQLConverter

    # cnx = mysql.connector.connect(**conn_params)
    cnx = mysql.connector.connect(pool_name='mypool', pool_size=32, **conn_params)

    return cnx

在其代码内部限制了连接池的最大连接数为 32,如果设置的 pool_size 超过这个值会报错,具体为什么限制尚不清楚。

这样修改后连接池就会生效了,但是在进行更新操作时会发现有时候无法更新,通过查看数据库的SQL日志发现其没有正确设置 autocommit。

-- 正常的连接会设置 autocommit
2025-01-02T09:04:23.452529Z	   81 Connect	your_db@172.20.0.1 on secker using SSL/TLS
2025-01-02T09:04:23.453540Z	   81 Query	SET NAMES 'utf8mb4' COLLATE 'utf8mb4_general_ci'
2025-01-02T09:04:23.454465Z	   81 Query	SET @@session.autocommit = OFF
2025-01-02T09:04:23.455438Z	   81 Query	SET sql_mode='STRICT_TRANS_TABLES'
2025-01-02T09:04:23.456311Z	   81 Query	SET @@session.autocommit = ON

-- 魔改后没有设置 autocommit
2025-01-02T08:38:29.923688Z	    3 Connect	your_db@172.20.0.1 on secker using SSL/TLS
2025-01-02T08:38:29.924858Z	    3 Query	SET NAMES 'utf8mb4' COLLATE 'utf8mb4_general_ci'
2025-01-02T08:38:29.925871Z	    3 Query	SET @@session.autocommit = OFF
2025-01-02T08:38:29.933270Z	    3 Query	SET sql_mode='STRICT_TRANS_TABLES'

MySQL 的相关文档:https://dev.mysql.com/doc/refman/5.7/en/innodb-autocommit-commit-rollback.html

为什么缺失了后面一句设置 autocommit 的深层原因不明,为了快速测试,在 base.py 文件中的 DatabaseWrapper 类的 get_connection_params 方法中加入代码:

def get_connection_params(self) -> Dict[str, Any]:
    kwargs = {
        "charset": "utf8",
        "use_unicode": True,
        "buffered": False,
        "consume_results": True,
    }

    settings_dict = self.settings_dict

    if settings_dict["USER"]:
        kwargs["user"] = settings_dict["USER"]
    if settings_dict["NAME"]:
        kwargs["database"] = settings_dict["NAME"]
    if settings_dict["PASSWORD"]:
        kwargs["passwd"] = settings_dict["PASSWORD"]
    if settings_dict["HOST"].startswith("/"):
        kwargs["unix_socket"] = settings_dict["HOST"]
    elif settings_dict["HOST"]:
        kwargs["host"] = settings_dict["HOST"]
    if settings_dict["PORT"]:
        kwargs["port"] = int(settings_dict["PORT"])
    ##### 加入代码判断连接的 autocommit 配置并设置对应的连接参数
    if settings_dict["AUTOCOMMIT"]:
        kwargs["autocommit"] = settings_dict["AUTOCOMMIT"]
    #####
    if settings_dict.get("OPTIONS", {}).get("init_command"):
        kwargs["init_command"] = settings_dict["OPTIONS"]["init_command"]
    ...

修改后虽然正常的数据库更新可以工作,但代码中启动的事务会出现异常:

django.db.utils.ProgrammingError: (1305, '1305 (42000): SAVEPOINT s4450145792_x1 does not exist', '42000')

django-db-connection-pool#

https://github.com/altairbow/django-db-connection-pool

通过 SQLAlchemy 实现连接池的一个开源项目,最新版本是2024年4月6号发布,看起来还算活跃。

其在默认不做任何设置时,默认连接池大小为10个连接并允许再多创建10个连接,20个连接之后再连接会报错:

sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/20/3o7r)

SQLAlchemy的连接池也不存在大小的限制,实测设置到100也可以正常工作。

经过测试发现,django-db-connection-pool 在 MySql 关闭了连接后,获取连接时会抛出异常,之后连接可以正确重新建立,但是当时的操作会失败。

Traceback (most recent call last):
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 716, in _do_ping_w_event
    return self.do_ping(dbapi_connection)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 742, in do_ping
    cursor.execute(self._dialect_specific_select_one)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/MySQLdb/connections.py", line 265, in query
    _mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/zja/Projects/example/example/uc/middleware.py", line 133, in process_request
    user = get_user_by_token(request)
  File "/Users/zja/Projects/example/example/uc/auth.py", line 173, in get_user_by_token
    return UserModel.objects.select_related("extension").get(username=jwt_token.get("username"))
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/query.py", line 646, in get
    num = len(clone)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/query.py", line 376, in __len__
    self._fetch_all()
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/query.py", line 1867, in _fetch_all
    self._result_cache = list(self._iterable_class(self))
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/query.py", line 87, in __iter__
    results = compiler.execute_sql(
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/sql/compiler.py", line 1396, in execute_sql
    cursor = self.connection.cursor()
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/utils/asyncio.py", line 26, in inner
    return func(*args, **kwargs)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/backends/base/base.py", line 323, in cursor
    return self._cursor()
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/backends/base/base.py", line 299, in _cursor
    self.ensure_connection()
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/utils/asyncio.py", line 26, in inner
    return func(*args, **kwargs)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/backends/base/base.py", line 282, in ensure_connection
    self.connect()
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/utils/asyncio.py", line 26, in inner
    return func(*args, **kwargs)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/backends/base/base.py", line 263, in connect
    self.connection = self.get_new_connection(conn_params)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/dj_db_conn_pool/core/mixins/core.py", line 106, in get_new_connection
    conn = db_pool.connect()
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 449, in connect
    return _ConnectionFairy._checkout(self)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1362, in _checkout
    with util.safe_reraise():
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1300, in _checkout
    result = pool._dialect._do_ping_w_event(
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 718, in _do_ping_w_event
    is_disconnect = self.is_disconnect(err, dbapi_connection, None)
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/base.py", line 2688, in is_disconnect
    ) and self._extract_error_code(e) in (
  File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/base.py", line 2734, in _extract_error_code
    raise NotImplementedError()

原因是当启动 pre_ping 机制时,获取连接时 sqlAlchemy 会检查该连接是否可用,检查时发现错误代码中会去确定是否是连接错误,如果是连接错误则进行重连:

# sqlalchemy/engine/default.py:714
def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
    try:
        return self.do_ping(dbapi_connection)
    except self.loaded_dbapi.Error as err:
        is_disconnect = self.is_disconnect(err, dbapi_connection, None)
    ...

在判断是否连接断开时,会调用 _extract_error_code 函数:

# sqlalchemy/dialects/mysql/base.py:2680
def is_disconnect(self, e, connection, cursor):
    if isinstance(
        e,
        (
            self.dbapi.OperationalError,
            self.dbapi.ProgrammingError,
            self.dbapi.InterfaceError,
        ),
    ) and self._extract_error_code(e) in (
        1927,
        2006,
        2013,
        2014,
        2045,
        2055,
        4031,
    ):
        return True
    ...
# sqlalchemy/dialects/mysql/base.py:2733
def _extract_error_code(self, exception):
    raise NotImplementedError()

该函数在 mysql 的 dialect 基类中未实现,根本原因是在 django-db-connection-pool 库中使用 SqlAlchemy 时使用了 MySQLDialect 类,该类是一个基类,不应该直接使用。

经过测试将 MySQLDialect 类改为 MySQLDialect_mysqldb 类则可以正常处理断连问题。SqlAlchemy 的 Dialect 需要根据底层使用的 driver 来选择对应的实现。

我们项目底层使用的 mysqlclient 连接数据库,则应该选用 MySQLDialect_mysqldb 类。

为了修复这个问题需要修改 django-db-connection-pool 的代码,没有找到更优雅的方案。

重新造轮子#

通过一圈的调研后发现,使用 SqlAlchemy 来实现连接池相对比较简单,只需要一个很薄的封装即可。

先在 Django 项目中创建一个目录,名字随便取,这里我使用了:mysql_pool_engine

创建 init.py 和 base.py 两个文件

在 base.py 文件中输入下面的代码:

from threading import Lock

from django.db.backends.mysql import base
from django.utils.asyncio import async_unsafe
from sqlalchemy import URL, create_pool_from_url


class DatabaseWrapper(base.DatabaseWrapper):
    _connect_pools = {}
    _lock = Lock()

    def get_url_params(self):
        param_mapping = {
            'username': 'USER',
            'password': 'PASSWORD',
            'host': 'HOST',
            'port': 'PORT',
            'database': 'NAME',
        }
        url_params = {k: self.settings_dict[v] for k, v in param_mapping.items() if v in self.settings_dict}
        # modify there if using other database
        url_params['drivername'] = 'mysql'
        return url_params

    def get_pool_params(self):
        return self.settings_dict.get('POOL_OPTIONS', {})

    def __init__(self, settings_dict, alias):
        super().__init__(settings_dict, alias)

        with self._lock:
            if alias not in DatabaseWrapper._connect_pools:
                url = URL.create(**self.get_url_params())
                DatabaseWrapper._connect_pools[alias] = create_pool_from_url(url, **self.get_pool_params())

    @async_unsafe
    def get_new_connection(self, conn_params):
        return DatabaseWrapper._connect_pools[self.alias].connect()

如果使用的是其它数据库,需要修改 drivername 的值。

之后修改 Django 的 settings.py 文件:

DATABASES = {
    'default': {
        'ENGINE': 'mysql_pool_engine',
        'NAME': 'your_database_name',
        'USER': 'your_database_user',
        'PASSWORD': 'your_database_password',
        'HOST': 'your_database_host',
        'PORT': 'your_database_port',
        'POOL_OPTIONS': {
            'pool_size': 10,
            'max_overflow': -1,
            'pool_recycle': -1,
            'pool_pre_ping': True
        },
    },

将 ENGINE 修改为创建的模块名称,这里我使用的是 mysql_pool_engine。

POOL_OPTIONS 里面配置的就是连接池的配置,具体的设置同 create_engine 的参数:https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine

之后就可以使用数据库连接池了。

comments powered by Disqus