前言:

线程安全问题:当2个线程同时用到线程池时,会同时创建2个线程池。如果多个线程,错开用到线程池,就只会创建一个线程池,会共用一个线程池。我用的注解方式的单例模式,感觉就是这个注解的单例方式,解决了多线程问题,但是没解决线程安全问题,需要优化这个单例模式。

主要通过 PooledDB 模块实现。

一、数据库封装

1.1数据库基本配置

db_config.py

# -*- coding: UTF-8 -*-
import pymysql
 
# 数据库信息
DB_TEST_HOST = "127.0.0.1"
DB_TEST_PORT = 3308
DB_TEST_DBNAME = "bt"
DB_TEST_USER = "root"
DB_TEST_PASSWORD = "123456"
 
# 数据库连接编码
DB_CHARSET = "utf8"
# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 5
# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 0
# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 5
# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 300
# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True
# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0
# setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None

# creator : 使用连接数据库的模块
DB_CREATOR = pymysql

设置连接池最大最小为5个。则启动连接池时,就会建立5个连接。

1.2 编写单例模式注解

singleton.py

#单例模式函数,用来修饰类
def singleton(cls,*args,**kw):
    instances = {}
    def _singleton():
        if cls not in instances:
            instances[cls] = cls(*args,**kw)
        return instances[cls]
    return _singleton

1.3 构建连接池

db_dbutils_init.py

from dbutils.pooled_db import PooledDB
import db_config as config
# import random

from singleton import singleton
"""
@功能:创建数据库连接池
"""

class MyConnectionPool(object):
    # 私有属性
    # 能通过对象直接访问,但是可以在本类内部访问;
    __pool = None
 
    # def __init__(self):
    #     self.conn = self.__getConn()
    #     self.cursor = self.conn.cursor()
 
    # 创建数据库连接conn和游标cursor
    def __enter__(self):
        self.conn = self.__getconn()
        self.cursor = self.conn.cursor()
 
    # 创建数据库连接池
    def __getconn(self):
        if self.__pool is None:
            # i = random.randint(1, 100)
            # print("创建线程池的数量" str(i))
            self.__pool = PooledDB(
                creator=config.DB_CREATOR,
                mincached=config.DB_MIN_CACHED,
                maxcached=config.DB_MAX_CACHED,
                maxshared=config.DB_MAX_SHARED,
                maxconnections=config.DB_MAX_CONNECYIONS,
                blocking=config.DB_BLOCKING,
                maxusage=config.DB_MAX_USAGE,
                setsession=config.DB_SET_SESSION,
                host=config.DB_TEST_HOST,
                port=config.DB_TEST_PORT,
                user=config.DB_TEST_USER,
                passwd=config.DB_TEST_PASSWORD,
                db=config.DB_TEST_DBNAME,
                use_unicode=False,
                charset=config.DB_CHARSET
            )
        return self.__pool.connection()
 
    # 释放连接池资源
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()
    # 关闭连接归还给链接池
    # def close(self):
    #     self.cursor.close()
    #     self.conn.close()
 
    # 从连接池中取出一个连接
    def getconn(self):
        conn = self.__getconn()
        cursor = conn.cursor()
        return cursor, conn
# 获取连接池,实例化
@singleton
def get_my_connection():
    return MyConnectionPool()

1.4 封装Python操作MYSQL的代码

mysqlhelper.py

