logo

该视频仅会员有权观看

立即开通课程「Python 入门」权限。

¥
199
/ 年

多线程是一种轻量级的进程,线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,一个进程可以拥有多个线程,这些线程在共享内存的情况下相互独立,每个线程都有自己的一组寄存器、堆栈和局部变量,线程之间的切换和调度不会引起进程的切换,只是线程的切换,这样就可以实现多个线程并发执行。

在 Python 中,要实现多线程,我们可以使用 threading 模块,这个模块提供了 Thread 类来表示一个线程对象,然后通过 start() 方法启动一个线程,通过 join() 方法等待线程执行完毕。

创建线程

比如我们要开发一个爬虫程序,需要同时爬取多个网页,接下来我们就可以来对比下使用多线程和不使用多线程的区别。

首先我们定义一个爬虫函数,模拟爬取网页的操作:

import time import random def crawl(url: str): start = time.time() print(f'Crawling {url}...') time.sleep(random.randint(1, 5)) # 模拟爬取网页的操作 end = time.time() print(f'Crawling {url} done, time: {end - start} seconds.')

如果不使用多线程,我们可以这样来执行:

if __name__ == '__main__': print('Start crawling...') urls = ['http://www.baidu.com', 'http://www.qq.com', 'https://fastclass.cn'] start = time.time() for url in urls: crawl(url) end = time.time() print(f'All crawling done, time: {end - start} seconds.')

上面代码中我们定义了一个 crawl 函数,模拟爬取网页的操作,然后我们定义了 3 个网页,然后循环调用 crawl 函数,模拟爬取这 3 个网页,执行结果如下:

Start crawling... Crawling http://www.baidu.com... Crawling http://www.baidu.com done, time: 4.005162000656128 seconds. Crawling http://www.qq.com... Crawling http://www.qq.com done, time: 2.0038959980010986 seconds. Crawling https://fastclass.cn... Crawling https://fastclass.cn done, time: 1.002044916152954 seconds. All crawling done, time: 7.011729001998901 seconds.

从输出可以看到,三个网页是顺序爬取的,需要一个网页爬取完毕后才能爬取下一个网页,总共耗时 7.01 秒,基本上就是 3 个网页的爬取时间之和。

如果我们使用多线程的话,就可以同时爬取多个网页了,这样就可以大大提高程序的执行效率,减少总共耗时,我们可以这样来实现:

if __name__ == '__main__': import threading print('Start crawling...') urls = ['http://www.baidu.com', 'http://www.qq.com', 'https://fastclass.cn'] start = time.time() threads = [] for url in urls: t = threading.Thread(target=crawl, args=(url,)) threads.append(t) t.start() # 启动线程 for t in threads: t.join() # 等待线程执行完毕 end = time.time() print(f'All crawling done, time: {end - start} seconds.')

上面代码中我们通过 Thread 类创建了 3 个线程对象,然后调用 start() 方法启动线程,最后调用 join() 方法等待线程执行完毕,执行结果如下:

Start crawling... Crawling http://www.baidu.com... Crawling http://www.qq.com... Crawling https://fastclass.cn... Crawling http://www.baidu.com done, time: 1.0050981044769287 seconds. Crawling https://fastclass.cn done, time: 1.0051090717315674 seconds. Crawling http://www.qq.com done, time: 3.005125045776367 seconds. All crawling done, time: 3.005729913711548 seconds.

从输出结果可以看到,三个网页是同时爬取的,不需要等待一个网页爬取完毕后才能爬取下一个网页,总共耗时 3.01 秒左右,几乎等于耗时最长的一个任务的执行时间,大大提高了程序的执行效率。

实际上除了可以直接通过 Thread 类来创建线程外,还可以通过继承 Thread 类并重写 run() 方法的方式来自定义线程,比如我们可以这样来实现,如下所示:

import threading import time import random def crawl(url: str): start = time.time() print(f'Crawling {url}...') time.sleep(random.randint(1, 5)) # 模拟爬取网页的操作 end = time.time() print(f'Crawling {url} done, time: {end - start} seconds.') class CrawlThread(threading.Thread): def __init__(self, url): super().__init__() self.url = url def run(self): # 重写 run 方法,run 方法是线程的入口 crawl(self.url) if __name__ == '__main__': print('Start crawling...') urls = ['http://www.baidu.com', 'http://www.qq.com', 'https://fastclass.cn'] start = time.time() threads = [] for url in urls: t = CrawlThread(url) # 创建自定义线程对象 threads.append(t) t.start() # 启动线程,会自动调用 run 方法 for t in threads: t.join() # 等待线程执行完毕 end = time.time() print(f'All crawling done, time: {end - start} seconds.')

