# 智队中枢 > PIT 网关路由应用 - Personal Intelligent Team Gateway Router Service **中文名**:智队中枢 **英文名**:PIT Router **当前版本**:v0.7.2 --- ## 更新日志 ### v0.7.2 (2026-03-15) - ✅ 修复暗黑主题切换不生效的问题 - ✅ 修复暗黑主题切换时的页面闪烁问题 - ✅ 完善 Web UI 细节(会话详情页、频道编辑页、错误页面) - ✅ 优化自动部署脚本(备份策略、健康检查) ### v0.7.1 (2026-03-15) - ✅ 登录页面,支持外网访问 Web UI - ✅ 添加自动部署脚本 - ✅ 添加 Webhook 服务 - ✅ 清理 venv 目录 --- ## 项目概述 **智队中枢**(原 PIT Router)是 PIT(Personal Intelligent Team)系统的核心组件,负责连接用户交互层和 Agent 层,实现消息路由、会话管理、Agent 调度等功能。 ### 核心目标 - 提供统一的 WebSocket 接入点 - 实现多 Agent 智能路由 - 管理会话生命周期 - 支持 Agent 负载均衡 - 保证消息可靠传输 --- ## 技术栈 | 层级 | 技术 | 版本 | 说明 | |------|------|------|------| | 语言 | Python | 3.12 | 高性能异步支持 | | Web框架 | Flask | 3.0+ | 轻量级、灵活 | | WebSocket | Flask-SocketIO | 5.3+ | 实时双向通信 | | ORM | SQLAlchemy | 3.1+ | 数据库抽象层 | | 认证 | Flask-Login + JWT | - | 用户身份验证 | | 缓存 | Redis | 7+ | 会话缓存、消息队列 | | 数据库 | PostgreSQL | 15+ | 生产环境持久化 | | 部署 | Gunicorn + Nginx | - | 生产环境部署 | --- ## 系统架构 ``` 用户交互层 (Clients) ↓ WebSocket / HTTP 智队中枢 (PIT Router) ├── 接入层 (Access Layer) │ ├── HTTP Server (Flask) │ ├── WebSocket (SocketIO) │ ├── Auth Middleware (JWT) │ └── Rate Limiter ├── 业务层 (Business Layer) │ ├── Session Manager │ ├── Message Router │ ├── Agent Scheduler (加权轮询) │ └── Message Queue (ACK机制) └── 数据层 (Data Layer) ├── PostgreSQL (持久化) ├── Redis (缓存/消息队列) └── Config Store ↓ WebSocket + PIT Channel 协议 Agent 层 (OpenClaw Gateways) ``` --- ## 项目结构 ``` pit-router/ ├── app/ │ ├── __init__.py # Flask 应用工厂 │ ├── config.py # 配置管理 │ ├── extensions.py # Flask 扩展初始化 │ ├── models/ # 数据模型层 │ │ ├── __init__.py │ │ ├── user.py # 用户模型 │ │ ├── session.py # 会话模型 │ │ ├── agent.py # Agent 模型 │ │ ├── gateway.py # Gateway 模型 │ │ ├── message.py # 消息模型 │ │ └── connection.py # 连接模型 │ ├── routes/ # HTTP 路由层 │ │ ├── __init__.py │ │ ├── auth.py # 认证路由 │ │ ├── sessions.py # 会话路由 │ │ ├── agents.py # Agent 路由 │ │ ├── gateways.py # Gateway 路由 │ │ ├── messages.py # 消息路由 │ │ └── stats.py # 统计接口 │ ├── socketio/ # WebSocket 处理层 │ │ ├── __init__.py │ │ ├── handlers.py # Socket.IO 事件处理 │ │ ├── events.py # 事件定义 │ │ └── auth.py # WebSocket 认证 │ ├── services/ # 业务逻辑层 │ │ ├── __init__.py │ │ ├── session_service.py # 会话服务 │ │ ├── agent_service.py # Agent 服务 │ │ ├── message_service.py # 消息服务 │ │ ├── scheduler.py # Agent 调度器 │ │ └── message_queue.py # 消息队列管理 │ ├── utils/ # 工具函数 │ │ ├── __init__.py │ │ ├── validators.py # 输入验证 │ │ ├── security.py # 安全工具 │ │ └── helpers.py # 辅助函数 │ └── extensions.py # Flask 扩展初始化 ├── migrations/ # 数据库迁移 ├── tests/ # 测试 │ ├── __init__.py │ ├── test_auth.py │ ├── test_sessions.py │ ├── test_messages.py │ └── test_websocket.py ├── requirements.txt # Python 依赖 ├── requirements-dev.txt # 开发依赖 ├── config.yaml # 配置文件模板 ├── config.example.yaml # 配置示例 ├── docker-compose.yaml # Docker 编排 ├── Dockerfile # Docker 镜像 └── run.py # 启动入口 ``` --- ## 数据模型 ### 用户模型 (User) | 字段 | 类型 | 说明 | |------|------|------| | id | String(36) | 主键 UUID | | username | String(80) | 用户名,唯一索引 | | password_hash | String(256) | 密码哈希 (bcrypt) | | email | String(120) | 邮箱,唯一 | | nickname | String(80) | 昵称 | | role | String(20) | 角色:admin/user | | status | String(20) | 状态:active/disabled | | created_at | DateTime | 创建时间 | | last_login_at | DateTime | 最后登录时间 | ### 会话模型 (Session) | 字段 | 类型 | 说明 | |------|------|------| | id | String(36) | 主键 UUID | | user_id | String(36) | 用户 ID,外键 | | primary_agent_id | String(36) | 主 Agent ID | | participating_agent_ids | JSON | 参与 Agent ID 列表 | | user_socket_id | String(100) | 用户 WebSocket Socket ID | | title | String(200) | 会话标题 | | channel_type | String(20) | 渠道类型:web/desktop/mobile | | status | String(20) | 状态:active/paused/closed | | message_count | Integer | 消息计数 | | unread_count | Integer | 未读消息数 | | created_at | DateTime | 创建时间 | | updated_at | DateTime | 更新时间 | | last_active_at | DateTime | 最后活跃时间 | ### Agent 模型 | 字段 | 类型 | 说明 | |------|------|------| | id | String(36) | 主键 UUID | | name | String(80) | Agent 名称 | | display_name | String(80) | 显示名称 | | gateway_id | String(36) | Gateway ID,外键 | | socket_id | String(100) | WebSocket Socket ID | | model | String(80) | 使用的模型 | | capabilities | JSON | 能力列表 | | status | String(20) | 状态:online/offline/busy | | priority | Integer | 优先级 (1-10) | | weight | Integer | 权重 (用于调度) | | connection_limit | Integer | 最大连接数 | | current_sessions | Integer | 当前会话数 | | last_heartbeat | DateTime | 最后心跳时间 | | created_at | DateTime | 创建时间 | ### Gateway 模型 | 字段 | 类型 | 说明 | |------|------|------| | id | String(36) | 主键 UUID | | name | String(80) | Gateway 名称,唯一 | | url | String(256) | Gateway 回调地址 | | token_hash | String(256) | 认证 Token 哈希 | | status | String(20) | 状态:online/offline | | agent_count | Integer | Agent 数量 | | connection_limit | Integer | 最大连接数 | | heartbeat_interval | Integer | 心跳间隔 (秒) | | allowed_ips | JSON | 允许连接的 IP 白名单 | | last_heartbeat | DateTime | 最后心跳时间 | | created_at | DateTime | 创建时间 | ### 消息模型 (Message) ⭐ 新增 | 字段 | 类型 | 说明 | |------|------|------| | id | String(36) | 主键 UUID | | session_id | String(36) | 会话 ID,外键 | | sender_type | String(20) | 发送者类型:user/agent/system | | sender_id | String(36) | 发送者 ID | | message_type | String(20) | 消息类型:text/media/system | | content | Text | 消息内容 | | content_type | String(20) | 内容类型:markdown/plain/html | | reply_to | String(36) | 回复的消息 ID | | status | String(20) | 状态:sent/delivered/read/failed | | ack_status | String(20) | 确认状态:pending/acknowledged | | retry_count | Integer | 重试次数 | | created_at | DateTime | 创建时间 | | delivered_at | DateTime | 送达时间 | ### 连接模型 (Connection) ⭐ 新增 | 字段 | 类型 | 说明 | |------|------|------| | id | String(36) | 主键 UUID | | socket_id | String(100) | Socket.IO Socket ID | | connection_type | String(20) | 连接类型:user/agent/gateway | | entity_id | String(36) | 关联实体 ID | | entity_type | String(20) | 实体类型:user/agent | | ip_address | String(45) | IP 地址 (支持 IPv6) | | user_agent | String(500) | 客户端信息 | | status | String(20) | 状态:connected/disconnected | | auth_token | String(500) | 认证 Token | | connected_at | DateTime | 连接时间 | | last_activity | DateTime | 最后活动时间 | | disconnected_at | DateTime | 断开时间 | --- ## HTTP API 设计 ### 认证 API ``` POST /api/auth/register - 注册新用户 POST /api/auth/login - 用户登录,返回 JWT Token POST /api/auth/logout - 用户登出 POST /api/auth/refresh - 刷新 Token GET /api/auth/me - 获取当前用户信息 POST /api/auth/verify - 验证 Token 有效性 ``` ### 会话 API ``` GET /api/sessions - 获取会话列表 POST /api/sessions - 创建会话 GET /api/sessions/:id - 获取会话详情 PUT /api/sessions/:id/close - 关闭会话 (替代 DELETE) GET /api/sessions/:id/messages - 获取会话消息 GET /api/sessions/:id/participants - 获取会话参与者 ⭐ 新增 POST /api/sessions/:id/transfer - 会话转接 ⭐ 新增 ``` ### Agent API ``` GET /api/agents - 获取 Agent 列表 GET /api/agents/:id - 获取 Agent 详情 GET /api/agents/:id/status - 获取 Agent 实时状态 PUT /api/agents/:id/config - 更新 Agent 配置 POST /api/agents/:id/heartbeat - Agent 心跳上报 ⭐ 新增 GET /api/agents/available - 获取可用 Agent 列表 ⭐ 新增 ``` ### Gateway API ``` GET /api/gateways - 获取 Gateway 列表 POST /api/gateways - 注册 Gateway DELETE /api/gateways/:id - 注销 Gateway GET /api/gateways/:id/status - 获取 Gateway 状态 POST /api/gateways/:id/heartbeat - Gateway 心跳上报 ⭐ 新增 PUT /api/gateways/:id/agents - 更新 Gateway 的 Agent 列表 ⭐ 新增 ``` ### 消息 API ⭐ 新增 ``` POST /api/messages - 发送消息 (HTTP 方式) GET /api/messages/:id - 获取单条消息 PUT /api/messages/:id/ack - 确认消息已送达 PUT /api/messages/:id/read - 标记消息已读 ``` ### 统计 API ⭐ 新增 ``` GET /api/stats - 系统统计信息 GET /api/stats/sessions - 会话统计 GET /api/stats/messages - 消息统计 GET /api/stats/agents - Agent 统计 ``` --- ## WebSocket 协议 ### 连接流程 ``` 1. 客户端连接 WebSocket 2. 服务端发送 connect 事件要求认证 3. 客户端发送 auth 事件携带 JWT Token 4. 服务端验证 Token,发送 authenticated 或 error 5. 认证成功后,可以进行其他操作 ``` ### 认证事件 | 事件 | 方向 | 说明 | 参数 | |------|------|------|------| | `connect` | C→S | 连接请求 | - | | `auth` | C→S | 认证请求 | `{ token: string }` | | `authenticated` | S→C | 认证成功 | `{ user_id: string, socket_id: string }` | | `auth_error` | S→C | 认证失败 | `{ code: string, message: string }` | ### 心跳事件 ⭐ 新增 | 事件 | 方向 | 说明 | 参数 | |------|------|------|------| | `ping` | C→S | 心跳请求 | `{ timestamp: number }` | | `pong` | S→C | 心跳响应 | `{ timestamp: number }` | | `heartbeat_timeout` | S→C | 心跳超时 | `{ socket_id: string }` | ### 会话事件 | 事件 | 方向 | 说明 | 参数 | |------|------|------|------| | `session.create` | C→S | 创建会话 | `{ title, user_id, agent_id?, priority? }` | | `session.created` | S→C | 会话已创建 | `{ session_id, agent_id }` | | `session.join` | C→S | 加入会话 | `{ session_id }` | | `session.joined` | S→C | 已加入会话 | `{ session_id, participants: [] }` | | `session.leave` | C→S | 离开会话 ⭐ 新增 | `{ session_id }` | | `session.left` | S→C | 已离开会话 ⭐ 新增 | `{ session_id, user_id }` | | `session.closed` | S→C | 会话被关闭 ⭐ 新增 | `{ session_id, reason }` | | `session.assigned` | S→C | Agent 分配通知 ⭐ 新增 | `{ session_id, agent_id }` | ### 消息事件 | 事件 | 方向 | 说明 | 参数 | |------|------|------|------| | `message.send` | C→S | 发送消息 | `{ session_id, content, type, reply_to? }` | | `message` | S→C | 收到消息 | `{ message_id, session_id, sender, content, timestamp }` | | `message.ack` | C→S | 消息确认 ⭐ 新增 | `{ message_id, status: delivered/read }` | | `message.acked` | S→C | 确认已收到 ⭐ 新增 | `{ message_id, status }` | | `message.stream` | S→C | 流式消息 | `{ session_id, chunk, is_end }` | | `message.read` | C→S | 消息已读 ⭐ 新增 | `{ session_id, message_ids: [] }` | | `typing` | C→S | 正在输入 ⭐ 新增 | `{ session_id, is_typing: boolean }` | ### 错误事件 | 事件 | 方向 | 说明 | 参数 | |------|------|------|------| | `error` | S→C | 通用错误 | `{ code, message, details? }` | | `session_error` | S→C | 会话错误 | `{ session_id, code, message }` | | `message_error` | S→C | 消息错误 | `{ message_id, code, message }` | ### 消息格式 (与 PIT Channel 兼容) ⭐ 新增 ```json { "type": "request" | "response" | "event" | "ack", "id": "uuid-string", "method": "send.message" | "send.media" | "ping" | "auth", "params": { "to": "user-id", "content": "message content", "timestamp": 1234567890, "replyTo": "message-id" }, "replyTo": "original-message-id", "error": { "code": "ERROR_CODE", "message": "Error description" } } ``` --- ## Agent 调度策略 ⭐ 新增 ### 调度算法 ```python class AgentScheduler: STRATEGIES = { "round_robin": RoundRobinStrategy, # 轮询 "weighted_round_robin": WeightedRoundRobinStrategy, # 加权轮询 "least_connections": LeastConnectionsStrategy, # 最少连接 "least_response_time": LeastResponseTimeStrategy, # 最快响应 "capability_match": CapabilityMatchStrategy, # 能力匹配 } ``` ### 调度配置 ```yaml scheduler: strategy: "weighted_round_robin" # 调度策略 retry_attempts: 3 # 分配失败重试次数 timeout: 30 # 分配超时 (秒) # 权重配置 weights: priority: 0.4 # 优先级权重 (40%) load: 0.3 # 负载权重 (30%) capability: 0.3 # 能力匹配权重 (30%) ``` --- ## 消息可靠性机制 ⭐ 新增 ### ACK 确认流程 ``` 1. 客户端发送 message.send 2. 服务端存储消息,状态=pending 3. 服务端转发消息给 Agent 4. Agent 收到后发送 message.ack 5. 服务端更新消息状态=delivered 6. 用户阅读后发送 message.read 7. 服务端更新消息状态=read ``` ### 消息重试机制 | 场景 | 策略 | 最大重试 | |------|------|----------| | Agent 离线 | 存入队列,Agent 上线后推送 | 无限 | | 发送超时 | 指数退避重试 | 3 次 | | ACK 超时 | 重新发送 | 3 次 | ### 消息持久化 ```python # 消息存储策略 MESSAGE_RETENTION = { "active_sessions": "30_days", # 活跃会话保留 30 天 "closed_sessions": "7_days", # 关闭会话保留 7 天 "failed_messages": "3_days", # 失败消息保留 3 天 } ``` --- ## 安全设计 ### 认证机制 | 层级 | 机制 | 说明 | |------|------|------| | HTTP API | JWT Bearer Token | Authorization: Bearer | | WebSocket | JWT Query Param | ws://host/ws?token= | | Gateway | Token + IP 白名单 | 双重验证 | ### Token 策略 | Token 类型 | 有效期 | 用途 | |------------|--------|------| | Access Token | 24 小时 | API 访问 | | Refresh Token | 7 天 | 刷新 Access Token | | Gateway Token | 永不过期 (可撤销) | Gateway 认证 | ### 安全措施 | 措施 | 实现 | 说明 | |------|------|------| | HTTPS/WSS | Nginx 反向代理 | 强制 TLS 1.2+ | | CORS | Flask-CORS | 白名单配置 | | Rate Limit | Flask-Limiter | 100 req/min per IP | | Input Validation | Marshmallow/Pydantic | 严格参数校验 | | SQL Injection | SQLAlchemy ORM | 参数化查询 | | XSS | Jinja2 转义 | 自动转义输出 | | IP 白名单 | Middleware | Gateway 连接限制 | | 连接数限制 | Socket.IO | 单用户最大连接数 | --- ## 部署架构 ### Docker Compose ⭐ 完善 ```yaml version: '3.8' services: pit-router: build: . ports: - "9000:9000" environment: - FLASK_ENV=production - SECRET_KEY=${SECRET_KEY} - JWT_SECRET_KEY=${JWT_SECRET_KEY} - DATABASE_URL=postgresql://user:pass@postgres:5432/pit - REDIS_URL=redis://redis:6379/0 volumes: - pit-data:/app/data - pit-logs:/app/logs depends_on: - postgres - redis restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/health"] interval: 30s timeout: 10s retries: 3 start_period: 40s deploy: resources: limits: cpus: '2.0' memory: 2G reservations: cpus: '0.5' memory: 512M postgres: image: postgres:15-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=${DB_PASSWORD} - POSTGRES_DB=pit_router volumes: - postgres-data:/var/lib/postgresql/data restart: unless-stopped healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres -d pit_router"] interval: 10s timeout: 5s retries: 5 redis: image: redis:7-alpine volumes: - redis-data:/data restart: unless-stopped healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 5s retries: 5 nginx: image: nginx:alpine ports: - "80:80" - "443:443" volumes: - ./nginx.conf:/etc/nginx/nginx.conf:ro - ./ssl:/etc/nginx/ssl:ro depends_on: - pit-router restart: unless-stopped volumes: pit-data: pit-logs: postgres-data: redis-data: ``` --- ## 开发计划 | 阶段 | 状态 | 内容 | |------|------|------| | **Phase 1** | ✅ 完成 | 核心功能(数据模型、HTTP API、WebSocket) | | **Phase 2** | ✅ 完成 | 服务层(调度器、消息队列、会话/消息/Agent 服务) | | **Phase 3** | ✅ 完成 | 工具层(验证器、安全工具、辅助函数) | | **Phase 4** | ⚠️ 部分 | 测试部署(单元测试 80%,集成/性能/部署未完成) | | **Phase 5** | ⏳ 待开始 | 运维工具(Nginx + SSL、日志、监控) | ### 详细进度 - ✅ 数据模型(6个)- 100% - ✅ HTTP API(6组)- 100% - ✅ WebSocket 事件 - 100% - ✅ 调度器(5种策略)- 100% - ✅ 消息队列(ACK)- 100% - ✅ 业务服务层 - 100% - ✅ 工具函数 - 100% - ✅ Docker 配置 - 100% - ⚠️ 单元测试 - 24/31 通过 (80%) - ⏳ 集成测试 - 待完成 - ⏳ 性能测试 - 待完成 - ⏳ Nginx + SSL - 待完成 - ⏳ 日志系统 - 待完成 - ⏳ 监控告警 - 待完成 **总进度**: 约 85% --- ## 与 PIT Channel 的对接 ### 协议兼容性 | PIT Channel 发送 | 智队中枢处理 | |------------------|----------------| | `type: "request"` | 路由到对应处理器 | | `method: "send.message"` | 转发给目标 Agent | | `method: "ping"` | 返回 `pong` | | `type: "ack"` | 更新消息状态 | ### 配置示例 ```json // openclaw.json { "plugins": { "entries": { "pit-bot": { "enabled": true, "config": { "routerUrl": "wss://智队中枢.example.com/ws", "authToken": "${PIT_ROUTER_TOKEN}", "heartbeatInterval": 30000, "heartbeatTimeout": 10000, "ackTimeout": 30000 } } } } } ``` --- ## 自动部署 🔄 > Gitea 仓库提交后自动更新 Docker 容器 ### 方案对比 | 方案 | 复杂度 | 说明 | |------|--------|------| | **方案 1: Webhook + 脚本** | ⭐ 简单 | 推荐,实现快、依赖少 | | **方案 2: Gitea Actions** | ⭐⭐⭐ 中等 | 功能强大,需要配置 Runner | | **方案 3: Watchtower** | ⭐⭐ 简单 | 自动监控镜像更新 | ### 推荐方案:Webhook + 脚本 #### 架构流程 ``` 开发者推送代码 → Gitea 触发 Webhook → 更新脚本执行 → 拉取代码 → 构建镜像 → 重启容器 → 健康检查 ``` #### 1. 更新脚本 ```bash #!/bin/bash # /www/wwwroot/pit-router/auto-update.sh set -e cd /www/wwwroot/pit-router # 拉取最新代码 git fetch origin && git reset --hard origin/main # 重建并启动 docker-compose down docker-compose build --no-cache docker-compose up -d # 健康检查 sleep 10 curl -sf http://localhost:1999/health && echo "✅ 更新成功" ``` #### 2. Webhook 服务 ```python # /www/wwwroot/pit-router/webhook-server.py from flask import Flask, request, jsonify import hmac, hashlib, subprocess app = Flask(__name__) SECRET = 'your-webhook-secret' @app.route('/webhook/pit-router', methods=['POST']) def handle(): # 验证签名 sig = request.headers.get('X-Gitea-Signature', '') expected = 'sha256=' + hmac.new(SECRET.encode(), request.data, hashlib.sha256).hexdigest() if not hmac.compare_digest(sig, expected): return {'error': 'Invalid signature'}, 401 # 执行更新 result = subprocess.run(['/www/wwwroot/pit-router/auto-update.sh'], capture_output=True) return {'status': 'success' if result.returncode == 0 else 'error'} if __name__ == '__main__': app.run(host='0.0.0.0', port=5001) ``` #### 3. 配置 Gitea Webhook ```bash curl -X POST "http://localhost:3000/api/v1/repos/yunxiafei/pit-router/hooks" \ -H "Authorization: token $TOKEN" \ -H "Content-Type: application/json" \ -d '{ "type": "gitea", "config": { "url": "http://localhost:5001/webhook/pit-router", "content_type": "json", "secret": "your-webhook-secret" }, "events": ["push"], "active": true }' ``` ### 实施清单 - [ ] 创建 `auto-update.sh` 脚本 - [ ] 创建 `webhook-server.py` 服务 - [ ] 配置 systemd 服务 - [ ] 配置 Gitea Webhook - [ ] 测试自动更新 **预计时间**:30 分钟 --- ## 相关项目 - [PIT Channel 插件](http://1.14.58.157:3000/yunxiafei/PIT_Channel) - OpenClaw Channel 插件(智队中枢客户端) --- ## 许可证 MIT License --- *创建时间: 2026-03-14 | 更新: 2026-03-14 | 作者: 小白 🐶* # Test webhook