import time
from db_dbutils_init import get_my_connection
"""执行语句查询有结果返回结果没有返回0;增/删/改返回变更数据条数,没有返回0"""
class MySqLHelper(object):
    def __init__(self):
        self.db = get_my_connection()  # 从数据池中获取连接
    #
    # def __new__(cls, *args, **kwargs):
    #     if not hasattr(cls, 'inst'):  # 单例
    #         cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs)
    #     return cls.inst
 
    # 封装执行命令
    def execute(self, sql, param=None, autoclose=False):
        """
        【主要判断是否有参数和是否执行完就释放连接】
        :param sql: 字符串类型,sql语句
        :param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
        :param autoclose: 是否关闭连接
        :return: 返回连接conn和游标cursor
        """
        cursor, conn = self.db.getconn()  # 从连接池获取连接
        count = 0
        try:
            # count : 为改变的数据条数
            if param:
                count = cursor.execute(sql, param)
            else:
                count = cursor.execute(sql)
            conn.commit()
            if autoclose:
                self.close(cursor, conn)
        except Exception as e:
            pass
        return cursor, conn, count
 
    # 释放连接
    def close(self, cursor, conn):
        """释放连接归还给连接池"""
        cursor.close()
        conn.close()
 
    # 查询所有
    def selectall(self, sql, param=None):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            res = cursor.fetchall()
            return res
        except Exception as e:
            print(e)
            self.close(cursor, conn)
            return count
 
    # 查询单条
    def selectone(self, sql, param=None):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            res = cursor.fetchone()
            self.close(cursor, conn)
            return res
        except Exception as e:
            print("error_msg:", e.args)
            self.close(cursor, conn)
            return count
 
    # 增加
    def insertone(self, sql, param):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            # _id = cursor.lastrowid()  # 获取当前插入数据的主键id,该id应该为自动生成为好
            conn.commit()
            self.close(cursor, conn)
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
 
    # 增加多行
    def insertmany(self, sql, param):
        """
        :param sql:
        :param param: 必须是元组或列表[(),()]或((),())
        :return:
        """
        cursor, conn, count = self.db.getconn()
        try:
            cursor.executemany(sql, param)
            conn.commit()
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
 
    # 删除
    def delete(self, sql, param=None):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            self.close(cursor, conn)
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
 
    # 更新
    def update(self, sql, param=None):
        cursor = None
        conn = None
        count = None
        try:
            cursor, conn, count = self.execute(sql, param)
            conn.commit()
            self.close(cursor, conn)
            return count
        except Exception as e:
            print(e)
            conn.rollback()
            self.close(cursor, conn)
            return count
# if __name__ == '__main__':
#     db = MySqLHelper()
#     sql = "SELECT SLEEP(10)"
#     db.execute(sql)
#     time.sleep(20)
 
 
    # TODO 查询单条
    # sql1 = 'select * from userinfo where name=%s'
    # args = 'python'
    # ret = db.selectone(sql=sql1, param=args)
    # print(ret)  # (None, b'python', b'123456', b'0')
 
    # TODO 增加单条
    # sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)'
    # ret = db.insertone(sql2, ('1', '2', '1', '2', '2'))
    # print(ret)
 
    # TODO 增加多条
    # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)'
    # li = li = [
    #     ('分省', '123'),
    #     ('到达','456')
    # ]
    # ret = db.insertmany(sql3,li)
    # print(ret)
 
    # TODO 删除
    # sql4 = 'delete from  userinfo WHERE name=%s'
    # args = 'xxxx'
    # ret = db.delete(sql4, args)
    # print(ret)
 
    # TODO 更新
    # sql5 = r'update userinfo set password=%s WHERE name LIKE %s'
    # args = ('993333993', '%old%')
    # ret = db.update(sql5, args)
    # print(ret)

二、连接池测试

修改 db_dbutils_init.py 文件,在创建连接池def __getconn(self):方法下,加一个打印随机数,方便将来我们定位是否时单例的线程池。

 修改后的db_dbutils_init.py 文件:

from dbutils.pooled_db import PooledDB
import db_config as config
import random
from singleton import singleton
 
