跳到主要内容

第十二节 — 并发与异步处理

这一节回答:FinBayes 用什么并发原语?多任务怎么并发?失败如何隔离?用户取消时发生什么?

并发原语:Python asyncio

FinBayes Runtime 采用 Python asyncio 作为基础并发原语。理由:

  • Runtime 工作是 I/O 密集 而非 CPU 密集(LLM 调用 / 工具调用 / 数据 API 都是网络 I/O)
  • asyncio 单线程事件循环避免锁竞争与死锁的常见坑
  • 与 Python 生态(FastAPI / aiohttp / httpx 等)天然契合
  • Python 3.11+ 提供 asyncio.TaskGroup 与结构化并发原语,匹配 FinBayes 业务对象 TaskGroup

例外:CPU 密集的操作(如本地 LLM 推理 / 大量 PDF 解析)走 loop.run_in_executor 抛到线程池或进程池执行,不阻塞事件循环。


业务层 TaskGroup 与异步原语的映射

业务对象 TaskGroup(用户输入命中多任务时的并发容器,详见状态对象生命周期)的工程实现:

# 示意代码(非最终实现)
async def execute_task_group(group: TaskGroup) -> MergedResult:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(execute_task(t)) for t in group.tasks
]
# asyncio.TaskGroup 保证:
# - 所有子任务并发执行
# - 任一子任务失败时其他子任务被自动取消(结构化并发)
# - 异常被 ExceptionGroup 集中抛出
return merge_results([t.result() for t in tasks if not t.exception()])

关键设计选择

选择理由
asyncio.TaskGroup 而不是 asyncio.gather(...)TaskGroup 提供结构化并发(任一失败自动取消其他),避免悬挂任务
失败 collected by ExceptionGroup不丢失任何子任务错误(vs gather(return_exceptions=True) 静默吞错)
merge_results 处理 partial success失败的子任务不阻塞 partial result 输出(业务约束:失败隔离)

Trace ID 跨任务传递机制

task_id 等 trace ID(详见 CHAP-18 trace ID 体系)通过 contextvars.ContextVar 在 TaskGroup 创建的子任务中自动继承:

# 示意代码
import contextvars
from contextvars import ContextVar

trace_task_id: ContextVar[str] = ContextVar("task_id")
trace_session_id: ContextVar[str] = ContextVar("session_id")
trace_user_id: ContextVar[str] = ContextVar("user_id")

async def execute_task(task: Task) -> TaskResult:
# asyncio 在 create_task 时自动 copy 当前 context(PEP 567)
# 因此 trace_task_id 在 TaskGroup 创建的所有子任务中自动可见
current_task_id = trace_task_id.get()
audit_event("task_started", task_id=current_task_id, ...)
...

关键约束

  • 所有 LLM / 工具 / Audit 子调用通过 ContextVar 拿到 trace ID,不通过参数显式传递(避免每个函数签名拖一串 trace 参数)
  • httpx middleware 在出站 HTTP 请求时自动从 ContextVar 读取 trace ID 并注入 X-Trace-ID / X-Task-Id 请求头(用于 Provider 端日志关联)
  • 任意 asyncio.create_task(...) 调用都自动继承 context —— 这是 asyncio 的内置行为(PEP 567),不需要工程层额外处理
  • 跨线程边界(如 asyncio.to_thread)需显式 context.run(...)contextvars.copy_context() 转移 context

失败隔离的颗粒度

FinBayes 在多个层级提供失败隔离:

层级颗粒度隔离机制
TaskGroup 内每个 Taskasyncio.TaskGroup 结构化并发 + 失败子任务的 result 标记为降级
Task 内每个证据节点(EvidencePacket)单证据节点失败只影响依赖它的下游证据节点,不影响其他证据节点
Evidence 内每个工具调用工具调用失败标记 degraded_reason,归一化为降级 EvidencePacket,不抛异常
Provider 调用内每个 LLM Provider失败触发 L1 → L1' → L2 → L3 → L4 降级(详见 Provider Adapter 子系统)

关键约束

  • 默认不让单点失败传染到整个 Session
  • 每个失败都有 degraded_reason 元数据进入 EvidencePacket / Result,前台可呈现给用户
  • 真正阻断的失败(如 State Store 完全不可用)才让 Session 进入只读模式

用户取消语义

用户主动取消(如 Ctrl+C / ESC / Web UI 取消按钮)的处理:

关键设计

  • 取消是 协作式 而非强制(asyncio 取消信号传播到协程边界)
  • 已完成的部分证据 / 部分结果保留至审计 trail(用户后续可查阅"取消前发生了什么")
  • 正在进行中的 LLM 调用通过 httpx 客户端的 CancelledError 立即释放,不浪费用户的 token 配额

TaskGroup 取消:用户取消整个 TaskGroup 时,所有子任务收到 cancel 信号;子任务已开始的工具调用允许 5 秒 grace period 完成(避免数据不一致),然后强制结束。


超时控制

不同层级的超时策略:

层级默认超时可配置
单个工具调用30s是(per-tool)
单个 LLM 调用60s是(per-task-type)
单个 Task(含所有证据 + 综合)120s
TaskGroup(用户感知的总等待)180s
Session Context Compactor30s

超时触发后:

  • 进行中的协程被 cancel
  • 已完成的部分结果保留
  • 用户看到部分结果 + 超时原因明示
  • 审计 trail 记录"哪一层超时"

配置走文件不硬编码:与战略未决问题相关的成本 / 性能参数都走配置(详见上位继承与不变量章节"不抢答"约定)。


并发限制(backpressure)

为防止本地资源被无界并发耗尽,runtime 设全局并发限制:

限制项默认值调节点
同时进行的 Task 数量5配置
单 Task 内同时调用的工具数量3配置
总 LLM 调用并发(跨 Task)10配置
数据 Provider 调用并发20配置

实现:用 asyncio.Semaphore 在各层级控流。超过限制的请求进入 FIFO 队列等待。

用户感知:高并发时新任务可能等待几秒;用户应能看到"正在排队"提示而非静默卡顿。


事件循环与阻塞调用的处理

asyncio 单线程事件循环对同步阻塞调用敏感。FinBayes 规则:

操作类型处理方式
HTTP / WebSocket / 数据库(SQLite asyncio binding)/ 文件 I/O直接 await(原生异步)
本地 LLM 推理(Ollama 等通过 HTTP,已是异步)直接 await
大量 CPU 计算(如复杂 PDF 解析 / 数据处理)loop.run_in_executor(None, sync_func, args) 抛线程池
第三方同步 SDK 调用同上
子进程调用(如 mermaid-cli 渲染)asyncio.create_subprocess_exec 异步等待

约束:任何直接调用同步阻塞 API 而不抛到线程池的代码视为 bug,进入 review gate 必查项。


测试约束

并发模型的测试覆盖:

测试类型覆盖什么
单任务并发测试TaskGroup 内任务真正并发执行(不是串行)
失败隔离测试一个子任务失败时其他子任务能完成
取消传播测试用户取消能正确传播到 Provider 调用
超时测试各层超时后部分结果保留 + 审计完整
并发限制测试超过 Semaphore 限制的请求被正确队列化

详细测试体系见质量与验收章节。