aiomysql异步操作mysql

一、概述

aiomysql是一个从asyncio(PEP-3156/tulip)框架访问MySQL数据库的库。它依赖并重用PyMySQL的大部分部分。aiomysql试图成为一个很棒的aiopg库,并保留相同的api、外观和感觉。

在内部aimysql是PyMySQL的副本,底层io调用切换到async,基本上是等待并在适当的位置添加async def coroutine。从aiopg移植的sqlalchemy支持。

安装模块

pip3 install aiomysql

简单示例

import asyncio
import aiomysql

loop = asyncio.get_event_loop()


async def test_example():
    conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                       user='root', password='', db='mysql',
                                       loop=loop)

    cur = await conn.cursor()
    await cur.execute("SELECT Host,User FROM user")
    print(cur.description)
    r = await cur.fetchall()
    print(r)
    await cur.close()
    conn.close()

loop.run_until_complete(test_example())
 

二、demo演示

环境说明

操作系统:centos 7.6

mysql版本:5.7

数据库名:test

数据库默认编码:utf8mb4

具体表结构以及数据,请参考链接:

https://www.cnblogs.com/xiao987334176/p/12721498.html

单次执行

执行select和update

 
#!/usr/bin/env python3
# coding: utf-8
"""
mysql 异步版本
"""
import traceback
import logging
import aiomysql
import asyncio
import time

logobj = logging.getLogger('mysql')


class Pmysql:
    def __init__(self):
        self.coon = None
        self.pool = None

    async def initpool(self):
        try:
            logobj.debug("will connect mysql~")
            __pool = await aiomysql.create_pool(
                minsize=5,  # 连接池最小值
                maxsize=10,  # 连接池最大值
                host='192.168.31.230',
                port=3306,
                user='root',
                password='abcd1234',
                db='test',
                autocommit=True,  # 自动提交模式
            )
            return __pool
        except:
            logobj.error('connect error.', exc_info=True)

    async def getCurosr(self):
        conn = await self.pool.acquire()
        # 返回字典格式
        cur = await conn.cursor(aiomysql.DictCursor)
        return conn, cur

    async def query(self, query, param=None):
        """
        查询操作
        :param query: sql语句
        :param param: 参数
        :return:
        """
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            return await cur.fetchall()
        except:
            logobj.error(traceback.format_exc())
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)

    async def execute(self, query, param=None):
        """
        增删改 操作
        :param query: sql语句
        :param param: 参数
        :return:
        """
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            if cur.rowcount == 0:
                return False
            else:
                return True
        except:
            logobj.error(traceback.format_exc())
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)


async def getAmysqlobj():
    mysqlobj = Pmysql()
    pool = await mysqlobj.initpool()
    mysqlobj.pool = pool
    return mysqlobj


async def test_select():
    mysqlobj = await getAmysqlobj()
    # UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
    exeRtn = await mysqlobj.query("select * from users")
    # print("查询结果",exeRtn)
    return exeRtn


async def test_update():
    mysqlobj = await getAmysqlobj()
    # UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
    exeRtn = await mysqlobj.execute("update users set username='xiao1' where id='1'")
    # print("exeRtn", exeRtn, type(exeRtn))
    if exeRtn:
        # print('操作成功')
        return '操作成功'
    else:
        # print('操作失败')
        return '操作失败'


async def main():  # 调用方
    tasks = [test_select(), test_update()]  # 把所有任务添加到task中
    done, pending = await asyncio.wait(tasks)  # 子生成器
    for r in done:  # done和pending都是一个任务,所以返回结果需要逐个调用result()
        # print('协程无序返回值:'+r.result())
        print(r.result())


if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop()  # 创建一个事件循环对象loop
    try:
        loop.run_until_complete(main())  # 完成事件循环,直到最后一个任务结束
    finally:
        loop.close()  # 结束事件循环
    print('所有IO任务总耗时%.5f秒' % float(time.time() - start))

执行输出:

操作成功
[{'id': 1, 'username': 'xiao', 'password': '123', 'phone': '12345678910', 'email': '[email protected]', 'create_time': datetime.datetime(2020, 4, 10, 1, 22, 7)}]
所有IO任务总耗时0.03948秒

批量插入

批量插入使用executemany

插入3万条数据

#!/usr/bin/env python3
# coding: utf-8

import time
import asyncio
import aiomysql

start = time.time()
loop = asyncio.get_event_loop()

async def test_example():
    conn = await aiomysql.connect(host='192.168.31.230', port=3306,
                                       user='root', password='abcd1234',
                                       db='test', loop=loop)

    # create default cursor
    cursor = await conn.cursor()

    # execute sql query
    data = []
    for i in range(1,30000):
        data.append(('xiao%s'%i, '123', '12345678910', '[email protected]', '2020-04-10 01:22:07'),)

    stmt = "INSERT INTO users (username,password,phone,email,create_time) VALUES(%s,%s,%s,%s,%s);"

    await cursor.executemany(stmt, data)
    await conn.commit()
    # detach cursor from connection
    await cursor.close()

    # close connection
    conn.close()

loop.run_until_complete(test_example())
print('所有IO任务总耗时%.5f秒' % float(time.time() - start))

执行输出:

所有IO任务总耗时11.96885秒