Django Connection Pool
通常简单的 Django 程序并不需要考虑数据库连接池,但由于我们项目使用了大量的多线程任务,每个线程都会开启自己的数据库连接,因此频繁的创建和关闭数据库连接成为了系统的一个瓶颈。
使用数据库连接池能够有效的降低数据库连接的数量和频率。
在 Django 的官方文档中提到,mysqlclient 和 MySQL Connector/Python 两个 MySql 的 driver 提供了连接池的能力。
为了测试连接池确实生效,我找到一个可行的方法是使用 Wireshark 抓包,监控 Django 程序与 MySql 建立了多少个连接。
在 Wireshark 中可以使用下面的条件将端口 3309 的所有建立连接的包(TCP建立连接的三次握手第一个包是 SYN 包)筛选出来:
tcp.dstport == 3309 and tcp.flags.syn == 1
在没有连接池的时候,我的项目刷新首页时,抓包的结果如下:
8797 5.433141 127.0.0.1 127.0.0.1 TCP 68 56707 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=993610705 TSecr=0 SACK_PERM
8831 5.450387 127.0.0.1 127.0.0.1 TCP 68 56708 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=698213627 TSecr=0 SACK_PERM
8975 5.496565 127.0.0.1 127.0.0.1 TCP 68 56711 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=818685765 TSecr=0 SACK_PERM
9107 5.683560 127.0.0.1 127.0.0.1 TCP 68 56714 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3144230711 TSecr=0 SACK_PERM
9161 5.726263 127.0.0.1 127.0.0.1 TCP 68 56716 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3320673497 TSecr=0 SACK_PERM
9301 5.785169 127.0.0.1 127.0.0.1 TCP 68 56719 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=899920607 TSecr=0 SACK_PERM
9373 6.498582 127.0.0.1 127.0.0.1 TCP 68 56722 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3196184111 TSecr=0 SACK_PERM
9473 7.435943 127.0.0.1 127.0.0.1 TCP 68 56724 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=1471022748 TSecr=0 SACK_PERM
10887 7.940869 127.0.0.1 127.0.0.1 TCP 68 56806 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=916530966 TSecr=0 SACK_PERM
11035 8.010046 127.0.0.1 127.0.0.1 TCP 68 56815 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=257879139 TSecr=0 SACK_PERM
11069 8.019687 127.0.0.1 127.0.0.1 TCP 68 56816 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=357996506 TSecr=0 SACK_PERM
11095 8.024085 127.0.0.1 127.0.0.1 TCP 68 56817 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3462932328 TSecr=0 SACK_PERM
11180 8.106833 127.0.0.1 127.0.0.1 TCP 68 56818 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=388620014 TSecr=0 SACK_PERM
11241 8.200372 127.0.0.1 127.0.0.1 TCP 68 56821 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3503724435 TSecr=0 SACK_PERM
11277 8.209429 127.0.0.1 127.0.0.1 TCP 68 56822 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=1364832109 TSecr=0 SACK_PERM
11461 8.251246 127.0.0.1 127.0.0.1 TCP 68 56825 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=2043783406 TSecr=0 SACK_PERM
12070 8.779165 127.0.0.1 127.0.0.1 TCP 68 56844 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=4227265927 TSecr=0 SACK_PERM
共有17次数据库连接建立,与实际的 API 接口请求次数相同,也就是每一次 API 请求创建一个数据库连接。
为了调试在启用连接池后 SQL 是否有什么问题,可以将 MySQL 的所有查询输出到一个文件方便检查:
SET GLOBAL general_log = 'ON';
SET GLOBAL log_output = 'FILE';
SET GLOBAL general_log_file = '/some_dir/mysql_general.log';
SHOW VARIABLES LIKE 'general_log%';
CONN_MAX_AGE#
根据Django的文档,CONN_MAX_AGE 变量设置为大于 0 的值后,Django 会启用长连接(Persistent connections),然而我实测没有效果,无论是否开启该设置,数据库连接数量没有变化。
具体的原因和机制有待后续调研。
mysqlclient#
这是 Django 官方推荐的 MySql 连接库,它本身是将 MySql 的 C 库进行了一个简单的封装,底层还是调用 C 库,因此在安装时需要系统先安装了 MySql 的 C 的连接库。
Django 官方文档上说其具有连接池功能,但我在其官方文档上没有找到如何开启和设置。
经过实际测试发现使用该库数据库连接数量不变,依然是每个请求会产生一个数据库连接。
MySQL Connector/Python#
这是 MySQL 官方提供的使用纯 Python 实现的连接库,安装时无需其它依赖。
在它的文档中对于如何使用连接池有明确的说明。
它本身也提供了 Django Backend,但自带的 Django Backend 无法配置和启用连接池功能。
将它的 Backend 代码拷贝出来,手工魔改启用连接池后,可以看到数据库连接数明显降低。
实测刷新首页时,17 个 API 请求只有 4 个数据库连接,连接数明显降低。
14918 43.102250 127.0.0.1 127.0.0.1 TCP 68 62940 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=601162343 TSecr=0 SACK_PERM
15108 43.201124 127.0.0.1 127.0.0.1 TCP 68 62943 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=1236611962 TSecr=0 SACK_PERM
15388 45.796693 127.0.0.1 127.0.0.1 TCP 68 62959 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=366009269 TSecr=0 SACK_PERM
18323 47.241159 127.0.0.1 127.0.0.1 TCP 68 63068 → 3309 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=3329381954 TSecr=0 SACK_PERM
启用连接池#
由于其自带的 Django Backend 不支持连接池,因此需要魔改。
先将其自带的 Django Backend 代码拷贝到项目目录下:
cp -r ../env/lib/python3.10/site-packages/mysql/connector/django your_project_root/mysql_connector_pool
之后修改 Django settings.py 将 DB 的 engine 指向新的地址:
DATABASES = {
'default': {
'ENGINE': 'mysql_connector_pool',
...
魔改其代码,在创建连接时使用连接池,编辑 base.py 文件,因为是测试,这里只简单的使用固定连接池配置:
def get_new_connection(
self, conn_params: Dict[str, Any]
) -> Union[PooledMySQLConnection, "MySQLConnectionAbstract"]:
if "converter_class" not in conn_params:
conn_params["converter_class"] = DjangoMySQLConverter
# cnx = mysql.connector.connect(**conn_params)
cnx = mysql.connector.connect(pool_name='mypool', pool_size=32, **conn_params)
return cnx
在其代码内部限制了连接池的最大连接数为 32,如果设置的 pool_size 超过这个值会报错,具体为什么限制尚不清楚。
这样修改后连接池就会生效了,但是在进行更新操作时会发现有时候无法更新,通过查看数据库的SQL日志发现其没有正确设置 autocommit。
-- 正常的连接会设置 autocommit
2025-01-02T09:04:23.452529Z 81 Connect your_db@172.20.0.1 on secker using SSL/TLS
2025-01-02T09:04:23.453540Z 81 Query SET NAMES 'utf8mb4' COLLATE 'utf8mb4_general_ci'
2025-01-02T09:04:23.454465Z 81 Query SET @@session.autocommit = OFF
2025-01-02T09:04:23.455438Z 81 Query SET sql_mode='STRICT_TRANS_TABLES'
2025-01-02T09:04:23.456311Z 81 Query SET @@session.autocommit = ON
-- 魔改后没有设置 autocommit
2025-01-02T08:38:29.923688Z 3 Connect your_db@172.20.0.1 on secker using SSL/TLS
2025-01-02T08:38:29.924858Z 3 Query SET NAMES 'utf8mb4' COLLATE 'utf8mb4_general_ci'
2025-01-02T08:38:29.925871Z 3 Query SET @@session.autocommit = OFF
2025-01-02T08:38:29.933270Z 3 Query SET sql_mode='STRICT_TRANS_TABLES'
MySQL 的相关文档:https://dev.mysql.com/doc/refman/5.7/en/innodb-autocommit-commit-rollback.html
为什么缺失了后面一句设置 autocommit 的深层原因不明,为了快速测试,在 base.py 文件中的 DatabaseWrapper 类的 get_connection_params 方法中加入代码:
def get_connection_params(self) -> Dict[str, Any]:
kwargs = {
"charset": "utf8",
"use_unicode": True,
"buffered": False,
"consume_results": True,
}
settings_dict = self.settings_dict
if settings_dict["USER"]:
kwargs["user"] = settings_dict["USER"]
if settings_dict["NAME"]:
kwargs["database"] = settings_dict["NAME"]
if settings_dict["PASSWORD"]:
kwargs["passwd"] = settings_dict["PASSWORD"]
if settings_dict["HOST"].startswith("/"):
kwargs["unix_socket"] = settings_dict["HOST"]
elif settings_dict["HOST"]:
kwargs["host"] = settings_dict["HOST"]
if settings_dict["PORT"]:
kwargs["port"] = int(settings_dict["PORT"])
##### 加入代码判断连接的 autocommit 配置并设置对应的连接参数
if settings_dict["AUTOCOMMIT"]:
kwargs["autocommit"] = settings_dict["AUTOCOMMIT"]
#####
if settings_dict.get("OPTIONS", {}).get("init_command"):
kwargs["init_command"] = settings_dict["OPTIONS"]["init_command"]
...
修改后虽然正常的数据库更新可以工作,但代码中启动的事务会出现异常:
django.db.utils.ProgrammingError: (1305, '1305 (42000): SAVEPOINT s4450145792_x1 does not exist', '42000')
django-db-connection-pool#
https://github.com/altairbow/django-db-connection-pool
通过 SQLAlchemy 实现连接池的一个开源项目,最新版本是2024年4月6号发布,看起来还算活跃。
其在默认不做任何设置时,默认连接池大小为10个连接并允许再多创建10个连接,20个连接之后再连接会报错:
sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: https://sqlalche.me/e/20/3o7r)
SQLAlchemy的连接池也不存在大小的限制,实测设置到100也可以正常工作。
经过测试发现,django-db-connection-pool 在 MySql 关闭了连接后,获取连接时会抛出异常,之后连接可以正确重新建立,但是当时的操作会失败。
Traceback (most recent call last):
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 716, in _do_ping_w_event
return self.do_ping(dbapi_connection)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 742, in do_ping
cursor.execute(self._dialect_specific_select_one)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
res = self._query(mogrified_query)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
db.query(q)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/MySQLdb/connections.py", line 265, in query
_mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/zja/Projects/example/example/uc/middleware.py", line 133, in process_request
user = get_user_by_token(request)
File "/Users/zja/Projects/example/example/uc/auth.py", line 173, in get_user_by_token
return UserModel.objects.select_related("extension").get(username=jwt_token.get("username"))
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/query.py", line 646, in get
num = len(clone)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/query.py", line 376, in __len__
self._fetch_all()
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/query.py", line 1867, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/query.py", line 87, in __iter__
results = compiler.execute_sql(
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/models/sql/compiler.py", line 1396, in execute_sql
cursor = self.connection.cursor()
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/utils/asyncio.py", line 26, in inner
return func(*args, **kwargs)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/backends/base/base.py", line 323, in cursor
return self._cursor()
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/backends/base/base.py", line 299, in _cursor
self.ensure_connection()
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/utils/asyncio.py", line 26, in inner
return func(*args, **kwargs)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/backends/base/base.py", line 282, in ensure_connection
self.connect()
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/utils/asyncio.py", line 26, in inner
return func(*args, **kwargs)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/django/db/backends/base/base.py", line 263, in connect
self.connection = self.get_new_connection(conn_params)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/dj_db_conn_pool/core/mixins/core.py", line 106, in get_new_connection
conn = db_pool.connect()
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 449, in connect
return _ConnectionFairy._checkout(self)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1362, in _checkout
with util.safe_reraise():
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
raise exc_value.with_traceback(exc_tb)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 1300, in _checkout
result = pool._dialect._do_ping_w_event(
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 718, in _do_ping_w_event
is_disconnect = self.is_disconnect(err, dbapi_connection, None)
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/base.py", line 2688, in is_disconnect
) and self._extract_error_code(e) in (
File "/Users/zja/Projects/example/env/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/base.py", line 2734, in _extract_error_code
raise NotImplementedError()
原因是当启动 pre_ping 机制时,获取连接时 sqlAlchemy 会检查该连接是否可用,检查时发现错误代码中会去确定是否是连接错误,如果是连接错误则进行重连:
# sqlalchemy/engine/default.py:714
def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
try:
return self.do_ping(dbapi_connection)
except self.loaded_dbapi.Error as err:
is_disconnect = self.is_disconnect(err, dbapi_connection, None)
...
在判断是否连接断开时,会调用 _extract_error_code 函数:
# sqlalchemy/dialects/mysql/base.py:2680
def is_disconnect(self, e, connection, cursor):
if isinstance(
e,
(
self.dbapi.OperationalError,
self.dbapi.ProgrammingError,
self.dbapi.InterfaceError,
),
) and self._extract_error_code(e) in (
1927,
2006,
2013,
2014,
2045,
2055,
4031,
):
return True
...
# sqlalchemy/dialects/mysql/base.py:2733
def _extract_error_code(self, exception):
raise NotImplementedError()
该函数在 mysql 的 dialect 基类中未实现,根本原因是在 django-db-connection-pool 库中使用 SqlAlchemy 时使用了 MySQLDialect 类,该类是一个基类,不应该直接使用。
经过测试将 MySQLDialect 类改为 MySQLDialect_mysqldb 类则可以正常处理断连问题。SqlAlchemy 的 Dialect 需要根据底层使用的 driver 来选择对应的实现。
我们项目底层使用的 mysqlclient 连接数据库,则应该选用 MySQLDialect_mysqldb 类。
为了修复这个问题需要修改 django-db-connection-pool 的代码,没有找到更优雅的方案。
重新造轮子#
通过一圈的调研后发现,使用 SqlAlchemy 来实现连接池相对比较简单,只需要一个很薄的封装即可。
先在 Django 项目中创建一个目录,名字随便取,这里我使用了:mysql_pool_engine
创建 init.py 和 base.py 两个文件
在 base.py 文件中输入下面的代码:
from threading import Lock
from django.db.backends.mysql import base
from django.utils.asyncio import async_unsafe
from sqlalchemy import URL, create_pool_from_url
class DatabaseWrapper(base.DatabaseWrapper):
_connect_pools = {}
_lock = Lock()
def get_url_params(self):
param_mapping = {
'username': 'USER',
'password': 'PASSWORD',
'host': 'HOST',
'port': 'PORT',
'database': 'NAME',
}
url_params = {k: self.settings_dict[v] for k, v in param_mapping.items() if v in self.settings_dict}
# modify there if using other database
url_params['drivername'] = 'mysql'
return url_params
def get_pool_params(self):
return self.settings_dict.get('POOL_OPTIONS', {})
def __init__(self, settings_dict, alias):
super().__init__(settings_dict, alias)
with self._lock:
if alias not in DatabaseWrapper._connect_pools:
url = URL.create(**self.get_url_params())
DatabaseWrapper._connect_pools[alias] = create_pool_from_url(url, **self.get_pool_params())
@async_unsafe
def get_new_connection(self, conn_params):
return DatabaseWrapper._connect_pools[self.alias].connect()
如果使用的是其它数据库,需要修改 drivername 的值。
之后修改 Django 的 settings.py 文件:
DATABASES = {
'default': {
'ENGINE': 'mysql_pool_engine',
'NAME': 'your_database_name',
'USER': 'your_database_user',
'PASSWORD': 'your_database_password',
'HOST': 'your_database_host',
'PORT': 'your_database_port',
'POOL_OPTIONS': {
'pool_size': 10,
'max_overflow': -1,
'pool_recycle': -1,
'pool_pre_ping': True
},
},
将 ENGINE 修改为创建的模块名称,这里我使用的是 mysql_pool_engine。
POOL_OPTIONS 里面配置的就是连接池的配置,具体的设置同 create_engine 的参数:https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine
之后就可以使用数据库连接池了。