Python 协程与异步编程:从入门到精通
Python 协程与异步编程从入门到精通作为一名从Python转向Rust的后端开发者我深刻体会到Python异步编程的强大和灵活。Python的协程和异步编程不仅可以提高程序的性能还可以使代码更加简洁、优雅这让我在编写高并发服务时更加自信。今天我想分享一下Python协程与异步编程的高级应用希望能帮助大家更好地理解和使用这个强大的特性。一、异步编程的基本概念1. 什么是异步编程异步编程是一种编程范式它允许程序在等待某些操作如I/O操作完成时执行其他任务从而提高程序的并发性能。2. 协程的基本概念协程是一种可以暂停执行并在稍后恢复的函数它是实现异步编程的基础。import asyncio async def hello(): print(Hello) await asyncio.sleep(1) print(World) async def main(): await hello() asyncio.run(main()) # 输出: # Hello # (等待1秒) # World二、高级应用技巧1. 并发执行多个协程我们可以使用asyncio.gather来并发执行多个协程。import asyncio import time async def task(name, duration): print(fTask {name} started) await asyncio.sleep(duration) print(fTask {name} finished) return fTask {name} result async def main(): start time.time() # 并发执行多个协程 results await asyncio.gather( task(A, 2), task(B, 1), task(C, 3) ) end time.time() print(fTotal time: {end - start} seconds) print(fResults: {results}) asyncio.run(main()) # 输出: # Task A started # Task B started # Task C started # Task B finished # Task A finished # Task C finished # Total time: 3.001234 seconds # Results: [Task A result, Task B result, Task C result]2. 使用 async with 和 async for我们可以使用async with和async for来处理异步上下文管理器和异步迭代器。import asyncio class AsyncContextManager: async def __aenter__(self): print(Entering context) await asyncio.sleep(0.5) return self async def __aexit__(self, exc_type, exc_val, exc_tb): print(Exiting context) await asyncio.sleep(0.5) class AsyncIterator: def __init__(self, count): self.count count self.current 0 def __aiter__(self): return self async def __anext__(self): if self.current self.count: value self.current self.current 1 await asyncio.sleep(0.5) return value else: raise StopAsyncIteration async def main(): # 使用 async with async with AsyncContextManager(): print(Inside context) # 使用 async for print(Iterating:) async for value in AsyncIterator(3): print(fValue: {value}) asyncio.run(main()) # 输出: # Entering context # Inside context # Exiting context # Iterating: # Value: 0 # Value: 1 # Value: 23. 任务管理我们可以使用asyncio.create_task来创建任务并使用Task对象来管理任务的执行。import asyncio async def task(name, duration): print(fTask {name} started) await asyncio.sleep(duration) print(fTask {name} finished) return fTask {name} result async def main(): # 创建任务 task1 asyncio.create_task(task(A, 2)) task2 asyncio.create_task(task(B, 1)) # 等待任务完成 await task1 await task2 # 获取任务结果 print(fTask1 result: {task1.result()}) print(fTask2 result: {task2.result()}) asyncio.run(main()) # 输出: # Task A started # Task B started # Task B finished # Task A finished # Task1 result: Task A result # Task2 result: Task B result三、实用示例1. 异步HTTP客户端我们可以使用aiohttp库来创建异步HTTP客户端并发发送多个HTTP请求。import asyncio import aiohttp async def fetch(url, session): async with session.get(url) as response: return await response.text() async def main(): urls [ https://www.example.com, https://www.google.com, https://www.github.com ] async with aiohttp.ClientSession() as session: tasks [fetch(url, session) for url in urls] results await asyncio.gather(*tasks) for url, content in zip(urls, results): print(fURL: {url}, Content length: {len(content)}) asyncio.run(main())2. 异步文件I/O我们可以使用aiofiles库来进行异步文件I/O操作。import asyncio import aiofiles async def read_file(filename): async with aiofiles.open(filename, r) as f: content await f.read() return content async def write_file(filename, content): async with aiofiles.open(filename, w) as f: await f.write(content) async def main(): # 读取文件 content await read_file(input.txt) print(fRead content: {content}) # 写入文件 await write_file(output.txt, content) print(File written) asyncio.run(main())3. 异步数据库操作我们可以使用asyncpg库来进行异步数据库操作。import asyncio import asyncpg async def main(): # 连接数据库 conn await asyncpg.connect( hostlocalhost, port5432, userpostgres, passwordpassword, databasetest ) # 创建表 await conn.execute( CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name TEXT, email TEXT ) ) # 插入数据 await conn.execute( INSERT INTO users (name, email) VALUES ($1, $2), Alice, aliceexample.com ) # 查询数据 rows await conn.fetch(SELECT * FROM users) for row in rows: print(fID: {row[id]}, Name: {row[name]}, Email: {row[email]}) # 关闭连接 await conn.close() asyncio.run(main())四、高级异步编程技术1. 信号量我们可以使用asyncio.Semaphore来限制并发数。import asyncio async def task(name, duration, semaphore): async with semaphore: print(fTask {name} started) await asyncio.sleep(duration) print(fTask {name} finished) async def main(): # 创建信号量限制最多2个并发任务 semaphore asyncio.Semaphore(2) tasks [ task(A, 2, semaphore), task(B, 1, semaphore), task(C, 3, semaphore), task(D, 1, semaphore) ] await asyncio.gather(*tasks) asyncio.run(main()) # 输出: # Task A started # Task B started # Task B finished # Task C started # Task A finished # Task D started # Task C finished # Task D finished2. 事件我们可以使用asyncio.Event来实现任务间的通信。import asyncio async def waiter(name, event): print(fWaiter {name} waiting) await event.wait() print(fWaiter {name} received event) async def main(): event asyncio.Event() # 创建多个等待者 waiters [waiter(f{i}, event) for i in range(3)] # 启动等待者 await asyncio.gather(*waiters, return_exceptionsTrue) # 触发事件 print(Setting event) event.set() # 等待所有等待者完成 await asyncio.gather(*waiters) asyncio.run(main()) # 输出: # Waiter 0 waiting # Waiter 1 waiting # Waiter 2 waiting # Setting event # Waiter 0 received event # Waiter 1 received event # Waiter 2 received event3. 队列我们可以使用asyncio.Queue来实现任务间的消息传递。import asyncio async def producer(queue): for i in range(5): await asyncio.sleep(0.5) item fItem {i} await queue.put(item) print(fProduced: {item}) await queue.put(None) # 发送结束信号 async def consumer(queue): while True: item await queue.get() if item is None: break await asyncio.sleep(1) print(fConsumed: {item}) queue.task_done() async def main(): queue asyncio.Queue(maxsize2) # 启动生产者和消费者 producer_task asyncio.create_task(producer(queue)) consumer_task asyncio.create_task(consumer(queue)) # 等待生产者完成 await producer_task # 等待消费者处理完所有项目 await queue.join() # 取消消费者任务 consumer_task.cancel() asyncio.run(main()) # 输出: # Produced: Item 0 # Produced: Item 1 # Consumed: Item 0 # Produced: Item 2 # Consumed: Item 1 # Produced: Item 3 # Consumed: Item 2 # Produced: Item 4 # Consumed: Item 3 # Consumed: Item 4五、实战应用1. 异步Web服务器我们可以使用FastAPI或aiohttp来创建异步Web服务器。from fastapi import FastAPI import asyncio app FastAPI() async def heavy_task(): await asyncio.sleep(2) return Task completed app.get(/) async def root(): result await heavy_task() return {message: result} app.get(/items/{item_id}) async def read_item(item_id: int, q: str None): return {item_id: item_id, q: q} # 运行服务器: # uvicorn main:app --reload2. 异步爬虫我们可以使用aiohttp来创建异步爬虫并发爬取多个网页。import asyncio import aiohttp from bs4 import BeautifulSoup async def fetch(url, session): async with session.get(url) as response: return await response.text() async def parse(url, session): html await fetch(url, session) soup BeautifulSoup(html, html.parser) title soup.title.string if soup.title else No title return (url, title) async def main(): urls [ https://www.example.com, https://www.google.com, https://www.github.com, https://www.python.org, https://www.rust-lang.org ] async with aiohttp.ClientSession() as session: tasks [parse(url, session) for url in urls] results await asyncio.gather(*tasks) for url, title in results: print(fURL: {url}, Title: {title}) asyncio.run(main())3. 异步微服务我们可以使用asyncio和gRPC来创建异步微服务。# 服务端 import asyncio import grpc from protos import service_pb2 from protos import service_pb2_grpc class Service(service_pb2_grpc.ServiceServicer): async def SayHello(self, request, context): await asyncio.sleep(1) return service_pb2.HelloResponse(messagefHello, {request.name}!) async def serve(): server grpc.aio.server() service_pb2_grpc.add_ServiceServicer_to_server(Service(), server) server.add_insecure_port([::]:50051) await server.start() await server.wait_for_termination() if __name__ __main__: asyncio.run(serve()) # 客户端 import asyncio import grpc from protos import service_pb2 from protos import service_pb2_grpc async def run(): async with grpc.aio.insecure_channel(localhost:50051) as channel: stub service_pb2_grpc.ServiceStub(channel) response await stub.SayHello(service_pb2.HelloRequest(nameAlice)) print(fResponse: {response.message}) if __name__ __main__: asyncio.run(run())六、总结Python的协程与异步编程是一个非常强大的特性它可以帮助我们编写高性能、高并发的程序。通过掌握并发执行多个协程、使用async with和async for、任务管理等高级技巧我们可以编写更加高效、可维护的异步代码。作为一名从Python转向Rust的开发者我发现Python的异步编程与Rust的异步编程有一些相似之处它们都使用协程来实现异步操作。但Python的异步编程更加简洁、易用而Rust的异步编程更加类型安全、性能更高。这两种风格各有优缺点我们可以根据具体的场景选择合适的语言和技术。希望这篇文章能对你有所帮助如果你有任何问题或建议欢迎在评论区留言。