跳转到主要内容
Zeus AI Backend 是一个基于 FastAPI 构建的 Agent 服务端,负责接收用户请求、编排 Agent 执行、管理工具调用和节点通信。它通过 HTTP REST + SSE 与前端交互,通过 WebSocket + JSON-RPC 2.0 与远程节点(浏览器扩展、桌面应用)通信。

概览

  • 一个 FastAPI 实例 同时承载 REST API 和 WebSocket 网关
  • Agent 调用 通过 /api/agent/invoke 发起,返回 SSE 流
  • 节点工具调用 通过 WebSocket 网关以 JSON-RPC 2.0(MCP 协议)格式路由
  • 每个用户拥有独立的 云端工作空间(Supabase Storage)和 会话状态(PostgreSQL Checkpointer)

组件与数据流

API Layer(路由层)

FastAPI 应用提供以下路由模块:
路由说明协议
/api/agent/invokeAgent 对话调用HTTP POST → SSE Stream
/api/agent/resumeHITL 恢复执行HTTP POST → SSE Stream
/api/tools/*工具管理 CRUDHTTP REST
/api/skill/*技能管理HTTP REST
/api/knowledge-base/*知识库管理HTTP REST
/api/node/*节点设备查询HTTP REST
/api/scheduled-task/*定时任务管理HTTP REST
/ws/extension浏览器扩展 WebSocketWebSocket
/ws/desktop桌面应用 WebSocketWebSocket
/ws/webWeb 客户端 WebSocketWebSocket
/api/feishu/*飞书渠道 WebhookHTTP REST
所有 REST 接口通过 JWT 认证(verify_jwt_token),WebSocket 通过 query parameter 传递 client_id / user_id

WebSocket Gateway(网关)

网关通过 ConnectionManager 管理所有 WebSocket 连接,支持三种节点类型:
  • Extension / Desktop 节点:每个节点通过 node_id 唯一标识,支持一个用户多个节点
  • Web 客户端:按 user_id 管理,同一用户可有多个 Web 连接
  • 工具调用:Agent 通过 call_tool() 发起 JSON-RPC 请求,Gateway 将请求路由到对应节点并等待响应(Future-based)

Agent Service(Agent 服务)

Agent 服务是核心编排层,基于 DeepAgents 框架(LangGraph 上层封装): Service 层分工:
服务文件职责
BaseServiceservices/base.py上下文初始化、工具装配、提示词组装、SSE 事件流
AgentServiceservices/agent.pyAgent 模式的 invoke/resume 入口
RAGServiceservices/rag.py知识库检索(向量 + BM25 混合搜索)
DocumentServiceservices/document.py文档处理与切块
FeishuServiceservices/feishu.py飞书渠道集成
SchedulerServiceservices/scheduler.py定时任务调度

Tool System(工具体系)

Zeus 的工具分为四层:
层级来源注册方式执行位置
Built-inutils/tools/built_in/代码内直接注册AI Backend 本地
MCP前端配置传入langchain_mcp_adaptersMCP Server(远程)
OAuth前端配置传入动态构建 LangChain ToolAI Backend → OAuth API
ConnectorWebSocket 节点上报SessionManager 绑定远程节点(Extension/Desktop)
Connector Tools 的调用链路:Agent → ToolRouter → Gateway → WebSocket → Node → 执行 → 原路返回。

Node Management(节点管理)

节点管理由三个组件协同完成:
组件说明
NodeManager节点注册/注销、心跳 TTL(60s)、周期性清理(30s)
SessionManager将 Session 绑定到具体节点,支持 preferred_node_id 指定
ToolRouter根据 Session 绑定决定工具调用路由到哪个节点
每个用户最多 10 个节点,超时未心跳的节点自动标记离线并注销。

Storage & State(存储与状态)

存储技术用途
CheckpointerPostgreSQL(PostgresSaver会话级状态持久化,支持 HITL 恢复
WorkspaceSupabase Storage用户文件(产出、上传、沙盒结果)
MemoryPostgreSQL + pgvector长期记忆(向量 + 元数据),三级作用域
Knowledge BasePostgreSQL + pgvector + BM25RAG 文档存储与混合检索
CacheRedis工作空间缓存(5min)、记忆缓存(10min)

连接生命周期

WebSocket 节点连接

Agent 调用流程

如果启用了 HITL(Human-in-the-Loop),Agent 在执行敏感工具前会暂停,发送 InterruptMessage,前端展示审批 UI,用户确认后调用 /api/agent/resume 继续执行。

通信协议

HTTP SSE(Agent 响应流)

Agent 调用返回 text/event-stream,SSE 事件类型:
事件类型data 字段说明
text{type, content}流式文本 token
tool_call{type, tool_name, tool_args, tool_call_id}Agent 发起工具调用
tool_call_result{type, tool_name, result, ...}工具执行结果
interrupt{type, tool_calls, ...}HITL 中断,等待用户审批
complete{type, finish_reason}流结束
error{type, error}错误信息

WebSocket JSON-RPC 2.0(节点工具调用)

节点工具调用遵循 MCP(Model Context Protocol) 规范: 请求:
{
  "jsonrpc": "2.0",
  "id": "uuid-string",
  "method": "tools/call",
  "params": {
    "name": "browser_click",
    "arguments": { "selector": "#submit-btn" },
    "session_id": "session_abc123"
  }
}
响应(成功):
{
  "jsonrpc": "2.0",
  "id": "uuid-string",
  "result": {
    "content": [
      { "type": "text", "text": "Clicked element successfully" },
      { "type": "image", "data": "base64...", "mimeType": "image/jpeg" }
    ],
    "isError": false
  }
}
响应(错误):
{
  "jsonrpc": "2.0",
  "id": "uuid-string",
  "error": {
    "code": -32603,
    "message": "Element not found"
  }
}

WebSocket 消息类型(非 JSON-RPC)

方向type说明
Node → Gatewayregister节点注册(capabilities, tools)
Gateway → Noderegistered注册确认
Node → Gatewayheartbeat心跳上报(status, current_tasks)
Gateway → Nodeheartbeat_ack心跳确认
Node ↔ Gatewayping / pongKeepalive
Web → Gatewayget_workflows请求工作流列表(转发至 Extension)
Web → Gatewayexecute_workflow执行工作流
Node → Gatewaytask_complete工作流执行完成

启动与生命周期

启动顺序

关键环境变量

变量说明
DATABASE_URLPostgreSQL 连接串(Checkpointer + pgvector)
SUPABASE_URL / SUPABASE_SERVICE_KEYSupabase Storage 配置
NEXTJS_API_URLNext.js 后端 API(用户数据、配置)
OPENAI_API_KEY / OPENAI_BASE_URL默认 LLM 配置
REDIS_URLRedis 缓存(可选)
LANGCHAIN_API_KEYLangSmith 追踪(可选)

健康检查

  • GET /health{"status": "ok"}
  • GET /{"name": "Zeus Backend API", "version": "1.0.0", "status": "running"}

系统不变量

  • JWT 认证:所有 REST API 必须携带有效 JWT Token
  • Session 隔离:每个 session_id 对应独立的 Checkpointer 状态,不同 Session 互不干扰
  • 节点心跳:超过 60 秒未心跳的节点自动标记离线;Gateway 在节点断连时立即注销
  • 工具调用超时:WebSocket 工具调用默认 60 秒超时,工作流执行 300 秒超时
  • SSE 不重放:Agent 调用的 SSE 流是一次性的,断连后需通过 Checkpointer 恢复上下文
  • 单实例 Gateway:当前 ConnectionManager 为进程内单例,WebSocket 连接不跨进程共享

目录结构

apps/ai-backend/src/
├── api/                          # FastAPI 路由层
│   ├── main.py                   # 入口 & 生命周期
│   ├── gateway.py                # WebSocket 网关
│   ├── agent.py                  # Agent API
│   ├── tools.py                  # 工具管理 API
│   ├── skill.py                  # 技能管理 API
│   ├── knowledge_base.py         # 知识库 API
│   ├── node.py                   # 节点查询 API
│   ├── scheduled_task.py         # 定时任务 API
│   └── channels/                 # 渠道(飞书)
├── services/                     # 业务逻辑层
│   ├── base.py                   # BaseService (Agent 核心)
│   ├── agent.py                  # AgentService
│   ├── rag.py                    # RAG 检索
│   ├── document.py               # 文档处理
│   ├── feishu.py                 # 飞书服务
│   └── scheduler.py              # 定时任务
├── repository/                   # 数据模型 & 提示词
│   ├── models/                   # Pydantic Models
│   ├── prompts/                  # System Prompts (.md)
│   └── skills/                   # 技能定义
└── utils/                        # 工具层
    ├── core/                     # LLM, Memory, Skills, HITL
    ├── infra/                    # Backend, Checkpoint, Node, Redis, Auth
    ├── tools/                    # 工具实现
    │   ├── built_in/             # 内置工具
    │   └── oauth/                # OAuth 工具
    ├── knowledge_base/           # 向量存储 & 切块
    └── channels/                 # 渠道集成