Python深入探究(二)并发编程!面试挂在这!
[TOC]
来点干货:Python并发编程探究(面试挂在这了55555)
大框架:
concurrent.futures
— 启动并行任务(都是基于Executor抽象类的)- Executor 对象
- ThreadPoolExecutor 线程池
- ProcessPoolExecutor
- Future 对象
subprocess — 子进程管理最多用于shell命令,构建管道获得输出
*==用这个提纲,去学习,能够有更深刻的理解==**
后台进程
使用threading模块来进行新线程的创建,如果默认情况下的话,是会守护子线程的,也就是,哪怕主线程已经结束了,子线程没有退出,python的解释器就不会退出:
1 | import time |
输出为:
1 | T-minus主线程 |
可以看到,再主线程打印了4次”主线程”之后,还是会等子线程打印”T-minus “打完才退出。
而我们可以通过设置daemon=True来使子线程为后台进程,我们看看会有什么不同:
t=Thread(*target*=countdown,*args*=(10,),*daemon*=True)
1 | T-minus 10 |
在主线程退出之后,子线程也退出了。
后台线程的责任是为整个主线程提供服务,如保持网络连接(发送keep-alive心跳包),负责内存管理与垃圾回收(实际上JVM就是这样做的). 因此这些线程与实际提供应用服务的线程有了逻辑上的”前/后”的概念,而如果主线程已经退出,那么后台线程也没有存在的必要.
如果没有这一机制,那么我们在主线程完成之后,还必须逐个地检查后台线程,然后在主线程退出之前,逐个地关闭它们. 有了前后线程的区分, 我们只需要负责管理前台线程, 完成主要的逻辑处理之后退出即可.
利用循环条件控制线程
上述后台线程的方法帮我们解决了部分线程退出的问题,但是我们还是没法控制它,比如让它在特定的点退出,下面是一个写法,利用了类——相当于给线程每次一个查询的操作:
1 | class CountdownTask: |
在这里面,利用这个类的terminate
函数,以及while循环中的条件,实现了我们的手动主动退出。
利用超时机制控制线程
但是还存在一个问题:
上述利用循环控制线程的方法
的确可以做到对特定情况下线程的退出,但是如果是开启一个网络服务器这样的呢,根本不会循环判断,怎么办:
如果线程执行一些像I/O这样的阻塞操作,那么通过轮询来终止线程将使得线程之间的协调变得非常棘手。比如,如果一个线程一直阻塞在一个I/O操作上,它就永远无法返回,也就无法检查自己是否已经被结束了。要正确处理这些问题,你需要利用超时循环来小心操作线程。 例子如下:
1 | class IOTask: |
利用socket的超时函数,来实现自我检查
GIL锁(初了解)
- GIL 保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也只允许同时只能有一个CPU 上运行该进程的一个线程。
- CPython中
- IO密集型,某个线程阻塞,就会调度其他就绪线程;
- CPU密集型,当前线程可能会连续的获得GIL,导致其它线程几乎无法使用CPU。
- 在CPython中由于有GIL存在,IO密集型,使用多线程较为合算;CPU密集型,使用多进程,要绕开GIL。
print()等就是IO输出,应该避免,否则线程会阻塞,会释放GIL锁,其他线程被调度。
由于全局解释锁(GIL)的原因,Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。所以,Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作(比如等待I/O、等待从数据库获取数据等等),而不是需要多处理器并行的计算密集型任务。
event对象协调线程运行
只是上述链接里面的例子,注意看清楚他是初始化了三个一模一样的线程,然后再event.set()之前,都是被event.wait()了,处于阻塞状态。
event对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程。如果你只想唤醒单个线程,最好是使用信号量或者
Condition
对象来替代。考虑一下这段使用信号量实现的代码:
1 | # Worker thread |
关于信号量的,下一次再写 2021.2.22
Condition类用于上锁
首先,谁拿到锁,那么谁才能够使用当前代码和内存!!
来一个很经典的生产者消费者模型:
假设有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交互产品。生产者的”策略“是如果市场上剩余的产品少于1000个,那么就生产100个产品放到市场上;而消费者的”策略“是如果市场上剩余产品的数量多余100个,那么就消费3个产品。
1 | import threading |
输出:
1 | Thread-6 produce 100, count=600 |
acquire()/release():获得/释放 Lock
wait([timeout]):线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。调用wait()会释放Lock,直至该线程被Notify()、NotifyAll()或者超时线程又重新获得Lock.
notify(n=1):通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
每次进入一个线程的时候,首先看能不能获得锁
con.acquire()
,如果拿到了,那么现在这段时间就只有我能访问count
了。然后判断
count
数值是不是满足自己操作的条件,如果不是,则就让自己先睡会con.wait()
,让别的线程知道可以下手了。如果满足条件,就进行相应的输出。
我用完了,就需要把锁给别人,首先
con.notify()
,唤醒waiting状态的线程,从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。然后
con.release()
正式释放锁。
总结:
Condition
实现了一个线程之间能够通信的模型,生产者拿到锁,生产完了之后就通知别人可以来访问共享资源了。
每个线程想获取资源进行操作的时候,都先acquire申请,申请不到就阻塞等待,让别的线程进行使用,用完了轮到自己拿到锁🔒了,就可以进行操作资源了,使用完资源之后又把锁给别人。这样就保证了共享资源的干净。
同时Condition
相比于event
,能够只唤醒一个线程,不用每次唤醒所有阻塞的线程。
编写涉及到大量的线程间同步问题的代码会让你痛不欲生。比较合适的方式是使用队列来进行线程间通信或者每个把线程当作一个Actor,利用Actor模型来控制并发。
利用队列线程间通信
前面我们在用event或者condition进行通信,但是当共享数据是列表形的时候就很麻烦,这是我们便可以利用队列:
我们创建一个能够被多线程共享的Queue
对象,然后线程使用put()get()
来操作元素,一个最简单的生产者消费者的例子:
1 | from queue import Queue |
Queue
对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。 当使用队列时,协调生产者和消费者的关闭问题可能会有一些麻烦。一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。例如:
1 | from queue import Queue |
Queue中的join()与task_done()
使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的
task_done()
和join()
:
实际上就是没法判断队列什么时候清空了,而:
Queue.task_done()
在完成一项工作之后,Queue.task_done()
函数向任务已经完成的队列发送一个信号
Queue.join()
实际上意味着等到队列为空,再执行别的操作
如果线程里每从队列里取一次,但没有执行task_done(),则join无法判断队列到底有没有结束,在最后执行个join()是等不到结果的,会一直挂起。
可以理解为,每task_done一次 就从队列里删掉一个元素,这样在最后join的时候根据队列长度是否为零来判断队列是否结束,从而执行主线程。
下面这个例子,会在join()的地方无限挂起,因为join在等队列清空,但是由于没有task_done,它认为队列还没有清空,还在一直等。
重点:
那说了半天,我为啥要用join呢,直接判断那个队列是不是空不就完了么,下面几个回答
join()只看task_done(),和队列空不空其实没关系
queue.join()
会一直阻塞,直到队列中所有的message都被get出来并且调用task_done才会返回。通常用在等待所有的任务都处理完了,然后退出进程。
empty会立马返回,用你的while循环检查时,如果队列为空线程会一直用死循环。循环等待会很耗CPU。
上例子:
1 | from queue import Queue |
在我这样 # self.queue.task_done()
的时候,就算任务完成了,线程还是不会退出的:
然后我把注释去掉,就终于可以自动退出,执行别的任务啦
哎,今天一下午就看了这么一点,终于搞懂了
线程池ThreadPoolExecutor
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
我们除了可以使用队列,通过不断的添加线程而形成一个线程池,利用队列的最大长度来实现管理,但是在python的库中使用
concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
Exectuor 提供了如下常用方法:
- submit(fn, args, **kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
- map(func, iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。这就是著名的MapReduce思想啊!并发问题**
- shutdown(wait=True):关闭线程池。
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
Future 提供了如下方法:
- cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
- cancelled():返回 Future 代表的线程任务是否被成功取消。
- running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
- done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
- result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
- exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
- add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。
在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
下面是一个简单的TCP服务器,使用了一个线程池来响应客户端:
1 | from socket import AF_INET, SOCK_STREAM, socket |
使用 ThreadPoolExecutor
相对于手动实现的一个好处在于它使得 任务提交者更方便的从被调用函数中获取返回值。例如,你可能会像下面这样写:
1 | from concurrent.futures import ThreadPoolExecutor |
特别的,a.result()
操作会阻塞进程直到对应的函数执行完成并返回一个结果。
要是不想阻塞,就把print(a.result())
换成future.add_done_callback(test_result)
即可.
另外
1 | import threading |
可以控制线程所使用的虚拟内存的大小,用来保护。
concurrent.futures中的密集并发处理
还记得我们提到的MapReduce思想吗,我在另一篇博客中详细讨论了这个思想,在并行编程中,我们用futures.map()来代替map(),就可以利用多核CPU的效果,使得避免了GIL锁的限制,提升了计算效率。例子在这里
ProcessPoolExecutor用法总结
典型用法如下:
1 | from concurrent.futures import ProcessPoolExecutor |
其原理是,一个 ProcessPoolExecutor
创建N个独立的Python解释器, N是系统上面可用CPU的个数。你可以通过提供可选参数给 ProcessPoolExecutor(N)
来修改 处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成, 然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。
异步执行可以由
ThreadPoolExecutor
使用线程或由ProcessPoolExecutor
使用单独的进程来实现。 两者都是实现抽像类Executor
定义的接口。
被提交到池中的工作必须被定义为一个函数。有两种方法去提交。 如果你想让一个列表推导或一个 map()
操作并行执行的话,可使用 pool.map()
:
1 | # A function that performs a lot of work |
另外,你可以使用 pool.submit()
来手动的提交单个任务:
1 | # Some function |
如果你手动提交一个任务,结果是一个 Future
实例。 要获取最终结果,你需要调用它的 result()
方法。 它会阻塞进程直到结果被返回来。
如果不想阻塞,你还可以使用一个回调函数,例如:
1 | def when_done(r): |
回调函数接受一个 Future
实例,被用来获取最终的结果(比如通过调用它的result()方法)。 尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方,如下几点:
- 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。
- 被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。
- 函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化
- 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情,
一旦启动你不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。
- 在Unix上进程池通过调用
fork()
系统调用被创建,
它会克隆Python解释器,包括fork时的所有程序状态。 而在Windows上,克隆解释器时不会克隆状态。 实际的fork操作会在第一次调用 pool.map()
或 pool.submit()
后发生。
- 当你混合使用进程池和多线程的时候要特别小心。
你应该在创建任何线程之前先创建并激活进程池(比如在程序启动的main线程中创建进程池)。
进程的并行multiprocess!!!
multiprocessing
是一个用与threading
模块相似API的支持产生进程的包。multiprocessing
包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响(ProcessPoolExecutor
其实也是一样的原理)。因此,multiprocessing
模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。
利用池子
我们先看看之前用线程怎么写一个并发:
1 | from concurrent.futures import ThreadPoolExecutor |
输出为1,4,9,16
再看看使用进程池:
1 | from multiprocessing import Pool |
直接创建进程对象
在 multiprocessing
中,通过创建一个 Process
对象然后调用它的 start()
方法来生成进程。
1 | from multiprocessing import Process |
在进程之间通信
multiprocessing
支持进程之间的两种通信通道:
队列
Queue
类是一个近似queue.Queue
的克隆。 例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 from multiprocessing import Process, Queue
def f(q):
q.put([1, None, 'hello'])
def f_2(q):
q.put([2, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p2 = Process(target=f_2, args=(q,))
p.start()
p2.start()
print(q.get()) # prints "[42, None, 'hello']"
print(q.get())
p.join()
p2.join()
输出:
[2, None, 'hello']
[1, None, 'hello']队列是线程和进程安全的。
管道
Pipe()
函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 from multiprocessing import Process, Pipe
import time
def f(conn):
conn.send([1, None, 'hello'])
conn.send([2, None, 'hello'])
conn.send([3, None, 'hello'])
time.sleep(2)
conn.send([4, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
while True:
try:
data=parent_conn.recv()
print(data)
except:
print('no')
p.join()返回的两个连接对象
Pipe()
表示管道的两端。每个连接对象都有send()
和recv()
方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。
管道原理:
上面的代码中,我做了个小实验,最终搞明白了管道的特点:一旦建成,除非主动销毁,在管道没有东西的时候,Pipe.rec()
是读不出来东西的,但是此时管道并没有被关闭,联想起做过的项目,也都是我把发端的进程强行停掉了,这个管道才会销毁。
信号量——终极一战
互斥锁就是n=1的信号量!
所谓信号量,其实就是为了控制同一时间内,正在运行的线程的多少的量。
threading.Semaphore
类提供了主要两种方法:acquire()release()
分别是申请和释放信号量,当信号量为0的时候,后面的申请的线程就会阻塞,处于等待状态
信号量与池的区别:
是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程,但是只有几个处于运行状态其余在阻塞等待
最后想说的话
学海无涯,越深入学习越能够认识到自己的不足,在并行方面还仍有可提升之处,下一步将要带着项目进行学习,还需要了解操作系统中进程与线程的行为,路漫漫其修远兮。
2.线程池ThreadPoolExecutor
https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p07_creating_thread_pool.html
https://www.cnblogs.com/hoojjack/p/10846010.html
3.进程池multiprocess https://docs.python.org/zh-cn/3.7/library/multiprocessing.html
multiprocess 基于进程的并发
subprocess 最多用于shell命令,构建管道获得输出
异步执行可以由
ThreadPoolExecutor
使用线程或由ProcessPoolExecutor
使用单独的进程来实现。 两者都是实现抽像类Executor
定义的接口。用这个提纲,去学习,能够有更深刻的理解