上周我接了个私活需求不复杂写个爬虫批量抓取几百个页面然后做数据清洗入库。同步写法跑了一版慢到我怀疑人生——500 个请求串行跑了将近 8 分钟。果断上 asyncio 改异步。然后就开始了长达一周的踩坑之旅。说实话asyncio 这东西文档看着挺简单async def加await就完了嘛。但真写起来各种反直觉的行为能让你怀疑自己是不是学了假 Python。这篇文章就是我这一周踩过的真实坑位记录都是血泪教训希望能帮到正在入坑的朋友。先说结论坑位症状原因解决耗时协程没被 await函数调用了但没执行忘了 await只拿到协程对象10 分钟同步代码阻塞事件循环整个程序卡住在 async 函数里调了time.sleep2 小时asyncio.run()套娃报错 RuntimeError在已运行的事件循环里再调asyncio.run()半天gather异常吞没部分任务静默失败默认不会抛出其他任务的异常1 天aiohttp session 没关ResourceWarning 刷屏没用async with管理生命周期30 分钟并发太猛被封 IP429 Too Many Requests没做并发控制大半天坑 1协程没 await代码静悄悄地不执行新手第一个会踩的坑我也没逃过。importasyncioasyncdeffetch_data():print(开始请求...)awaitasyncio.sleep(1)print(请求完成)return{data:hello}asyncdefmain():# ❌ 错误写法忘了 awaitresultfetch_data()print(f结果:{result})asyncio.run(main())运行结果结果: coroutine object fetch_data at 0x... RuntimeWarning: coroutine fetch_data was never awaitedfetch_data里的两个 print 一个都没执行。因为fetch_data()只是创建了一个协程对象并没有真正运行它。asyncdefmain():# ✅ 正确写法resultawaitfetch_data()print(f结果:{result})这个坑虽然简单但在复杂项目里特别隐蔽。比如你在某个回调里调用了协程函数但忘了 await那段逻辑就直接被跳过了还不报错只有个 Warningdebug 的时候你会疯。坑 2同步阻塞炸掉整个事件循环这个坑是真让我排查了两小时。importasyncioimporttimeasyncdeftask_a():print(f[{time.strftime(%H:%M:%S)}] Task A 开始)# ❌ 用了同步的 time.sleeptime.sleep(3)print(f[{time.strftime(%H:%M:%S)}] Task A 完成)asyncdeftask_b():print(f[{time.strftime(%H:%M:%S)}] Task B 开始)awaitasyncio.sleep(1)print(f[{time.strftime(%H:%M:%S)}] Task B 完成)asyncdefmain():awaitasyncio.gather(task_a(),task_b())asyncio.run(main())你猜输出什么[14:00:00] Task A 开始 [14:00:03] Task A 完成 # 注意B 在 A 完成后才开始 [14:00:03] Task B 开始 [14:00:04] Task B 完成time.sleep(3)直接把事件循环阻塞了Task B 根本没法并发。asyncio 是单线程的协作式并发你用同步阻塞调用就相当于一个人霸占了整条路别人谁都过不去。是 - 让出控制权否 - 同步阻塞事件循环 - 单线程当前协程是否 await?切换到其他就绪的协程整个事件循环卡住所有其他协程都在等多个协程交替执行 ✅变成串行执行 ❌正确做法asyncdeftask_a():print(f[{time.strftime(%H:%M:%S)}] Task A 开始)# ✅ 用异步的 sleepawaitasyncio.sleep(3)print(f[{time.strftime(%H:%M:%S)}] Task A 完成)但现实中不只是 sleep 的问题。你用了requests库发 HTTP 请求用了open()读大文件用了某个不支持异步的数据库驱动——这些全是同步阻塞操作会把你的事件循环卡得死死的。如果实在要用同步库用run_in_executor扔到线程池importasyncioimportrequestsasyncdeffetch_sync_api(url):loopasyncio.get_event_loop()# 把同步的 requests.get 扔到线程池执行responseawaitloop.run_in_executor(None,requests.get,url)returnresponse.text坑 3asyncio.run()嵌套调用直接炸这个坑我是在 Jupyter Notebook 里踩的。importasyncioasyncdefinner():returnhelloasyncdefouter():# ❌ 在协程里再调 asyncio.run()resultasyncio.run(inner())returnresult asyncio.run(outer())直接报RuntimeError: asyncio.run() cannot be called from a running event loop原因很简单asyncio.run()会创建一个新的事件循环但你已经在一个事件循环里了。一山不容二虎一个线程不容两个事件循环。Jupyter Notebook 里更坑因为 Notebook 自带一个运行中的事件循环你在 cell 里直接写asyncio.run()就会炸。解决方案# 在协程内部直接 await 就行了不要再 runasyncdefouter():resultawaitinner()# ✅returnresult# 在 Jupyter Notebook 里# 方案一直接 awaitJupyter 支持顶层 awaitresultawaitinner()# 方案二用 nest_asyncio不太优雅但管用importnest_asyncio nest_asyncio.apply()asyncio.run(outer())坑 4asyncio.gather的异常处理黑洞这个坑害我丢了一天数据真的痛。importasyncioasyncdeftask_ok():awaitasyncio.sleep(0.5)return成功asyncdeftask_fail():awaitasyncio.sleep(0.1)raiseValueError(出错了)asyncdeftask_also_ok():awaitasyncio.sleep(0.3)return也成功了asyncdefmain():# ❌ 默认行为一个任务抛异常其他任务的结果就拿不到了try:resultsawaitasyncio.gather(task_ok(),task_fail(),task_also_ok())exceptValueErrorase:print(f捕获到异常:{e})# 但是 task_ok 和 task_also_ok 的结果呢没了。asyncio.run(main())gather在默认行为下遇到第一个异常就会把它抛出来其他任务的结果你拿不到虽然它们其实已经执行了或者还在执行。加上return_exceptionsTrueasyncdefmain():# ✅ 异常作为返回值不会中断其他任务resultsawaitasyncio.gather(task_ok(),task_fail(),task_also_ok(),return_exceptionsTrue)fori,resultinenumerate(results):ifisinstance(result,Exception):print(f任务{i}失败:{result})else:print(f任务{i}成功:{result})asyncio.run(main())输出任务 0 成功: 成功 任务 1 失败: 出错了 任务 2 成功: 也成功了2026 年了更推荐用TaskGroupPython 3.11 引入的asyncdefmain():try:asyncwithasyncio.TaskGroup()astg:t1tg.create_task(task_ok())t2tg.create_task(task_fail())t3tg.create_task(task_also_ok())except*ValueErroraseg:forexcineg.exceptions:print(f捕获:{exc})TaskGroup配合except*ExceptionGroup 语法用起来更清晰异常处理逻辑更可控。坑 5并发数不控制直接被封最后一个大坑。我一开始写爬虫的时候500 个请求直接 gather 一把梭# ❌ 500 个请求同时发出去tasks[fetch(url)forurlinurls]# 500 个resultsawaitasyncio.gather(*tasks)结果瞬间收到一堆 429IP 还被临时封了。用asyncio.Semaphore控制并发数importasyncioimportaiohttpasyncdeffetch(session,url,semaphore):asyncwithsemaphore:# 信号量控制并发asyncwithsession.get(url)asresponse:returnawaitresponse.text()asyncdefmain():semaphoreasyncio.Semaphore(10)# 最多同时 10 个请求asyncwithaiohttp.ClientSession()assession:tasks[fetch(session,url,semaphore)forurlinurls]resultsawaitasyncio.gather(*tasks,return_exceptionsTrue)successsum(1forrinresultsifnotisinstance(r,Exception))failedsum(1forrinresultsifisinstance(r,Exception))print(f成功:{success}, 失败:{failed})asyncio.run(main())顺带一提aiohttp.ClientSession一定要用async with来管理。手动创建但忘了 close退出时会收到一堆ResourceWarning: Unclosed client session不影响功能但看着烦。我总结的 asyncio 心智模型学了一周我觉得理解 asyncio 的关键就一句话它是单线程的协作式并发所有协程共享一个线程靠 await 主动让出执行权。对比维度多线程 threading异步 asyncio并发模型抢占式协作式切换时机OS 随时切换遇到 await 才切换线程数多个1 个竞态条件容易出现几乎不会阻塞影响只阻塞当前线程阻塞整个事件循环适用场景CPU 密集 IO高并发 IO调试难度高竞态问题中异步思维代码 IO 密集网络请求、数据库查询、文件读写asyncio 能给你显著的性能提升。我那个爬虫改完之后500 个请求从 8 分钟降到了 40 秒提升了 12 倍。CPU 密集型的大量计算、图像处理asyncio 帮不了你老老实实用 multiprocessing。小结回头看这一周asyncio 的核心概念并不多难的是那些反直觉的行为和隐蔽的 bug。几条实操建议先搞清楚事件循环的运行机制不要急着写代码检查所有库是否支持异步不支持的用run_in_executor包一层gather一定要加return_exceptionsTrue不然数据丢了你都不知道并发数必须控制Semaphore 是你的好朋友Python 3.11 尽量用TaskGroup替代gather异常处理更优雅踩完这些坑之后asyncio 用起来其实挺顺手的。起码比 threading 那套锁来锁去的操作省心多了——毕竟单线程不用操心竞态条件。有问题欢迎评论区交流asyncio 数据库连接池的实践后面可能也会写一篇。