多线程编程

学习时长:120分钟 练习题:5个

多线程编程简介

多线程编程是Python高级编程中的重要概念,它允许程序同时执行多个任务,提高程序的执行效率和响应性。本章将介绍Python中的线程概念、创建方法、同步机制以及实际应用场景。

为什么要学习多线程编程?

  • 提高程序的执行效率,充分利用多核CPU
  • 增强程序的响应性,避免界面卡顿
  • 处理I/O密集型任务时特别有效
  • 是开发高性能应用程序的必备技能

注意事项

由于Python的GIL(全局解释器锁)机制,Python的多线程在CPU密集型任务上可能无法充分发挥多核CPU的优势。但在I/O密集型任务中,多线程仍然是非常有效的并发方案。

线程基础

线程是程序中最小的执行单位,它们共享所属进程的资源,包括内存空间和文件句柄等。Python提供了threading模块来支持多线程编程。

import threading

# 获取当前线程
current_thread = threading.current_thread()
print(f"当前线程名称:{current_thread.name}")

# 获取活动线程数量
active_count = threading.active_count()
print(f"活动线程数量:{active_count}")

# 获取所有活动线程列表
all_threads = threading.enumerate()
print(f"所有活动线程:{[thread.name for thread in all_threads]}")

线程的特点

  • 线程是CPU调度的基本单位
  • 同一进程中的线程共享资源
  • 线程创建和切换开销较小
  • 适合I/O密集型任务

线程的创建和管理

在Python中,可以通过两种方式创建线程:使用Thread类直接创建,或者继承Thread类创建自定义线程类。

方式一:使用Thread类直接创建

import threading
import time

def worker(name):
    print(f"线程 {name} 开始工作")
    time.sleep(2)  # 模拟工作过程
    print(f"线程 {name} 工作结束")

# 创建线程
thread1 = threading.Thread(target=worker, args=("Thread-1",))
thread2 = threading.Thread(target=worker, args=("Thread-2",))

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

方式二:继承Thread类

class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    
    def run(self):
        print(f"线程 {self.name} 开始工作")
        time.sleep(2)  # 模拟工作过程
        print(f"线程 {self.name} 工作结束")

# 创建自定义线程实例
thread1 = MyThread("Custom-1")
thread2 = MyThread("Custom-2")

# 启动线程
thread1.start()
thread2.start()

# 等待线程结束
thread1.join()
thread2.join()

线程生命周期

  • 新建(New):创建线程对象
  • 就绪(Runnable):调用start()方法后等待CPU调度
  • 运行(Running):获得CPU时间片正在执行
  • 阻塞(Blocked):等待I/O或同步条件
  • 终止(Terminated):run()方法执行完毕

线程同步机制

当多个线程同时访问共享资源时,需要使用同步机制来确保数据的一致性和正确性。Python提供了多种线程同步机制。

Lock(互斥锁)

import threading

class Counter:
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:
            self.count += 1
    
    def get_count(self):
        return self.count

# 使用示例
counter = Counter()
threads = []

for i in range(10):
    t = threading.Thread(target=counter.increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"最终计数:{counter.get_count()}")

RLock(可重入锁)

class ReentrantCounter:
    def __init__(self):
        self.count = 0
        self.lock = threading.RLock()
    
    def increment(self):
        with self.lock:
            self.count += 1
            self.double()  # 可以在持有锁的情况下调用其他需要获取锁的方法
    
    def double(self):
        with self.lock:  # 同一个线程可以多次获取RLock
            self.count *= 2

Condition(条件变量)

class Buffer:
    def __init__(self, size):
        self.buffer = []
        self.size = size
        self.condition = threading.Condition()
    
    def put(self, item):
        with self.condition:
            while len(self.buffer) >= self.size:
                self.condition.wait()  # 缓冲区满,等待消费者消费
            self.buffer.append(item)
            self.condition.notify()  # 通知消费者有新数据
    
    def get(self):
        with self.condition:
            while not self.buffer:
                self.condition.wait()  # 缓冲区空,等待生产者生产
            item = self.buffer.pop(0)
            self.condition.notify()  # 通知生产者有空间
            return item

注意事项

  • 使用锁时要注意避免死锁
  • 尽量减少锁的粒度
  • 使用with语句来确保锁的正确释放
  • 避免在持有锁时执行耗时操作

线程池的使用

线程池可以有效管理线程的创建和复用,避免频繁创建和销毁线程带来的开销。Python的concurrent.futures模块提供了ThreadPoolExecutor类来实现线程池。

from concurrent.futures import ThreadPoolExecutor
import time

