Python threading 使用指南:并发编程的轻骑兵

张开发
2026/6/5 7:00:39 15 分钟阅读
Python threading 使用指南:并发编程的轻骑兵
Python threading 使用指南并发编程的轻骑兵作者书到用时方恨少发布日期2026年4月2日阅读时长约19分钟 前言在 Python 并发编程的世界里threading模块如同一匹轻骑兵——轻量、灵活、响应迅速。它允许你在单个进程中创建多个线程共享同一块内存空间特别适合I/O 密集型任务如网络爬虫、文件读写、数据库交互。然而由于全局解释器锁GIL的存在threading并不能让 CPU 密集型任务真正并行但这并不妨碍它成为提升程序响应速度和资源利用率的利器。无论你是想编写一个不卡顿的 GUI 程序还是需要并发处理成千上万个网络请求这篇博客都将带你从零开始深入理解threading的核心概念、同步机制、通信方式以及最佳实践。让我们一同探索并发编程的奥秘1. 线程是什么为什么需要线程1.1 进程与线程进程操作系统资源分配的基本单位拥有独立的内存空间、文件句柄等。进程间相互隔离通信成本高。线程CPU 调度的基本单位隶属于进程。同一个进程内的线程共享内存空间和资源创建和切换成本远低于进程。可以这样理解进程就像一家公司线程就是公司里的员工。员工共享公司的办公区、设备可以高效协作而不同公司之间的资源不共享沟通需要额外的渠道如邮件。1.2 GIL全局解释器锁—— Python 线程的“紧箍咒”CPython官方 Python 实现中有一个GIL它确保同一时刻只有一个线程执行 Python 字节码。这意味着即使在多核 CPU 上Python 的多线程也无法真正并行执行计算密集型任务。但这是否意味着 threading 毫无用处绝对不是I/O 密集型任务当线程执行 I/O 操作如网络请求、文件读写、用户输入时会释放 GIL其他线程可以趁机运行。因此多线程可以极大地提高 I/O 密集型程序的吞吐量。响应性即使有一个线程在计算另一个线程也可以处理用户界面事件防止程序“假死”。选型小贴士I/O 密集型 → 优先考虑threading或asyncioCPU 密集型 → 使用multiprocessing高并发网络服务 →asyncio可能更高效2. 快速入门创建并启动线程threading模块提供了Thread类你可以通过两种方式创建线程。2.1 方式一传入目标函数importthreadingimporttimedefworker(name,delay):print(f线程{name}开始)time.sleep(delay)print(f线程{name}结束)# 创建线程t1threading.Thread(targetworker,args(A,2))t2threading.Thread(targetworker,args(B,1))# 启动线程t1.start()t2.start()# 等待线程结束t1.join()t2.join()print(所有线程执行完毕)target可调用对象函数。args位置参数元组。kwargs关键字参数字典。start()启动线程实际开始运行。join([timeout])等待线程结束阻塞主线程。2.2 方式二继承 Thread 类importthreadingimporttimeclassMyThread(threading.Thread):def__init__(self,name,delay):super().__init__()self.namename self.delaydelaydefrun(self):# 重写 run 方法print(f线程{self.name}开始)time.sleep(self.delay)print(f线程{self.name}结束)t1MyThread(A,2)t2MyThread(B,1)t1.start()t2.start()t1.join()t2.join()2.3 守护线程Daemon Thread守护线程在主线程结束时自动终止不等待其完成。设置daemonTrue即可。defdaemon_task():whileTrue:print(守护线程运行中...)time.sleep(1)dthreading.Thread(targetdaemon_task,daemonTrue)d.start()time.sleep(3)print(主线程结束守护线程也随之结束)3. 线程同步防止数据混乱多个线程共享内存同时修改同一变量会导致数据竞争race condition。threading提供了多种同步原语。3.1 Lock互斥锁最基础的锁一次只允许一个线程持有。importthreading counter0lockthreading.Lock()defincrement():globalcounterfor_inrange(100000):withlock:# 上下文管理器自动获取和释放counter1threads[]for_inrange(5):tthreading.Thread(targetincrement)threads.append(t)t.start()fortinthreads:t.join()print(counter)# 应该是 500000如果没有锁结果会小于此值acquire(blockingTrue)获取锁。release()释放锁。with lock:是最推荐的用法避免忘记释放。3.2 RLock可重入锁允许同一线程多次获取锁适用于递归调用或同一线程需要重复加锁的场景。rlockthreading.RLock()rlock.acquire()rlock.acquire()# 同一线程可以再次获取rlock.release()rlock.release()3.3 Semaphore信号量控制同时访问资源的线程数量。常用于限制数据库连接数或并发下载数量。semthreading.Semaphore(3)# 最多允许 3 个线程同时访问deflimited_task():withsem:print(f{threading.current_thread().name}正在运行)time.sleep(1)foriinrange(10):threading.Thread(targetlimited_task).start()3.4 Event事件用于线程间的信号通知一个线程设置事件其他线程等待该事件。eventthreading.Event()defwaiter():print(等待事件...)event.wait()print(事件已发生继续执行)defsetter():time.sleep(2)print(设置事件)event.set()threading.Thread(targetwaiter).start()threading.Thread(targetsetter).start()wait([timeout])阻塞直到事件被设置。set()设置事件唤醒所有等待线程。clear()清除事件标志。3.5 Condition条件变量比 Event 更灵活允许线程等待某个条件满足后再继续。常用于生产者-消费者模式。importthreadingimporttime conditionthreading.Condition()items[]defproducer():foriinrange(5):withcondition:items.append(i)print(f生产{i})condition.notify()# 通知一个等待的消费者time.sleep(0.5)defconsumer():whileTrue:withcondition:whilenotitems:condition.wait()# 等待被通知itemitems.pop(0)print(f消费{item})time.sleep(1)threading.Thread(targetproducer).start()threading.Thread(targetconsumer).start()3.6 Barrier屏障等待所有线程都到达某个点后才一起继续执行。barrierthreading.Barrier(3)defworker():print(f{threading.current_thread().name}准备就绪)barrier.wait()print(f{threading.current_thread().name}继续执行)foriinrange(3):threading.Thread(targetworker).start()4. 线程间通信Queue 队列直接共享变量需要繁琐的锁而queue.Queue提供了线程安全的队列是生产者-消费者模式的首选。importthreadingimportqueueimporttime qqueue.Queue(maxsize10)defproducer():foriinrange(20):q.put(i)print(f生产{i})time.sleep(0.2)defconsumer():whileTrue:itemq.get()ifitemisNone:# 毒丸poison pill信号breakprint(f消费{item})q.task_done()# 标记任务完成print(消费者退出)# 启动生产者threading.Thread(targetproducer).start()# 启动多个消费者for_inrange(3):threading.Thread(targetconsumer,daemonTrue).start()# 等待所有生产任务完成q.join()# 发送结束信号for_inrange(3):q.put(None)Queue(maxsize0)先进先出队列。LifoQueue后进先出栈。PriorityQueue优先级队列。put(item, blockTrue, timeoutNone)放入元素。get(blockTrue, timeoutNone)取出元素。task_done()通知队列一个任务已完成。join()等待所有任务完成即task_done调用次数等于put次数。5. 线程池高效管理大量线程手动创建大量线程开销较大使用线程池可以复用线程提高性能。Python 标准库提供了concurrent.futures.ThreadPoolExecutor。fromconcurrent.futuresimportThreadPoolExecutorimporttimedeftask(n):time.sleep(1)returnn*nwithThreadPoolExecutor(max_workers4)asexecutor:# 方式1map保持顺序resultsexecutor.map(task,range(10))print(list(results))# 方式2submit futuresfutures[executor.submit(task,i)foriinrange(10)]forfinfutures:print(f.result())为什么使用线程池减少线程创建销毁的开销。限制并发数量防止资源耗尽。简化异常处理和结果收集。6. 实战案例案例一并发下载多个网页importthreadingimportrequests urls[https://www.python.org,https://www.github.com,https://www.stackoverflow.com,# ... 更多 URL]results{}lockthreading.Lock()deffetch_url(url):try:resprequests.get(url,timeout5)withlock:results[url]resp.status_codeprint(f完成:{url}-{resp.status_code})exceptExceptionase:withlock:results[url]str(e)threads[]forurlinurls:tthreading.Thread(targetfetch_url,args(url,))t.start()threads.append(t)fortinthreads:t.join()print(results)案例二多线程批量处理文件模拟importthreadingimportosdefprocess_file(filepath):# 模拟文件处理例如统计行数、计算哈希等withopen(filepath,r)asf:lineslen(f.readlines())print(f{filepath}:{lines}行)defmain():files[fforfinos.listdir(.)iff.endswith(.txt)]threads[]forfileinfiles:tthreading.Thread(targetprocess_file,args(file,))t.start()threads.append(t)fortinthreads:t.join()if__name____main__:main()案例三带超时的线程等待importthreadingimporttimedeflong_task():time.sleep(10)print(任务完成)tthreading.Thread(targetlong_task)t.start()t.join(timeout3)# 最多等待 3 秒ift.is_alive():print(任务超时仍在运行中)# 注意无法强制终止线程只能设计协作式退出机制7. ⚠️ 常见陷阱与注意事项7.1 死锁Deadlock多个线程互相等待对方释放资源导致程序永久阻塞。避免方法使用with语句自动释放锁。按固定顺序获取锁。使用threading.RLock或超时机制。7.2 线程无法强制终止Python 线程没有terminate()方法。通常通过标志位或队列的毒丸来让线程主动退出。stop_flagthreading.Event()defworker():whilenotstop_flag.is_set():# 执行任务passtthreading.Thread(targetworker)t.start()stop_flag.set()# 通知线程退出t.join()7.3 全局解释器锁GIL对 CPU 密集型任务的影响对于纯计算任务多线程甚至比单线程更慢因为锁竞争。请使用multiprocessing。7.4 避免使用threading.active_count()进行精确控制它返回的是当前活动线程数包括已启动但未完成的不保证完全准确。7.5 线程安全的数据结构queue.Queue是线程安全的。列表、字典等内置容器不是线程安全的需要加锁保护。7.6 异常处理线程中的异常不会传播到主线程务必在线程函数内部捕获并处理。defsafe_worker():try:# 可能出错的代码passexceptExceptionase:print(f线程出错:{e})8. threading vs asyncio vs multiprocessing特性threadingasynciomultiprocessing并发模型多线程抢占式协程协作式多进程适合任务I/O 密集型高并发 I/O网络CPU 密集型真正的并行否GIL限制否是内存开销低共享极低高编程复杂度中等需处理锁较高async/await中等IPC第三方库支持广泛需要异步版本广泛选型建议简单并发 I/O如少量爬虫 →threading成千上万个网络连接 →asyncio数值计算、图像处理 →multiprocessingGUI 程序 →threading保持界面响应9. 总结通过本文我们全面学习了 Pythonthreading模块✅线程基础创建、启动、守护线程、join✅同步机制Lock、RLock、Semaphore、Event、Condition、Barrier✅线程间通信queue.Queue 安全传递数据✅线程池ThreadPoolExecutor高效管理✅实战案例网页下载、文件处理、超时控制✅注意事项死锁、GIL、无法强制终止等threading是 Python 并发编程的基石之一掌握它将使你能够编写更高效、更响应的应用程序。但请记住没有银弹——根据任务类型选择合适的并发工具才能发挥最大的威力。希望这篇博客能帮助你理清threading的脉络。如果在实践中遇到有趣的并发问题欢迎在评论区讨论感谢阅读我们下篇见

更多文章