Python学习笔记-第10天:numpy模块以及Python的多线程编程

第十天 numpy模块介绍和使用

今天计划学习numpy模块以及Python的多线程编程,学习项目及练习源码地址:
GitHub源码

numpy模块

NumPy是Python中科学计算的基础包。它是一个Python库,提供多维数组对象,各种派生对象(如掩码数组和矩阵),以及用于数组快速操作的各种例程,包括数学,逻辑,形状操作,排序,选择,I/O离散傅立叶变换,基本线性代数,基本统计运算,随机模拟等等。

NumPy包的核心是ndarray对象。这封装了同构数据类型的n维数组,许多操作在编译代码中执行以提高性能。NumPy数组和标准Python序列之间有几个重要的区别:

  • NumPy数组在创建时具有固定大小,与Python列表(可以动态增长)不同。更改ndarray的大小将创建一个新数组并删除原始数组。
  • NumPy数组中的元素都需要具有相同的数据类型,因此在内存中的大小相同。例外:可以有(Python,包括NumPy)对象的数组,从而允许不同大小的元素的数组。
  • NumPy数组有助于对大量数据进行高级数学和其他类型的操作。通常,与使用Python的内置序列相比,这些操作的执行效率更高,代码更少。
  • 越来越多的基于Python的科学和数学软件包正在使用NumPy数组; 虽然这些通常支持Python序列输入,但它们在处理之前将这些输入转换为NumPy数组,并且它们通常输出NumPy数组。换句话说,为了有效地使用当今大量(甚至大多数)基于Python的科学/数学软件,只知道如何使用Python的内置序列类型是不够的 – 还需要知道如何使用NumPy数组。

安装numpy和使用示例

pip3 install numpy

import numpy
a = numpy.arange(10)
print(a) # [0 1 2 3 4 5 6 7 8 9] 看起来是个列表,实际不是
print([1,2,3]) # [1, 2, 3] 这才是列表

在这个例子中只涉及numpy模块中的一个arange函数,该函数可以传入一个整数类型的参数n,函数返回值看着像一个列表,其实返回值类型是numpy.ndarray。这是Numpy中特有的数组类型。如果传入arange函数的参数值是n,那么arange函数会返回0 到n-1的ndarray类型的数组。

numpy中的数组

numpy.array

numpy模块的array函数可以生成多维数组。例如,如果要生成一个二维数组,需要向array函数传递一个列表类型的参数。每一个列表元素是一维的ndarray类型数组,作为二维数组的行。另外,通过ndarray类的shape属性可以获得数组每一维的元素个数(元组形式),也可以通过shape[n]形式获得每一维的元素个数,其中n是维度,从0开始。

numpy.array(object,dtype=None,copy=None,order=None,subok=Flase,ndmin=0)

参数名称 描述
object 数组或嵌套的数列
dtype 数组元素的数据类型,可选
copy 对象是否需要复制,可选
order 创建数组的样式,C为行方向,F为列方向,A为任意方向(默认)
subok 默认返回一个与基类类型一致的数组
ndmin 指定生成数组的最小维度
  • 一维数组的创建

    a = numpy.array([1,2,3,4,5])
    print(a)
    print('数组的纬度:',a.shape) # (5,) 元祖
    
  • 二维数组

    a = numpy.array([[1,2],[3,4],[5,6]])
    print(a)
    print('数组的纬度:',a.shape) # (5,) 元祖   
    
  • dtyp参数的使用

    a = numpy.array([1,2,3,4,5],dtype=complex)
    print(a)
    

numpy.arange

使用 arange 函数创建数值范围并返回 ndarray 对象,函数格式如下:

numpy.arange(start,stop,step=1,dtype=None)

注意:不包含stop的值

numpy.random.random创建随机数数组

numpy.random.random(size=None)

默认返回[0.0,1.0)之间的随机数,不包含1

numpy.random.randint创建随机数数组

numpy.random.randint(low=0,high=None,size=None)
该方法有三个参数low、high、size 三个参数。默认 high 是 None,如果只有low,那范围就是[0,low)。如果有high,范围就是[low,high)。

