跳转到主要内容
Zeus AI Backend 通过 WebSocket 网关管理所有节点连接。三类客户端通过不同的 WebSocket 端点连接,使用 JSON-RPC 2.0 协议进行工具调用通信。

WebSocket 端点

端点客户端连接 URL
WS /ws/extension浏览器扩展wss://zeus-api.agentspro.cn/ws/extension?client_id={client_id}&node_id={node_id}
WS /ws/desktop桌面应用wss://zeus-api.agentspro.cn/ws/desktop?client_id={client_id}&node_id={node_id}
WS /ws/webWeb 客户端wss://zeus-api.agentspro.cn/ws/web?user_id={user_id}

连接参数

Browser Extension / Desktop App

client_id
string
必填
客户端 ID。格式为 user_{user_id}(浏览器扩展)或 desktop_{user_id}(桌面应用)。服务端会自动解析提取 user_id
node_id
string
节点 ID,唯一标识该设备。如果不提供,服务端会自动生成(ext_{user_id}desktop_{user_id})。

Web Client

user_id
string
必填
用户 ID,用于路由消息到正确的用户。

连接生命周期


消息类型

1. 节点注册 (register)

浏览器扩展和桌面应用连接后,应发送 register 消息注册节点信息: 客户端 → 服务端:
{
  "type": "register",
  "node_id": "node_abc123",
  "node_name": "Chrome Extension",
  "node_type": "extension",
  "os": "macOS",
  "os_version": "14.0",
  "app_version": "1.2.0",
  "capabilities": ["browser_control", "screenshot"],
  "available_tools": ["click", "type", "screenshot", "navigate"],
  "max_concurrent_tasks": 3
}
node_id
string
必填
节点唯一 ID
node_name
string
节点显示名称(默认 “Unknown Node”)
node_type
string
必填
节点类型:extensiondesktop
os
string
操作系统名称
os_version
string
操作系统版本
app_version
string
客户端应用版本
capabilities
string[]
节点能力列表,如 ["browser_control", "screenshot", "file_system"]
available_tools
string[]
可用工具列表,如 ["click", "type", "screenshot", "navigate"]
max_concurrent_tasks
integer
最大并发任务数(默认 3)
服务端 → 客户端:
{
  "type": "registered",
  "node_id": "node_abc123",
  "success": true
}

2. 心跳机制 (heartbeat)

