多线程编程
多线程编程简介
多线程编程是Python高级编程中的重要概念,它允许程序同时执行多个任务,提高程序的执行效率和响应性。本章将介绍Python中的线程概念、创建方法、同步机制以及实际应用场景。
为什么要学习多线程编程?
- 提高程序的执行效率,充分利用多核CPU
- 增强程序的响应性,避免界面卡顿
- 处理I/O密集型任务时特别有效
- 是开发高性能应用程序的必备技能
注意事项
由于Python的GIL(全局解释器锁)机制,Python的多线程在CPU密集型任务上可能无法充分发挥多核CPU的优势。但在I/O密集型任务中,多线程仍然是非常有效的并发方案。
线程基础
线程是程序中最小的执行单位,它们共享所属进程的资源,包括内存空间和文件句柄等。Python提供了threading模块来支持多线程编程。
import threading
# 获取当前线程
current_thread = threading.current_thread()
print(f"当前线程名称:{current_thread.name}")
# 获取活动线程数量
active_count = threading.active_count()
print(f"活动线程数量:{active_count}")
# 获取所有活动线程列表
all_threads = threading.enumerate()
print(f"所有活动线程:{[thread.name for thread in all_threads]}")
线程的特点
- 线程是CPU调度的基本单位
- 同一进程中的线程共享资源
- 线程创建和切换开销较小
- 适合I/O密集型任务
线程的创建和管理
在Python中,可以通过两种方式创建线程:使用Thread类直接创建,或者继承Thread类创建自定义线程类。
方式一:使用Thread类直接创建
import threading
import time
def worker(name):
print(f"线程 {name} 开始工作")
time.sleep(2) # 模拟工作过程
print(f"线程 {name} 工作结束")
# 创建线程
thread1 = threading.Thread(target=worker, args=("Thread-1",))
thread2 = threading.Thread(target=worker, args=("Thread-2",))
# 启动线程
thread1.start()
thread2.start()
# 等待线程结束
thread1.join()
thread2.join()
方式二:继承Thread类
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f"线程 {self.name} 开始工作")
time.sleep(2) # 模拟工作过程
print(f"线程 {self.name} 工作结束")
# 创建自定义线程实例
thread1 = MyThread("Custom-1")
thread2 = MyThread("Custom-2")
# 启动线程
thread1.start()
thread2.start()
# 等待线程结束
thread1.join()
thread2.join()
线程生命周期
- 新建(New):创建线程对象
- 就绪(Runnable):调用start()方法后等待CPU调度
- 运行(Running):获得CPU时间片正在执行
- 阻塞(Blocked):等待I/O或同步条件
- 终止(Terminated):run()方法执行完毕
线程同步机制
当多个线程同时访问共享资源时,需要使用同步机制来确保数据的一致性和正确性。Python提供了多种线程同步机制。
Lock(互斥锁)
import threading
class Counter:
def __init__(self):
self.count = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.count += 1
def get_count(self):
return self.count
# 使用示例
counter = Counter()
threads = []
for i in range(10):
t = threading.Thread(target=counter.increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数:{counter.get_count()}")
RLock(可重入锁)
class ReentrantCounter:
def __init__(self):
self.count = 0
self.lock = threading.RLock()
def increment(self):
with self.lock:
self.count += 1
self.double() # 可以在持有锁的情况下调用其他需要获取锁的方法
def double(self):
with self.lock: # 同一个线程可以多次获取RLock
self.count *= 2
Condition(条件变量)
class Buffer:
def __init__(self, size):
self.buffer = []
self.size = size
self.condition = threading.Condition()
def put(self, item):
with self.condition:
while len(self.buffer) >= self.size:
self.condition.wait() # 缓冲区满,等待消费者消费
self.buffer.append(item)
self.condition.notify() # 通知消费者有新数据
def get(self):
with self.condition:
while not self.buffer:
self.condition.wait() # 缓冲区空,等待生产者生产
item = self.buffer.pop(0)
self.condition.notify() # 通知生产者有空间
return item
注意事项
- 使用锁时要注意避免死锁
- 尽量减少锁的粒度
- 使用with语句来确保锁的正确释放
- 避免在持有锁时执行耗时操作
线程池的使用
线程池可以有效管理线程的创建和复用,避免频繁创建和销毁线程带来的开销。Python的concurrent.futures模块提供了ThreadPoolExecutor类来实现线程池。
from concurrent.futures import ThreadPoolExecutor
import time
def process_item(item):
print(f"处理项目 {item}")
time.sleep(1) # 模拟处理过程
return f"项目 {item} 处理完成"
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
future_to_item = {executor.submit(process_item, i): i for i in range(5)}
# 获取结果
for future in concurrent.futures.as_completed(future_to_item):
item = future_to_item[future]
try:
result = future.result()
print(result)
except Exception as e:
print(f"处理项目 {item} 时发生错误:{e}")
线程池的优势
- 重用线程,避免创建和销毁开销
- 控制并发线程数量
- 提供任务队列管理
- 支持获取异步任务的返回值
实际应用案例
案例1:多线程文件下载器
import threading
import requests
import os
class FileDownloader:
def __init__(self, url, num_threads=3):
self.url = url
self.num_threads = num_threads
self.filename = os.path.basename(url)
def download_chunk(self, start, end):
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(self.url, headers=headers, stream=True)
with open(self.filename, 'rb+') as f:
f.seek(start)
f.write(response.content)
def download(self):
# 获取文件大小
response = requests.head(self.url)
file_size = int(response.headers['content-length'])
# 创建空文件
with open(self.filename, 'wb') as f:
f.seek(file_size - 1)
f.write(b'\0')
# 计算每个线程下载的块大小
chunk_size = file_size // self.num_threads
threads = []
# 创建多个线程下载
for i in range(self.num_threads):
start = i * chunk_size
end = start + chunk_size - 1 if i < self.num_threads - 1 else file_size - 1
thread = threading.Thread(target=self.download_chunk, args=(start, end))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
案例2:多线程Web爬虫
from concurrent.futures import ThreadPoolExecutor
import requests
from bs4 import BeautifulSoup
import threading
class WebCrawler:
def __init__(self, urls, max_workers=3):
self.urls = urls
self.max_workers = max_workers
self.results = []
self.lock = threading.Lock()
def process_url(self, url):
try:
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else "No title"
with self.lock:
self.results.append({
'url': url,
'title': title,
'status': response.status_code
})
except Exception as e:
with self.lock:
self.results.append({
'url': url,
'error': str(e)
})
def crawl(self):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
executor.map(self.process_url, self.urls)
return self.results
案例3:多线程图片处理
from PIL import Image
import os
from concurrent.futures import ThreadPoolExecutor
class ImageProcessor:
def __init__(self, input_dir, output_dir, max_workers=3):
self.input_dir = input_dir
self.output_dir = output_dir
self.max_workers = max_workers
if not os.path.exists(output_dir):
os.makedirs(output_dir)
def process_image(self, filename):
try:
input_path = os.path.join(self.input_dir, filename)
output_path = os.path.join(self.output_dir, filename)
with Image.open(input_path) as img:
# 调整图片大小
resized = img.resize((800, 600))
# 转换为RGB模式
rgb_img = resized.convert('RGB')
# 保存处理后的图片
rgb_img.save(output_path, 'JPEG', quality=85)
return f"Successfully processed {filename}"
except Exception as e:
return f"Error processing {filename}: {str(e)}"
def process_all(self):
image_files = [f for f in os.listdir(self.input_dir)
if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(self.process_image, image_files))
return results
实际应用建议
- 根据任务特点选择合适的线程数量
- 注意异常处理和资源释放
- 合理使用同步机制避免数据竞争
- 考虑使用线程池提高效率
练习与实践
练习1:计数器
实现一个线程安全的计数器类,支持多个线程同时增加和减少计数值。
提示:使用Lock或RLock来保护共享资源。
练习2:生产者-消费者模式
实现一个线程安全的队列,支持多个生产者和消费者线程同时工作。
提示:使用Condition来实现线程间的通信。
练习3:并发文件处理
编写一个程序,使用线程池并发处理多个文本文件,统计每个文件中的单词数量。
提示:使用ThreadPoolExecutor来管理线程池。
练习4:多线程网络请求
实现一个多线程的URL检查器,同时检查多个URL的可访问性。
提示:使用requests库进行HTTP请求,使用线程池管理并发请求。
练习5:线程安全的缓存系统
实现一个线程安全的缓存系统,支持多线程环境下的数据缓存和获取。
提示:使用RLock实现细粒度的锁定,考虑缓存过期机制。