Files
pit-router/README.md

19 KiB
Raw Blame History

PIT 网关路由应用

Personal Intelligent Team Router Service

项目概述

PITPersonal Intelligent Team网关路由应用是 PIT 系统的核心组件,负责连接用户交互层和 Agent 层实现消息路由、会话管理、Agent 调度等功能。

核心目标

  • 提供统一的 WebSocket 接入点
  • 实现多 Agent 智能路由
  • 管理会话生命周期
  • 支持 Agent 负载均衡
  • 保证消息可靠传输

技术栈

层级 技术 版本 说明
语言 Python 3.12 高性能异步支持
Web框架 Flask 3.0+ 轻量级、灵活
WebSocket Flask-SocketIO 5.3+ 实时双向通信
ORM SQLAlchemy 3.1+ 数据库抽象层
认证 Flask-Login + JWT - 用户身份验证
缓存 Redis 7+ 会话缓存、消息队列
数据库 PostgreSQL 15+ 生产环境持久化
部署 Gunicorn + Nginx - 生产环境部署

系统架构

用户交互层 (Clients)
    ↓ WebSocket / HTTP
PIT 网关路由应用 (Gateway Router)
    ├── 接入层 (Access Layer)
    │   ├── HTTP Server (Flask)
    │   ├── WebSocket (SocketIO)
    │   ├── Auth Middleware (JWT)
    │   └── Rate Limiter
    ├── 业务层 (Business Layer)
    │   ├── Session Manager
    │   ├── Message Router
    │   ├── Agent Scheduler (加权轮询)
    │   └── Message Queue (ACK机制)
    └── 数据层 (Data Layer)
        ├── PostgreSQL (持久化)
        ├── Redis (缓存/消息队列)
        └── Config Store
    ↓ WebSocket + PIT Channel 协议
Agent 层 (OpenClaw Gateways)

项目结构

pit-router/
├── app/
│   ├── __init__.py              # Flask 应用工厂
│   ├── config.py                # 配置管理
│   ├── models/                  # 数据模型层
│   │   ├── __init__.py
│   │   ├── user.py              # 用户模型
│   │   ├── session.py           # 会话模型
│   │   ├── agent.py             # Agent 模型
│   │   ├── gateway.py           # Gateway 模型
│   │   ├── message.py           # 消息模型
│   │   └── connection.py        # 连接模型
│   ├── routes/                  # HTTP 路由层
│   │   ├── __init__.py
│   │   ├── auth.py              # 认证路由
│   │   ├── sessions.py          # 会话路由
│   │   ├── agents.py            # Agent 路由
│   │   ├── gateways.py          # Gateway 路由
│   │   ├── messages.py          # 消息路由
│   │   └── stats.py             # 统计接口
│   ├── socketio/                # WebSocket 处理层
│   │   ├── __init__.py
│   │   ├── handlers.py          # Socket.IO 事件处理
│   │   ├── events.py            # 事件定义
│   │   └── auth.py              # WebSocket 认证
│   ├── services/                # 业务逻辑层
│   │   ├── __init__.py
│   │   ├── session_service.py   # 会话服务
│   │   ├── agent_service.py     # Agent 服务
│   │   ├── message_service.py   # 消息服务
│   │   ├── scheduler.py         # Agent 调度器
│   │   └── message_queue.py     # 消息队列管理
│   ├── utils/                   # 工具函数
│   │   ├── __init__.py
│   │   ├── validators.py        # 输入验证
│   │   ├── security.py          # 安全工具
│   │   └── helpers.py           # 辅助函数
│   └── extensions.py            # Flask 扩展初始化
├── migrations/                  # 数据库迁移
├── tests/                       # 测试
│   ├── __init__.py
│   ├── test_auth.py
│   ├── test_sessions.py
│   ├── test_messages.py
│   └── test_websocket.py
├── requirements.txt             # Python 依赖
├── requirements-dev.txt         # 开发依赖
├── config.yaml                  # 配置文件模板
├── config.example.yaml          # 配置示例
├── docker-compose.yaml          # Docker 编排
├── Dockerfile                   # Docker 镜像
└── run.py                       # 启动入口

数据模型

用户模型 (User)

字段 类型 说明
id String(36) 主键 UUID
username String(80) 用户名,唯一索引
password_hash String(256) 密码哈希 (bcrypt)
email String(120) 邮箱,唯一
nickname String(80) 昵称
role String(20) 角色admin/user
status String(20) 状态active/disabled
created_at DateTime 创建时间
last_login_at DateTime 最后登录时间