"""
@功能:创建数据库连接池
"""
class MyConnectionPool(object):
    # 私有属性
    # 能通过对象直接访问,但是可以在本类内部访问;
    __pool = None
 
    # def __init__(self):
    #     self.conn = self.__getConn()
    #     self.cursor = self.conn.cursor()
 
    # 创建数据库连接conn和游标cursor
    def __enter__(self):
        self.conn = self.__getconn()
        self.cursor = self.conn.cursor()
 
    # 创建数据库连接池
    def __getconn(self):
        if self.__pool is None:
            i = random.randint(1, 100)
            print("线程池的随机数" str(i))
            self.__pool = PooledDB(
                creator=config.DB_CREATOR,
                mincached=config.DB_MIN_CACHED,
                maxcached=config.DB_MAX_CACHED,
                maxshared=config.DB_MAX_SHARED,
                maxconnections=config.DB_MAX_CONNECYIONS,
                blocking=config.DB_BLOCKING,
                maxusage=config.DB_MAX_USAGE,
                setsession=config.DB_SET_SESSION,
                host=config.DB_TEST_HOST,
                port=config.DB_TEST_PORT,
                user=config.DB_TEST_USER,
                passwd=config.DB_TEST_PASSWORD,
                db=config.DB_TEST_DBNAME,
                use_unicode=False,
                charset=config.DB_CHARSET
            )
        return self.__pool.connection()
 
    # 释放连接池资源
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()
 
    # 关闭连接归还给链接池
    # def close(self):
    #     self.cursor.close()
    #     self.conn.close()
 
    # 从连接池中取出一个连接
    def getconn(self):
        conn = self.__getconn()
        cursor = conn.cursor()
        return cursor, conn
 # 获取连接池,实例化
@singleton
def get_my_connection():
    return MyConnectionPool()

开始测试:

场景一:同一个实例,执行2次sql

from mysqlhelper import MySqLHelper
import time
 
if __name__ == '__main__':
    sql = "SELECT SLEEP(10)"
    sql1 = "SELECT SLEEP(15)"
 
    db = MySqLHelper()
    db.execute(sql)
    db.execute(sql1)
    time.sleep(20)

在数据库中,使用 show processlist;

show processlist;

当执行第一个sql时。数据库连接显示。

当执行第二个sql时。数据库连接显示:

 当执行完sql,程序sleep时。数据库连接显示:

程序打印结果:

线程池的随机数43

由以上可以得出结论:

线程池启动后,生成了5个连接。执行第一个sql时,使用了1个连接。执行完第一个sql后,使用了另外1个连接。 这是一个线性的,线程池中一共5个连接,但是每次执行,只使用了其中一个。

有个疑问,连接池如果不支持并发是不是就毫无意义?

如上,虽然开了线程池5个连接,但是每次执行sql,只用到了一个连接。那为何不设置线程池大小为1呢?设置线程池大小的意义何在呢?(如果在非并发的场景下,是不是设置大小无意义?)

相比于不用线程池的优点:

如果不用线程池,则每次执行一个sql都要创建、断开连接。 像我们这样使用连接池,不用反复创建、断开连接,拿现成的连接直接用就好了。

场景二:依次创建2个实例,各自执行sql

from mysqlhelper import MySqLHelper
import time
 
if __name__ == '__main__':
    db = MySqLHelper()
    db1 = MySqLHelper()
    sql = "SELECT SLEEP(10)"
    sql1 = "SELECT SLEEP(15)"
    db.execute(sql)
    db1.execute(sql1)
    time.sleep(20)

第一个实例db,执行sql。线程池启动了5个连接

第二个实例db1,执行sql:

 程序睡眠时,一共5个线程池:

 打印结果:

结果证明:

虽然我们依次创建了2个实例,但是(1)创建线程池的打印结果,只打印1次,且从始至终,线程池一共只启动了5个连接,且连接的id没有发生改变,说明一直是这5个连接。

证明,我们虽然创建了2个实例,但是这2个实例其实是一个实例。(单例模式是生效的)

场景三:启动2个线程,但是线程在创建连接池实例时,有时间间隔

import threading
from mysqlhelper import MySqLHelper
import time
def sl1():
    time.sleep(2)
    db = MySqLHelper()
    sql = "SELECT SLEEP(6)"
    db.execute(sql)
 