def process_item(item):
    print(f"处理项目 {item}")
    time.sleep(1)  # 模拟处理过程
    return f"项目 {item} 处理完成"

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务
    future_to_item = {executor.submit(process_item, i): i for i in range(5)}
    
    # 获取结果
    for future in concurrent.futures.as_completed(future_to_item):
        item = future_to_item[future]
        try:
            result = future.result()
            print(result)
        except Exception as e:
            print(f"处理项目 {item} 时发生错误:{e}")

线程池的优势

  • 重用线程,避免创建和销毁开销
  • 控制并发线程数量
  • 提供任务队列管理
  • 支持获取异步任务的返回值

实际应用案例

案例1:多线程文件下载器

import threading
import requests
import os

class FileDownloader:
    def __init__(self, url, num_threads=3):
        self.url = url
        self.num_threads = num_threads
        self.filename = os.path.basename(url)
        
    def download_chunk(self, start, end):
        headers = {'Range': f'bytes={start}-{end}'}
        response = requests.get(self.url, headers=headers, stream=True)
        
        with open(self.filename, 'rb+') as f:
            f.seek(start)
            f.write(response.content)
    
    def download(self):
        # 获取文件大小
        response = requests.head(self.url)
        file_size = int(response.headers['content-length'])
        
        # 创建空文件
        with open(self.filename, 'wb') as f:
            f.seek(file_size - 1)
            f.write(b'\0')
        
        # 计算每个线程下载的块大小
        chunk_size = file_size // self.num_threads
        threads = []
        
        # 创建多个线程下载
        for i in range(self.num_threads):
            start = i * chunk_size
            end = start + chunk_size - 1 if i < self.num_threads - 1 else file_size - 1
            
            thread = threading.Thread(target=self.download_chunk, args=(start, end))
            threads.append(thread)
            thread.start()
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()

案例2:多线程Web爬虫

from concurrent.futures import ThreadPoolExecutor
import requests
from bs4 import BeautifulSoup
import threading

class WebCrawler:
    def __init__(self, urls, max_workers=3):
        self.urls = urls
        self.max_workers = max_workers
        self.results = []
        self.lock = threading.Lock()
    
    def process_url(self, url):
        try:
            response = requests.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')
            title = soup.title.string if soup.title else "No title"
            
            with self.lock:
                self.results.append({
                    'url': url,
                    'title': title,
                    'status': response.status_code
                })
        except Exception as e:
            with self.lock:
                self.results.append({
                    'url': url,
                    'error': str(e)
                })
    
    def crawl(self):
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            executor.map(self.process_url, self.urls)
        return self.results

案例3:多线程图片处理

from PIL import Image
import os
from concurrent.futures import ThreadPoolExecutor

class ImageProcessor:
    def __init__(self, input_dir, output_dir, max_workers=3):
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.max_workers = max_workers
        
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
    
    def process_image(self, filename):
        try:
            input_path = os.path.join(self.input_dir, filename)
            output_path = os.path.join(self.output_dir, filename)
            
            with Image.open(input_path) as img:
                # 调整图片大小
                resized = img.resize((800, 600))
                # 转换为RGB模式
                rgb_img = resized.convert('RGB')
                # 保存处理后的图片
                rgb_img.save(output_path, 'JPEG', quality=85)
                
            return f"Successfully processed {filename}"
        except Exception as e:
            return f"Error processing {filename}: {str(e)}"
    
    def process_all(self):
        image_files = [f for f in os.listdir(self.input_dir)
                      if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            results = list(executor.map(self.process_image, image_files))
        
        return results

实际应用建议

  • 根据任务特点选择合适的线程数量
  • 注意异常处理和资源释放
  • 合理使用同步机制避免数据竞争
  • 考虑使用线程池提高效率

练习与实践

练习1:计数器

实现一个线程安全的计数器类,支持多个线程同时增加和减少计数值。

提示:使用Lock或RLock来保护共享资源。

练习2:生产者-消费者模式

实现一个线程安全的队列,支持多个生产者和消费者线程同时工作。

提示:使用Condition来实现线程间的通信。

练习3:并发文件处理

编写一个程序,使用线程池并发处理多个文本文件,统计每个文件中的单词数量。

提示:使用ThreadPoolExecutor来管理线程池。

练习4:多线程网络请求

实现一个多线程的URL检查器,同时检查多个URL的可访问性。

提示:使用requests库进行HTTP请求,使用线程池管理并发请求。

练习5:线程安全的缓存系统

实现一个线程安全的缓存系统,支持多线程环境下的数据缓存和获取。

提示:使用RLock实现细粒度的锁定,考虑缓存过期机制。