输入关键词开始搜索

Python 异步编程

为什么需要异步

# 同步:逐个等待
import time

def download(url):
    time.sleep(1)   # 模拟 IO 等待
    return f"{url} done"

start = time.time()
for url in ["a", "b", "c"]:
    download(url)
print(f"{time.time() - start:.1f}s")  # 3.0s
# 异步:并发等待
import asyncio

async def download(url):
    await asyncio.sleep(1)  # 不阻塞事件循环
    return f"{url} done"

async def main():
    tasks = [download(url) for url in ["a", "b", "c"]]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())
# 耗时 ~1.0s,三个任务并发执行

async/await 语法

# async def 定义协程函数 — 调用它不执行,返回协程对象
async def foo():
    return 42

coro = foo()           # 不执行,只是协程对象
result = await coro    # 真正执行

# await 只能出现在 async 函数内部
# await 后面必须是 Awaitable(协程、Task、Future)

# 常见的可 await 对象:
await asyncio.sleep(1)        # 等待
await some_coroutine()        # 协程
await asyncio.gather(...)     # 并发等
await task                    # Task 对象

事件循环

# 获取当前事件循环
loop = asyncio.get_event_loop()

# 主入口(Python 3.7+)
asyncio.run(main())

# 等价于:
loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

Task 并发

# create_task:立即调度,不阻塞
async def main():
    task1 = asyncio.create_task(download("a"))
    task2 = asyncio.create_task(download("b"))
    # 两个任务已经在后台跑了

    result1 = await task1    # 等 task1 完成
    result2 = await task2    # 等 task2 完成

# gather:批量等待
results = await asyncio.gather(
    download("a"),
    download("b"),
    download("c"),
    return_exceptions=True   # 单个失败不影响其他
)

# as_completed:哪个先完成先处理
for coro in asyncio.as_completed(tasks):
    result = await coro
    print(f"完成: {result}")
方法返回时机适用场景
gather全部完成批量请求
as_completed逐个完成哪个快处理哪个
wait可配置FIRST_COMPLETED / ALL_COMPLETED

实战:aiohttp 网络请求

import aiohttp
import asyncio

async def fetch(url, session):
    async with session.get(url) as resp:
        return await resp.text()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(url, session) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"Got {len(results)} responses")

asyncio.run(main())  # ~1s, not 3s

常见陷阱

陷阱 1:async 函数里调同步阻塞代码

async def bad():
    time.sleep(2)          # ❌ 阻塞整个事件循环
    result = requests.get(url)  # ❌ requests 是同步库

async def good():
    await asyncio.sleep(2) # ✅
    # 用 aiohttp 替代 requests

陷阱 2:忘记 await

asyncio.create_task(download("a"))  # ⚠️ 没有 await,可能还没跑完 main 就退出了

# 解决:存引用,最后 await
task = asyncio.create_task(download("a"))
await task

陷阱 3:协程对象重复 await

coro = download("a")
await coro    # ✅ 第一次
await coro    # ❌ RuntimeError: cannot reuse already awaited coroutine

# 正确:想多次执行就每次重新调用

异步 vs 多线程

asynciothreading
适合IO 密集型IO / CPU 密集型
切换成本极低(协程切换)较高(线程切换)
并发数数千个几十个
GIL不受影响受 GIL 限制
学习曲线较高中等
调试较难相对容易