def sl2():
    time.sleep(4)
    db = MySqLHelper()
    sql = "SELECT SLEEP(15)"
    db.execute(sql)
if __name__ == '__main__':
    threads = []
    t1 = threading.Thread(target=sl1)
    threads.append(t1)
    t2 = threading.Thread(target=sl2)
    threads.append(t2)
 
    for t in threads:
        t.setDaemon(True)
        t.start()
    time.sleep(20)

2个线程间隔了2秒。

观察数据库的连接数量:

打印结果:

在并发执行2个sql时,共用了这5个连接,且打印结果只打印了一次,说明虽然并发创建了2次实例,但真正只创建了一个连接池。

场景四:启动2个线程,线程在创建连接池实例时,没有时间间隔

import threading
from mysqlhelper import MySqLHelper
import time

if __name__ == '__main__':
    db = MySqLHelper()
    sql = "SELECT SLEEP(6)"
    sql1 = "SELECT SLEEP(15)"
    threads = []
    t1 = threading.Thread(target=db.execute, args=(sql,))
    threads.append(t1)
    t2 = threading.Thread(target=db.execute, args=(sql1,))
    threads.append(t2)
 
    for t in threads:
        t.setDaemon(True)
        t.start()
    time.sleep(20)

观察数据库连接 :

 打印结果:

结果表明:

终端打印了2次,数据库建立了10个连接,说明创建了2个线程池。这样的单例模式,存在线程安全问题。

到此这篇关于Python封装数据库连接池详解的文章就介绍到这了,更多相关Python连接池内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

Python封装数据库连接池详解的更多相关文章

  1. 详解前端HTML5几种存储方式的总结

    本篇文章主要介绍了前端HTML5几种存储方式的总结 ,主要包括本地存储localstorage,本地存储sessionstorage,离线缓存(application cache),Web SQL,IndexedDB。有兴趣的可以了解一下。

  2. PhoneGap / iOS上的SQLite数据库 – 超过5mb可能

    我误解了什么吗?Phonegap中的sqlitedbs真的有5mb的限制吗?我正在使用Phonegap1.2和iOS5.解决方法您可以使用带有phonegap插件的原生sqliteDB,您将没有任何限制.在iOS5.1中,Websql被认为是可以随时删除的临时数据…

  3. ios – 领域:如何获取数据库的当前大小

    是否有RealmAPI方法使用RealmSwift作为数据存储来获取我的RealmSwift应用程序的当前数据库大小?

  4. XCode 3.2 Ruby和Python模板

    在xcode3.2下,我的ObjectiveCPython/Ruby项目仍然可以打开更新和编译,但是你无法创建新项目.鉴于xcode3.2中缺少ruby和python的所有痕迹(即创建项目并添加新的ruby/python文件),是否有一种简单的方法可以再次安装模板?我发现了一些关于将它们复制到某个文件夹的信息,但我似乎无法让它工作,我怀疑文件夹的位置已经改变为3.2.解决方法3.2中的应用程序模板

  5. ios – Realm – 无法使用现有主键值创建对象

    我有一个对象有许多狗的人.应用程序有单独的页面,它只显示狗和其他页面显示人的狗我的模型如下我有人存储在Realm中.人有详细页面,我们取,并显示他的狗.如果狗已经存在,我会更新该狗的最新信息并将其添加到人的狗列表中,否则创建新狗,保存并将其添加到人员列表中.这适用于coredata.在尝试用他的狗更新人时,领域会抛出异常无法使用现有主键值创建对象解决方法这里的问题是,即使你正在创建一个全新的Rea

  6. ios – UIWebView中的WebSQL / SQLite数据库的最大大小(phonegap)

    我知道一般来说,Web应用程序的本地存储空间有5MB的限制.本地网页浏览应用程式是否也有这个限制?

  7. ios – Firebase离线存储高级 – 手动同步和进度信息

    >我可以提供一个捆绑数据库–安装App后我可以已经离线查询了Firebase数据?然后我有另一个关于Firebase的主要问题:>JSON存储是伟大的–但是这样我们不关心一个独特的结构,我们必须注意这一点插入总是正确的数据集?我从来没有试图显示实际的进展,但是当您从firebase中检索数据时,始终会在成功检索数据时调用onDataChange方法.https://firebase.google.com/docs/database/android/retrieve-data#read_data_onceC

  8. ios – 如何处理多用户数据库

    我的应用程序就像很多应用程序–它有一个用户输入用户名和密码的登录屏幕,以及登录按钮我的应用程序还使用CoreData来保存大多数用户的业务对象,当然也是用户特定的.我也有一个登出按钮来启用切换用户.这不会发生很多,但仍然是必要的).现在如果不同的用户登录,我需要获取他的具体数据.但是我该如何做呢?

  9. ios – Swift从Firebase数据库中获取特定价值

    我正在尝试从Firebase数据库中获取特定值.我看了一些像谷歌这样的文件,但我做不到.这是数据库的JSON文件:SWIFT代码:我想获得用户的电子邮件价值,而不是每个人.我怎样才能做到这一点?解决方法在您的代码中,快照将包含子值的字典.要访问它们,请将snapshot.value转换为Dictionary,然后访问各个子项是一个快照

  10. ios – Realm Swift:在卸载应用程序后是否可以保留数据库?

    使用realmswift,即使从设备上卸载应用程序,是否可以在设备内存中保留和维护应用程序的领域数据库文件?非常感谢您的帮助.解决方法删除应用程序时,应用程序的所有文件都是剩余的.iOS应用程序是沙盒.这意味着每个应用程序在磁盘中都有自己的空间,并有自己的目录,这些目录充当应用程序及其数据的主页.从iPhone删除应用程序会删除此沙箱,删除与该应用程序关联的所有数据.

