Python异步编程实战:asyncio深度解析
Python异步编程实战asyncio深度解析引言在Python开发中异步编程是提高程序性能的关键技术。作为一名从Rust转向Python的后端开发者我深刻体会到asyncio在处理IO密集型任务时的优势。asyncio提供了完整的异步编程框架使得编写高性能并发代码变得更加容易。asyncio核心概念什么是asyncioasyncio是Python标准库中的异步IO框架具有以下特点协程支持使用async/await语法事件循环调度协程执行任务管理并发执行多个协程异步IO非阻塞的文件和网络操作同步原语锁、信号量等架构设计┌─────────────────────────────────────────────────────────────┐ │ asyncio 架构 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 事件循环 (Event Loop) │ │ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ │ │ 协程任务 │ │ IO操作 │ │ │ │ │ │ (Task) │ │ (Non-blocking)│ │ │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ │ │ │ │ └────────┬─────────┘ │ │ │ │ ▼ │ │ │ │ 调度器 (Scheduler) │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘环境搭建与基础配置基本协程import asyncio async def hello(): print(Hello) await asyncio.sleep(1) print(World) asyncio.run(hello())并发执行import asyncio async def task(name, delay): print(fTask {name} started) await asyncio.sleep(delay) print(fTask {name} completed) return fResult from {name} async def main(): results await asyncio.gather( task(A, 2), task(B, 1), task(C, 3) ) print(results) asyncio.run(main())高级特性实战任务管理import asyncio async def long_running_task(): print(Starting long task) await asyncio.sleep(5) print(Long task completed) return Done async def main(): task asyncio.create_task(long_running_task()) print(Doing other work...) await asyncio.sleep(2) result await task print(fResult: {result}) asyncio.run(main())取消任务import asyncio async def cancellable_task(): try: print(Task started) for i in range(5): print(fWorking... {i}) await asyncio.sleep(1) return Completed except asyncio.CancelledError: print(Task was cancelled) raise async def main(): task asyncio.create_task(cancellable_task()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print(Main caught cancellation) asyncio.run(main())超时处理import asyncio async def slow_task(): await asyncio.sleep(5) return Slow result async def main(): try: result await asyncio.wait_for(slow_task(), timeout2) print(result) except asyncio.TimeoutError: print(Task timed out) asyncio.run(main())同步原语锁import asyncio async def worker(name, lock, counter): async with lock: print(f{name} acquired lock) counter[0] 1 print(f{name}: counter {counter[0]}) await asyncio.sleep(1) print(f{name} released lock) async def main(): lock asyncio.Lock() counter [0] await asyncio.gather( worker(A, lock, counter), worker(B, lock, counter), worker(C, lock, counter) ) asyncio.run(main())信号量import asyncio async def limited_worker(name, semaphore): async with semaphore: print(f{name} started) await asyncio.sleep(2) print(f{name} completed) async def main(): semaphore asyncio.Semaphore(2) await asyncio.gather( limited_worker(A, semaphore), limited_worker(B, semaphore), limited_worker(C, semaphore), limited_worker(D, semaphore) ) asyncio.run(main())实际业务场景场景一批量API请求import asyncio import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.json() async def main(): urls [ https://api.example.com/users, https://api.example.com/posts, https://api.example.com/comments ] async with aiohttp.ClientSession() as session: tasks [fetch(session, url) for url in urls] results await asyncio.gather(*tasks) for url, result in zip(urls, results): print(f{url}: {len(result)} items) asyncio.run(main())场景二文件异步读写import asyncio async def read_file(filename): async with open(filename, r) as f: content await f.read() return content async def write_file(filename, content): async with open(filename, w) as f: await f.write(content) async def main(): content await read_file(input.txt) await write_file(output.txt, content.upper()) print(Files processed) asyncio.run(main())性能优化并发控制import asyncio async def process_items(items, max_concurrent10): semaphore asyncio.Semaphore(max_concurrent) async def process_item(item): async with semaphore: return await do_processing(item) tasks [process_item(item) for item in items] results await asyncio.gather(*tasks) return results任务分组import asyncio async def group_tasks(): group1 asyncio.gather(task1(), task2()) group2 asyncio.gather(task3(), task4()) result1, result2 await asyncio.gather(group1, group2) return result1 result2总结asyncio为Python开发者提供了完整的异步编程框架。通过协程和事件循环可以高效处理IO密集型任务。从Rust开发者的角度来看asyncio的设计理念与Rust的异步运行时相似但Python的协程更加轻量和易用。在实际项目中建议合理使用同步原语来控制并发并注意任务的取消和超时处理。