会话模型 (Session)

字段 类型 说明
id String(36) 主键 UUID
user_id String(36) 用户 ID外键
primary_agent_id String(36) 主 Agent ID
participating_agent_ids JSON 参与 Agent ID 列表
user_socket_id String(100) 用户 WebSocket Socket ID
title String(200) 会话标题
channel_type String(20) 渠道类型web/desktop/mobile
status String(20) 状态active/paused/closed
message_count Integer 消息计数
unread_count Integer 未读消息数
created_at DateTime 创建时间
updated_at DateTime 更新时间
last_active_at DateTime 最后活跃时间

Agent 模型

字段 类型 说明
id String(36) 主键 UUID
name String(80) Agent 名称
display_name String(80) 显示名称
gateway_id String(36) Gateway ID外键
socket_id String(100) WebSocket Socket ID
model String(80) 使用的模型
capabilities JSON 能力列表
status String(20) 状态online/offline/busy
priority Integer 优先级 (1-10)
weight Integer 权重 (用于调度)
connection_limit Integer 最大连接数
current_sessions Integer 当前会话数
last_heartbeat DateTime 最后心跳时间
created_at DateTime 创建时间

Gateway 模型

字段 类型 说明
id String(36) 主键 UUID
name String(80) Gateway 名称,唯一
url String(256) Gateway 回调地址
token_hash String(256) 认证 Token 哈希
status String(20) 状态online/offline
agent_count Integer Agent 数量
connection_limit Integer 最大连接数
heartbeat_interval Integer 心跳间隔 (秒)
allowed_ips JSON 允许连接的 IP 白名单
last_heartbeat DateTime 最后心跳时间
created_at DateTime 创建时间

消息模型 (Message) 新增

字段 类型 说明
id String(36) 主键 UUID
session_id String(36) 会话 ID外键
sender_type String(20) 发送者类型user/agent/system
sender_id String(36) 发送者 ID
message_type String(20) 消息类型text/media/system
content Text 消息内容
content_type String(20) 内容类型markdown/plain/html
reply_to String(36) 回复的消息 ID
status String(20) 状态sent/delivered/read/failed
ack_status String(20) 确认状态pending/acknowledged
retry_count Integer 重试次数
created_at DateTime 创建时间
delivered_at DateTime 送达时间

连接模型 (Connection) 新增

字段 类型 说明
id String(36) 主键 UUID
socket_id String(100) Socket.IO Socket ID
connection_type String(20) 连接类型user/agent/gateway
entity_id String(36) 关联实体 ID
entity_type String(20) 实体类型user/agent
ip_address String(45) IP 地址 (支持 IPv6)
user_agent String(500) 客户端信息
status String(20) 状态connected/disconnected
auth_token String(500) 认证 Token
connected_at DateTime 连接时间
last_activity DateTime 最后活动时间
disconnected_at DateTime 断开时间

HTTP API 设计

认证 API

POST   /api/auth/register       - 注册新用户
POST   /api/auth/login          - 用户登录,返回 JWT Token
POST   /api/auth/logout         - 用户登出
POST   /api/auth/refresh        - 刷新 Token
GET    /api/auth/me             - 获取当前用户信息
POST   /api/auth/verify         - 验证 Token 有效性

会话 API

GET    /api/sessions                - 获取会话列表
POST   /api/sessions                - 创建会话
GET    /api/sessions/:id            - 获取会话详情
PUT    /api/sessions/:id/close      - 关闭会话 (替代 DELETE)
GET    /api/sessions/:id/messages   - 获取会话消息
GET    /api/sessions/:id/participants - 获取会话参与者 ⭐ 新增
POST   /api/sessions/:id/transfer   - 会话转接 ⭐ 新增

Agent API

GET    /api/agents                 - 获取 Agent 列表
GET    /api/agents/:id             - 获取 Agent 详情
GET    /api/agents/:id/status      - 获取 Agent 实时状态
PUT    /api/agents/:id/config      - 更新 Agent 配置
POST   /api/agents/:id/heartbeat   - Agent 心跳上报 ⭐ 新增
GET    /api/agents/available       - 获取可用 Agent 列表 ⭐ 新增

Gateway API