随机推荐

  1. 10 个Python中Pip的使用技巧分享

    众所周知,pip 可以安装、更新、卸载 Python 的第三方库,非常方便。本文小编为大家总结了Python中Pip的使用技巧,需要的可以参考一下

  2. python数学建模之三大模型与十大常用算法详情

    这篇文章主要介绍了python数学建模之三大模型与十大常用算法详情,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感想取得小伙伴可以参考一下

  3. Python爬取奶茶店数据分析哪家最好喝以及性价比

    这篇文章主要介绍了用Python告诉你奶茶哪家最好喝性价比最高,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

  4. 使用pyinstaller打包.exe文件的详细教程

    PyInstaller是一个跨平台的Python应用打包工具,能够把 Python 脚本及其所在的 Python 解释器打包成可执行文件,下面这篇文章主要给大家介绍了关于使用pyinstaller打包.exe文件的相关资料,需要的朋友可以参考下

  5. 基于Python实现射击小游戏的制作

    这篇文章主要介绍了如何利用Python制作一个自己专属的第一人称射击小游戏,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起动手试一试

  6. Python list append方法之给列表追加元素

    这篇文章主要介绍了Python list append方法如何给列表追加元素,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  7. Pytest+Request+Allure+Jenkins实现接口自动化

    这篇文章介绍了Pytest+Request+Allure+Jenkins实现接口自动化的方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  8. 利用python实现简单的情感分析实例教程

    商品评论挖掘、电影推荐、股市预测……情感分析大有用武之地,下面这篇文章主要给大家介绍了关于利用python实现简单的情感分析的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下

  9. 利用Python上传日志并监控告警的方法详解

    这篇文章将详细为大家介绍如何通过阿里云日志服务搭建一套通过Python上传日志、配置日志告警的监控服务,感兴趣的小伙伴可以了解一下

  10. Pycharm中运行程序在Python console中执行,不是直接Run问题

    这篇文章主要介绍了Pycharm中运行程序在Python console中执行,不是直接Run问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

返回
顶部