上面代码中我们定义了一个 CrawlThread 类,继承自 Thread 类,然后重写了 run() 方法,run() 方法是线程的入口,我们在这个方法中调用了 crawl 函数,然后通过 CrawlThread 类创建了 3 个线程对象,然后调用 start() 方法启动线程,最后调用 join() 方法等待线程执行完毕,执行结果和上面的代码是一样的,但是使用这种方式更加灵活,可以自定义线程的行为。

Start crawling... Crawling http://www.baidu.com... Crawling http://www.qq.com... Crawling https://fastclass.cn... Crawling https://fastclass.cn done, time: 2.001559019088745 seconds. Crawling http://www.baidu.com done, time: 3.001649856567383 seconds. Crawling http://www.qq.com done, time: 5.005148887634277 seconds. All crawling done, time: 5.005666017532349 seconds.

线程池

和多进程类似,我们同样还可以使用线程池的方式来批量创建线程,这样可以控制同时执行的线程数量,而且线程的创建和释放也会带来较大的开销,频繁的创建和释放线程通常都不是很好的选择。利用线程池,可以提前准备好若干个线程,在使用的过程中不需要再通过自定义的代码创建和释放线程,而是直接复用线程池中的线程。

Python 内置的 concurrent.futures 模块提供了对线程池的支持,比如我们可以使用线程池来爬取多个网页,如下所示:

from concurrent.futures import ThreadPoolExecutor import time import random def crawl(url: str): start = time.time() print(f'Crawling {url}...') time.sleep(random.randint(1, 5)) # 模拟爬取网页的操作 end = time.time() print(f'Crawling {url} done, time: {end - start} seconds.') if __name__ == '__main__': print('Start crawling...') urls = ['http://www.baidu.com', 'http://www.qq.com', 'https://fastclass.cn'] start = time.time() with ThreadPoolExecutor() as executor: executor.map(crawl, urls) end = time.time() print(f'All crawling done, time: {end - start} seconds.')

在上面代码中我们使用 ThreadPoolExecutor 类创建了一个线程池,注意这里我们使用了 with 语句,这样可以自动关闭线程池,然后调用 map() 方法来执行多个任务,map() 方法会自动分配任务给线程池中的线程,然后等待所有任务执行完毕,执行结果如下:

Start crawling... Crawling http://www.baidu.com... Crawling http://www.qq.com... Crawling https://fastclass.cn... Crawling http://www.qq.com done, time: 1.0050897598266602 seconds. Crawling https://fastclass.cn done, time: 2.005074977874756 seconds. Crawling http://www.baidu.com done, time: 4.005168914794922 seconds. All crawling done, time: 4.005808115005493 seconds.

需要注意 ThreadPoolExecutor 类实例化的时候包含了一个参数 max_workers,表示线程池中最多可以创建的线程数量,如果不指定,默认值为 min(32, (os.cpu_count() or 1) + 4),也就是最多创建 32 个线程,或者是 CPU 核数加 4 个线程。

比如我们这里如果指定最多创建 2 个线程,将 ThreadPoolExecutor() 改成 ThreadPoolExecutor(max_workers=2),那么执行任务的最大线程数就是 2 个,执行结果如下:

Start crawling... Crawling http://www.baidu.com... Crawling http://www.qq.com... Crawling http://www.qq.com done, time: 1.0035929679870605 seconds. Crawling https://fastclass.cn... Crawling http://www.baidu.com done, time: 5.005242824554443 seconds. Crawling https://fastclass.cn done, time: 5.005155086517334 seconds. All crawling done, time: 6.009704113006592 seconds.

从上面输出可以看到,最多只有 2 个线程在执行任务,其他任务需要等待。当然除了使用 map 方法外,我们还可以使用 submit 方法来提交任务,submit 方法会返回一个 Future 对象,可以用来获取任务的执行结果,比如下面的例子:

from concurrent.futures import ThreadPoolExecutor import time import random def crawl(url: str): start = time.time() print(f'Crawling {url}...') time.sleep(random.randint(1, 5)) # 模拟爬取网页的操作 end = time.time() print(f'Crawling {url} done, time: {end - start} seconds.') return url if __name__ == '__main__': print('Start crawling...') urls = ['http://www.baidu.com', 'http://www.qq.com', 'https://fastclass.cn'] start = time.time() with ThreadPoolExecutor() as executor: futures = [executor.submit(crawl, url) for url in urls] for future in futures: print(future.result()) # 获取任务的执行结果 end = time.time() print(f'All crawling done, time: {end - start} seconds.')

上面代码中我们使用 submit 方法提交任务,然后通过 result() 方法获取任务的执行结果。

资源竞争

多线程和多进程的一个最大的不同在于,多进程中不同进程的内存是独立的,一个进程无法访问另一个进程的内存,而多线程中不同线程是共享内存的,一个线程可以访问另一个线程的内存,所以多线程之间的资源竞争是一个非常重要的问题。

比如对于银行账户这个资源,如果多个线程同时操作一个账户,就会出现资源竞争的问题,比如多个线程同时取钱,就会导致账户余额不正确,这就是典型的资源竞争问题,如下代码所示:

import threading import random import time class Account: def __init__(self, balance): self.balance = balance # 账户余额 def withdraw(self, amount): # 取钱 new_balance = self.balance - amount time.sleep(random.random()) # 模拟存钱耗时 self.balance = new_balance def deposit(self, amount): # 存钱 new_balance = self.balance + amount time.sleep(random.random()) # 模拟存钱耗时 self.balance = new_balance if __name__ == '__main__': account = Account(1000) # 创建一个账户,初始余额 1000 threads = [] # 创建 100 个线程,每个线程取钱 10 元 for i in range(100): t = threading.Thread(target=account.withdraw, args=(10,)) threads.append(t) t.start() for t in threads: t.join() # 所有线程执行完毕后,期望输出账户余额 print(f'Account balance: {account.balance}')

在上面代码中我们定义了一个 Account 类,表示一个银行账户,然后定义了 withdrawdeposit 方法,分别表示取钱和存钱,然后我们初始化了一个账户,初始余额为 1000,然后创建了 100 个线程,每个线程取钱 10 元,按照预期的话账户余额应该是 0,但是由于多个线程同时操作一个账户,就会出现资源竞争的问题,执行结果如下:

Account balance: 990

这其实也很好理解,因为多个线程同时取钱,对余额进行修改,但是由于线程之间是并发执行的,所以可能会出现多个线程同时读取余额,然后同时修改余额,这样就可能会导致多个线程在相同的余额上进行操作,导致余额不正确,这就是所谓的“丢失更新”问题,即一个线程读取了共享变量,修改了共享变量,但是还没有写回共享变量,另一个线程也读取了这个共享变量,然后修改了这个共享变量,这样就会导致数据不一致的问题。

要解决这个问题,我们可以使用锁来保护共享资源,在 Python 中我们可以使用 threading 模块提供的 LockRLock 类来支持锁机制,这二者的区别在于 RLock 可以允许一个线程多次获取锁,而 Lock 不允许多次获取锁,获取一次需要释放一次,如果不知道使用哪个,建议直接使用 RLock。比如我们现在使用 RLock 来解决上面的问题,如下所示:

import threading import random import time class Account: def __init__(self, balance): self.balance = balance # 账户余额 self.lock = threading.RLock() # 创建一个锁对象 def withdraw(self, amount): # 取钱 # 获取锁 self.lock.acquire() try: new_balance = self.balance - amount time.sleep(random.random()) # 模拟存钱耗时 self.balance = new_balance finally: # 释放锁 self.lock.release() def deposit(self, amount): # 存钱 # 获取锁 self.lock.acquire() try: new_balance = self.balance + amount time.sleep(random.random()) # 模拟存钱耗时 self.balance = new_balance finally: # 释放锁 self.lock.release() if __name__ == '__main__': account = Account(1000) # 创建一个账户,初始余额 1000 threads = [] # 创建 100 个线程,每个线程取钱 10 元 for i in range(100): t = threading.Thread(target=account.withdraw, args=(10,)) threads.append(t) t.start() for t in threads: t.join() # 所有线程执行完毕后,期望输出账户余额 print(f'Account balance: {account.balance}')

上面代码中我们在 Account 类中定义了一个 RLock 对象,然后在 withdrawdeposit 方法中使用 acquire() 方法获取锁,然后在 finally 代码块中使用 release() 方法释放锁,这样就可以保证同一时刻只有一个线程可以访问共享资源,执行结果如下:

Account balance: 0

上面我们是手动获取和释放锁,其实我们还可以使用 with 语句来简化代码,with 语句会自动获取和释放锁,如下所示:

import threading import random import time class Account: def __init__(self, balance): self.balance = balance # 账户余额 self.lock = threading.RLock() # 创建一个锁对象 def withdraw(self, amount): # 取钱 with self.lock: # 使用 with 语句获取和释放锁 new_balance = self.balance - amount time.sleep(random.random()) # 模拟存钱耗时 self.balance = new_balance def deposit(self, amount): # 存钱 with self.lock: # 使用 with 语句获取和释放锁 new_balance = self.balance + amount time.sleep(random.random()) # 模拟存钱耗时 self.balance = new_balance

Condition

现在如果我们有多个线程同时向银行账户存钱和取钱,如果在银行账户余额不足时,取钱的线程需要停下来等待存钱的线程将钱存入后再尝试取钱,那么我们应该如何来实现呢?这里我们可以使用 threading 模块提供的 Condition 类来实现,Condition 类可以用来实现线程间的同步,比如下面的例子:

import threading import random import time class Account: def __init__(self, balance): self.balance = balance # 账户余额 self.lock = threading.RLock() # 创建一个锁对象 self.condition = threading.Condition(self.lock) # 创建一个条件对象 def withdraw(self, amount): # 取钱 with self.condition: # 使用 with 语句获取和释放锁 while self.balance < amount: self.condition.wait() # 等待条件变量 new_balance = self.balance - amount time.sleep(random.random()) # 模拟存钱耗时 self.balance = new_balance print(f'Withdraw {amount} done, balance: {self.balance}, current thread: {threading.current_thread().name}') def deposit(self, amount): # 存钱 with self.condition: # 使用 with 语句获取和释放锁 new_balance = self.balance + amount time.sleep(random.random()) # 模拟存钱耗时 self.balance = new_balance self.condition.notify_all() # 通知所有等待的线程 print(f'Deposit {amount} done, balance: {self.balance}, current thread: {threading.current_thread().name}') if __name__ == '__main__': account = Account(1000) # 创建一个账户,初始余额 1000 threads = [] # 创建 5 个线程,每个线程取钱 300 元 for i in range(5): t = threading.Thread(target=account.withdraw, args=(300,)) threads.append(t) t.start() # 创建 5 个线程,每个线程存钱 200 元 for i in range(5): t = threading.Thread(target=account.deposit, args=(200,)) threads.append(t) t.start() for t in threads: t.join() print(f'Account balance: {account.balance}')

上面代码中我们在 Account 类中定义了一个 Condition 对象,也就是条件变量,条件变量允许一个或多个线程等待,直到它们由另一个线程通知,初始化时需要传入一个锁对象,如果不传入锁对象,则会创建一个新的 RLock 对象,用作底层锁。

同样在使用条件变量的时候我们可以使用 with 语句来获取和释放锁,因为我们需要在取钱的时候账户要有足够的余额,如果余额不足,取钱的线程就需要等待(需要死循环),这时我们可以调用 wait() 方法来等待条件变量,然后在存钱的时候我们可以调用 notify_all() 方法来通知所有等待的线程,如果达到条件就可以继续执行,执行结果如下:

Withdraw 300 done, balance: 700, current thread: Thread-1 (withdraw) Withdraw 300 done, balance: 400, current thread: Thread-2 (withdraw) Withdraw 300 done, balance: 100, current thread: Thread-3 (withdraw) Deposit 200 done, balance: 300, current thread: Thread-6 (deposit) Deposit 200 done, balance: 500, current thread: Thread-7 (deposit) Deposit 200 done, balance: 700, current thread: Thread-8 (deposit) Deposit 200 done, balance: 900, current thread: Thread-9 (deposit) Deposit 200 done, balance: 1100, current thread: Thread-10 (deposit) Withdraw 300 done, balance: 800, current thread: Thread-4 (withdraw) Withdraw 300 done, balance: 500, current thread: Thread-5 (withdraw) Account balance: 500

从上面输出可以看到,取钱的线程在余额不足时会等待,然后存钱的线程存入钱后会通知所有等待的线程,这样取钱的线程就可以继续执行,最后账户余额为 500,也是符合预期的。