GET    /api/gateways               - 获取 Gateway 列表
POST   /api/gateways               - 注册 Gateway
DELETE /api/gateways/:id           - 注销 Gateway
GET    /api/gateways/:id/status    - 获取 Gateway 状态
POST   /api/gateways/:id/heartbeat - Gateway 心跳上报 ⭐ 新增
PUT    /api/gateways/:id/agents    - 更新 Gateway 的 Agent 列表 ⭐ 新增

消息 API 新增

POST   /api/messages               - 发送消息 (HTTP 方式)
GET    /api/messages/:id           - 获取单条消息
PUT    /api/messages/:id/ack       - 确认消息已送达
PUT    /api/messages/:id/read      - 标记消息已读

统计 API 新增

GET    /api/stats                  - 系统统计信息
GET    /api/stats/sessions         - 会话统计
GET    /api/stats/messages         - 消息统计
GET    /api/stats/agents           - Agent 统计

WebSocket 协议

连接流程

1. 客户端连接 WebSocket
2. 服务端发送 connect 事件要求认证
3. 客户端发送 auth 事件携带 JWT Token
4. 服务端验证 Token发送 authenticated 或 error
5. 认证成功后,可以进行其他操作

认证事件

事件 方向 说明 参数
connect C→S 连接请求 -
auth C→S 认证请求 { token: string }
authenticated S→C 认证成功 { user_id: string, socket_id: string }
auth_error S→C 认证失败 { code: string, message: string }

心跳事件 新增

事件 方向 说明 参数
ping C→S 心跳请求 { timestamp: number }
pong S→C 心跳响应 { timestamp: number }
heartbeat_timeout S→C 心跳超时 { socket_id: string }

会话事件

事件 方向 说明 参数
session.create C→S 创建会话 { title, user_id, agent_id?, priority? }
session.created S→C 会话已创建 { session_id, agent_id }
session.join C→S 加入会话 { session_id }
session.joined S→C 已加入会话 { session_id, participants: [] }
session.leave C→S 离开会话 新增 { session_id }
session.left S→C 已离开会话 新增 { session_id, user_id }
session.closed S→C 会话被关闭 新增 { session_id, reason }
session.assigned S→C Agent 分配通知 新增 { session_id, agent_id }

消息事件

事件 方向 说明 参数
message.send C→S 发送消息 { session_id, content, type, reply_to? }
message S→C 收到消息 { message_id, session_id, sender, content, timestamp }
message.ack C→S 消息确认 新增 { message_id, status: delivered/read }
message.acked S→C 确认已收到 新增 { message_id, status }
message.stream S→C 流式消息 { session_id, chunk, is_end }
message.read C→S 消息已读 新增 { session_id, message_ids: [] }
typing C→S 正在输入 新增 { session_id, is_typing: boolean }

错误事件

事件 方向 说明 参数
error S→C 通用错误 { code, message, details? }
session_error S→C 会话错误 { session_id, code, message }
message_error S→C 消息错误 { message_id, code, message }

消息格式 (与 PIT Channel 兼容) 新增

{
  "type": "request" | "response" | "event" | "ack",
  "id": "uuid-string",
  "method": "send.message" | "send.media" | "ping" | "auth",
  "params": {
    "to": "user-id",
    "content": "message content",
    "timestamp": 1234567890,
    "replyTo": "message-id"
  },
  "replyTo": "original-message-id",
  "error": {
    "code": "ERROR_CODE",
    "message": "Error description"
  }
}

Agent 调度策略 新增

调度算法

class AgentScheduler:
    STRATEGIES = {
        "round_robin": RoundRobinStrategy,      # 轮询
        "weighted_round_robin": WeightedRoundRobinStrategy,  # 加权轮询
        "least_connections": LeastConnectionsStrategy,       # 最少连接
        "least_response_time": LeastResponseTimeStrategy,    # 最快响应
        "capability_match": CapabilityMatchStrategy,         # 能力匹配
    }

调度配置

scheduler:
  strategy: "weighted_round_robin"  # 调度策略
  retry_attempts: 3                 # 分配失败重试次数
  timeout: 30                       # 分配超时 (秒)
  
  # 权重配置
  weights:
    priority: 0.4      # 优先级权重 (40%)
    load: 0.3          # 负载权重 (30%)
    capability: 0.3    # 能力匹配权重 (30%)

消息可靠性机制 新增

ACK 确认流程

1. 客户端发送 message.send
2. 服务端存储消息,状态=pending
3. 服务端转发消息给 Agent
4. Agent 收到后发送 message.ack
5. 服务端更新消息状态=delivered
6. 用户阅读后发送 message.read
7. 服务端更新消息状态=read