a = numpy.random.randint(4,10,size=(3,4,2))
print(a) # 第一纬度3个元素,第二纬度4个元素,第三纬度2个元素 随机数在最里面的纬度
# [
# [[4 5][5 5][5 7][9 6]]
# [[6 9][5 7][7 9][7 9]]
# [[8 7][8 7][7 6][8 6]]
# ]

numpy.random.randn 返回标准正太分布

函数返回一个或一组样本,具有标准正态分布(期望为 0,方差为 1)。 dn表格每个维度。返回值为指定维度的array。

numpy.random.randn(d0,d1,...,dn)

numpy.random.normal指定期望和方差的正太分布

numpy.random.normal(loc=0,scale=1,size=None)

ndarray对象

numpy最重要的一个特点是其N维数组对象ndarray,它是一系列同类型数据的集合,以0下标为开始进行集合中元素的索引。

  • ndarray 对象是用于存放同类型元素的多维数组。
  • ndarray 中的每个元素在内存中都有相同存储大小的区域
  • ndarray 内部由以下内容组成:
    1. 一个指向数据(内存或内存映射文件中的一块数据)的指针。
    2. 数据类型或dtype,描述在数组中的固定大小值的格子。
    3. 一个表示数组形状(shape)的元组,表示各维度大小的元组。

重要的ndarray对象属性有:

属性 说明
ndarray.ndim 秩,即轴的数量或维度的数量
ndarray.shape 数组的维度,对于矩阵,n 行 m 列
ndarray.size 数组元素的总个数,相当于 .shape 中 n*m 的值
ndarray.dtype ndarray 对象的元素类型
ndarray.itemsize ndarray 对象中每个元素的大小,以字节为单位
ndarray.flags ndarray 对象的内存信息
ndarray.real ndarray 元素的实部
ndarray.imag ndarray 元素的虚部
ndarray.data 包含实际数组元素的缓冲区,由于一般通过数组的索引获取元素,所以通常不需要使用这个属性。

numpy.zeros 创建指定大小的数组,数组元素以 0 来填充

numpy.zeros(shape, dtype = float, order = 'C')

有”C”和”F”两个选项,分别代表,行优先和列优先,在计算机内存中的存储元素的顺序。

举例:

x = numpy.zeros(5) # [0. 0. 0. 0. 0.] 默认是float
x = numpy.zeros((5,),dtype=int) # # [0 0 0 0 0]
x = numpy.zeros((2,2),dtype=str) #[['' '']['' '']]

numpy.ones 创建指定形状的数组,数组元素以1来填充

numpy.ones(shape, dtype = None, order = 'C')

类似于zeros

举例:

x = numpy.ones((5,),dtype=int) # # [1 1 1 1 1]
x = numpy.ones((2,2),dtype=str) #[['1' '1']['1' '1']]

numpy.empty 方法用来创建一个指定形状(shape)、数据类型(dtype)且未初始化的 数组,里面的元素的值是之前内存的值

numpy.empty(shape, dtype = float, order = 'C')

举例:

x = numpy.empty((5,),dtype=int) 
x = numpy.empty((2,2),dtype=str) 

切片和索引

ndarray对象的内容可以通过索引或切片来访问和修改,与 Python中list的切片操作一样。

a = numpy.arange(10)
b = a[2:7:1]
c = a[3:]

print(a[1])

改变数组的维度

处理数组的一项重要工作就是改变数组的维度,包含提高数组的维度和降低数组的维度,还包括数组的转置。Numpy提供的大量 API可以很轻松地完成这些数组的操作。例如,通过reshape方法可以将一维数组变成二维、三维或者多维数组。通过ravel方法或 flatten方法可以将多维数组变成一维数组。改变数组的维度还可以直接设置Numpy数组的shape属性(元组类型),通过 resize法也可以改变数组的维度。

a = numpy.arange(1,13)
print(a)
b = a.reshape(2,3,2) # 乘积需要等于元素个数
print(b)
c = b.ravel()
print(c)
d = c.flatten()
print(d)

数组的拼接

水平拼接

通过hstack函数可以将两个或多个数组水平组合起来形成一个数组。水平组合必须要满足一个条件,就是所有参与水平组合的数组的行数必须相同,否则进行水平组合会抛出异常。

垂直拼接

