第十一天 并发编程和异步编程(1)
今天计划学习Python的多线程编程异步编程,学习项目及练习源码地址:
GitHub源码
线程
线程也是实现多任务的一种方式,一个进程中,也经常需要同时做多件事,就需要同时运行多个‘子任务’,这些子任务就是线程。一个进程可以拥有多个并行的线程,其中每一个线程,共享当前进程的资源。
再巩固下进程和线程的区别:
区别 | 进程 | 线程 |
---|---|---|
根本区别 | 作为资源分配的单位 | 调度和执行的单位 |
开销 | 每一个进程都有独立的代码和数据空间,进程间的切换会有较大的开销 | 线程可以看出是轻量级的进程,多个线程共享内存,线程切换的开销小 |
所处环境 | 在操作系统中,同时运行的多个任务 | 在程序中多个顺序流同时执行 |
分配内存 | 系统在运行的时候为每一个进程分配不同的内存区域 | 线程所使用的资源是他所属进程的资源 |
包含关系 | 一个进程内可以拥有多个线程 | 线程是进程的一部分,所有线程有时候称为是轻量级的进程 |
进程和线程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护,而进程正相反。
在Python3程序中,可以通过_thread(兼容python2,不建议使用)和threading(推荐使用)这两个模块来处理线程。
_thread模块
可以通过两种方式来使用线程:使用函数或者使用类来包装线程对象。当使用_thread模块来处理线程时,可以调用里面的函数start_new_thread()来生成一个新的线程,语法格式如下:
thread.start_new_thread ( function, args[, kwargs] )
其中function是线程函数;args表示传递给线程函数的参数,他必须是个tuple(元祖)类型;kwargs是可选参数。
【示例】使用_thread模块创建线程:
import _thread
import time
def fun1():
print('开始运行fun1')
time.sleep(4)
print('运行fun1结束')
def fun2():
print('开始运行fun2')
time.sleep(2)
print('运行fun2结束')
if __name__=='__main__':
print('开始运行')
#启动一个线程运行函数fun1
_thread.start_new_thread(fun1,())
#启动一个线程运行函数fun2
_thread.start_new_thread(fun2,())
time.sleep(6)
'''
开始运行
开始运行fun1
开始运行fun2
运行fun2结束
运行fun1结束
'''
threading模块
Python3 通过两个标准库_thread和threading提供对线程的支持。_thread提供了低级别的、原始的线程以及一个简单的锁,它相比于threading模块的功能还是比较有限的。threading模块除了包含 _thread模块中的所有方法外,还提供的其他方法:
- threading.currentThread(): 返回当前的线程变量。
- threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
在Python3程序中,对多线程支持最好的是threading模块,使用这个模块,可以灵活地创建多线程程序,并且可以在多线程之间进行同步和通信。在Python3程序中,可以通过如下两种方式来创建线程:
- 通过threading.Thread直接在线程中运行函数
- 通过继承类threading.Thread来创建线程
在Python中使用threading.Thread的基本语法格式如下所示:
Thread(group=None, target=None, name=None, args=(), kwargs={})
其中target: 要执行的方法;name: 线程名;args/kwargs: 要传入方法的参数。
Thread类的方法如表所示:
方法名 | 描述 |
---|
run() | 用以表示线程活动的方法
start() | 启动线程活动
join([time]) | 等待至线程中止。这阻塞调用线程直至线程的join()方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生
isAlive() | 返回线程是否活动的
getName() | 返回线程名
setName() | 设置线程名
【示例】threading.Thread直接创建线程:
import threading
import time
def fun1(thread_name,delay):
print('线程{0}开始运行fun1'.format(thread_name))
time.sleep(delay)
print('线程{0}运行fun1结束'.format(thread_name))
def fun2(thread_name,delay):
print('线程{0}开始运行fun2'.format(thread_name))
time.sleep(delay)
print('线程{0}运行fun2结束'.format(thread_name))
if __name__=='__main__':
print('开始运行')
#创建线程
t1=threading.Thread(target=fun1,args=('thread-1',2))
t2=threading.Thread(target=fun2,args=('thread-2',4))
t1.start()
t2.start()
在Python中,通过继承类threading.Thread的方式来创建一个线程。这种方法只要重写类threading.Thread中的方法run(),然后再调用方法start()就能创建线程,并运行方法run()中的代码。
在调用Thread类的构造方法时,需要将线程函数、参数等值传入构造方法,其中name表示线程的名字,如果不指定这个参数,默认的线程名字格式为Thread-1、Thread-2。每一个传入构造方法的参数值,在Thread类中都有对应的成员变量保存这些值,这些成员变量都以下划线(_)开头,如果_target、_args等。在run方法中需要使用这些变量调用传入的线程函数,并为线程函数传递参数。
【示例】继承threading.Thread类创建线程:
import threading
import time
def fun1(delay):
print('线程{0}开始运行fun1'.format(threading.current_thread().getName()))
time.sleep(delay)
print('线程{0}运行fun1结束'.format(threading.current_thread().getName()))
def fun2(delay):
print('线程{0}开始运行fun2'.format(threading.current_thread().getName()))
time.sleep(2)
print('线程{0}运行fun2结束'.format(threading.current_thread().getName()))
#创建线程类继承threading.Thread
class MyThread(threading.Thread):
#重写父类的构造方法,其中func是线程函数,args是传入线程的参数,name是线程名
def __init__(self,func,name,args):
super().__init__(target=func,name=name,args=args)
#重写父类的run()方法
def run(self):
self._target(*self._args)
if __name__=='__main__':
print('开始运行')
#创建线程
t1=MyThread(fun1,'thread-1',(2,))
t2=MyThread(fun2,'thread-2',(4,))
t1.start()
t2.start()
线程共享全局变量
在一个进程内所有线程共享全局变量,多线程之间的数据共享比多进程要好。但是可能造成多个进程同时修改一个变量(即线程非安全),可能造成混乱。
【示例】线程共享全局变量:
import time
from threading import *
#定义全局变量num
num=10
def test1():
global num
for i in range(3):
num+=1
print('test1输出num:',num)
def test2():
global num
print('test2输出num:',num)
if __name__=='__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t1.join() # 等待线程1结束才开始线程2
t2.start()
t2.join()
'''
test1输出num: 13
test2输出num: 13
'''
【示例】线程共享全局变量存在问题:
import time
from threading import *
#定义全局变量num
num=0
def test1():
global num
for i in range(100000):
num+=1
print('test1输出num:',num)
def test2():
global num
for i in range(100000):
num+=1
print('test2输出num:',num)
if __name__=='__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()
'''
test1输出num: 176838
test2输出num: 181299
每次结果都可能不一样,所以同一个进程中对共享变量的读取修改是不安全的。
'''
互斥锁
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。最简单的同步机制就是引入互斥锁。
锁有两种状态——锁定和未锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”状态,其他的线程才能再次锁定该资源。
互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
使用Thread对象的Lock可以实现简单的线程同步,有上锁acquire方法和释放release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire和release方法之间。
【示例】互斥锁:
import time
from threading import Thread,Lock
#定义全局变量num
num=0
#创建一把互斥锁
mutex=Lock()
def test1():
global num
'''
在两个线程中都调用上锁的方法,则这两个线程就会抢着上锁,
如果有1方成功上锁,那么导致另外一方会堵塞(一直等待)直到这个锁被解开
'''
mutex.acquire()#上锁
for i in range(100000):
num+=1
mutex.release()
print('test1输出num:',num)
def test2():
global num
mutex.acquire() # 上锁
for i in range(100000):
num+=1
mutex.release()
print('test2输出num:',num)
if __name__=='__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()
死锁
在线程共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。当一个线程永远地持有一个锁,并且其他线程都尝试去获得这个锁时,那么它们将永远被阻塞,这个我们都知道。如果线程A持有锁L并且想获得锁M,线程B持有锁M并且想获得锁L,那么这两个线程将永远等待下去,这种情况就是最简单的死锁形式。
【示例】死锁
import time
from threading import Thread,Lock
import threading
mutexA=threading.Lock()
mutexB=threading.Lock()
class MyThread1(Thread):
def run(self):
if mutexA.acquire():
print(self.name,'执行')
time.sleep(1)
if mutexB.acquire():
print(self.name,'执行')
mutexB.release()
mutexA.release()
class MyThread2(Thread):
def run(self):
if mutexB.acquire():
print(self.name,'执行')
time.sleep(1)
if mutexA.acquire():
print(self.name,'执行')
mutexA.release()
mutexB.release()
if __name__ == '__main__':
t1=MyThread1()
t2=MyThread2()
t1.start()
t2.start()
避免死锁的方式
既然可能产生死锁,那么接下来,讲一下如何避免死锁。
- 让程序每次至多只能获得一个锁。当然,在多线程环境下,这种情况通常并不现实
- 设计时考虑清楚锁的顺序,尽量减少嵌在的加锁交互数量
- 既然死锁的产生是两个线程无限等待对方持有的锁,那么只要等待时间有个上限不就好了。当然synchronized不具备这个功能,但是我们可以使用Lock类中的tryLock方法去尝试获取锁,这个方法可以指定一个超时时限,在等待超过该时限之后变回返回一个失败信息
线程同步的应用
同步就是协同步调,按预定的先后次序进行运行。例如:开会。“同”字指协同、协助、互相配合。
如进程、线程同步,可以理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行,B运行后将结果给A,A继续运行。
线程生产者消费者模式
生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这个问题于是引入生产者和消费者模式生产者消费者模式通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者之间不直接通信。生产者生产商品,然后将其放到类似队列的数据结构中,消费者不找生产者要数据,而是直接从队列中取。这里使用queue模块来提供线程间通信的机制,也就是说,生产者和消费者共享一个队列。生产者生产商品后,会将商品添加到队列中。消费者消费商品,会从队列中取出商品。
ThreadLocal
我们知道多线程环境下,每一个线程均可以使用所属进程的全局变量。如果一个线程对全局变量进行了修改,将会影响到其他所有的线程对全局变量的计算操作,从而出现数据混乱,即为脏数据。为了避免多个线程同时对变量进行修改,引入了线程同步机制,通过互斥锁来控制对全局变量的访问。所以有时候线程使用局部变量比全局变量好,因为局部变量只有线程自身可以访问,同一个进程下的其他线程不可访问。
但是局部变量也是有问题,就是在函数调用的时候,传递起来很麻烦。
【示例】局部变量作为参数传递:
def process_student(name):
std=Student(name)
do_task1(std)
do_task2(std)
def do_task1(std):
do_sub_task1(std)
do_sub_task2(std)
def do_task2(std):
do_sub_task1(std)
do_sub_task2(std)
从上面的实例可以看到每个函数一层一层调用都需要传递std参数,非常麻烦,如果使用全局变量也不行,因为每个线程处理不同的Student对象,不能共享。因此Python还提供了ThreadLocal变量,它本身是一个全局变量,但是每个线程却可以利用它来保存属于自己的私有数据,这些私有数据对其他线程也是不可见的。
【示例】ThreadLocal的使用:
import threading
# 创建全局ThreadLocal对象:
local = threading.local()
def process_student():
# 获取当前线程关联的name:
student_name = local.name
print('线程名:%s 学生姓名:%s' % (threading.current_thread().name,student_name))
def process_thread(name):
# 绑定ThreadLocal的name:
local.name = name
process_student()
t1 = threading.Thread(target=process_thread, args=('张三',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('李四',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
协程和异步编程
什么是协程
协程,英文名是Coroutine,又称为微线程,是一种用户态的轻量级线程。协程不像线程和进程那样,需要进行系统内核上的上下文切换,协程的上下文切换是由程序员决定的。协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程。众所周知,子程序(函数)都是层级调用的,如果在A中调用了B,那么B执行完毕返回后A才能执行完毕。协程与子程序有点类似,但是它在执行过程中可以中断,转而执行其他的协程,在适当的时候再回来继续执行。
协程相对于多线程的优点
多线程编程是比较困难的, 因为调度程序任何时候都能中断线程, 必须记住保留锁, 去保护程序中重要部分, 防止多线程在执行的过程中断。
而协程默认会做好全方位保护,以防止中断。我们必须显示产出才能让程序的余下部分运行。对协程来说,无需保留锁,而在多个线程之间同步操作,协程自身就会同步,因为在任意时刻,只有一个协程运行。总结下大概下面几点:
- 无需系统内核的上下文切换,减小开销;
- 无需原子操作锁定及同步的开销,不用担心资源共享的问题;
- 单线程即可实现高并发,单核CPU即便支持上万的协程都不是问题,所以很适合用于高并发处理,尤其是在应用在网络爬虫中。
协程的缺点
- 无法使用CPU的多核协程的本质是个单线程,它不能同时用上单个CPU的多个核,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,就比如网络爬虫来说,限制爬虫的速度还有其他的因素,比如网站并发量、网速等问题都会是爬虫速度限制的因素。除非做一些密集型应用,这个时候才可能会用到多进程和协程。
- 处处都要使用非阻塞代码写协程就意味着你要一值写一些非阻塞的代码,使用各种异步版本的库,比如后面的异步爬虫教程中用的aiohttp就是一个异步版本的request库等。不过这些缺点并不能影响到使用协程的优势。
写习惯NodeJS的人,还是习惯的来。
协程的实现(一)
使用yield关键字
存在yield关键字的函数是一个生成器,当调用这个函数时,函数不会立即执行,而是返回一个生成器对象。使用生成器对象时,代码块才会执行。yield 有两个关键作用:返回一个值;接收调用方传入的值,且不影响返回值。
- yield是实现生成器的重要关键字。
- yiled的三个方法及其作用, next(), send(),throw().
- yield的一般形式为:temp=yield 表达式(每次迭代要返回的值)(推荐使用:既可以返回迭代的值,也可以接受send进去的参数并使用)
例子:
def consume():
r = ''
while True:
n = yield r # 断点
if not n:
return
print('消费者 正在消费: {}'.format(n))
r = '200 RMB'
def produce(c):
c.send(None) # 启动生成器
n = 0
while n < 5:
n += 1
print('生产者 正在生产: {}'.format{n})
r = c.send(n)
print('[生产者] 消费者返回: {}'.format{r})
print('------------')
c.close()
c = consume()
produce(c)
特点:
延迟加载特性,生成器需要启动,如果用send启动,第一次需要传入None,否则报错
使用send()方法传进去的值,实际上就是yield表达式返回的值,没有传值则默认返回None
如果使用了send(value),传递进去的那个value会取代那个表达式的值,并且会将传递进去的那个值返回给yield表达式的结果temp,而send(value)的返回值就是原来的值
每次惰性返回一个值
- send 方法:send(value)是有返回值的,即迭代的那个值。send 的主要作用, 当需要手动更改生成器里面的某一个值并且使用它,则send发送进去一个数据,然后保存到yield语句的返回值,以提供使用;send(value)的返回值就是那个本来应该被迭代出来的那个值。这样既可以保证我能够传入新的值,原来的值也不会弄丢。
- throw 方法:在生成器中抛出异常,并且这个throw函数会返回下一个要迭代的值或者是StopIteration。
生成器的启动与close
生成器启动时,使用next可以直接启动并调用,但是使用 send() 第一次要传入 None,不然会报错。
- close() 表示关闭生成器,如果后续再调用此生成器,那么会抛出异常。
- 生成器的终止- StopIteration
在一个生成器中,如果没有return,则默认执行到函数完毕返回 StopIteration;
如果遇到 return,则直接抛出 StopIteration,如果 return 后面有值,会一并返回。
可以使用 except StopIteration as e 捕获异常,通过e.value 来获取值。
协程的状态查看
协程分别有四种状态,可以导入 inspect.getgeneratorstate() 模块来查看
GEN_CREATED: 被创建,等待执行
GEN_RUNNING: 解释器执行
GEN_SUSPENDED: 在 yield 表达式处暂停
GEN-CLOSED: 执行结束
从某些角度来说,协程其实就是一个可以暂停执行的函数,并且可以继续执行。那么 yield 已经可以暂停执行了,如果在暂停后有办法把一些 value 发送到暂停执行的函数中,那么这就是 Python 中的协程。
不足之处:协程函数的返回值不是特别方便获取,比如 return 的返回值需要捕获异常as e,使用e.value来获得
Python 的生成器是协程 coroutine 的一种形式,它的局限性在于只能向它的直接调用者每次 yield一个值,这意味着那么包含 yield 的代码不能像其他代码被分离出来放到一个单独的函数中,而这正是 yield from 要解决的。
yiled from关键字
从字面看是yield的升级改进版本,如果将 yield 理解成返回,那么 yield from 就是从哪里返回。
def generator2():
yield 'a'
yield 'b'
yield 'c'
yield from [11,22,33,44]
yield from (12,23,34)
yield from range(3)
for i in generator2():
print(i,end=' , ')
# a , b , c , 11 , 22 , 33 , 44 , 12 , 23 , 34 , 0 , 1 , 2 ,
yield from 返回另一个生成器。而yield 只返回一个元素。有下面的等价关系:
yield from iterable == for item in iterable: yield item
使用for循环迭代生成器时,不会显式的触发StopIteration异常,因为for循环的底层处理了这个异常,因此也就不会得到return的返回值。
def my_generator():
for i in range(5):
if i==2:
return '我被迫中断了'
else:
yield i
def main(generator):
try:
for i in generator: #不会显式触发异常,故而无法获取到return的值
print(i)
except StopIteration as exc:
print(exc.value)
g=my_generator() #调用
main(g)
# 0 1
如果使用 next 来迭代会显示的触发异常,虽然能够获取return的返回值,但是操作麻烦。
而使用yield from来实现这个需求:
def my_generator(): # 子生成器
for i in range(5):
if i==2:
return '我被迫中断了'
else:
yield i
def wrap_my_generator(generator): # 委托生成器
result = yield from generator
#自动触发StopIteration异常,并且将return的返回值赋值给yield from表达式的结果,即result
print('这是return的返回值,',result)
def main(generator): # 调用方
for j in generator:
print(j)
g=my_generator()
wrap_g=wrap_my_generator(g)
main(wrap_g) #调用
'''运行结果为:
0
1
这是return的返回值, 我被迫中断了
'''
上面的my_generator是原始的生成器即子生成器,main是调用方,wrap_my_generator是委托生成器
在使用yield from的时候,多了一个 委托生成器,调用方通过委托生成器来与子生成器进行交互。
委托生成器会为调用方和子生成器建立双向通道,即调用方可以和子生成器直接进行交互,委托生成器不参与代码的处理,只负责充当管道的作用以及接收 return 的返回值。
yield from iteration结构会在内部自动捕获 iteration生成器的StopIteration 异常,把return返回的值或者是StopIteration的value 属性的值变成 yield from 表达式的值。
一些异步编程的概念
同步(Sync)和异步(Async)
同步 :就是发出一个功能调用时,在没有得到结果之前,该调用就不返回或继续执行后续操作。
异步 :当一个异步过程调用发出后,调用者在没有得到结果之前,就可以继续执行后续操作。当这个调用完成后,一般通过状态、通知和回调来通知调用者。对于异步调用,调用的返回并不受调用者控制。
对于通知调用者的三种方式,具体如下:
- 状态,即监听被调用者的状态(轮询),调用者需要每隔一定时间检查一次,效率会很低。
- 通知,当被调用者执行完成后,发出通知告知调用者,无需消耗太多性能
- 回调,与通知类似,当被调用者执行完成后,会调用调用者提供的回调函数。
同步和异步的区别 : 请求发出后,是否需要等待结果,才能继续执行其他操作。
阻塞和非阻塞
阻塞和非阻塞这两个概念仅仅与等待消息通知时的状态有关。跟同步、异步没什么太大关系,也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的。
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
并发并行
并发: 是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。并发(Concurrent)。
并行:当一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。
并发和并行的区别:并发的关键是你有处理多个任务的能力,不一定要同时;但是并行的关键是你有同时处理多个任务的能力
关键概念的区分
(1)阻塞/非阻塞:关注的是程序在等待调用结果(消息,返回值)时的状态
(2)同步/异步:关注的是消息通知的机制。即等到完全做完才通知,还是边做不后通知。
(3)同步阻塞、同步非阻塞,异步阻塞、异步非阻塞。
举个简单的例子来描述这四种情况,老张要做两件事,用水壶烧开水,看电视,两件事情即两个任务,两个函数。 同步阻塞:老张把水壶放到火上,就坐在那里等水开,开了之后再去看电视。 同步非阻塞:老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞) 老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶。水开之后,能大声发出嘀~~~~的噪音。 异步阻塞:老张把响水壶放到火上,然后就坐在旁边等着听那个烧开的提示音。(异步阻塞) 异步非阻塞:老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞) 乍一看,这“同步阻塞、意不阻塞”似乎没有什么区别,但实际上是有区别的,所谓同步异步,指的是消息通知的机制。区别在哪里呢? 在这个例子中同步异步只是对于水壶而言。在使用普通水壶的时候,我要自己主动去观察水是不是烧开了,自己主动去获取烧开的这个结果,即所谓的同步;但是在响水壶的时候,我不需要再管水烧到什么程度了,因为只要水烧开了,那个滴滴的噪声就会通知我的,即所谓的异步。 他们的相同点是,在烧水的过程中,老王啥也没干,即“阻塞”。
(4)四种总结——同步/异步与阻塞/非阻塞
同步阻塞形式:效率是最低的。拿上面的例子来说,在烧水的过程中,什么别的事都不做。
同步非阻塞形式:实际上是效率低下的。因为老王需要不断的在看电视与烧水之间来回跑动,看一下电视,又要去看一下水烧开没有,这样来回跑很多次,在程序中,程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。
异步阻塞形式:异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。 这个效率其实跟同步阻塞差不多的。
异步非阻塞形式:效率更高。因为老王把水烧好之后就不用管了,可以安安心心去看电视,不用来回奔波看水烧开了没,因为水烧开了会有提示告诉他水烧好了,这样效率岂不是更高。 那有没有更好的办法?当然有,如果老王还有一个帮手老张,让老王自己看电视、同时老张去烧开水,这样岂不是更好?这就是所谓的并行
(5)并发/并行、同步/异步、阻塞/非阻塞
并发/并行:即能够开启多个任务,多个任务交替执行为并发,多个任务同时执行为并行
同步/异步:关注的是消息通知的机制,主动等候消息则为同步、被动听消息则为异步
阻塞/非阻塞:关注的是等候消息的过程中有没有干其他事。
总结:上面的几组概念,时刻穿插的,并没有完全的等价关系,所以经常有人说,异步就是非阻塞,同步就是阻塞,并发就是非阻塞、并行就是非阻塞,这些说法都是不完全准确地。
概念小结
并发和并行都是实现异步编程的思路,只有一个线程的并发,称之为“伪并发”;有多个线程的并发称之为“真并发”,真并发与并行是很接近的。
异步操作的优缺点 :
因为异步操作无须额外的线程负担(这里指的是单线程交替执行的“伪并发”),并且使用回调的方式进行处理,在设计良好的情况下,处理函数可以不必使用共享变量(即使无法完全不用,最起码可以减少共享变量的数量),减少了死锁的可能。当然异步操作也并非完美无暇。编写异步操作的复杂程度较高,程序主要使用回调方式进行处理,与普通人的思维方式有些出入,而且难以调试。
多线程的优缺点:
多线程的优点很明显,线程中的处理程序依然是顺序执行,符合普通人的思维习惯,所以编程简单。但是多线程的缺点也同样明显,线程的使用(滥用)会给系统带来上下文切换的额外负担。并且线程间的共享变量可能造成死锁的出现。 异步与多线程,从辩证关系上来看,异步和多线程并不时一个同等关系,(因为单线程也是可以实现异步的)异步是目的,多线程只是我们实现异步的一个手段.什么是异步:异步是当一个调用请求发送给被调用者,而调用者不用等待其结果的返回.实现异步可以采用多线程技术或则交给另外的进程来处理。
协程的实现(二)
使用asyncio模块
从3.4开始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不过语法上面有稍微的改变。比如在3.4版本中使用@asyncio.coroutine装饰器和yield from语句,但是在3.5以后的版本中使用async、await两个关键字代替,虽然语法上稍微有所差异,但是原理是一样的。
asyncio组成的基本概念
- 协程函数的作用
- result = yield from future,返回future的结果。
- result = yield from coroutine,等候另一个协程函数返回结果或者是触发异常
- result= yield from task,返回一个task的结果
- return expression,作为一个函数抛出返回值
- raise exception
- 事件循环 event_loop线程一直在各个协程方法之间永不停歇的游走,遇到一个yield from 或者await就悬挂起来,然后又走到另外一个方法,依次进行下去,直到事件循环所有的方法执行完毕。实际上loop是BaseEventLoop的一个实例,我们可以查看定义,它到底有哪些方法可调用协程函数,不是像普通函数那样直接调用运行的,必须添加到事件循环中,然后由事件循环去运行,单独运行协程函数是不会有结果的。
import time import asyncio async def say_after_time(delay,what): await asyncio.sleep(delay) print(what) async def main(): print(f"开始时间为: {time.time()}") await say_after_time(1,"hello") await say_after_time(2,"world") print(f"结束时间为: {time.time()}") ''' 直接运行 ''' # >>> main() # <coroutine object main at 0x1053bb7c8> ''' 需要通过事件循环来调用''' loop=asyncio.get_event_loop() #创建事件循环对象 #loop=asyncio.new_event_loop() #与上面等价,创建新的事件循环 loop.run_until_complete(main()) #通过事件循环对象运行协程函数 loop.close()
获取事件循环对象的几种方式:
- loop=asyncio.get_running_loop(),返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误
- loop=asyncio.get_event_loop() ,获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop
- loop=asyncio.set_event_loop(loop), 设置一个事件循环为当前线程的事件循环;
- loop=asyncio.new_event_loop() ,创建一个新的事件循环
通过事件循环运行协程函数的两种方式:
- 创建事件循环对象loop,即 asyncio.get_event_loop(),通过事件循环运行协程函数
- 直接通过 asyncio.run(function_name) 运行协程函数。
但是需要注意的是,首先run函数是python3.7版本新添加的,前面的版本是没有的;其次,这个run函数总是会创建一个新的事件循环并在run结束之后关闭事件循环,所以,如果在同一个线程中已经有了一个事件循环,则不能再使用这个函数了,因为同一个线程不能有两个事件循环,而且这个run函数不能同时运行两次,因为他已经创建一个了。即同一个线程中是不允许有多个事件循环loop的。 asyncio.run()是python3.7 新添加的内容,也是后面推荐的运行任务的方式,因为它是高层API,后面会讲到它与asyncio.run_until_complete()的差异性,run_until_complete()是相对较低层的API。
- 什么是awaitable对象有三类对象是可等待的,即 coroutines , Tasks , and Futures .
coroutine :本质上就是一个函数,一前面的生成器yield和yield from为基础,不再赘述;
Tasks : 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;
Future :它是一个“更底层”的概念,他代表一个异步操作的最终结果,因为异步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为Future。
三者的关系,coroutin 可以自动封装成task ,而Task是Future的子类。
- 什么是task任务Task用来并发调度的协程, 单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是可以包含各种状态的,异步编程最重要的就是对异步操作状态的把控了。
(1)创建任务(两种方法):
方法一:task = asyncio.create_task(coro()) # 这是3.7版本新添加的 方法二:task = asyncio.ensure_future(coro()) ,也可以使用loop.create_future() ,loop.create_task(coro) 也是可以的。
(2)获取某一个任务的方法:
方法一:task = asyncio.current_task(loop=None);返回在某一个指定的loop中,当前正在运行的任务,如果没有任务正在运行,则返回None;如果loop为None,则默认为在当前的事件循环中获取, 方法二:asyncio.all_tasks(loop=None);返回某一个loop中还没有结束的任务;
- 什么是future?Future是一个较低层的可等待(awaitable)对象,他表示的是异步操作的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。 Future是Task的父类,一般情况下,已不用去管它们两者的详细区别,也没有必要去用Future,用Task就可以了,返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor().
asyncio的基本架构
asyncio分为高层API和低层API。我们前面所讲的Coroutine和Tasks属于高层API,而Event Loop 和Future属于低层API。所谓的高层API主要是指那些asyncio.xxx()的方法。
高层API:
Coroutines Tasks Streams Synchronization Primitives Subprocesses Queues Exceptions
低层API:
Event Loop Futures Transports Protocols Policies Platform Support
- 常见的一些高层API方法
1)运行异步协程 asyncio.run(coro, *, debug=False) #运行一个异步程序 2)创建任务 task = asyncio.create_task(coro) #python3.7 task = asyncio.ensure_future(coro()) 3)睡眠 await asyncio.sleep(delay, result=None, *, loop=None) 这个函数表示的是:当前的那个任务(协程函数)睡眠多长时间,而允许其他任务执行。这是它与time.sleep()的区别,time.sleep()是当前线程休息 4)并发运行多个任务 await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) 它本身也是awaitable的。当所有的任务都完成之后,返回的结果是一个列表的形式、 5)防止任务取消 await asyncio.shield(*arg, *, loop=None) 6)设置timeout await asyncio.wait_for(aw, timeout, *, loop=None) 当异步操作需要执行的时间超过waitfor设置的timeout,就会触发异常,所以在编写程序的时候,如果要给异步操作设置timeout,一定要选择合适,如果异步操作本身的耗时较长,而你设置的timeout太短,会涉及到她还没做完,就抛出异常了。 7)多个协程函数时候的等候 await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 与上面的区别是,第一个参数aws是一个集合,要写成集合set的形式,比如: {func(),func(),func3()} 表示的是一系列的协程函数或者是任务,其中协程会自动包装成任务。事实上,写成列表的形式也是可以的。 该函数的返回值是两个Tasks/Futures的集合: (done, pending) 其中done是一个集合,表示已经完成的任务tasks;pending也是一个集合,表示还没有完成的任务。 常见的使用方法为:done, pending = await asyncio.wait(aws)
- Task类
- 他是作为一个python协程对象,和Future对象很像的这么一个对象,但不是线程安全的;他继承了Future所有的API,,除了Future.set_result()和Future.set_Exception();
- 使用高层API asyncio.create_task()创建任务,或者是使用低层API loop.create_task()或者是loop.ensure_future()创建任务对象;
- 相比于协程函数,任务时有状态的,可以使用Task.cancel()进行取消,这会触发CancelledError异常,使用cancelled()检查是否取消。
import asyncio async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) #模拟一个耗时任务 except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): #通过协程创建一个任务,需要注意的是,在创建任务的时候,就会跳入到异步开始执行 #因为是3.7版本,创建一个任务就相当于是运行了异步函数cancel_me task = asyncio.create_task(cancel_me()) #等待一秒钟 await asyncio.sleep(1) print('main函数休息完了') #发出取消任务的请求 task.cancel() try: await task #因为任务被取消,触发了异常 except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) '''运行结果为: cancel_me(): before sleep main函数休息完了 cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now '''
- 异步函数的结果获取两种方法:第一种是直接通过Task.result()来获取;第二种是绑定一个回调函数来获取,即函数执行完毕后调用一个函数来获取异步函数的返回值。
通过result函数:
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b coroutine = hello1(10,5) loop = asyncio.get_event_loop() #第一步:创建事件循环 task = asyncio.ensure_future(coroutine) #第二步:将多个协程函数包装成任务列表 loop.run_until_complete(task) #第三步:通过事件循环运行 print('-------------------------------------') print(task.result()) loop.close() '''运行结果为 Hello world 01 begin Hello again 01 end ------------------------------------- 15 '''
通过定义回调函数:
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b def callback(future): #定义的回调函数 print(future.result()) loop = asyncio.get_event_loop() #第一步:创建事件循环 task = asyncio.ensure_future(hello1(10,5)) #第二步:将多个协程函数包装成任务 task.add_done_callback(callback) #并被任务绑定一个回调函数 loop.run_until_complete(task) #第三步:通过事件循环运行 loop.close() #第四步:关闭事件循环 '''运行结果为: Hello world 01 begin Hello again 01 end 15 '''
所谓的回调函数,就是指协程函数coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。我们创建的task和回调里的future对象,实际上是同一个对象,因为task是future的子类。
- asyncio的基本模版针对3.7之前的版本
import asyncio import time from functools import partial async def get_url(): print('start get url') await asyncio.sleep(2) # await 后面跟的必须是一个 await 对象 print('end get url') return 'stack' def test(url,future): print(url,'hello, stack') if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # loop.run_until_complete(get_url()) # 只是提交了一个请求,时间2s tasks = [get_url() for i in range(10)] # get_future = asyncio.ensure_future(get_url()) # 获得返回值用法1,源码上依然是先判断loop,然后调用create_task # get_future = loop.create_task(get_url()) # 方法2,还可以继续添加函数,执行逻辑 # get_future.add_done_callback(partial(test, 'Stack')) # 函数本身在获得调用时需要一个任意形数,参数即是 get_future 本身,否则报错 # 如果函数需要传递参数,需要通过 偏函数 partial 模块来解决,以及函数的形参需要放在前面 loop.run_until_complete(asyncio.wait(tasks)) # 提交了10次,时间也是2s # loop.run_until_complete(asyncio.gather(*tasks)) 效果同上 # gather 和 wait 的区别 # gather是更高一级的抽象,且使用更加灵活,可以使用分组,以及取消任务 print(time.time() - start) # print(get_future.result()) # 接收返回值
针对3.7的版本
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b async def hello2(a,b): print("Hello world 02 begin") await asyncio.sleep(2) #模拟耗时任务2秒 print("Hello again 02 end") return a-b async def hello3(a,b): print("Hello world 03 begin") await asyncio.sleep(4) #模拟耗时任务4秒 print("Hello again 03 end") return a*b async def main(): results = await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5)) for result in results: print(result) asyncio.run(main()) '''运行结果为: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''
总结:
第一步:构建一个入口函数main 它也是一个异步协程函数,即通过async定义,并且要在main函数里面await一个或者是多个协程,同前面一样,我可以通过gather或者是wait进行组合,对于有返回值的协程函数,一般就在main里面进行结果的获取。
第二步:启动主函数main 这是python3.7新添加的函数,就一句话,即 asyncio.run(main())