logo

该视频仅会员有权观看

立即开通课程「Python 入门」权限。

¥
199
/ 年

在 Python 中,要实现多进程,我们可以使用 multiprocessing 模块,这个模块提供了一个 Process 类来表示一个进程对象,然后通过 start() 方法启动一个进程,通过 join() 方法等待进程执行完毕。

创建进程

我们可以模拟一个比较耗时的操作,然后来对比下多进程和单进程的执行时间,比如下面的例子:

import os import time def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(2) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start)))

如果不使用多进程,我们如果要执行两个耗时的操作:

if __name__ == '__main__': print('Parent process %s.' % os.getpid()) start = time.time() long_time_task('A') long_time_task('B') end = time.time() print('Task runs %0.2f seconds.' % (end - start))

上面代码中我们首先输出父进程的 pid,然后调用 long_time_task 函数两次,模拟两个耗时的操作,然后输出执行时间,我们可以看到两个任务是顺序执行的,输出如下:

Parent process 40897. Run task A (40897)... Task A runs 2.00 seconds. Run task B (40897)... Task B runs 2.01 seconds. Task runs 4.01 seconds.

从输出可以看到 A、B 两个任务属于同一个进程,与主进程的 ID 是一样的,是顺序执行的,总共执行时间是 4.01 秒。

如果使用多进程,我们可以同时执行两个耗时的操作:

if __name__ == '__main__': from multiprocessing import Process print('Parent process %s.' % os.getpid()) start = time.time() p1 = Process(target=long_time_task, args=('A',)) p2 = Process(target=long_time_task, args=('B',)) print('Waiting for all subprocesses done...') p1.start() p2.start() p1.join() p2.join() end = time.time() print('All subprocesses done. %0.2f seconds.' % (end - start))

在上面的代码中我们首先输出父进程的 pid,然后使用 Process 类创建了两个进程对象 p1p2,实例化该对象的时候传入 target 参数,指定要执行的函数,然后传入 args 参数,指定函数的参数,然后调用 start() 方法启动进程,最后需要注意还需要调用 join() 方法等待进程执行完毕。

这里我们定义的 A、B 两个任务就是两个独立的进程,是并发执行的,输出如下:

Parent process 45972. Waiting for all subprocesses done... Run task B (45976)... Run task A (45975)... Task B runs 2.01 seconds. Task A runs 2.01 seconds. All subprocesses done. 2.07 seconds. # 进程创建和销毁也需要时间,所以稍微大于 2 秒

从上面的输出可以看到 A、B 两个任务属于不同的进程,与主进程的 ID 不一样(也就是我们这里总共有 3 个进程),这两个进程是并发执行的,总共执行时间是 2.07 秒,比单进程执行时间快了接近一半。可以看到使用多进程可以大大提高程序的执行效率,特别是对于多核 CPU,多进程可以真正实现并行执行多任务。

进程池

我们知道多进程可以大大提高程序的执行效率,那么是不是进程越多越好呢?当然不是的,因为进程本身也是需要资源的,如果创建的进程过多,导致系统资源不足,反而会降低程序的执行效率,而且单个 CPU 核某时刻只能执行一个进程,所以最理想的情况是 CPU 核数和进程数相等,这样可以充分利用 CPU 资源了。

所以我们可以使用进程池的方式来批量创建子进程,这样可以控制同时执行的进程数量。进城池就是一个可容纳最大进程数目的池子,当有新的请求时,如果进程池还有空闲的进程,就会将请求分配给空闲的进程执行,如果没有空闲的进程,就需要等待,直到有进程空闲为止,这样就可以控制同时执行的进程数量了,这里的上限设置成 CPU 的核数是不是就最合适了。

multiprocessing 模块提供了 Pool 类来创建进程池,比如下面的例子:

from multiprocessing import Pool, cpu_count import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__ == '__main__': cpu = cpu_count() print('Parent process %s.' % os.getpid()) print('CPU core number: %s' % cpu) start = time.time() p = Pool(cpu) for i in range(cpu*2): p.apply_async(long_time_task, args=(i,)) # 异步执行任务 print('Waiting for all subprocesses done...') p.close() # 关闭进程池,不再接受新的进程 p.join() # 等待所有进程执行完毕 end = time.time() print('All subprocesses done. Total time: %0.2f seconds.' % (end - start))

在上面代码中我们使用 cpu_count() 函数获取 CPU 的核数,然后使用 Pool 创建了一个进程池,最大进程数设置成 CPU 的核数,然后循环创建 cpu*2 个任务,然后调用 apply_async() 方法异步执行任务,最后调用 close() 方法关闭进程池,调用 join() 方法等待所有进程执行完毕。

输出如下:

Parent process 67588. CPU core number: 10 Waiting for all subprocesses done... Run task 0 (67591)... Run task 1 (67590)... Run task 2 (67592)... Run task 3 (67595)... Run task 4 (67598)... Run task 5 (67593)... Run task 6 (67594)... Run task 7 (67596)... Run task 8 (67599)... Run task 9 (67597)... Task 9 runs 0.18 seconds. Run task 10 (67597)... Task 10 runs 0.19 seconds. Run task 11 (67597)... Task 4 runs 0.40 seconds. Run task 12 (67598)... Task 8 runs 0.60 seconds. Run task 13 (67599)... Task 0 runs 0.74 seconds. Run task 14 (67591)... Task 14 runs 0.09 seconds. Run task 15 (67591)... Task 13 runs 0.44 seconds. Run task 16 (67599)... Task 16 runs 0.22 seconds. Run task 17 (67599)... Task 17 runs 0.13 seconds. Run task 18 (67599)... Task 3 runs 1.47 seconds. Run task 19 (67595)... Task 2 runs 1.66 seconds. Task 11 runs 1.60 seconds. Task 1 runs 2.15 seconds. Task 5 runs 2.20 seconds. Task 6 runs 2.28 seconds. Task 7 runs 2.54 seconds. Task 12 runs 2.40 seconds. Task 18 runs 1.68 seconds. Task 15 runs 2.38 seconds. Task 19 runs 2.48 seconds. All subprocesses done. Total time: 4.06 seconds.

从上面输出可以看出我们的 CPU 核数是 10,进程池里面最多创建 10 个进程,然后我们创建了 20 个任务,所以可以看到最开始的 10 个任务几乎是同时执行的,然后当有任务执行完毕后,下一个任务才有机会从池子里面取出来执行,这样就可以控制同时执行的进程数量了。

进程间通信

不同进程之间是无法共享内存的,每个进程有自己独立的内存空间,如果要实现进程间通信,可以使用 multiprocessing 模块提供的 QueuePipes 等方式。

队列(queue)在典型的生产者-消费者问题中是非常有用的,比如一个进程负责生产数据,另一个进程负责消费数据,这两个进程之间通过队列来传递数据。

我们这里可以先创建一个生产者进程,然后创建一个消费者进程,然后通过队列来传递数据,比如下面的例子:

from multiprocessing import Process, Queue import os import time import random def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print(f'Put {value} to queue...') q.put(value) # 将数据放入队列 time.sleep(random.random()) # 随机休眠 def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) # 从队列中获取数据 print(f'Get {value} from queue...') if __name__ == '__main__': q = Queue() # 创建一个队列 pw = Process(target=write, args=(q,)) # 创建一个生产者进程 pr = Process(target=read, args=(q,)) # 创建一个消费者进程 pw.start() # 启动生产者进程 pr.start() # 启动消费者进程 pw.join() # 等待生产者进程执行完毕 pr.terminate() # 强制终止消费者进程

在上面代码中我们首先定义了两个函数 writeread,分别表示生产者和消费者,在生产者进程中我们将数据放入队列,然后在消费者进程中我们从队列中获取数据,然后输出,这样就实现了进程间通信。注意这里的队列我们使用的是 multiprocessing 模块提供的 Queue 类,这个队列是进程安全的,可以在多个进程之间共享数据。另外还需要注意的一点是这里消费者进程是一个死循环,所以我们只是在生产者进程加了阻塞,等待生产者进程执行完毕后我们调用 terminate() 方法强制终止消费者进程,否则消费者进程会一直等待数据。执行结果如下:

Process to write: 64466 Process to read: 64467 Put A to queue... Get A from queue... Put B to queue... Get B from queue... Put C to queue... Get C from queue...

同样对于 Pipe 也是一样的,Pipe 可以创建一个管道,两个进程之间可以通过管道来通信,比如下面的例子:

from multiprocessing import Process, Pipe import os import time import random def write(conn): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print(f'Put {value} to pipe...') conn.send(value) # 将数据放入管道 time.sleep(random.random()) # 随机休眠 def read(conn): print('Process to read: %s' % os.getpid()) while True: value = conn.recv() # 从管道中获取数据 print(f'Get {value} from pipe...') if __name__ == '__main__': parent_conn, child_conn = Pipe() # 创建一个管道 pw = Process(target=write, args=(parent_conn,)) # 创建一个生产者进程 pr = Process(target=read, args=(child_conn,)) # 创建一个消费者进程 pw.start() # 启动生产者进程 pr.start() # 启动消费者进程 pw.join() # 等待生产者进程执行完毕 pr.terminate() # 强制终止消费者进程

Pipe 函数返回两个连接对象,这两个连接对象分别表示管道的两端,一个连接对象只能用于读取,另一个连接对象只能用于写入,这样就实现了进程间通信。这里我们将数据放入管道,然后从管道中获取数据,这样就实现了进程间通信,和队列的方式类似。执行结果如下:

Process to write: 64164 Put A to pipe... Process to read: 64165 Get A from pipe... Put B to pipe... Get B from pipe... Put C to pipe... Get C from pipe...

QueuePipe 都是进程安全的(Queue 的底层也是通过 Pipe 来实现的,所以性能比 Pipe 差),可以在多个进程之间共享数据,但是 Queue 多个进程可以共享的,而 Pipe 是两个进程之间共享的。