通过vstack函数可以将两个或多个数组垂直组合起来形成一个数组。水平组合必须要满足一个条件,就是所有参与水平组合的数组的列数必须相同,否则进行水平组合会抛出异常。

numpy.concatenate函数用于沿指定轴连接相同形状的两个或多个数组

格式如下:
numpy.concatenate((a1, a2, ...), axis)

axis = 0 默认 vstack

axis = 1 hstack

axis = 2 dstack

多线程和并发编程

进程

在学习线程前,先了解下Python关于进程的知识。

什么是进程:程序编写完没有运行称之为程序。正在运行的代码就是进程。

在Python3语言中,对多进程支持的是multiprocessing模块和subprocess模块。multiprocessing模块为在子进程中运行任务、通讯和共享数据,以及执行各种形式的同步提供支持。

进程的创建

Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据。语法格式如下:

Process([group [, target [, name [, args [, kwargs]]]]])

其中target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group参数未使用,值始终为None。

构造函数简单地构造了一个Process进程,Process的实例方法、Process的实例属性如下表所示:

方法 描述
is_alive() 如果p仍然运行,返回True
join([timeout]) 等待进程p终止。Timeout是可选的超时期限,进程可以被链接无数次,但如果连接自身则会出错
run() 进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一种方法是继承Process类并重新实现run()函数
start() 启动进程,这将运行代表进程的子进程,并调用该子进程中的run()函数
terminate() 强制终止进程。如果调用此函数,进程p将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将变为僵尸进程。使用此方法时需要特别小心。如果p保存了一个锁或参与了进程间通信,那么终止它可能会导致死锁或I/O损坏

【示例】创建子进程并执行

from multiprocessing import Process,set_start_method
set_start_method('spawn',True) # 在vscode中运行时需要加上这个,否则要报错
#定义子进程代码
def run_proc():
    print('子进程运行中')

if __name__=='__main__':
    print('父进程运行')
    p=Process(target=run_proc)
    print('子进程将要执行')
    p.start()

【示例】创建子进程,传递参数

from multiprocessing import Process,set_start_method
import os
from time import sleep

set_start_method('spawn',True)
#创建子进程代码
def run_proc(name,age,**kwargs):
    for i in range(5):
        print('子进程运行中,参数name:%s,age:%d'%(name,age))
        print('字典参数kwargs:',kwargs)
        sleep(0.5)

if __name__=='__main__':
    print('主进程开始运行')
    p=Process(target=run_proc,args=('张山',38),kwargs={'book1':'test1','chuban':'xin'})
    print('子进程将要执行')
    p.start()

【示例】join()方法的使用

from multiprocessing import Process,set_start_method
from time import sleep

set_start_method('spawn',True)
def worker(interval):
    print("work start")
    sleep(interval)
    print("work end")

if __name__ == "__main__":
    p = Process(target = worker, args = (10,))
    p.start()
    print('等待进程p终止')
    p.join(3) # 如果子进程未结束 程序已经结束,子程序还是会继续执行的
    print("主进程结束!")
    '''
    等待进程p终止
    work start
    主进程结束!
    work end
    '''

Process实例属性:

方法 描述
name 进程的名称
pid 进程的整数进程ID

【示例】创建函数并将其作为多个进程

from multiprocessing import Process,set_start_method
from time import sleep

set_start_method('spawn',True)
#创建进程调用函数
def work1(interval):
    print('work1')
    sleep(interval)
    print('end work1')
def work2(interval):
    print('work2')
    sleep(interval)
    print('end work2')
def work3(interval):
    print('work3')
    sleep(interval)
    print('end work3')
if __name__=='__main__':
    #创建进程对象
    p1=multiprocessing.Process(target=work1,args=(4,))
    p2=multiprocessing.Process(target=work2,args=(3,))
    p3=multiprocessing.Process(target=work3,args=(2,))
    #启动进程
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    print("p1 join")
    p2.join()
    print("p2 join")
    p3.join()
    print("p3 join")
    print('主进程结束')

创建自定义Process子类

创建进程的方式还可以使用类的方式,可以自定义一个类,继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象,示例如下:

from multiprocessing import Process
import time
#定义线程类
class MyProcess(Process):
    def __init__(self,interval):
        Process.__init__(self)
        self.interval=interval
    def run(self): # 一定要重新这个方法
        print('子进程开始执行的时间:{}'.format(time.ctime()))
        time.sleep(self.interval)
        print('子进程结束的时间:{}'.format(time.ctime()))

if __name__=='__main__':
    #创建进程
    p=MyProcess(3)
    #启动进程
    p.start()
    p.join()
    print('主进程结束')

进程池

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。Pool的语法格式如下:

Pool([numprocess [, initializer [, initargs]]])

其中numprocess是要创建的进程数。如果省略此参数,将使用cpu_count()的值。Initializer是每个工作进程启动时要执行的可调用对象。Initargs是要传递给initializer的参数元祖。Initializer默认为None。

Pool类的实例方法如下表所示:

方法 描述
apply(func [,args [,kwargs]]) 在一个池工作进程中执行函数(args,*kwargs),然后返回结果。
apply_async (func [, args [,kwargs [,callback ] ] ]) 在一个池工作进程中异步地执行函数(args,*kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,稍后可用于获得最终结果。Callback是可调用对象,接受输入参数。当func的结果变为可用时,将立即传递给callback。Callback禁止执行任何阻塞操作,否则将阻塞接收其他异步操作中的结果
close() 关闭进程池,防止进行进一步操作。如果还有挂起的操作,它们将在工作进程终止之前完成
join() 等待所有工作进程退出。此方法只能在close()或者terminate()方法之后调用
imap( func,iterable [ ,chunksize] ) map()函数的版本之一,返回迭代器而非结果列表
imap_unordered( func,iterable [,chunksize] ) 同imap()函数一样,只是结果的顺序根据从工作进程接收到的时间任意确定
map( func,iterable [,chunksize] ) 将可调用对象func应用给iterable中的所有项,然后以列表的形式返回结果。通过将iterable划分为多块并将工作分派给工作进程,可以并行地执行这项操作。chunksize指定每块中的项数。如果数量较大,可以增大chunksize的值来提升性能
map_async( func,iterable [,chunksize [,callback]] ) 同map()函数,但结果的返回是异步的。返回值是AsyncResult类的实例,稍后可用与获取结果。Callback是指接受一个参数的可调对象。如果提供callable,当结果变为可用时,将使用结果调用callable
terminate() 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
get( [ timeout] ) 返回结果,如果有必要则等待结果到达。Timeout是可选的超时。如果结果在指定时间内没有到达,将引发multiprocessing.TimeoutError异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发
ready() 如果调用完成,则返回True
sucessful() 如果调用完成且没有引发异常,返回True。如果在结果就绪之前调用此方法,将引发AssertionError异常
wait( [timeout] ) 等待结果变为可用。Timeout是可选的超时

非阻塞(apply_async)进程池示例:

from multiprocessing import Process,set_start_method,Pool

import time,os

set_start_method('spawn',True)

def func(msg):
    print("start:", msg)
    time.sleep(3)
    print("end:",msg)

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in range(5):
        msg = "hello %d" %(i)
        #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.apply_async(func, (msg, ))

    pool.close() #进程池关闭之后不再接收新的请求
    # 调用join之前,先调用close函数,否则会出错。
    # 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    pool.join() # 如果不等待,主程序会提前结束,未完的子程序会报错
    print('主进程结束')

【示例】使用进程池(阻塞)apply:

from multiprocessing import Process,set_start_method,Pool

import time,os

set_start_method('spawn',True)

def func(msg):
    print("start:", msg)
    time.sleep(3)
    print("end",msg)

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in range(5):
        msg = "hello %d" %(i)
        #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.apply(func, (msg, ))

    pool.close()
    #调用join之前,先调用close函数,否则会出错。
    # 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    pool.join()

很明显,在这里进程是一个添加等待执行完毕之后才会添加下一个进程。

进程间通信

  • 全局变量在多个进程中不共享,进程之间的数据是独立的,默认情况下互不影响。

【示例】多个进程中数据不共享

from multiprocessing import Process
num=1
def work1():
    global num
    num+=5
    print('子进程1运行,num:',num)

def work2():
    global num
    num += 10
    print('子进程2运行,num:',num)

if __name__=='__main__':
    print('父进程开始运行')
    p1=Process(target=work1)
    p2=Process(target=work2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
'''
子进程1运行,num: 6
子进程2运行,num: 11
'''
  • Queue实现进程间的通信

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

Queue实例方法表:

方法 描述
cancle_join_thread() 不会在进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞
close() 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列尚未写入数据,但将在此方法完成时马上关闭
empty() 如果调用此方法时q为空,返回True
full() 如果q已满,返回True
get([block [,timeout]) 返回q中的一个项。如果q为空,此方法将阻塞,直到队列中有项可用为止。Block用于控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。Timeout是可选超时时间,用在阻塞模式中。如果在指定的时间间隔内没有项变为可用,将引发Queue.Empty异常
join_thread() 连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下此方法由不是q的原始创建者的所有进程调用。调用q.cancle_join_thread()方法可以禁止这种行为
put(item [ , block [, timeout]]) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。Block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。Timeout指定在阻塞模式中等待可用时空间的时间长短。超时后将引发Queue.Full异常。
qsize() 返回目前队列中项的正确数量。
joinableQueue([maxsize]) 创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项的消费者通知生产者项已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
task_done() 消费者使用此方法发出信号,表示q.get()返回的项已经被处理。如果调用此方法的次数大于从队列中删除的项的数量,将引发ValueError异常
join() 生产者使用此方法进行阻塞,知道队列中的所有项均被处理。阻塞将持续到位队列中的每个项均调用q.task_done()方法为止

【示例】Queue的方法使用

from multiprocessing import Queue
q=Queue(3)
q.put('消息1')
q.put('消息2')
print('消息队列是否已满:',q.full())
q.put('消息3')
print('消息队列是否已满:',q.full())

# q.put('消息4')因为消息队列已满,需要直接写入需要等待,如果超时会抛出异常,
# 所以写入时候需判断,消息队列是否已满
if not q.full():
    q.put('消息4')

#同理读取消息时,先判断消息队列是否为空,再读取
# qsize在MacOS上无法运行
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
#return self._maxsize – # self._sem._semlock._get_value()

# 所以 不能这样用
#for i in range(q.qsize()):
#    print(q.get())

while(not q.empty()):
    print(q.get())

    

【示例】Queue队列实现进程间通信

from multiprocessing import *
import time
def write(q):
    #将列表中的元素写入队列中
    for i in ["a","b","c"]:
        print('开始写入值%s' % i)
        q.put(i)
        time.sleep(1)

#读取
def read(q):
    print('开始读取')
    while True:
        if not q.empty():
            print('读取到:',q.get())
            time.sleep(1)
        else:
            break
if __name__=='__main__':
    #创建队列
    q=Queue()
    #创建写入进程
    pw=Process(target=write,args=(q,))
    pr=Process(target=read,args=(q,))
    #启动进程
    pw.start()
    pw.join()
    pr.start() # 注意:测试一下将这个放到pw.join()之前
    pr.join()

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产和消费的工作能力来提高程序的整体处理数据的速度。

  1. 为什么要使用生产者和消费者模式

在并发世界里,生产者就是生产数据的进(线)程,消费者就是消费数据的进(线)程。在多进(线)程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

  1. 什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

生产和消费的示例代码

Pool池里面的进程间通信

如果使用Pool创建进程,就需要使multiprocessing.Manager()中的Queue()来完成进程间的通信,而不是multiprocessing.Queue(),否则会抛出如下异常。

【示例】进程池创建进程完成进程之间的通信

from multiprocessing import Manager,Pool
import time
def write(q):
    #将列表中的元素写入队列中
    for i in ["a","b","c"]:
        print('开始写入值%s' % i)
        q.put(i)
        time.sleep(1)

#读取
def read(q):
    print('开始读取')
    while True:
        if not q.empty():
            print('读取到:',q.get())
            time.sleep(1)
        else:
            break
if __name__=='__main__':
    #创建队列
    q=Manager().Queue()
    #创建进程池
    p=Pool(3)
    #使用阻塞模式创建进程
    p.apply(write,(q,))
    p.apply(read,(q,)) # 如果使用异步,需要修改read保证读取
    p.close() # 不要忘了关闭
    p.join()

小结

明天开始学习线程