第十二节 — 并发与异步处理
这一节回答:FinBayes 用什么并发原语?多任务怎么并发?失败如何隔离?用户取消时发生什么?
并发原语:Python asyncio
FinBayes Runtime 采用 Python asyncio 作为基础并发原语。理由:
- Runtime 工作是 I/O 密集 而非 CPU 密集(LLM 调用 / 工具调用 / 数据 API 都是网络 I/O)
- asyncio 单线程事件循环避免锁竞争与死锁的常见坑
- 与 Python 生态(FastAPI / aiohttp / httpx 等)天然契合
- Python 3.11+ 提供
asyncio.TaskGroup与结构化并发原语,匹配 FinBayes 业务对象 TaskGroup
例外:CPU 密集的操作(如本地 LLM 推理 / 大量 PDF 解析)走 loop.run_in_executor 抛到线程池或进程池执行,不阻塞事件循环。
业务层 TaskGroup 与异步原语的映射
业务对象 TaskGroup(用户输入命中多任务时的并发容器,详见状态对象生命周期)的工程实现:
# 示意代码(非最终实现)
async def execute_task_group(group: TaskGroup) -> MergedResult:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(execute_task(t)) for t in group.tasks
]
# asyncio.TaskGroup 保证:
# - 所有子任务并发执行
# - 任一子任务失败时其他子任务被自动取消(结构化并发)
# - 异常被 ExceptionGroup 集中抛出
return merge_results([t.result() for t in tasks if not t.exception()])
关键设计选择:
| 选择 | 理由 |
|---|---|
用 asyncio.TaskGroup 而不是 asyncio.gather(...) | TaskGroup 提供结构化并发(任一失败自动取消其他),避免悬挂任务 |
| 失败 collected by ExceptionGroup | 不丢失任何子任务错误(vs gather(return_exceptions=True) 静默吞错) |
merge_results 处理 partial success | 失败的子任务不阻塞 partial result 输出(业务约束:失败隔离) |
Trace ID 跨任务传递机制
task_id 等 trace ID(详见 CHAP-18 trace ID 体系)通过 contextvars.ContextVar 在 TaskGroup 创建的子任务中自动继承:
# 示意代码
import contextvars
from contextvars import ContextVar
trace_task_id: ContextVar[str] = ContextVar("task_id")
trace_session_id: ContextVar[str] = ContextVar("session_id")
trace_user_id: ContextVar[str] = ContextVar("user_id")
async def execute_task(task: Task) -> TaskResult:
# asyncio 在 create_task 时自动 copy 当前 context(PEP 567)
# 因此 trace_task_id 在 TaskGroup 创建的所有子任务中自动可见
current_task_id = trace_task_id.get()
audit_event("task_started", task_id=current_task_id, ...)
...
关键约束:
- 所有 LLM / 工具 / Audit 子调用通过 ContextVar 拿到 trace ID,不通过参数显式传递(避免每个函数签名拖一串 trace 参数)
- httpx middleware 在出站 HTTP 请求时自动从 ContextVar 读取 trace ID 并注入
X-Trace-ID/X-Task-Id请求头(用于 Provider 端日志关联) - 任意
asyncio.create_task(...)调用都自动继承 context —— 这是 asyncio 的内置行为(PEP 567),不需要工程层额外处理 - 跨线程边界(如
asyncio.to_thread)需显式context.run(...)或contextvars.copy_context()转移 context
失败隔离的颗粒度
FinBayes 在多个层级提供失败隔离:
| 层级 | 颗粒度 | 隔离机制 |
|---|---|---|
| TaskGroup 内 | 每个 Task | asyncio.TaskGroup 结构化并发 + 失败子任务的 result 标记为降级 |
| Task 内 | 每个证据节点(EvidencePacket) | 单证据节点失败只影响依赖它的下游证据节点,不影响其他证据节点 |
| Evidence 内 | 每个工具调用 | 工具调用失败标记 degraded_reason,归一化为降级 EvidencePacket,不抛异常 |
| Provider 调用内 | 每个 LLM Provider | 失败触发 L1 → L1' → L2 → L3 → L4 降级(详见 Provider Adapter 子系统) |
关键约束:
- 默认不让单点失败传染到整个 Session
- 每个失败都有
degraded_reason元数据进入 EvidencePacket / Result,前台可呈现给用户 - 真正阻断的失败(如 State Store 完全不可用)才让 Session 进入只读模式
用户取消语义
用户主动取消(如 Ctrl+C / ESC / Web UI 取消按钮)的处理:
关键设计:
- 取消是 协作式 而非强制(asyncio 取消信号传播到协程边界)
- 已完成的部分证据 / 部分结果保留至审计 trail(用户后续可查阅"取消前发生了什么")
- 正在进行中的 LLM 调用通过 httpx 客户端的 CancelledError 立即释放,不浪费用户的 token 配额
TaskGroup 取消:用户取消整个 TaskGroup 时,所有子任务收到 cancel 信号;子任务已开始的工具调用允许 5 秒 grace period 完成(避免数据不一致),然后强制结束。
超时控制
不同层级的超时策略:
| 层级 | 默认超时 | 可配置 |
|---|---|---|
| 单个工具调用 | 30s | 是(per-tool) |
| 单个 LLM 调用 | 60s | 是(per-task-type) |
| 单个 Task(含所有证据 + 综合) | 120s | 是 |
| TaskGroup(用户感知的总等待) | 180s | 是 |
| Session Context Compactor | 30s | 是 |
超时触发后:
- 进行中的协程被 cancel
- 已完成的部分结果保留
- 用户看到部分结果 + 超时原因明示
- 审计 trail 记录"哪一层超时"
配置走文件不硬编码:与战略未决问题相关的成本 / 性能参数都走配置(详见上位继承与不变量章节"不抢答"约定)。
并发限制(backpressure)
为防止本地资源被无界并发耗尽,runtime 设全局并发限制:
| 限制项 | 默认值 | 调节点 |
|---|---|---|
| 同时进行的 Task 数量 | 5 | 配置 |
| 单 Task 内同时调用的工具数量 | 3 | 配置 |
| 总 LLM 调用并发(跨 Task) | 10 | 配置 |
| 数据 Provider 调用并发 | 20 | 配置 |
实现:用 asyncio.Semaphore 在各层级控流。超过限制的请求进入 FIFO 队列等待。
用户感知:高并发时新任务可能等待几秒;用户应能看到"正在排队"提示而非静默卡顿。
事件循环与阻塞调用的处理
asyncio 单线程事件循环对同步阻塞调用敏感。FinBayes 规则:
| 操作类型 | 处理方式 |
|---|---|
| HTTP / WebSocket / 数据库(SQLite asyncio binding)/ 文件 I/O | 直接 await(原生异步) |
| 本地 LLM 推理(Ollama 等通过 HTTP,已是异步) | 直接 await |
| 大量 CPU 计算(如复杂 PDF 解析 / 数据处理) | loop.run_in_executor(None, sync_func, args) 抛线程池 |
| 第三方同步 SDK 调用 | 同上 |
| 子进程调用(如 mermaid-cli 渲染) | asyncio.create_subprocess_exec 异步等待 |
约束:任何直接调用同步阻塞 API 而不抛到线程池的代码视为 bug,进入 review gate 必查项。
测试约束
并发模型的测试覆盖:
| 测试类型 | 覆盖什么 |
|---|---|
| 单任务并发测试 | TaskGroup 内任务真正并发执行(不是串行) |
| 失败隔离测试 | 一个子任务失败时其他子任务能完成 |
| 取消传播测试 | 用户取消能正确传播到 Provider 调用 |
| 超时测试 | 各层超时后部分结果保留 + 审计完整 |
| 并发限制测试 | 超过 Semaphore 限制的请求被正确队列化 |
详细测试体系见质量与验收章节。