yunxiafei b1f430aa3b fix: 修复自动部署脚本
- 修正健康检查端口为 9000
- 增加等待时间到 30 秒
- 增加重试次数到 10 次
- 排除 venv 和 .git 目录的备份
- 只保留最近 3 个备份
2026-03-15 08:17:01 +08:00
2026-03-15 08:17:01 +08:00

智队中枢

PIT 网关路由应用 - Personal Intelligent Team Gateway Router Service

中文名:智队中枢
英文名PIT Router

项目概述

智队中枢(原 PIT Router是 PITPersonal Intelligent Team系统的核心组件负责连接用户交互层和 Agent 层实现消息路由、会话管理、Agent 调度等功能。

核心目标

  • 提供统一的 WebSocket 接入点
  • 实现多 Agent 智能路由
  • 管理会话生命周期
  • 支持 Agent 负载均衡
  • 保证消息可靠传输

技术栈

层级 技术 版本 说明
语言 Python 3.12 高性能异步支持
Web框架 Flask 3.0+ 轻量级、灵活
WebSocket Flask-SocketIO 5.3+ 实时双向通信
ORM SQLAlchemy 3.1+ 数据库抽象层
认证 Flask-Login + JWT - 用户身份验证
缓存 Redis 7+ 会话缓存、消息队列
数据库 PostgreSQL 15+ 生产环境持久化
部署 Gunicorn + Nginx - 生产环境部署

系统架构

用户交互层 (Clients)
    ↓ WebSocket / HTTP
智队中枢 (PIT Router)
    ├── 接入层 (Access Layer)
    │   ├── HTTP Server (Flask)
    │   ├── WebSocket (SocketIO)
    │   ├── Auth Middleware (JWT)
    │   └── Rate Limiter
    ├── 业务层 (Business Layer)
    │   ├── Session Manager
    │   ├── Message Router
    │   ├── Agent Scheduler (加权轮询)
    │   └── Message Queue (ACK机制)
    └── 数据层 (Data Layer)
        ├── PostgreSQL (持久化)
        ├── Redis (缓存/消息队列)
        └── Config Store
    ↓ WebSocket + PIT Channel 协议
Agent 层 (OpenClaw Gateways)

项目结构

pit-router/
├── app/
│   ├── __init__.py              # Flask 应用工厂
│   ├── config.py                # 配置管理
│   ├── extensions.py            # Flask 扩展初始化
│   ├── models/                  # 数据模型层
│   │   ├── __init__.py
│   │   ├── user.py              # 用户模型
│   │   ├── session.py           # 会话模型
│   │   ├── agent.py             # Agent 模型
│   │   ├── gateway.py           # Gateway 模型
│   │   ├── message.py           # 消息模型
│   │   └── connection.py        # 连接模型
│   ├── routes/                  # HTTP 路由层
│   │   ├── __init__.py
│   │   ├── auth.py              # 认证路由
│   │   ├── sessions.py          # 会话路由
│   │   ├── agents.py            # Agent 路由
│   │   ├── gateways.py          # Gateway 路由
│   │   ├── messages.py          # 消息路由
│   │   └── stats.py             # 统计接口
│   ├── socketio/                # WebSocket 处理层
│   │   ├── __init__.py
│   │   ├── handlers.py          # Socket.IO 事件处理
│   │   ├── events.py            # 事件定义
│   │   └── auth.py              # WebSocket 认证
│   ├── services/                # 业务逻辑层
│   │   ├── __init__.py
│   │   ├── session_service.py   # 会话服务
│   │   ├── agent_service.py     # Agent 服务
│   │   ├── message_service.py   # 消息服务
│   │   ├── scheduler.py         # Agent 调度器
│   │   └── message_queue.py     # 消息队列管理
│   ├── utils/                   # 工具函数
│   │   ├── __init__.py
│   │   ├── validators.py        # 输入验证
│   │   ├── security.py          # 安全工具
│   │   └── helpers.py           # 辅助函数
│   └── extensions.py            # Flask 扩展初始化
├── migrations/                  # 数据库迁移
├── tests/                       # 测试
│   ├── __init__.py
│   ├── test_auth.py
│   ├── test_sessions.py
│   ├── test_messages.py
│   └── test_websocket.py
├── requirements.txt             # Python 依赖
├── requirements-dev.txt         # 开发依赖
├── config.yaml                  # 配置文件模板
├── config.example.yaml          # 配置示例
├── docker-compose.yaml          # Docker 编排
├── Dockerfile                   # Docker 镜像
└── run.py                       # 启动入口

数据模型

用户模型 (User)

字段 类型 说明
id String(36) 主键 UUID
username String(80) 用户名,唯一索引
password_hash String(256) 密码哈希 (bcrypt)
email String(120) 邮箱,唯一
nickname String(80) 昵称
role String(20) 角色admin/user
status String(20) 状态active/disabled
created_at DateTime 创建时间
last_login_at DateTime 最后登录时间

会话模型 (Session)

字段 类型 说明
id String(36) 主键 UUID
user_id String(36) 用户 ID外键
primary_agent_id String(36) 主 Agent ID
participating_agent_ids JSON 参与 Agent ID 列表
user_socket_id String(100) 用户 WebSocket Socket ID
title String(200) 会话标题
channel_type String(20) 渠道类型web/desktop/mobile
status String(20) 状态active/paused/closed
message_count Integer 消息计数
unread_count Integer 未读消息数
created_at DateTime 创建时间
updated_at DateTime 更新时间
last_active_at DateTime 最后活跃时间

Agent 模型

字段 类型 说明
id String(36) 主键 UUID
name String(80) Agent 名称
display_name String(80) 显示名称
gateway_id String(36) Gateway ID外键
socket_id String(100) WebSocket Socket ID
model String(80) 使用的模型
capabilities JSON 能力列表
status String(20) 状态online/offline/busy
priority Integer 优先级 (1-10)
weight Integer 权重 (用于调度)
connection_limit Integer 最大连接数
current_sessions Integer 当前会话数
last_heartbeat DateTime 最后心跳时间
created_at DateTime 创建时间

Gateway 模型

字段 类型 说明
id String(36) 主键 UUID
name String(80) Gateway 名称,唯一
url String(256) Gateway 回调地址
token_hash String(256) 认证 Token 哈希
status String(20) 状态online/offline
agent_count Integer Agent 数量
connection_limit Integer 最大连接数
heartbeat_interval Integer 心跳间隔 (秒)
allowed_ips JSON 允许连接的 IP 白名单
last_heartbeat DateTime 最后心跳时间
created_at DateTime 创建时间

消息模型 (Message) 新增

字段 类型 说明
id String(36) 主键 UUID
session_id String(36) 会话 ID外键
sender_type String(20) 发送者类型user/agent/system
sender_id String(36) 发送者 ID
message_type String(20) 消息类型text/media/system
content Text 消息内容
content_type String(20) 内容类型markdown/plain/html
reply_to String(36) 回复的消息 ID
status String(20) 状态sent/delivered/read/failed
ack_status String(20) 确认状态pending/acknowledged
retry_count Integer 重试次数
created_at DateTime 创建时间
delivered_at DateTime 送达时间

连接模型 (Connection) 新增

字段 类型 说明
id String(36) 主键 UUID
socket_id String(100) Socket.IO Socket ID
connection_type String(20) 连接类型user/agent/gateway
entity_id String(36) 关联实体 ID
entity_type String(20) 实体类型user/agent
ip_address String(45) IP 地址 (支持 IPv6)
user_agent String(500) 客户端信息
status String(20) 状态connected/disconnected
auth_token String(500) 认证 Token
connected_at DateTime 连接时间
last_activity DateTime 最后活动时间
disconnected_at DateTime 断开时间

HTTP API 设计

认证 API

POST   /api/auth/register       - 注册新用户
POST   /api/auth/login          - 用户登录,返回 JWT Token
POST   /api/auth/logout         - 用户登出
POST   /api/auth/refresh        - 刷新 Token
GET    /api/auth/me             - 获取当前用户信息
POST   /api/auth/verify         - 验证 Token 有效性

会话 API

GET    /api/sessions                - 获取会话列表
POST   /api/sessions                - 创建会话
GET    /api/sessions/:id            - 获取会话详情
PUT    /api/sessions/:id/close      - 关闭会话 (替代 DELETE)
GET    /api/sessions/:id/messages   - 获取会话消息
GET    /api/sessions/:id/participants - 获取会话参与者 ⭐ 新增
POST   /api/sessions/:id/transfer   - 会话转接 ⭐ 新增

Agent API

GET    /api/agents                 - 获取 Agent 列表
GET    /api/agents/:id             - 获取 Agent 详情
GET    /api/agents/:id/status      - 获取 Agent 实时状态
PUT    /api/agents/:id/config      - 更新 Agent 配置
POST   /api/agents/:id/heartbeat   - Agent 心跳上报 ⭐ 新增
GET    /api/agents/available       - 获取可用 Agent 列表 ⭐ 新增

Gateway API

GET    /api/gateways               - 获取 Gateway 列表
POST   /api/gateways               - 注册 Gateway
DELETE /api/gateways/:id           - 注销 Gateway
GET    /api/gateways/:id/status    - 获取 Gateway 状态
POST   /api/gateways/:id/heartbeat - Gateway 心跳上报 ⭐ 新增
PUT    /api/gateways/:id/agents    - 更新 Gateway 的 Agent 列表 ⭐ 新增

消息 API 新增

POST   /api/messages               - 发送消息 (HTTP 方式)
GET    /api/messages/:id           - 获取单条消息
PUT    /api/messages/:id/ack       - 确认消息已送达
PUT    /api/messages/:id/read      - 标记消息已读

统计 API 新增

GET    /api/stats                  - 系统统计信息
GET    /api/stats/sessions         - 会话统计
GET    /api/stats/messages         - 消息统计
GET    /api/stats/agents           - Agent 统计

WebSocket 协议

连接流程

1. 客户端连接 WebSocket
2. 服务端发送 connect 事件要求认证
3. 客户端发送 auth 事件携带 JWT Token
4. 服务端验证 Token发送 authenticated 或 error
5. 认证成功后,可以进行其他操作

认证事件

事件 方向 说明 参数
connect C→S 连接请求 -
auth C→S 认证请求 { token: string }
authenticated S→C 认证成功 { user_id: string, socket_id: string }
auth_error S→C 认证失败 { code: string, message: string }

心跳事件 新增

事件 方向 说明 参数
ping C→S 心跳请求 { timestamp: number }
pong S→C 心跳响应 { timestamp: number }
heartbeat_timeout S→C 心跳超时 { socket_id: string }

会话事件

事件 方向 说明 参数
session.create C→S 创建会话 { title, user_id, agent_id?, priority? }
session.created S→C 会话已创建 { session_id, agent_id }
session.join C→S 加入会话 { session_id }
session.joined S→C 已加入会话 { session_id, participants: [] }
session.leave C→S 离开会话 新增 { session_id }
session.left S→C 已离开会话 新增 { session_id, user_id }
session.closed S→C 会话被关闭 新增 { session_id, reason }
session.assigned S→C Agent 分配通知 新增 { session_id, agent_id }

消息事件

事件 方向 说明 参数
message.send C→S 发送消息 { session_id, content, type, reply_to? }
message S→C 收到消息 { message_id, session_id, sender, content, timestamp }
message.ack C→S 消息确认 新增 { message_id, status: delivered/read }
message.acked S→C 确认已收到 新增 { message_id, status }
message.stream S→C 流式消息 { session_id, chunk, is_end }
message.read C→S 消息已读 新增 { session_id, message_ids: [] }
typing C→S 正在输入 新增 { session_id, is_typing: boolean }

错误事件

事件 方向 说明 参数
error S→C 通用错误 { code, message, details? }
session_error S→C 会话错误 { session_id, code, message }
message_error S→C 消息错误 { message_id, code, message }

消息格式 (与 PIT Channel 兼容) 新增

{
  "type": "request" | "response" | "event" | "ack",
  "id": "uuid-string",
  "method": "send.message" | "send.media" | "ping" | "auth",
  "params": {
    "to": "user-id",
    "content": "message content",
    "timestamp": 1234567890,
    "replyTo": "message-id"
  },
  "replyTo": "original-message-id",
  "error": {
    "code": "ERROR_CODE",
    "message": "Error description"
  }
}

Agent 调度策略 新增

调度算法

class AgentScheduler:
    STRATEGIES = {
        "round_robin": RoundRobinStrategy,      # 轮询
        "weighted_round_robin": WeightedRoundRobinStrategy,  # 加权轮询
        "least_connections": LeastConnectionsStrategy,       # 最少连接
        "least_response_time": LeastResponseTimeStrategy,    # 最快响应
        "capability_match": CapabilityMatchStrategy,         # 能力匹配
    }

调度配置

scheduler:
  strategy: "weighted_round_robin"  # 调度策略
  retry_attempts: 3                 # 分配失败重试次数
  timeout: 30                       # 分配超时 (秒)
  
  # 权重配置
  weights:
    priority: 0.4      # 优先级权重 (40%)
    load: 0.3          # 负载权重 (30%)
    capability: 0.3    # 能力匹配权重 (30%)

消息可靠性机制 新增

ACK 确认流程

1. 客户端发送 message.send
2. 服务端存储消息,状态=pending
3. 服务端转发消息给 Agent
4. Agent 收到后发送 message.ack
5. 服务端更新消息状态=delivered
6. 用户阅读后发送 message.read
7. 服务端更新消息状态=read

消息重试机制

场景 策略 最大重试
Agent 离线 存入队列Agent 上线后推送 无限
发送超时 指数退避重试 3 次
ACK 超时 重新发送 3 次

消息持久化

# 消息存储策略
MESSAGE_RETENTION = {
    "active_sessions": "30_days",    # 活跃会话保留 30 天
    "closed_sessions": "7_days",     # 关闭会话保留 7 天
    "failed_messages": "3_days",     # 失败消息保留 3 天
}

安全设计

认证机制

层级 机制 说明
HTTP API JWT Bearer Token Authorization: Bearer
WebSocket JWT Query Param ws://host/ws?token=
Gateway Token + IP 白名单 双重验证

Token 策略

Token 类型 有效期 用途
Access Token 24 小时 API 访问
Refresh Token 7 天 刷新 Access Token
Gateway Token 永不过期 (可撤销) Gateway 认证

安全措施

措施 实现 说明
HTTPS/WSS Nginx 反向代理 强制 TLS 1.2+
CORS Flask-CORS 白名单配置
Rate Limit Flask-Limiter 100 req/min per IP
Input Validation Marshmallow/Pydantic 严格参数校验
SQL Injection SQLAlchemy ORM 参数化查询
XSS Jinja2 转义 自动转义输出
IP 白名单 Middleware Gateway 连接限制
连接数限制 Socket.IO 单用户最大连接数

部署架构

Docker Compose 完善

version: '3.8'

services:
  pit-router:
    build: .
    ports:
      - "9000:9000"
    environment:
      - FLASK_ENV=production
      - SECRET_KEY=${SECRET_KEY}
      - JWT_SECRET_KEY=${JWT_SECRET_KEY}
      - DATABASE_URL=postgresql://user:pass@postgres:5432/pit
      - REDIS_URL=redis://redis:6379/0
    volumes:
      - pit-data:/app/data
      - pit-logs:/app/logs
    depends_on:
      - postgres
      - redis
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s
    deploy:
      resources:
        limits:
          cpus: '2.0'
          memory: 2G
        reservations:
          cpus: '0.5'
          memory: 512M
  
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=${DB_PASSWORD}
      - POSTGRES_DB=pit_router
    volumes:
      - postgres-data:/var/lib/postgresql/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres -d pit_router"]
      interval: 10s
      timeout: 5s
      retries: 5
  
  redis:
    image: redis:7-alpine
    volumes:
      - redis-data:/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./ssl:/etc/nginx/ssl:ro
    depends_on:
      - pit-router
    restart: unless-stopped

volumes:
  pit-data:
  pit-logs:
  postgres-data:
  redis-data:

开发计划

阶段 状态 内容
Phase 1 完成 核心功能数据模型、HTTP API、WebSocket
Phase 2 完成 服务层(调度器、消息队列、会话/消息/Agent 服务)
Phase 3 完成 工具层(验证器、安全工具、辅助函数)
Phase 4 ⚠️ 部分 测试部署(单元测试 80%,集成/性能/部署未完成)
Phase 5 待开始 运维工具Nginx + SSL、日志、监控

详细进度

  • 数据模型6个- 100%
  • HTTP API6组- 100%
  • WebSocket 事件 - 100%
  • 调度器5种策略- 100%
  • 消息队列ACK- 100%
  • 业务服务层 - 100%
  • 工具函数 - 100%
  • Docker 配置 - 100%
  • ⚠️ 单元测试 - 24/31 通过 (80%)
  • 集成测试 - 待完成
  • 性能测试 - 待完成
  • Nginx + SSL - 待完成
  • 日志系统 - 待完成
  • 监控告警 - 待完成

总进度: 约 85%


与 PIT Channel 的对接

协议兼容性

PIT Channel 发送 智队中枢处理
type: "request" 路由到对应处理器
method: "send.message" 转发给目标 Agent
method: "ping" 返回 pong
type: "ack" 更新消息状态

配置示例

// openclaw.json
{
  "plugins": {
    "entries": {
      "pit-bot": {
        "enabled": true,
        "config": {
          "routerUrl": "wss://智队中枢.example.com/ws",
          "authToken": "${PIT_ROUTER_TOKEN}",
          "heartbeatInterval": 30000,
          "heartbeatTimeout": 10000,
          "ackTimeout": 30000
        }
      }
    }
  }
}

自动部署 🔄

Gitea 仓库提交后自动更新 Docker 容器

方案对比

方案 复杂度 说明
方案 1: Webhook + 脚本 简单 推荐,实现快、依赖少
方案 2: Gitea Actions 中等 功能强大,需要配置 Runner
方案 3: Watchtower 简单 自动监控镜像更新

推荐方案Webhook + 脚本

架构流程

开发者推送代码 → Gitea 触发 Webhook → 更新脚本执行
    → 拉取代码 → 构建镜像 → 重启容器 → 健康检查

1. 更新脚本

#!/bin/bash
# /www/wwwroot/pit-router/auto-update.sh

set -e
cd /www/wwwroot/pit-router

# 拉取最新代码
git fetch origin && git reset --hard origin/main

# 重建并启动
docker-compose down
docker-compose build --no-cache
docker-compose up -d

# 健康检查
sleep 10
curl -sf http://localhost:1999/health && echo "✅ 更新成功"

2. Webhook 服务

# /www/wwwroot/pit-router/webhook-server.py

from flask import Flask, request, jsonify
import hmac, hashlib, subprocess

app = Flask(__name__)
SECRET = 'your-webhook-secret'

@app.route('/webhook/pit-router', methods=['POST'])
def handle():
    # 验证签名
    sig = request.headers.get('X-Gitea-Signature', '')
    expected = 'sha256=' + hmac.new(SECRET.encode(), request.data, hashlib.sha256).hexdigest()
    if not hmac.compare_digest(sig, expected):
        return {'error': 'Invalid signature'}, 401
    
    # 执行更新
    result = subprocess.run(['/www/wwwroot/pit-router/auto-update.sh'], capture_output=True)
    return {'status': 'success' if result.returncode == 0 else 'error'}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001)

3. 配置 Gitea Webhook

curl -X POST "http://localhost:3000/api/v1/repos/yunxiafei/pit-router/hooks" \
  -H "Authorization: token $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "type": "gitea",
    "config": {
      "url": "http://localhost:5001/webhook/pit-router",
      "content_type": "json",
      "secret": "your-webhook-secret"
    },
    "events": ["push"],
    "active": true
  }'

实施清单

  • 创建 auto-update.sh 脚本
  • 创建 webhook-server.py 服务
  • 配置 systemd 服务
  • 配置 Gitea Webhook
  • 测试自动更新

预计时间30 分钟


相关项目


许可证

MIT License


创建时间: 2026-03-14 | 更新: 2026-03-14 | 作者: 小白 🐶

Description
智队中枢 - PIT Gateway Router Service
Readme 2.7 MiB
Languages
Python 51.9%
HTML 27.5%
Vue 14%
TypeScript 3.6%
CSS 1.8%
Other 1.2%