消息重试机制

场景 策略 最大重试
Agent 离线 存入队列Agent 上线后推送 无限
发送超时 指数退避重试 3 次
ACK 超时 重新发送 3 次

消息持久化

# 消息存储策略
MESSAGE_RETENTION = {
    "active_sessions": "30_days",    # 活跃会话保留 30 天
    "closed_sessions": "7_days",     # 关闭会话保留 7 天
    "failed_messages": "3_days",     # 失败消息保留 3 天
}

安全设计

认证机制

层级 机制 说明
HTTP API JWT Bearer Token Authorization: Bearer
WebSocket JWT Query Param ws://host/ws?token=
Gateway Token + IP 白名单 双重验证

Token 策略

Token 类型 有效期 用途
Access Token 24 小时 API 访问
Refresh Token 7 天 刷新 Access Token
Gateway Token 永不过期 (可撤销) Gateway 认证

安全措施

措施 实现 说明
HTTPS/WSS Nginx 反向代理 强制 TLS 1.2+
CORS Flask-CORS 白名单配置
Rate Limit Flask-Limiter 100 req/min per IP
Input Validation Marshmallow/Pydantic 严格参数校验
SQL Injection SQLAlchemy ORM 参数化查询
XSS Jinja2 转义 自动转义输出
IP 白名单 Middleware Gateway 连接限制
连接数限制 Socket.IO 单用户最大连接数

部署架构

Docker Compose 完善

version: '3.8'

services:
  pit-router:
    build: .
    ports:
      - "9000:9000"
    environment:
      - FLASK_ENV=production
      - SECRET_KEY=${SECRET_KEY}
      - JWT_SECRET=${JWT_SECRET}
      - DATABASE_URL=postgresql://user:pass@postgres:5432/pit
      - REDIS_URL=redis://redis:6379
    volumes:
      - pit-data:/app/data
      - pit-logs:/app/logs
    depends_on:
      - postgres
      - redis
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s
    deploy:
      resources:
        limits:
          cpus: '2.0'
          memory: 2G
        reservations:
          cpus: '0.5'
          memory: 512M
  
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=${DB_PASSWORD}
      - POSTGRES_DB=pit
    volumes:
      - postgres-data:/var/lib/postgresql/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d pit"]
      interval: 10s
      timeout: 5s
      retries: 5
  
  redis:
    image: redis:7-alpine
    volumes:
      - redis-data:/data
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./ssl:/etc/nginx/ssl:ro
    depends_on:
      - pit-router
    restart: unless-stopped

volumes:
  pit-data:
  pit-logs:
  postgres-data:
  redis-data:

开发计划

Phase 1: 核心功能 (3-4 天) 调整

  • 项目结构和配置文件
  • 数据模型实现 (User/Session/Agent/Gateway/Message/Connection)
  • 数据库迁移
  • 认证 API (注册/登录/JWT)
  • WebSocket 认证中间件
  • 基础会话管理
  • 消息路由基础功能

Phase 2: 核心功能完善 (2-3 天) 新增阶段

  • Agent 调度器实现
  • 消息 ACK 机制
  • 消息持久化
  • Gateway 心跳管理
  • 错误处理和重试机制

Phase 3: 扩展功能 (2 天)

  • 统计和监控接口
  • 日志系统
  • 配置热更新
  • 消息历史查询

Phase 4: 测试部署 (2-3 天) 调整

  • 单元测试 (pytest)
  • 集成测试 (WebSocket)
  • 性能测试
  • Docker 部署
  • Nginx + SSL 配置

总计9-12 天 (更实际的估算)


与 PIT Channel 的对接

协议兼容性

PIT Channel 发送 PIT Router 处理
type: "request" 路由到对应处理器
method: "send.message" 转发给目标 Agent
method: "ping" 返回 pong
type: "ack" 更新消息状态

配置示例

// openclaw.json
{
  "plugins": {
    "entries": {
      "pit-bot": {
        "enabled": true,
        "config": {
          "routerUrl": "wss://pit-router.example.com/ws",
          "authToken": "${PIT_ROUTER_TOKEN}",
          "heartbeatInterval": 30000,
          "heartbeatTimeout": 10000,
          "ackTimeout": 30000
        }
      }
    }
  }
}

相关项目


许可证

MIT License


创建时间: 2026-03-14 | 更新: 2026-03-14 | 作者: 小白 🐶