第十二天 异步编程(2)和单元测试
今天计划学习Python的多线程编程异步编程,学习项目及练习源码地址:
GitHub源码
协程
参见昨天的学习记录
无阻塞
异步程序依然会假死freezing
freezing案例:
import asyncio
import time
import threading
#定义一个异步操作
async def hello1(a,b):
print(f"异步函数开始执行")
await asyncio.sleep(3)
print("异步函数执行结束")
return a+b
#在一个异步操作里面调用另一个异步操作
async def main():
c=await hello1(10,20)
print(c)
print("主函数执行")
loop = asyncio.get_event_loop()
tasks = [main()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
'''运行结果为:
异步函数开始执行(在此处要等待3秒)
异步函数执行结束
30
主函数执行
'''
例子中,hello1是一个耗时3s的异步任务,main也是一个异步方法,但是main需要调用hello1的返回值,所以必须登台hello1执行完成才能继续执行main,这说明异步也是会有阻塞的。
而之前定义的异步函数不用等待是因为事件循环将所有的异步操作‘gather’起来,在多个操作间不同的游走切换,来回调用所有没有等待。
也可以理解为,事件循环只有一个异步操作在处理,没有可以切换执行的目标,所以只能等待当前的操作完成。
多线程+asyncio解决调用时freezing
为了让一个协程函数在不同的线程中执行,我们可以使用以下两个函数:
- loop.call_soon_threadsafe(callback, *args),这是一个很底层的API接口,一般很少使用
- asyncio.run_coroutine_threadsafe(coroutine,loop) 第一个参数为需要异步执行的协程函数,第二个loop参数为在新线程中创建的事件循环loop,注意一定要是在新线程中创建哦,该函数的返回值是一个concurrent.futures.Future类的对象,用来获取协程的返回结果。 future = asyncio.run_coroutine_threadsafe(coro_func(), loop) 在新线程中运行协程result = future.result()等待获取Future的结果
示例代码:
import asyncio
import asyncio,time,threading
#需要执行的耗时异步任务
async def func(num):
print(f'准备调用func,大约耗时{num}')
await asyncio.sleep(num)
print(f'耗时{num}之后,func函数运行结束')
#定义一个专门创建事件循环loop的函数,在另一个线程中启动它
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
#定义一个main函数
def main():
coroutine1 = func(3)
coroutine2 = func(2)
coroutine3 = func(1)
new_loop = asyncio.new_event_loop() #在当前线程下创建时间循环,(未启用),在start_loop里面启动它
t = threading.Thread(target=start_loop,args=(new_loop,)) #通过当前线程开启新的线程去启动事件循环
t.start()
asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #这几个是关键,代表在新线程中事件循环不断“游走”执行
asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
asyncio.run_coroutine_threadsafe(coroutine3,new_loop)
for i in "iloveu":
print(str(i)+" ")
if __name__ == "__main__":
main()
'''运行结果为:
i 准备调用func,大约耗时3
l 准备调用func,大约耗时2
o 准备调用func,大约耗时1
v
e
u
耗时1之后,func函数运行结束
耗时2之后,func函数运行结束
耗时3之后,func函数运行结束
'''
第一步:定义需要异步执行的一系列操作,及一系列协程函数;
第二步:在主线程中定义一个新的线程,然后在新线程中产生一个新的事件循环;
第三步:在主线程中,通过asyncio.run_coroutine_threadsafe(coroutine,loop)这个方法,将一系列异步方法注册到新线程的loop里面去,这样就是新线程负责事件循环的执行。
使用asyncio实现一个timer 定时器
所谓的timer指的是,指定一个时间间隔,让某一个操作隔一个时间间隔执行一次,如此周而复始。很多编程语言都提供了专门的timer实现机制、包括C++、C#等。但是 Python 并没有原生支持 timer,不过可以用 asyncio.sleep 模拟。大致的思想如下,将timer定义为一个异步协程,然后通过事件循环去调用这个异步协程,让事件循环不断在这个协程中反反复调用,只不过隔几秒调用一次即可。简单的实现如下(本例基于python3.7:
import asyncio
async def delay(time):
await asyncio.sleep(time)
async def timer(time,function):
while True:
future=asyncio.ensure_future(delay(time))
await future
future.add_done_callback(function)
def func(future):
print('done')
if __name__=='__main__':
asyncio.run(timer(2,func))
aiohttp模块
asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
-
安装
pip3 install aiohttp
-
示例代码
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, {}!</h1>'.format(request.match_info['name'])
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
aiomysql
Python3.7+ 下的一个异步操作mysql数据的模块,官方地址
示例:
#coding:utf-8
import aiomysql
import asyncio
import logging
import traceback
'''
mysql 异步版本
'''
logobj = logging.getLogger('mysql')
class Pmysql:
__connection = None
def __init__(self):
self.cursor = None
self.connection = None
@staticmethod
async def getconnection():
if Pmysql.__connection == None:
conn = await aiomysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
)
if conn:
Pmysql.__connection = conn
return conn
else:
raise("connect to mysql error ")
else:
return Pmysql.__connection
async def query(self,query,args=None):
self.cursor = await self.connection.cursor()
await self.cursor.execute(query,args)
r = await self.cursor.fetchall()
await self.cursor.close()
return r
async def test():
conn = await Pmysql.getconnection()
mysqlobj.connection = conn
await conn.ping()
r = await mysqlobj.query("select * from person")
for i in r:
print(i)
conn.close()
if __name__ == '__main__':
mysqlobj = Pmysql()
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
aioredis
redis异步操作库,官方地址
示例:
import aioredis
import asyncio
class Redis:
_redis = None
async def get_redis_pool(self, *args, **kwargs):
if not self._redis:
self._redis = await aioredis.create_redis_pool(*args, **kwargs)
return self._redis
async def close(self):
if self._redis:
self._redis.close()
await self._redis.wait_closed()
async def get_value(key):
redis = Redis()
r = await redis.get_redis_pool(('127.0.0.1', 6379), db=7, encoding='utf-8')
value = await r.get(key)
print(f'{key!r}: {value!r}')
await redis.close()
if __name__ == '__main__':
asyncio.run(get_value('key')) # need python3.7
测试
单元测试
单元测试是用来对一个模块、一个函数或者一个类来进行正确性检验的测试工作。
Python自带的unittest模块可以很方便的让我们编写单元测试。
编写单元测试时,我们需要编写一个测试类,从unittest.TestCase继承。
以test开头的方法就是测试方法,不以test开头的方法不被认为是测试方法,测试的时候不会被执行。
对每一类测试都需要编写一个test_xxx()方法。由于unittest.TestCase提供了很多内置的条件判断,我们只需要调用这些方法就可以断言输出是否是我们所期望的。
代码示例:
'''
定义一个要测试的类
mydict.py
'''
class MyDict(dict):
def __init__(self, **kw):
super().__init__(**kw)
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(r"'Dict' object has no attribute '%s'" % key)
def __setattr__(self, key, value):
self[key] = value
编写单元测试:
import unittest
from mydict import MyDict
class TestDict(unittest.TestCase):
def test_init(self):
d = MyDict(a=1, b='test')
self.assertEqual(d.a, 1)
self.assertEqual(d.b, 'test')
self.assertTrue(isinstance(d, dict))
def test_key(self):
d = MyDict()
d['key'] = 'value'
self.assertEqual(d.key, 'value')
def test_attr(self):
d = MyDict()
d.key = 'value'
self.assertTrue('key' in d)
self.assertEqual(d['key'], 'value')
def test_keyerror(self):
d = MyDict()
with self.assertRaises(KeyError):
value = d['empty']
def test_attrerror(self):
d = MyDict()
with self.assertRaises(AttributeError):
value = d.empty
运行单元测试
一旦编写好单元测试,我们就可以运行单元测试。最简单的运行方式是在mydict_test.py的最后加上两行代码:
if __name__ == '__main__':
unittest.main()
另一种方法是在命令行通过参数-m unittest直接运行单元测试:
python -m unittest mydict_test
这是推荐的做法,因为这样可以一次批量运行很多单元测试,并且,有很多工具可以自动来运行这些单元测试。
setUp() tearDown()在每次执行之前准备环境,或者在每次执行完之后需要进行一些清理。比如执行前需要连接数据库,执行完成之后需要还原数据、断开连接。
如果想要在所有case执行之前准备一次环境,并在所有case执行结束之后再清理环境,我们可以用 setUpClass() 与 tearDownClass()
跳过某个case需要用到skip装饰器一共有三个:unittest.skip(reason)、unittest.skipIf(condition, reason)、unittest.skipUnless(condition, reason),skip无条件跳过,skipIf当condition为True时跳过,skipUnless当condition为False时跳过。
在VS Code中对Python进行单元测试
Python扩展支持使用Python的内置unittest框架以及pytest和Nose进行单元测试。要使用pytest和Nose,必须将它们安装到当前的Python环境中(即,在pythonPath设置中标识的环境,请参阅环境)。
使用Python:Discover Unit Tests根据当前所选测试框架的发现模式扫描项目以进行测试(请参阅测试发现。一旦发现,VS Code提供了多种运行测试的方法(请参阅运行测试)。
单元测试输出显示在Python Test Log面板中,包括未安装测试框架时导致的错误。
在settings.json中进行设置:
{
"python.pythonPath": "/usr/local/bin/python3",
"python.testing.unittestEnabled": true,
"python.testing.unittestArgs": [
"-v",
"-s",
"./src/tests",
"-p",
"test_*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.nosetestsEnabled": false,
}
Unittest配置设置
设置 | 默认 | 描述 |
---|---|---|
unittestEnabled | false | 指定是否为单元测试启用UnitTest。 |
unittestArgs | [“-v”, “-s”, “.”, “-p”, “test.py”] | 传递给unittest的参数,其中由空格分隔的每个元素是列表中的单独项。有关默认值的说明,请参见下文。 |
CWD | 空值 | 指定单元测试的可选工作目录。 |
outputWindow | “Python Test Log” | 用于单元测试输出的窗口。 |
promptToConfigure | true | 指定VS代码是否在发现潜在测试时提示配置测试框架。 |
DEBUGPORT | 3000 | 用于调试UnitTest测试的端口号。 |
autoTestDiscoverOnSaveEnabled | true | 指定在保存单元测试文件时是启用还是禁用自动运行测试发现。 |
UnitTest的默认参数如下:
-v设置默认详细程度。删除此参数以获得更简单的输出。
-s .指定用于发现测试的起始目录。如果您在“test”文件夹中进行了测试,则可以将其更改为-s test(”-s”, “test”在arguments数组中)。
-p test.py是用于查找测试的发现模式。在这种情况下,它.py是包含单词“test” 的任何文件。如果以不同的方式命名测试文件,例如在每个文件名后附加“_test”,则使用类似于*_test.py数组的相应参数的模式。
要在第一次失败时停止测试运​​行,请将fail fast选项添加”-f”到arguments数组中。
文档测试
如果你经常阅读Python的官方文档,可以看到很多文档都有示例代码。比如re模块就带了很多示例代码:
>>> import re
>>> m = re.search('(?<=abc)def', 'abcdef')
>>> m.group(0)
'def'
可以把这些示例代码在Python的交互式环境下输入并执行,结果与文档中的示例代码显示的一致。
这些代码与其他说明可以写在注释中,然后,由一些工具来自动生成文档。既然这些代码本身就可以粘贴出来直接运行,那么,可不可以自动执行写在注释中的这些代码呢?
答案是肯定的。
当我们编写注释时,如果写上这样的注释:
def abs(n):
'''
Function to get absolute value of number.
Example:
>>> abs(1)
1
>>> abs(-1)
1
>>> abs(0)
0
'''
return n if n >= 0 else (-n)
无疑更明确地告诉函数的调用者该函数的期望输入和输出。
并且,Python内置的“文档测试”(doctest)模块可以直接提取注释中的代码并执行测试。
doctest严格按照Python交互式命令行的输入和输出来判断测试结果是否正确。只有测试异常的时候,可以用…表示中间一大段烦人的输出。
小结
今天主要学习了Pyton的异步编程,并简单了解了下相关的常用模块。并针对单元测试进行了详细的了解,单元测试很重要。明天打算开始学习下Pyton的函数式编程。