e8dec248f7a9c487a0d77e8589c85b340a0d0989
智队中枢
PIT 网关路由应用 - Personal Intelligent Team Gateway Router Service
中文名:智队中枢
英文名:PIT Router
当前版本:v0.7.2
更新日志
v0.7.2 (2026-03-15)
- ✅ 修复暗黑主题切换不生效的问题
- ✅ 修复暗黑主题切换时的页面闪烁问题
- ✅ 完善 Web UI 细节(会话详情页、频道编辑页、错误页面)
- ✅ 优化自动部署脚本(备份策略、健康检查)
v0.7.1 (2026-03-15)
- ✅ 登录页面,支持外网访问 Web UI
- ✅ 添加自动部署脚本
- ✅ 添加 Webhook 服务
- ✅ 清理 venv 目录
项目概述
智队中枢(原 PIT Router)是 PIT(Personal Intelligent Team)系统的核心组件,负责连接用户交互层和 Agent 层,实现消息路由、会话管理、Agent 调度等功能。
核心目标
- 提供统一的 WebSocket 接入点
- 实现多 Agent 智能路由
- 管理会话生命周期
- 支持 Agent 负载均衡
- 保证消息可靠传输
技术栈
| 层级 | 技术 | 版本 | 说明 |
|---|---|---|---|
| 语言 | Python | 3.12 | 高性能异步支持 |
| Web框架 | Flask | 3.0+ | 轻量级、灵活 |
| WebSocket | Flask-SocketIO | 5.3+ | 实时双向通信 |
| ORM | SQLAlchemy | 3.1+ | 数据库抽象层 |
| 认证 | Flask-Login + JWT | - | 用户身份验证 |
| 缓存 | Redis | 7+ | 会话缓存、消息队列 |
| 数据库 | PostgreSQL | 15+ | 生产环境持久化 |
| 部署 | Gunicorn + Nginx | - | 生产环境部署 |
系统架构
用户交互层 (Clients)
↓ WebSocket / HTTP
智队中枢 (PIT Router)
├── 接入层 (Access Layer)
│ ├── HTTP Server (Flask)
│ ├── WebSocket (SocketIO)
│ ├── Auth Middleware (JWT)
│ └── Rate Limiter
├── 业务层 (Business Layer)
│ ├── Session Manager
│ ├── Message Router
│ ├── Agent Scheduler (加权轮询)
│ └── Message Queue (ACK机制)
└── 数据层 (Data Layer)
├── PostgreSQL (持久化)
├── Redis (缓存/消息队列)
└── Config Store
↓ WebSocket + PIT Channel 协议
Agent 层 (OpenClaw Gateways)
项目结构
pit-router/
├── app/
│ ├── __init__.py # Flask 应用工厂
│ ├── config.py # 配置管理
│ ├── extensions.py # Flask 扩展初始化
│ ├── models/ # 数据模型层
│ │ ├── __init__.py
│ │ ├── user.py # 用户模型
│ │ ├── session.py # 会话模型
│ │ ├── agent.py # Agent 模型
│ │ ├── gateway.py # Gateway 模型
│ │ ├── message.py # 消息模型
│ │ └── connection.py # 连接模型
│ ├── routes/ # HTTP 路由层
│ │ ├── __init__.py
│ │ ├── auth.py # 认证路由
│ │ ├── sessions.py # 会话路由
│ │ ├── agents.py # Agent 路由
│ │ ├── gateways.py # Gateway 路由
│ │ ├── messages.py # 消息路由
│ │ └── stats.py # 统计接口
│ ├── socketio/ # WebSocket 处理层
│ │ ├── __init__.py
│ │ ├── handlers.py # Socket.IO 事件处理
│ │ ├── events.py # 事件定义
│ │ └── auth.py # WebSocket 认证
│ ├── services/ # 业务逻辑层
│ │ ├── __init__.py
│ │ ├── session_service.py # 会话服务
│ │ ├── agent_service.py # Agent 服务
│ │ ├── message_service.py # 消息服务
│ │ ├── scheduler.py # Agent 调度器
│ │ └── message_queue.py # 消息队列管理
│ ├── utils/ # 工具函数
│ │ ├── __init__.py
│ │ ├── validators.py # 输入验证
│ │ ├── security.py # 安全工具
│ │ └── helpers.py # 辅助函数
│ └── extensions.py # Flask 扩展初始化
├── migrations/ # 数据库迁移
├── tests/ # 测试
│ ├── __init__.py
│ ├── test_auth.py
│ ├── test_sessions.py
│ ├── test_messages.py
│ └── test_websocket.py
├── requirements.txt # Python 依赖
├── requirements-dev.txt # 开发依赖
├── config.yaml # 配置文件模板
├── config.example.yaml # 配置示例
├── docker-compose.yaml # Docker 编排
├── Dockerfile # Docker 镜像
└── run.py # 启动入口
数据模型
用户模型 (User)
| 字段 | 类型 | 说明 |
|---|---|---|
| id | String(36) | 主键 UUID |
| username | String(80) | 用户名,唯一索引 |
| password_hash | String(256) | 密码哈希 (bcrypt) |
| String(120) | 邮箱,唯一 | |
| nickname | String(80) | 昵称 |
| role | String(20) | 角色:admin/user |
| status | String(20) | 状态:active/disabled |
| created_at | DateTime | 创建时间 |
| last_login_at | DateTime | 最后登录时间 |
会话模型 (Session)
| 字段 | 类型 | 说明 |
|---|---|---|
| id | String(36) | 主键 UUID |
| user_id | String(36) | 用户 ID,外键 |
| primary_agent_id | String(36) | 主 Agent ID |
| participating_agent_ids | JSON | 参与 Agent ID 列表 |
| user_socket_id | String(100) | 用户 WebSocket Socket ID |
| title | String(200) | 会话标题 |
| channel_type | String(20) | 渠道类型:web/desktop/mobile |
| status | String(20) | 状态:active/paused/closed |
| message_count | Integer | 消息计数 |
| unread_count | Integer | 未读消息数 |
| created_at | DateTime | 创建时间 |
| updated_at | DateTime | 更新时间 |
| last_active_at | DateTime | 最后活跃时间 |
Agent 模型
| 字段 | 类型 | 说明 |
|---|---|---|
| id | String(36) | 主键 UUID |
| name | String(80) | Agent 名称 |
| display_name | String(80) | 显示名称 |
| gateway_id | String(36) | Gateway ID,外键 |
| socket_id | String(100) | WebSocket Socket ID |
| model | String(80) | 使用的模型 |
| capabilities | JSON | 能力列表 |
| status | String(20) | 状态:online/offline/busy |
| priority | Integer | 优先级 (1-10) |
| weight | Integer | 权重 (用于调度) |
| connection_limit | Integer | 最大连接数 |
| current_sessions | Integer | 当前会话数 |
| last_heartbeat | DateTime | 最后心跳时间 |
| created_at | DateTime | 创建时间 |
Gateway 模型
| 字段 | 类型 | 说明 |
|---|---|---|
| id | String(36) | 主键 UUID |
| name | String(80) | Gateway 名称,唯一 |
| url | String(256) | Gateway 回调地址 |
| token_hash | String(256) | 认证 Token 哈希 |
| status | String(20) | 状态:online/offline |
| agent_count | Integer | Agent 数量 |
| connection_limit | Integer | 最大连接数 |
| heartbeat_interval | Integer | 心跳间隔 (秒) |
| allowed_ips | JSON | 允许连接的 IP 白名单 |
| last_heartbeat | DateTime | 最后心跳时间 |
| created_at | DateTime | 创建时间 |
消息模型 (Message) ⭐ 新增
| 字段 | 类型 | 说明 |
|---|---|---|
| id | String(36) | 主键 UUID |
| session_id | String(36) | 会话 ID,外键 |
| sender_type | String(20) | 发送者类型:user/agent/system |
| sender_id | String(36) | 发送者 ID |
| message_type | String(20) | 消息类型:text/media/system |
| content | Text | 消息内容 |
| content_type | String(20) | 内容类型:markdown/plain/html |
| reply_to | String(36) | 回复的消息 ID |
| status | String(20) | 状态:sent/delivered/read/failed |
| ack_status | String(20) | 确认状态:pending/acknowledged |
| retry_count | Integer | 重试次数 |
| created_at | DateTime | 创建时间 |
| delivered_at | DateTime | 送达时间 |
连接模型 (Connection) ⭐ 新增
| 字段 | 类型 | 说明 |
|---|---|---|
| id | String(36) | 主键 UUID |
| socket_id | String(100) | Socket.IO Socket ID |
| connection_type | String(20) | 连接类型:user/agent/gateway |
| entity_id | String(36) | 关联实体 ID |
| entity_type | String(20) | 实体类型:user/agent |
| ip_address | String(45) | IP 地址 (支持 IPv6) |
| user_agent | String(500) | 客户端信息 |
| status | String(20) | 状态:connected/disconnected |
| auth_token | String(500) | 认证 Token |
| connected_at | DateTime | 连接时间 |
| last_activity | DateTime | 最后活动时间 |
| disconnected_at | DateTime | 断开时间 |
HTTP API 设计
认证 API
POST /api/auth/register - 注册新用户
POST /api/auth/login - 用户登录,返回 JWT Token
POST /api/auth/logout - 用户登出
POST /api/auth/refresh - 刷新 Token
GET /api/auth/me - 获取当前用户信息
POST /api/auth/verify - 验证 Token 有效性
会话 API
GET /api/sessions - 获取会话列表
POST /api/sessions - 创建会话
GET /api/sessions/:id - 获取会话详情
PUT /api/sessions/:id/close - 关闭会话 (替代 DELETE)
GET /api/sessions/:id/messages - 获取会话消息
GET /api/sessions/:id/participants - 获取会话参与者 ⭐ 新增
POST /api/sessions/:id/transfer - 会话转接 ⭐ 新增
Agent API
GET /api/agents - 获取 Agent 列表
GET /api/agents/:id - 获取 Agent 详情
GET /api/agents/:id/status - 获取 Agent 实时状态
PUT /api/agents/:id/config - 更新 Agent 配置
POST /api/agents/:id/heartbeat - Agent 心跳上报 ⭐ 新增
GET /api/agents/available - 获取可用 Agent 列表 ⭐ 新增
Gateway API
GET /api/gateways - 获取 Gateway 列表
POST /api/gateways - 注册 Gateway
DELETE /api/gateways/:id - 注销 Gateway
GET /api/gateways/:id/status - 获取 Gateway 状态
POST /api/gateways/:id/heartbeat - Gateway 心跳上报 ⭐ 新增
PUT /api/gateways/:id/agents - 更新 Gateway 的 Agent 列表 ⭐ 新增
消息 API ⭐ 新增
POST /api/messages - 发送消息 (HTTP 方式)
GET /api/messages/:id - 获取单条消息
PUT /api/messages/:id/ack - 确认消息已送达
PUT /api/messages/:id/read - 标记消息已读
统计 API ⭐ 新增
GET /api/stats - 系统统计信息
GET /api/stats/sessions - 会话统计
GET /api/stats/messages - 消息统计
GET /api/stats/agents - Agent 统计
WebSocket 协议
连接流程
1. 客户端连接 WebSocket
2. 服务端发送 connect 事件要求认证
3. 客户端发送 auth 事件携带 JWT Token
4. 服务端验证 Token,发送 authenticated 或 error
5. 认证成功后,可以进行其他操作
认证事件
| 事件 | 方向 | 说明 | 参数 |
|---|---|---|---|
connect |
C→S | 连接请求 | - |
auth |
C→S | 认证请求 | { token: string } |
authenticated |
S→C | 认证成功 | { user_id: string, socket_id: string } |
auth_error |
S→C | 认证失败 | { code: string, message: string } |
心跳事件 ⭐ 新增
| 事件 | 方向 | 说明 | 参数 |
|---|---|---|---|
ping |
C→S | 心跳请求 | { timestamp: number } |
pong |
S→C | 心跳响应 | { timestamp: number } |
heartbeat_timeout |
S→C | 心跳超时 | { socket_id: string } |
会话事件
| 事件 | 方向 | 说明 | 参数 |
|---|---|---|---|
session.create |
C→S | 创建会话 | { title, user_id, agent_id?, priority? } |
session.created |
S→C | 会话已创建 | { session_id, agent_id } |
session.join |
C→S | 加入会话 | { session_id } |
session.joined |
S→C | 已加入会话 | { session_id, participants: [] } |
session.leave |
C→S | 离开会话 ⭐ 新增 | { session_id } |
session.left |
S→C | 已离开会话 ⭐ 新增 | { session_id, user_id } |
session.closed |
S→C | 会话被关闭 ⭐ 新增 | { session_id, reason } |
session.assigned |
S→C | Agent 分配通知 ⭐ 新增 | { session_id, agent_id } |
消息事件
| 事件 | 方向 | 说明 | 参数 |
|---|---|---|---|
message.send |
C→S | 发送消息 | { session_id, content, type, reply_to? } |
message |
S→C | 收到消息 | { message_id, session_id, sender, content, timestamp } |
message.ack |
C→S | 消息确认 ⭐ 新增 | { message_id, status: delivered/read } |
message.acked |
S→C | 确认已收到 ⭐ 新增 | { message_id, status } |
message.stream |
S→C | 流式消息 | { session_id, chunk, is_end } |
message.read |
C→S | 消息已读 ⭐ 新增 | { session_id, message_ids: [] } |
typing |
C→S | 正在输入 ⭐ 新增 | { session_id, is_typing: boolean } |
错误事件
| 事件 | 方向 | 说明 | 参数 |
|---|---|---|---|
error |
S→C | 通用错误 | { code, message, details? } |
session_error |
S→C | 会话错误 | { session_id, code, message } |
message_error |
S→C | 消息错误 | { message_id, code, message } |
消息格式 (与 PIT Channel 兼容) ⭐ 新增
{
"type": "request" | "response" | "event" | "ack",
"id": "uuid-string",
"method": "send.message" | "send.media" | "ping" | "auth",
"params": {
"to": "user-id",
"content": "message content",
"timestamp": 1234567890,
"replyTo": "message-id"
},
"replyTo": "original-message-id",
"error": {
"code": "ERROR_CODE",
"message": "Error description"
}
}
Agent 调度策略 ⭐ 新增
调度算法
class AgentScheduler:
STRATEGIES = {
"round_robin": RoundRobinStrategy, # 轮询
"weighted_round_robin": WeightedRoundRobinStrategy, # 加权轮询
"least_connections": LeastConnectionsStrategy, # 最少连接
"least_response_time": LeastResponseTimeStrategy, # 最快响应
"capability_match": CapabilityMatchStrategy, # 能力匹配
}
调度配置
scheduler:
strategy: "weighted_round_robin" # 调度策略
retry_attempts: 3 # 分配失败重试次数
timeout: 30 # 分配超时 (秒)
# 权重配置
weights:
priority: 0.4 # 优先级权重 (40%)
load: 0.3 # 负载权重 (30%)
capability: 0.3 # 能力匹配权重 (30%)
消息可靠性机制 ⭐ 新增
ACK 确认流程
1. 客户端发送 message.send
2. 服务端存储消息,状态=pending
3. 服务端转发消息给 Agent
4. Agent 收到后发送 message.ack
5. 服务端更新消息状态=delivered
6. 用户阅读后发送 message.read
7. 服务端更新消息状态=read
消息重试机制
| 场景 | 策略 | 最大重试 |
|---|---|---|
| Agent 离线 | 存入队列,Agent 上线后推送 | 无限 |
| 发送超时 | 指数退避重试 | 3 次 |
| ACK 超时 | 重新发送 | 3 次 |
消息持久化
# 消息存储策略
MESSAGE_RETENTION = {
"active_sessions": "30_days", # 活跃会话保留 30 天
"closed_sessions": "7_days", # 关闭会话保留 7 天
"failed_messages": "3_days", # 失败消息保留 3 天
}
安全设计
认证机制
| 层级 | 机制 | 说明 |
|---|---|---|
| HTTP API | JWT Bearer Token | Authorization: Bearer |
| WebSocket | JWT Query Param | ws://host/ws?token= |
| Gateway | Token + IP 白名单 | 双重验证 |
Token 策略
| Token 类型 | 有效期 | 用途 |
|---|---|---|
| Access Token | 24 小时 | API 访问 |
| Refresh Token | 7 天 | 刷新 Access Token |
| Gateway Token | 永不过期 (可撤销) | Gateway 认证 |
安全措施
| 措施 | 实现 | 说明 |
|---|---|---|
| HTTPS/WSS | Nginx 反向代理 | 强制 TLS 1.2+ |
| CORS | Flask-CORS | 白名单配置 |
| Rate Limit | Flask-Limiter | 100 req/min per IP |
| Input Validation | Marshmallow/Pydantic | 严格参数校验 |
| SQL Injection | SQLAlchemy ORM | 参数化查询 |
| XSS | Jinja2 转义 | 自动转义输出 |
| IP 白名单 | Middleware | Gateway 连接限制 |
| 连接数限制 | Socket.IO | 单用户最大连接数 |
部署架构
Docker Compose ⭐ 完善
version: '3.8'
services:
pit-router:
build: .
ports:
- "9000:9000"
environment:
- FLASK_ENV=production
- SECRET_KEY=${SECRET_KEY}
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
- DATABASE_URL=postgresql://user:pass@postgres:5432/pit
- REDIS_URL=redis://redis:6379/0
volumes:
- pit-data:/app/data
- pit-logs:/app/logs
depends_on:
- postgres
- redis
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
deploy:
resources:
limits:
cpus: '2.0'
memory: 2G
reservations:
cpus: '0.5'
memory: 512M
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=${DB_PASSWORD}
- POSTGRES_DB=pit_router
volumes:
- postgres-data:/var/lib/postgresql/data
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d pit_router"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
volumes:
- redis-data:/data
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- pit-router
restart: unless-stopped
volumes:
pit-data:
pit-logs:
postgres-data:
redis-data:
开发计划
| 阶段 | 状态 | 内容 |
|---|---|---|
| Phase 1 | ✅ 完成 | 核心功能(数据模型、HTTP API、WebSocket) |
| Phase 2 | ✅ 完成 | 服务层(调度器、消息队列、会话/消息/Agent 服务) |
| Phase 3 | ✅ 完成 | 工具层(验证器、安全工具、辅助函数) |
| Phase 4 | ⚠️ 部分 | 测试部署(单元测试 80%,集成/性能/部署未完成) |
| Phase 5 | ⏳ 待开始 | 运维工具(Nginx + SSL、日志、监控) |
详细进度
- ✅ 数据模型(6个)- 100%
- ✅ HTTP API(6组)- 100%
- ✅ WebSocket 事件 - 100%
- ✅ 调度器(5种策略)- 100%
- ✅ 消息队列(ACK)- 100%
- ✅ 业务服务层 - 100%
- ✅ 工具函数 - 100%
- ✅ Docker 配置 - 100%
- ⚠️ 单元测试 - 24/31 通过 (80%)
- ⏳ 集成测试 - 待完成
- ⏳ 性能测试 - 待完成
- ⏳ Nginx + SSL - 待完成
- ⏳ 日志系统 - 待完成
- ⏳ 监控告警 - 待完成
总进度: 约 85%
与 PIT Channel 的对接
协议兼容性
| PIT Channel 发送 | 智队中枢处理 |
|---|---|
type: "request" |
路由到对应处理器 |
method: "send.message" |
转发给目标 Agent |
method: "ping" |
返回 pong |
type: "ack" |
更新消息状态 |
配置示例
// openclaw.json
{
"plugins": {
"entries": {
"pit-bot": {
"enabled": true,
"config": {
"routerUrl": "wss://智队中枢.example.com/ws",
"authToken": "${PIT_ROUTER_TOKEN}",
"heartbeatInterval": 30000,
"heartbeatTimeout": 10000,
"ackTimeout": 30000
}
}
}
}
}
自动部署 🔄
Gitea 仓库提交后自动更新 Docker 容器
方案对比
| 方案 | 复杂度 | 说明 |
|---|---|---|
| 方案 1: Webhook + 脚本 | ⭐ 简单 | 推荐,实现快、依赖少 |
| 方案 2: Gitea Actions | ⭐⭐⭐ 中等 | 功能强大,需要配置 Runner |
| 方案 3: Watchtower | ⭐⭐ 简单 | 自动监控镜像更新 |
推荐方案:Webhook + 脚本
架构流程
开发者推送代码 → Gitea 触发 Webhook → 更新脚本执行
→ 拉取代码 → 构建镜像 → 重启容器 → 健康检查
1. 更新脚本
#!/bin/bash
# /www/wwwroot/pit-router/auto-update.sh
set -e
cd /www/wwwroot/pit-router
# 拉取最新代码
git fetch origin && git reset --hard origin/main
# 重建并启动
docker-compose down
docker-compose build --no-cache
docker-compose up -d
# 健康检查
sleep 10
curl -sf http://localhost:1999/health && echo "✅ 更新成功"
2. Webhook 服务
# /www/wwwroot/pit-router/webhook-server.py
from flask import Flask, request, jsonify
import hmac, hashlib, subprocess
app = Flask(__name__)
SECRET = 'your-webhook-secret'
@app.route('/webhook/pit-router', methods=['POST'])
def handle():
# 验证签名
sig = request.headers.get('X-Gitea-Signature', '')
expected = 'sha256=' + hmac.new(SECRET.encode(), request.data, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected):
return {'error': 'Invalid signature'}, 401
# 执行更新
result = subprocess.run(['/www/wwwroot/pit-router/auto-update.sh'], capture_output=True)
return {'status': 'success' if result.returncode == 0 else 'error'}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)
3. 配置 Gitea Webhook
curl -X POST "http://localhost:3000/api/v1/repos/yunxiafei/pit-router/hooks" \
-H "Authorization: token $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"type": "gitea",
"config": {
"url": "http://localhost:5001/webhook/pit-router",
"content_type": "json",
"secret": "your-webhook-secret"
},
"events": ["push"],
"active": true
}'
实施清单
- 创建
auto-update.sh脚本 - 创建
webhook-server.py服务 - 配置 systemd 服务
- 配置 Gitea Webhook
- 测试自动更新
预计时间:30 分钟
相关项目
- PIT Channel 插件 - OpenClaw Channel 插件(智队中枢客户端)
许可证
MIT License
创建时间: 2026-03-14 | 更新: 2026-03-14 | 作者: 小白 🐶
Description
Languages
Python
51.9%
HTML
27.5%
Vue
14%
TypeScript
3.6%
CSS
1.8%
Other
1.2%