作为一个合格的crud工程师,我真的就只记得crud,最近由于忘记了mysql账号密码(包括root账号),在找回账号的过程中顺便就梳理了一些常用的mysql用法。
忘记root密码
第一步: 关闭mysql
$ service mysql stop
第二步: 修改my.cnf文件,加上红框中两行内容
$ vim /etc/mysql/my.cnf
第三步: 重启mysql服务,并进入mysql修改密码
$ service mysql restart $ mysql mysql> use mysql; mysql> update user set authentication_string=PASSWORD("new_pass") where user='root';
第四步: 撤回对my.cnf文件的修改
Python操作MySQL
python操作MySQL的常用方法是使用pymysql模块。在单次使用时,只需要直接调用connect()方法即可。
import pymysql #链接数据库 db = pymysql.connect(host='yourhost',user='yourname',passwd='yourpasswd',db='yourdb') cursor = db.cursor() cursor.execute("select * from yourdb") result = cursor.fetchone()
但是为了达到复用的效果,可以选择对一些常用的操作进行封装。例如,新建一个操作mysql的工具类,命名文件为lib_mysql.py,类中封装exec_sql()方法用于查询。
import pymysql from urllib.parse import urlparse class Client: def __init__(self, url, **kwargs): self.url = url if not url: return url = urlparse(url) settings = { 'host': url.hostname, 'port': url.port or 3306, 'charset': 'utf8', 'db': url.path.strip().strip('/'), 'user': url.username, 'passwd': url.password, 'cursorclass': pymysql.cursors.DictCursor } self.settings = settings if kwargs: self.settings.update(kwargs) self.conn = pymysql.connect(**self.settings) self.cursor = self.conn.cursor() def exec_sql(self, sql, fetch_rows=False, commit=True, raise_exc=False): try: self.cursor.execute(sql) if commit: self.conn.commit() except Exception as exc: logger.exception('exe_sql got exception: {}'.format(str(exc))) if raise_exc: raise if commit: self.conn.rollback() if fetch_rows: rows = self.cursor.fetchall() return rows
这样之后想要查询mysql时,只需要import lib_mysql即可。例如:
>>> import lib_mysql >>> client = lib_mysql.Client(url='mysql://test:1qaz@WSX@localhost:3306/test?charset=utf8') >>> sql = '''select * from conn_pool_test''' >>> record = client.exec_sql(sql,fetch_rows=True) >>> type(record) <class 'list'> >>> record [{'id': 1, 'app_id': '001', 'app_name': 'android_test'}, {'id': 2, 'app_id': '002', 'app_name': 'ios_test'}, {'id': 3, 'app_id': '003', 'app_name': 'web_test'}, {'id': 4, 'app_id': '004', 'app_name': 'mac_test'}, {'id': 5, 'app_i
同样地,可以在工具类中封装插入方法,例如:
def upsert_to_mysql(self, items, table, fields): try: fields_str = ','.join(fields) replace_str = ','.join(['%s' for _ in range(len(fields))]) fields_update_str = ','.join(['{}=values({})'.format(field, field) for field in fields]) sql = ''' insert ignore into {table} ( {fields} ) values ({replace_str}) on duplicate key update {fields_update} '''.format(table=table, replace_str=replace_str, fields=fields_str, fields_update=fields_update_str) records = [] for item in items: row = tuple([item[field] for field in fields]) records.append(row) self.cursor.executemany(sql, records) self.conn.commit() except Exception as e: print(e) traceback.print_exc() self.conn.rollback()
调用插入方法:
>>> fields = ['id', 'app_id' ,'app_name'] >>> data = [{'id': 6, 'app_id': '006', 'app_name': 'Solor'}, {'id': 7, 'app_id': '007', 'app_name': 'Symbian'}] >>> client.upsert_to_mysql(table='conn_pool_test', fields=fields, records=data) total 2 records processed 2 records
MySQL连接池
如果每一次连接数据库只是做了简单操作,然后反复连接断连。如果短时间内连接次数过多,会给数据库带来压力。因此,可以采用连接池的方式,让连接重复使用,降低数据库资源消耗。
这里介绍采用pymysql和DBUtils实现mysql连接池的方式。
import pymysql from dbutils.pooled_db import PooledDB, SharedDBConnection from urllib.parse import urlparse class MysqlPool(object): def __init__(self, url): self.url = url if not url: return url = urlparse(url) self.POOL = PooledDB( creator=pymysql, maxconnections=10, # 连接池的最大连接数 maxcached=10, maxshared=10, blocking=True, setsession=[], host=url.hostname, port=url.port or 3306, user=url.username, password=url.password, database=url.path.strip().strip('/'), charset='utf8', ) def __new__(cls, *args, **kw): if not hasattr(cls, '_instance'): cls._instance = object.__new__(cls) return cls._instance def connect(self): conn = self.POOL.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) return conn, cursor def connect_close(self,conn, cursor): cursor.close() conn.close() def fetch_all(self,sql, args): conn, cursor = self.connect() if args is None: cursor.execute(sql) else: cursor.execute(sql, args) record_list = cursor.fetchall() return record_list def fetch_one(self,sql, args): conn, cursor = self.connect() cursor.execute(sql, args) result = cursor.fetchone() return result def insert(self,sql, args): conn, cursor = self.connect() row = cursor.execute(sql, args) conn.commit() self.connect_close(conn, cursor) return row
https://zhuanlan.zhihu.com/p/344642744