节点定期发送心跳以维持连接状态。服务端会更新 NodeManager 中的节点状态。 客户端 → 服务端:
{
  "type": "heartbeat",
  "status": "online",
  "current_tasks": 0
}
status
string
节点状态:onlinebusyoffline(默认 online
current_tasks
integer
当前正在执行的任务数(默认 0)
服务端 → 客户端:
{
  "type": "heartbeat_ack"
}

3. Ping/Pong 保活

所有三种客户端(Extension、Desktop、Web)都支持 ping/pong 保活: 客户端 → 服务端:
{
  "type": "ping"
}
服务端 → 客户端:
{
  "type": "pong"
}
对于 Extension 和 Desktop 客户端,ping 消息会同时触发 NodeManager 心跳更新。

4. 工具调用(JSON-RPC 2.0)

服务端通过 WebSocket 向节点发送 JSON-RPC 2.0 请求来调用工具。这是 MCP(Model Context Protocol)格式。 服务端 → 客户端(请求):
{
  "jsonrpc": "2.0",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "method": "tools/call",
  "params": {
    "name": "screenshot",
    "arguments": { "full_page": true },
    "session_id": "sess_abc123"
  }
}
id
string
必填
请求 ID(UUID),用于匹配响应
method
string
必填
固定为 tools/call
params.name
string
必填
要调用的工具名称
params.arguments
object
必填
工具参数
params.session_id
string
关联的会话 ID(可选)
客户端 → 服务端(成功响应):
{
  "jsonrpc": "2.0",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "result": {
    "content": [
      {
        "type": "text",
        "text": "Screenshot taken successfully"
      },
      {
        "type": "image",
        "data": "iVBORw0KGgo...",
        "mimeType": "image/png"
      }
    ],
    "isError": false,
    "_screenshot": "base64_screenshot_data..."
  }
}
客户端 → 服务端(错误响应):
{
  "jsonrpc": "2.0",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "error": {
    "code": -32603,
    "message": "Tool execution failed: element not found"
  }
}

MCP Result 内容类型

result.content 数组中的每个项可以是:
type字段说明
texttext文本内容
imagedata, mimeTypeBase64 编码的图片数据
服务端解析响应时,会提取 textimage_screenshot 字段,并将 isError 映射为 success 状态。默认超时时间为 60 秒。

5. Legacy MCP 响应 (mcp_response)

此格式已废弃,仅为向后兼容保留。新客户端应使用 JSON-RPC 2.0 格式。
{
  "type": "mcp_response",
  "request_id": "req_001",
  "result": { "screenshot": "base64..." },
  "error": null
}

6. 状态更新 (status)

客户端可以发送状态更新消息:
{
  "type": "status",
  "status": "busy"
}

7. Workflow 执行

Web 客户端请求 Workflow 列表

Web 客户端 → 服务端:
{
  "type": "get_workflows"
}
服务端会将请求转发给浏览器扩展。扩展返回的 workflows_list 消息会自动转发给 Web 客户端。如果扩展未连接,返回:
{
  "type": "workflows_list",
  "success": false,
  "error": "Extension not connected"
}

Web 客户端执行 Workflow

Web 客户端 → 服务端:
{
  "type": "execute_workflow",
  "workflow_id": "wf_abc123",
  "variables": {
    "url": "https://example.com",
    "query": "search term"
  }
}
服务端 → Web 客户端(确认启动):
{
  "type": "workflow_started",
  "workflow_id": "wf_abc123"
}
服务端 → Web 客户端(执行完成): 当扩展发送 task_complete 消息后,服务端会通知 Web 客户端:
{
  "type": "workflow_complete",
  "task_id": "task_456",
  "success": true,
  "result": { "data": "..." },
  "output": "Workflow completed successfully"
}

扩展端 Workflow 完成通知

Extension → 服务端:
{
  "type": "task_complete",
  "task_id": "task_456",
  "result": { "data": "..." },
  "output": "Workflow completed"
}
Workflow 执行的默认超时时间为 300 秒(5 分钟)。

错误处理

连接拒绝

当缺少必要参数时,服务端会关闭连接:
错误码原因
4001缺少 client_id(Extension/Desktop)或 user_id(Web)

断线处理

当 WebSocket 连接断开时,服务端会自动:
  1. 从 ConnectionManager 中移除连接
  2. 从 NodeManager 中注销节点(Extension/Desktop)
  3. 清理所有挂起的请求

ConnectionManager API

ConnectionManager 是 WebSocket 网关的核心单例,管理所有连接的生命周期。

连接管理

方法说明
connect_extension(user_id, node_id, ws, register_request?)注册浏览器扩展连接,可选传入注册信息
connect_desktop(user_id, node_id, ws, register_request?)注册桌面应用连接,可选传入注册信息
connect_web(user_id, ws)注册 Web 客户端连接
disconnect_extension(user_id, node_id)断开扩展连接并注销节点
disconnect_desktop(user_id, node_id)断开桌面连接并注销节点
disconnect_web(user_id, ws)断开 Web 客户端连接

连接查询

方法说明
is_extension_connected(user_id, node_id?)检查扩展连接状态。不传 node_id 则检查该用户是否有任意扩展连接
is_desktop_connected(user_id, node_id?)检查桌面应用连接状态
get_extension_node_ids(user_id)获取用户所有 Extension 节点 ID 列表
get_desktop_node_ids(user_id)获取用户所有 Desktop 节点 ID 列表
get_websocket(user_id, node_id, node_type)获取指定节点的 WebSocket 连接对象
get_stats()获取连接统计信息

消息发送

方法说明
send_to_extension(user_id, message, node_id?)发送消息到浏览器扩展。指定 node_id 发送到特定节点,否则发送到第一个可用节点
send_to_desktop(user_id, message, node_id?)发送消息到桌面应用。逻辑同上
send_to_web(user_id, message)发送消息到该用户的所有 Web 客户端连接

工具调用

方法说明
call_extension_tool(user_id, tool_name, tool_args, session_id?, node_id?, timeout=60)调用浏览器扩展工具并等待响应
call_desktop_tool(user_id, tool_name, tool_args, session_id?, node_id?, timeout=60)调用桌面应用工具并等待响应
call_tool(user_id, node_id, node_type, tool_name, tool_args, session_id?, timeout=60)统一工具调用方法,根据 node_type 自动路由
execute_workflow(user_id, workflow_id, variables?, timeout=300)在浏览器扩展上执行 Workflow
resolve_request(request_id, result)解析挂起的请求(由消息处理循环调用)

连接统计 (get_stats)

{
  "extension_connections": 3,
  "desktop_connections": 1,
  "web_connections": 5,
  "pending_requests": 2
}

数据结构

多节点连接模型

ConnectionManager 使用嵌套字典管理连接,支持每个用户拥有多个节点:
extension_connections: { user_id -> { node_id -> WebSocket } }
desktop_connections:   { user_id -> { node_id -> WebSocket } }
web_connections:       { user_id -> Set[WebSocket] }
pending_requests:      { request_id -> asyncio.Future }
  • Extension / Desktop:每个用户可连接多个节点(多设备),通过 node_id 区分
  • Web:每个用户可有多个 Web 客户端(多标签页),使用 Set 存储

NodeType 枚举

说明
extension浏览器扩展节点
desktop桌面应用节点

NodeStatus 枚举

说明
online节点在线,可接受任务
busy节点忙碌,正在执行任务
offline节点离线

连接示例

JavaScript (Browser Extension)

const ws = new WebSocket(
  'wss://zeus-api.agentspro.cn/ws/extension?client_id=user_abc123&node_id=ext_001'
);

ws.onopen = () => {
  // 注册节点信息
  ws.send(JSON.stringify({
    type: 'register',
    node_id: 'ext_001',
    node_name: 'Chrome Extension',
    node_type: 'extension',
    os: navigator.platform,
    app_version: '1.0.0',
    capabilities: ['browser_control', 'screenshot'],
    available_tools: ['click', 'type', 'screenshot', 'navigate']
  }));

  // 启动心跳
  setInterval(() => {
    ws.send(JSON.stringify({ type: 'heartbeat', status: 'online' }));
  }, 30000);
};

ws.onmessage = (event) => {
  const message = JSON.parse(event.data);

  // 处理 JSON-RPC 工具调用
  if (message.jsonrpc === '2.0' && message.method === 'tools/call') {
    const { id, params } = message;
    
    // 执行工具...
    executeToolCall(params.name, params.arguments).then(result => {
      ws.send(JSON.stringify({
        jsonrpc: '2.0',
        id: id,
        result: {
          content: [{ type: 'text', text: 'Tool executed successfully' }],
          isError: false
        }
      }));
    }).catch(error => {
      ws.send(JSON.stringify({
        jsonrpc: '2.0',
        id: id,
        error: { code: -32603, message: error.message }
      }));
    });
  }
};

Python (Desktop App)

import asyncio
import websockets
import json

async def connect_desktop():
    uri = "wss://zeus-api.agentspro.cn/ws/desktop?client_id=desktop_abc123&node_id=desk_001"
    
    async with websockets.connect(uri) as ws:
        # 注册节点
        await ws.send(json.dumps({
            "type": "register",
            "node_id": "desk_001",
            "node_name": "My Desktop",
            "node_type": "desktop",
            "os": "macOS",
            "os_version": "14.0",
            "app_version": "1.0.0",
            "capabilities": ["file_system", "screen_capture"],
            "available_tools": ["read_file", "write_file", "screenshot"]
        }))
        
        async for message in ws:
            data = json.loads(message)
            
            if data.get("type") == "heartbeat_ack":
                continue
            
            # 处理 JSON-RPC 工具调用
            if data.get("jsonrpc") == "2.0" and "method" in data:
                request_id = data["id"]
                tool_name = data["params"]["name"]
                tool_args = data["params"]["arguments"]
                
                try:
                    result = await execute_tool(tool_name, tool_args)
                    await ws.send(json.dumps({
                        "jsonrpc": "2.0",
                        "id": request_id,
                        "result": {
                            "content": [{"type": "text", "text": str(result)}],
                            "isError": False
                        }
                    }))
                except Exception as e:
                    await ws.send(json.dumps({
                        "jsonrpc": "2.0",
                        "id": request_id,
                        "error": {"code": -32603, "message": str(e)}
                    }))